Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

Public Interfaces

We intend to introduce the following new configuration parametersWe intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery.

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect.

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.operator-coordinator-snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshots. 

The minimal pause between operator coordinator snapshots

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system.

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

Proposed Changes

The Whole Workflow of Job Recovery

...

Record states during normal execution

In order to recover the previous state after JM crash, we need to record the state of JM. We will introduce an event-based method to record the state of JM, each event(JobEvent) records the increment of the JM state. After JM crashes, we can recover the JM state by replaying the JobEventsIn order to recover the previous state after JM crash, we need to record the state of JM. We will introduce an event-based method to record the state of JM, each event(JobEvent) records the increment of the JM state. After JM crashes, we can recover the JM state by replaying the JobEvents.

JobEvent

We will record 2 kind of JobEvents:

  1. ExecutionJobVertexInitializedEvent: This event is responsible for recording the initialization information of ExecutionJobVertex,  its content contains the decided parallelism of this job vertex  its content contains the decided parallelism of this job vertex, and its input informationinput information. This event will be triggered and written out when a job vertex is initialized.

  2. ExecutionVertexFinishedEvent:This event is responsible for recording the information of finished task. Our goal is that all finished tasks don’t need to reOur goal is that all finished tasks don’t need to re-run,  so the simple idea is to trigger an event when a task is finished.  The content of this event containsso the simple idea is to trigger an event when a task is finished.  The content of this event contains:

    1. The state of the finished task/ExecutionVertex, including IO metrics, accumulators, etc. These contents can be easily obtained from ExecutionGraph.

    2. If the job vertex which this task belongs to has operator coordinators, the states of the operator coordinators also need to be recorded.

In order to obtain the state of operator coordinators, we will enrich the checkpointCoordinatormethod to let it accept  -1 (NO_CHECKPOINT) as the value of  checkpointId,  to support sna pshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by to support snapshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by resetToCheckpoint method), it also needs to call subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashes.

...