Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently with our released versions [1], there are mainly two ways to finish a job: stop and cancel, and the difference between them are as follows [1]:

  • On a cancel call, the operators in a job immediately receive a cancel() method call to cancel them as soon as possible. If operators are not stopping after the cancel call, Flink will start interrupting the thread periodically until it stops.
  • A “stop” call is a more graceful way of stopping a running streaming job. Stop is only available for jobs which use sources that implement the StoppableFunction interface. When the user requests to stop a job, all sources will receive a stop() method call. The job will keep running until all sources properly shut down. This allows the job to finish processing all inflight data.

However, for stateful operators with retained checkpointing, the stop call won’t take any checkpoint, thus when resuming the job it needs to recover from the latest checkpoint with source rewinding, which causes the wait for processing all inflight data meaningless (all need to be processed again). In another word, there’s no real difference between stop and cancel in this case.

On the other hand, in latest master branch after FLIP-34, job stop will always be accompanied by a savepoint, which has below problems:

  • It's an unexpected behavior change from user perspective, that old stop command will fail on jobs w/o savepoint configuration.
  • It slows down the job stop procedure and might block launching new jobs when resource is contended.

To resolve this issueThis document targets at reinforcing the semantic of job stopping, adding back the normal stop (w/o savepoint) support as well as preventing unnecessary source winding when retained checkpoint is enabled. To achieve these goals, we will firstly discuss about the conceptual difference between job stop and cancel, and relatively that between checkpoint and savepoint, analogizing to the concepts in database systems. Then we will describe how to reinforce the job stop semantic and how to implement it.

Current Status

In the first section we have illustrated the current difference between job cancellation and stop, as well as the issue and ambiguity when stopping a job with stateful operators and retained checkpointing. Let’s look deeper about why this happens, analogizing to the mature database system.

Checkpoint 

Both checkpoint in Flink and database are system-controlled process.

this paragraph let’s align the concepts in Flink to those of the mature database system, and see what's missing in existing job stop process.

Checkpoint 

In database (take MySQL for example) there’s also a checkpoint concept [2]. Along with data ingestion, changes are made to data pages which are cached in the buffer pool and written to the data files some time later by a process known as flushing. The checkpoint is a record of the latest changes that have been successfully flushed.

...

  • During normal operation, it performs fuzzy checkpoints, just like the normal checkpointing process in Flink.
  • When the database shuts down, it performs a sharp checkpoint, which is missing in current Flink job stopping process.

Please note that checkpoint in both Flink and database are system-controlled process.

Cancel, Stop and Failover

Obviously, we We could map the Flink job cancel and stop command to database kill and normal-shutdown, and the Flink job failover process to database crash-and-recover.

...

  • Resuming a Flink job after cancellation involves source rewinding, just like database resume from killing needs a redo log replay.
  • Resuming a Flink job after stop should be able to load from the latest retained checkpoint without any source rewinding, just like no redo log replay needed for database resume from normal shutdown.
    • This is missing in current implementation.

Savepoint

We could map the Flink savepoint concept could be analogized to database backups since the use-case of savepoints in Flink [6] includes:

...

To conclude, according to all above analogies, it's clear that we should always (automatically) do a checkpoint (with retained checkpoint configured) when stopping job, just like we (automatically) do a sharp checkpoint during database shutdown. And this is a necessary supplement to reinforce the job stop semantic.

Proposed Changes

FLIP-34 makes job stop always accompanied by a savepoint, which is an unexpected behavior change from user perspective.

After changes for this FLIP, we will make the normal stop command (w/o "-s" option) work again, with semantic enhancement:

...

TypeSource OPSTask StatusJob Status
SUSPEND

Checkpoint Barrier,

End Of Stream

FinishedFinished
TERMINATE

MAX_WATERMARK, Checkpoint Barrier,

End Of Stream

FinishedFinished

And Based on this, we need below implementations to support performing a checkpoint when stopping the job (with retained checkpoint configured):

...