Discussion thread
Vote thread

FLINK-12047 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Flink offers state abstractions for user functions to guarantee fault-tolerant processing of streams. Users can work with both non-partitioned and partitioned state.

The partitioned state interface provides access to different types of state that are all scoped to the key of the current input element. This type of state is only available inside a keyed stream, which is created via stream.keyBy().

Currently, all of this state is internal to Flink and used to provide processing guarantees in failure cases (e.g., exactly-once processing). The only way to access state externally is through Queryable state, but that is limited to read-only, one key at a time operations.

The State Processor API provides powerful functionality to reading, writing and modifying savepoints using Flink’s batch DataSet api.

This is useful for:

  • Analyzing state for interesting patterns
  • Troubleshooting or auditing jobs by checking for discrepancies in state
  • Bootstrapping state for new applications
  • Modifying savepoints such as:
    • Changing max parallelism
    • Making breaking schema changes
    • Correcting invalid state


To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database.

A database can be thought of as one or more namespaces, each containing a collection of tables. Those tables in turn contain columns whose values have some intrinsic relationship between them, such as being scoped under the same key.

A savepoint represents the state of a Flink job at a particular point in time which is made up of many operators. Those operators contain various kinds of state, both partitioned or keyed state, and non-partitioned or operator state.

MapStateDescriptor<Integer, Double> CURRENCY_RATES = new MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);

class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> {

  public void processElement(
		Transaction value, 
		ReadOnlyContext ctx,
		Collector<Transaction> out) throws Exception {

     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
     if (rate != null) {
        value.amount *= rate;

  public void processBroadcastElement(
		CurrencyRate value,
		Context ctx,
		Collector<Transaction> out) throws Exception {
		ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate);
class Summarize extends RichFlatMapFunction<Transaction, Summary> {
  transient ValueState<Double> totalState;
  transient ValueState<Integer> countState;

  public void open(Configuration configuration) throws Exception {
     totalState = getRuntimeContext().getState(new ValueStateDescriptor("total", Types.DOUBLE));
     countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Types.INT));

  public void flatMap(Transaction value, Collector<Summary> out) throws Exception {
     Summary summary = new Summary();
     summary.total = value.amount;
     summary.count = 1;

     Double currentTotal = totalState.value();
     if (currentTotal != null) {
        summary.total += currentTotal;

     Integer currentCount = countState.value();
     if (currentCount != null) {
        summary.count += currentCount;


DataStream<Transaction> transactions = . . .
BroadcastStream<CurrencyRate> rates = . . .
  .process(new CurrencyConverter())
  .keyBy(transaction -> transaction.accountId)
  .flatMap(new Summarize())

This job contains multiple operators along with various kinds of state. When analyzing that state we can first scope data by its operator, named by setting its uid. Within each operator we can look at the registered states. CurrencyConverter has a broadcast state, which is a type of non-partitioned operator state. In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row. Contrast this with Summarize, which contains two keyed states. Because both states are scoped under the same key we can safely assume there exists some relationship between the two values. Therefore, keyed state is best understood as a single table per operator containing one “key” column along with n value columns, one for each registered state. All of this means that the state for this job could be described using the following pseudo-sql commands.

CREATE NAMESPACE currency_converter;

CREATE TABLE currency_converter.rates (
   value Tuple2<Integer, Double>


CREATE TABLE summarize.keyed_state (
   total DOUBLE,
   count INTEGER

In general, the savepoint ↔ database relationship can be summarized as:

  • A savepoint is a database
  • An operator is a namespace named by its uid
  • Each operator state represents a single table
    • Each element in an operator state represents a single row in that table
  • Each operator containing keyed state has a single “keyed_state” table
    • Each keyed_state table has one key column mapping the key value of the operator
    • Each registered state represents a single column in the table
    • Each row in the table maps to a single key


  • Namespace1
    • Column1
    • Column2
    • TableA
    • TableB
  • Namespace2
    • TableA
    • TableB


  • Uid1
    • Value
    • Value
    • Key
    • Column1
    • Column2
    • Operator_state_1
    • Operator_state_2
    • Keyed_state

Public Interfaces

Reading an existing savepoint:

Load an existing savepoint:

ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", stateBackend);

When reading operator state, simply specify the operator uid, state name, and type information.

DataSet<Integer> listState  = savepoint.readListState("uid", "state”, Types.INT);
DataSet<Integer> unionState = savepoint.readUnionState("uid", "state”, Types.INT);

DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState("uid", "state”, Types.INT, Types.INT);

Additionally, a custom type serializer may be provided if the state uses custom serialization.

DataSet<Integer> listState = savepoint.readListState("uid", "state”, Types.INT, new MyCustomIntSerializer());

When reading keyed state, users specify a KeyedStateReaderFunction to allow reading arbitrary columns and complex state types such as ListState, MapState, and AggregatingState plus timers. This means if an operator contains a stateful process function such as:

class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Integer, Void> {

   ValueState<Integer> state;

   public void open(Configuration parameters) {
      state = getRuntimeContext().getState(stateDescriptor);

   public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception {
      state.update(value + 1);

Then users can ready this state by first defining an output type and corresponding KeyedStateReaderFunction.

class MyPojo {
  Integer key;
  Integer value;
  Set<Long> timers;

class ReaderFunction extends KeyedStateReaderFunction<Integer, MyPojo> {
  ValueState<Integer> state;

  public void open(Configuration parameters) {
     state = getRuntimeContext().getState(stateDescriptor);

  public void processKey(
	Integer key,
	Context ctx,
	Collector<MyPojo> out) throws Exception {

     MyPojo pojo = new MyPojo();
     pojo.key 	 = key;
     pojo.value  = state.value();
     pojo.timers = ctx.getEventTimeTimers();

DataSet<MyPojo> keyedState = savepoint.readKeyedState("uid", new ReaderFunction());

Creating state / savepoint from scratch:

Define how to bootstrap a new operator’s state with a given DataSet:

BootstrapTransformation<Account> transformation = OperatorTransformation
  .assignTimestamps(account -> account.timestamp)
  .keyBy(acc -> acc.id)
  .transform(new AccountBootstrapper());

class AccountBootstrapp6er extends KeyedStateBootstrapFunction<Integer, Account> {
    ValueState<Double> state;

    public void open(Configuration parameters) throws Exception {
       ValueStateDescriptor<Double> descriptor =
         new ValueStateDescriptor<>("total", Types.DOUBLE);
       state = getRuntimeContext().getState(descriptor);

    public void processElement(Account value, Context ctx) throws Exception {

Create a new savepoint, specifying state backend type, max parallelism, multiple operators and an output path.

	.create(stateBackend, maxParallelism)
	.withOperator(“uid”, transformation)

Modifying an existing savepoint

Load a new savepoint based on existing savepoint and add / overwrite / remove operators

ExistingSavepoint existingSavepoint = Savepoint.load(backendType, oldSavepointPath)

Add a new bootstrapped operator

    .withOperator(“newUid”, transformation)

Remove / overwrite operator state in existing savepoint, and write it. Modified savepoints retain the max parallelism of the original savepoint.

    .withOperator(oldOperatorUid, transformation)

Proposed Changes

The key goal for the implementation is to only use available savepoint API’s so that the implementation can be trivially correct and simple to maintain. As savepoint formats change or new features such as TTL or state migration are added, the connector will continue to work without modification.

Querying Timers

The only prerequisite work will be minor modifications to the internal timer service which provides efficient mappings to timestamps to keys registered for that point in time. Efficient querying of registered timers requires an inverted mapping of keys to registered timestamps. Because the timer service resides in the per record execution path we do not want to make any changes to how timers are managed. Instead, two methods will be added to the InternalTimerService interface; forEachProcessingTimeTimer and forEachEventTimeTimer. These methods will allow copying all registered timers into a data structure prior to reading that supports efficient querying without touching any per-record code paths.

State Input

Reading state from an existing savepoint is built around a series of input formats, where each split corresponds to a single execution vertex in a data stream execution graph. That means if ten input splits are requested, then the state is partitioned identically as if that savepoint restored in a data stream application with parallelism of ten (using the methods in StateAssignmentOperation). On open, each split will then restore a local state backend and iterate through all of the restored data.

Writing New Savepoints

Savepoint writing is based around three interfaces:

  • StateBootstrapFunction for writing non-partitioned operator state
  • BroadcastStateBootstrapFunction for writing broadcast state
  • KeyedStateBootstrapFunction for writing keyed operator state

Each interface is structured similarly to process functions except they do not include a Collector because writing is a terminal operation in a dataflow. The interfaces are backed by corresponding StreamOperator's but the operators do not contain any special logic. The actually snapshotting occurs within a new subclass of StreamTask called BoundedStreamTask. This class is identical to OneInputStreamTask except:

  • The input is powered by an iterator instead of the network stack
  • Once all data has been processed it will take a snapshot of the subtask

This means that all checkpointing logic is reused, similarly to the input format, the library will support all savepoint features for free. Finally, the BoundedStreamTask will run inside a DataSet#mapPartition that takes in the bootstrap data and outputs the OperatorSubtaskState's for the snapshot. Afterwards the snapshot handles can be aggregated down and written out as a savepoint metadata file.

Appendix A: Why use the DataSet API

With all the ongoing work within the community to improve Flink’s batch support around the Table API and an eventual BoundedStream API, the question arises why use the DataSet API now. There are theoretically three other API’s that this functionality could be built on top of:

  • BoundedStream
    • Does not currently exist
  • DataStream
    • The DataStream API was considered but it is missing key functionality that would require core runtime changes and seemed out of scope.
  • Table API
    • The current table runner requires sources / sinks for batch applications to be implemented using the DataSet API
    • The new table runner being contributed by Alibaba is under active development and requires batch table sources / sinks to be implemented using the DataStream API

At the same time we do appreciate that any new functionality built using the DataSet API will require updating when a proper BoundedStream API is implemented and DataSet deprecated. That is why the savepoint connector wraps its functionality in the API shown above and does not expose any internals such as input and output formats. From a user perspective the only thing that will change is that calls to readListState will return a BoundedStream<> instead of a DataSet<>. Internally, the usage of DataSet is trivial as core functionality is derived from exposed savepoint api’s from flink-streaming-java module and migration should just be a matter of changing types.

1 Comment

  1. .assignTimestamps(account -> account.timestamp)
    What's the assigned timestamp used for?