Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-43-Savepoint-Connector-td29233.html

JIRA: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-12047

Released: Create

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink offers state abstractions for user functions to guarantee fault-tolerant processing of streams. Users can work with both non-partitioned and partitioned state.

...

Currently, all of this state is internal to Flink and used to provide processing guarantees in failure cases (e.g., exactly-once processing). The only way to access state externally is through Queryable state, but that is limited to read-only, one key at a time operations.

The savepoint connector State Processor API provides powerful functionality to reading, writing and modifying savepoints using Flink’s batch DataSet api.

...

  • Analyzing state for interesting patterns
  • Troubleshooting or auditing jobs by checking for discrepancies in state
  • Bootstrapping state for new applications
  • Modifying savepoints such as:
    • Changing max parallelism
    • Making breaking schema changes
    • Correcting invalid state

Abstraction

To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database.

...

Database

  • Namespace1
    • Column1
    • Column2
    • TableA
    • TableB
  • Namespace2
    • TableA
    • TableB

Savepoint

  • Uid1
    • Value
    • Value
    • Key
    • Column1
    • Column2
    • Operator_state_1
    • Operator_state_2
    • Keyed_state


Public Interfaces

Reading an existing savepoint:

Load an existing savepoint:

...

Code Block
languagejava
class MyPojo {
  Integer key;
  Integer value;
  Set<Long> timers;
}

class ReaderFunction extends KeyedStateReaderFunction<Integer, MyPojo> {
  ValueState<Integer> state;

  @Override
  public void open(Configuration parameters) {
     state = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void processKey(
	Integer key,
	Context ctx,
	Collector<MyPojo> out) throws Exception {

     MyPojo pojo = new MyPojo();
     pojo.key 	 = key;
     pojo.value  = state.value();
     pojo.timers = ctx.getEventTimeTimers();
     out.collect(pojo);
  }
}

DataSet<MyPojo> keyedState = savepoint.readKeyedState("uid", new ReaderFunction());


Creating state / savepoint from scratch:

Define how to bootstrap a new operator’s state with a given DataSet:

...

Code Block
languagejava
Savepoint
	.create(stateBackend, maxParallelism)
	.withOperator(“uid”, transformation)
	.withOperator(...)
	.write(path)


Modifying an existing savepoint

Load a new savepoint based on existing savepoint and add / overwrite / remove operators

...

Code Block
languagejava
existingSavepoint
    .removeOperator(oldOperatorUid)
    .withOperator(oldOperatorUid, transformation)
    .write(path)


Proposed Changes

The key goal for the implementation is to only use available savepoint API’s so that the implementation can be trivially correct and simple to maintain. As savepoint formats change or new features such as TTL or state migration are added, the connector will continue to work without modification.

Querying Timers

The only prerequisite work will be minor modifications to the internal timer service which provides efficient mappings to timestamps to keys registered for that point in time. Efficient querying of registered timers requires an inverted mapping of keys to registered timestamps. Because the timer service resides in the per record execution path we do not want to make any changes to how timers are managed. Instead, two methods will be added to the InternalTimerService interface; forEachProcessingTimeTimer and forEachEventTimeTimer. These methods will allow copying all registered timers into a data structure prior to reading that supports efficient querying without touching any per-record code paths.

State Input

Reading state from an existing savepoint is built around a series of input formats, where each split corresponds to a single execution vertex in a data stream execution graph. That means if ten input splits are requested, then the state is partitioned identically as if that savepoint restored in a data stream application with parallelism of ten (using the methods in StateAssignmentOperation). On open, each split will then restore a local state backend and iterate through all of the restored data.

Writing New Savepoints

Savepoint writing is based around three interfaces:

...

This means that all checkpointing logic is reused, similarly to the input format, the library will support all savepoint features for free. Finally, the BoundedStreamTask will run inside a DataSet#mapPartition that takes in the bootstrap data and outputs the OperatorSubtaskState's for the snapshot. Afterwards the snapshot handles can be aggregated down and written out as a savepoint metadata file.

Appendix A: Why use the DataSet API

With all the ongoing work within the community to improve Flink’s batch support around the Table API and an eventual BoundedStream API, the question arises why use the DataSet API now. There are theoretically three other API’s that this functionality could be built on top of:

...