Status

Motivation

Currently Flink state information (checkpoint/savepoint) can be accessed through State Processor API. In order to use the mentioned API one must implement a custom Flink application in Java which is not convenient for every user.

The mentioned situation formed the need to provide a SQL source to make state data access more easy and comfortable. There are multiple types of state information which can read from checkpoint data:

  • Metadata (high level information about operator states, operator subtask states, master states, etc…)
  • Keyed state data
  • Non-keyed state data

The target of this implementation proposal is to provide SQL support for keyed states. Other types are going to be covered in further FLIP documents.

Important note is that in the first phase only primitive and POJO types are supported, so for example state data in Avro format is not going to be covered here.

Just a small recap of how state information is structured: Every Flink operator (identified by its UID) represents a namespace. Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state’s data of all tasks. All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state. The following figure shows how a savepoint of MyApp is mapped to a database.

The intention is to be able to reach keyed value, list and map states with SQL statements.

Proposed Changes

The proposal is to add a new SQL connector with the following syntax:

CREATE TABLE state_table (
  k BIGINT,
  KeyedPrimitiveValue BIGINT,
  KeyedPojoValue ROW<myLong1 BIGINT, myLong2 BIGINT>,
  KeyedPrimitiveValueList ARRAY<BIGINT>,
  KeyedPrimitiveValueMap MAP<BIGINT, BIGINT>,
  PRIMARY KEY (k) NOT ENFORCED
)
with (
  'connector' = 'savepoint',
  'state.path' = '/root/dir/of/checkpoint-data',
  'operator.uid' = 'keyed-state-process-uid',
  'fields.KeyedPojoValue.value-format' = 'com.example.schema.PojoData'
);

SELECT * FROM state_table;

The main design is pretty simple. One column represents either the key (defined by the keyBy statement in the Flink application) or the keyed value (ValueState, ListState or MapState).

From high level perspective the following is happening under the hood:

  • Checkpoint defined by state.path will be opened
  • Operator defined by either operator.uid or operator.uid.hash will be opened
  • Keyed state data defined by the column name or overridden by fields.#.state-name (when state name is not SQL column compliant) will be read

The scheme of the specific key or keyed value inferred when nothing is defined. Important note that scheme inference only works for atomic types, so like there is no inference for POJO or any other complex types. When the inferred type is not satisfying or it’s a complex type then fields.#.value-format can be used to provide the proper scheme.

General options

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'savepoint'.

state.backend.type

required

(none)

Enum Possible values: hashmap, rocksdb

Defines the state backend which must be used for state reading. This must match with the value which was defined in Flink job which created the savepoint or checkpoint.

state.path

required

(none)

String

Defines the state path which must be used for state reading. All filesystem which are supported by Flink can be used here.

operator.uid

optional

(none)

String

Defines the operator UID which must be used for state reading (Can't be used together with UID hash).

operator.uid.hash

optional

(none)

String

Defines the operator UID hash which must be used for state reading (Can't be used together with UID).

Connector options for column

Option

Required

Default

Type

Description

fields.#.state-name

optional

(none)

String

Overrides the state name which must be used for state reading. This can be useful when the state name contains characters which are not compliant with SQL column names.

fields.#.state-type

optional

(none)

Enum Possible values: list, map, value

Defines the state type which must be used for state reading, including value, list and map. When It's not provided then it tries to be inferred from the SQL type (ARRAY=list, MAP=map, all others=value).

fields.#.map-key-format

optional

(none)

String

Defines the format class scheme for decoding map value key data (for ex. java.lang.Long). When It's not provided then it tries to be inferred from the SQL type (only primitive types supported).

fields.#.value-format

optional

(none)

String

Defines the format class scheme for decoding value data (for ex. java.lang.Long). When It's not provided then it tries to be inferred from the SQL type (only primitive types supported).

Public Interfaces

New SQL connector will be added with the name: state.

Compatibility, Deprecation, and Migration Plan

No migration is needed.

Test Plan

It’s planned to implement automated tests.

Rejected Alternatives

None.

  • No labels