You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state"Under Discussion"

Discussion thread: https://lists.apache.org/thread.html/reb368f095ec13638b95cd5d885a0aa8e69af06d6e982a5f045f50022%40%3Cdev.flink.apache.org%3E

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)


Motivation

As described in FLIP-131, we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. Users should be able to write a program using the DataStream API that will execute efficiently on both bounded and unbounded input data. But before we reach this point, it is worth discussing and agreeing on the semantics of some operations as we transition from the streaming world to the batch one. 

Does processing time windowing make sense in the batch world? How should watermarks be generated in a batch job operating on event time?

This document discusses questions about semantics like the ones above and sketches a plan about “cleaning up” the current DataStream API by deprecating methods that were introduced in the past but have ended up being more of a maintenance burden. 

Batch vs Streaming Scheduling: explicit or derived?

The user code of an application in Flink is translated into a graph of tasks similar to the one shown in here. Flink’s runtime will then schedule these tasks differently depending on if they belong to a batch or a streaming application.

In Streaming, all tasks are scheduled before any data can flow through the pipeline. Records are processed as they come and the output of each task is aggressively pushed to the next task downstream with almost no buffering in between. Mechanisms like Checkpointing build on the fact that all tasks are available throughout the lifecycle of a job in order to provide fault tolerance.

In Batch, the task graph is split into independent regions which either run independently, e.g. “embarrassingly parallel” jobs, or communicate with blocking shuffles, e.g. pipelines containing keyBy(). Each of these regions processes its (bounded) input from start to finish and materialises its output. The lifecycle of that output is independent from that of the task that produced it. This allows for more flexible scheduling where the task can be de-allocated and the freed resources can be allocated to the next ready task. In addition, if a task fails, its predecessors do not necessarily need to be re-executed as their output is already materialised. For more details please see FLIP-119.

Each of the above scheduling algorithms has benefits over the other for its particular setting, but each has its own implications about fault-tolerance and other operational topics such as availability of savepoints. Given the above, the question arises: should Flink automatically pick batch scheduling when it detects that all sources of a pipeline are bounded, or should we require the user to explicitly state his or her preference? 

Proposal: Ideally, we would like to let the user explicitly specify the scheduling algorithm he or she prefers, but if not explicitly set,  then pick Batch scheduling if all sources are bounded, or Streaming scheduling otherwise. 

But, given that some already existing sources in DataStream are bounded, e.g. fromCollection() or readFile() with FileProcessingMode set to PROCESS_ONCE, and this would break existing behaviour, we can go with a scheme where the user can set one of three options: BATCH, STREAMING and AUTOMATIC, and set the default to STREAMING, which is the current behaviour.

In more detail, the above modes would mean:

  • BATCH
    • we divide the graph into independent regions and schedule them individually
    • The boundaries of these regions are the keyBy()’s which are set to blocking
  • STREAMING
    • All tasks are scheduled before elements start flowing in the pipeline
    • keyBy()’s are hash partitions whose elements are forwarded aggressively
  • AUTOMATIC
    • If ALL Sources are bounded, then pick BATCH
    • STREAMING otherwise


How to expose that:

We suggest introducing two new configuration options:

  1. The general option for choosing the overall runtime mode
    1. we will expose it via a configuration under the execution.runtime-mode key with an enum value BATCH/STREAM/AUTOMATIC
    2. through StreamExecutionEnvironment#setRuntimeMode(RuntimeMode mode)
    3. each of the particular modes will set default values of other settings such as ScheduleMode, ShuffleMode, buffer timeout, etc. It will be possible to override the defaults for particular options, but only as long as the value is compatible chosen mode, e.g. shuffle-mode other than ALL_EDGED_PIPELINED is illegal for STREAM mode. Illegal combinations will result in an exception.
  2. An option for choosing the shuffle-mode
    1. exposed only through a configuration option under the execution.shuffle-mode key. We will not expose

Examples:

Users will be able to choose BATCH mode:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
Configuration conf = new Configuration();
conf.setString("execution.shuffle-mode", "ALL_EDGES_BLOCKING");
env.configure(conf);


As a configuration option named execution.mode.This will allow users to use it through:

  • the environment constructors, 
  • through the CLI, and 
  • through the env.configure() method.


Rejected Alternative: An alternative could be to expose it through the executionConfig or directly in the Environments but those were rejected because we can achieve the same through a configuration option.

Processing Time Support in Batch 

The notion of time is crucial in Stream Processing. Time in Streaming comes in two flavours, Processing and Event Time. In a nutshell, Processing Time is the wall-clock time on the machine the record is processed, at the specific instance it is processed, while Event Time is a timestamp usually embedded in the record itself and indicating, for example, when a transaction happened, or when a measurement was taken by a sensor. For more details, please refer to the corresponding documentation page.

Based on the definition of Processing Time, we see that the results of a computation that is based on processing time are not reproducible. This is because the same record processed twice will have two different timestamps. 

Despite the above, in streaming, using processing time can be useful. The reason has to do with the fact that streaming pipelines often ingest their unbounded input in “real time” and jobs are expected to run for a long period of time. As an example, imagine a computation that counts incoming records of a certain type in 1h (event time) windows. In this example, the user may need to wait ~1h to have the result for a window, as data is ingested as fast as they come (real time). In this case, to have an early (incomplete) indication of the trend of the counter (is it increasing or decreasing), the user may want to use “early firings” every 5 min in processing time.

In essence, the “real time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.

Proposal: Given the above discussion, and the fact that with batch processing completeness, correctness and reproducibility of the result have no fundamental reason to be sacrificed for performance, we argue that processing time can be ignored in batch.

Practically this means that in bounded stream execution:

  1. When trying to apply processing time windowing on batch workloads, Flink could throw an exception warning the user.
  2. When the user tries to set Processing Time timers, e.g. a ProcessFunction, or specify a processing time-based policy, e.g. a rolling policy in the StreamingFileSink, Flink will ignore these timers when executed in batch.
  3. Custom triggers, both in event and processing time, should be treated as optional and potentially be ignored in batch. The reason is that early triggers are simply a means to bypass the previously mentioned correlation between responsiveness and processing time, which does not exist in batch processing.
  4. If event time is the only sensible option for batch, then we may need to consider changing the default value of the TimeCharacteristic from ProcessingTime to EventTime. In this case, if no WatermarkGenerator and TimestampAssigner is set, we can go with INGESTION_TIME semantics.


IMPORTANT PROPOSAL:

If we decide to change the default value of the TimeCharacteristic from ProcessingTime to EventTime and no WatermarkGenerator and TimestampAssigner is set, then we will go with INGESTION_TIME semantics.


Future Work: In the future we may consider adding as options the capability of:

  • firing all the registered processing time timers at the end of a job (at close()).

These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch.

Event Time Support in Batch

Flink’s streaming runtime builds on the pessimistic assumption that there are no guarantees about the order of the events. This means that events may come out-of-order, i.e. an event with timestamp t may come after an event with timestamp t+1. In addition, the system can never be sure that no more elements with timestamp t < T can come in the future. To amortise the impact of this out-of-orderness, Flink, along with other frameworks in the streaming space, uses an heuristic called Watermarks. A watermark with timestamp T signals that no element  with timestamp t < T will follow.

In the batch world, where the input dataset is assumed to be static and known in advance, there is no need for such an heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in batch we can assume “perfect watermarks”.

Implications: Given the above, in batch we only need to send a MAX_WATERMARK when the end of input is reached or at the end of each key, if we decide to process each key independently, as done in DataSet. This will allow all registered timers to fire. This means that user-defined WatermarkAssigners will be ignored.

Although the above does not directly imply any user-visible change, it has to be stressed out as in some cases, the same application executed on batch and streaming may lead to different results due to late elements being left out in case of streaming.

Broadcast State

A powerful feature of the DataStream API is the Broadcast State pattern. This feature/pattern was introduced to allow users to implement use-cases where a “control” stream needs to be broadcasted to all downstream tasks, and its broadcasted elements, e.g. rules, need to be applied to all incoming elements from another stream. An example can be a fraud detection use-case where the rules evolve over time.

In this pattern, Flink provides no guarantees about the order in which the inputs are read. Essentially, there is no way to force the broadcast side to be read before the non-broadcast side. Use-cases like the one above make sense in the streaming world where jobs are expected to run for a long period of time with input data that are not known in advance. In these settings, requirements may change over time depending on the incoming data. 

In the batch world though, we believe that such use-cases do not make much sense, as the input (both the elements and the control stream) are expected to be static and known in advance. 

Proposal (potentially not in 1.12): Build custom support for broadcast state pattern where the broadcast side is read first.

Relational methods in DataStream

As discussed in FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we see the Table API/SQL as the relational API, where we expect users to work with schemas and fields. Going forward, we envision the DataStream API to be "slightly" lower level API, with a more explicit control over the execution graph, operations, and state. Having said that we think it is worth deprecating and removing in the future all relational style methods in DataStream, which often use Reflection to access the fields and thus are less performant than providing an explicit extractors such as:

  • DataStream#project
  • Windowed/KeyedStream#sum,min,max,minBy,maxBy
  • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

Moreover some of the operations have semantics that might make sense for stream processing but should behave differently for batch. For example, KeyedStream.reduce() is essentially a reduce on a GlobalWindow with a Trigger that fires on every element. In DB terms it produces an UPSERT stream as an output, if you get ten input elements for a key you also get ten output records. For batch processing it make more sense to instead only produce one output record per key with the result of the aggregation when we reach the end of stream/key. This will be correct for downstream consumers that expect an UPSERT stream but it will change the actual physical output stream that they see. The methods we suggest changing the behaviour when run in a bounded execution mode include:

  • KeyedStream#reduce
  • KeyedStream#sum,min,max,minBy,maxBy
  • KeyedStream#fold

Sinks

Current exactly-once sinks in DataStream rely heavily on Flink’s checkpointing mechanism and will not work with batch scheduling. Support for exactly-once sinks is outside the scope of this FLIP and there will be a separate one coming soon.

Iterations

Iterations are outside the scope of this FLIP and there will be a separate one in the future.

  • No labels