DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Motivation
Currently Flink state information (checkpoint/savepoint) can be accessed through State Processor API. In order to use the mentioned API one must implement a custom Flink application in Java which is not convenient for every user.
The mentioned situation formed the need to provide a SQL source to make state data access more easy and comfortable. There are multiple types of state information which can read from checkpoint data:
- Metadata (high level information about operator states, operator subtask states, master states, etc…)
- Keyed state data
- Non-keyed state data
The target of this implementation proposal is to provide SQL support for keyed states. Other types are going to be covered in further FLIP documents.
Important note is that in the first phase only primitive and POJO types are supported, so for example state data in Avro format is not going to be covered here.
Just a small recap of how state information is structured: Every Flink operator (identified by its UID) represents a namespace. Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state’s data of all tasks. All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state. The following figure shows how a savepoint of MyApp is mapped to a database.
The intention is to be able to reach keyed value, list and map states with SQL statements.
Proposed Changes
The proposal is to add a new SQL connector with the following syntax:
CREATE TABLE state_table ( k BIGINT, KeyedPrimitiveValue BIGINT, KeyedPojoValue ROW<myLong1 BIGINT, myLong2 BIGINT>, KeyedPrimitiveValueList ARRAY<BIGINT>, KeyedPrimitiveValueMap MAP<BIGINT, BIGINT>, PRIMARY KEY (k) NOT ENFORCED ) with ( 'connector' = 'savepoint', 'state.path' = '/root/dir/of/checkpoint-data', 'operator.uid' = 'keyed-state-process-uid', 'fields.KeyedPojoValue.value-format' = 'com.example.schema.PojoData' ); SELECT * FROM state_table;
The main design is pretty simple. One column represents either the key (defined by the keyBy statement in the Flink application) or the keyed value (ValueState, ListState or MapState).
From high level perspective the following is happening under the hood:
- Checkpoint defined by state.path will be opened
- Operator defined by either operator.uid or operator.uid.hash will be opened
- Keyed state data defined by the column name or overridden by fields.#.state-name (when state name is not SQL column compliant) will be read
The scheme of the specific key or keyed value inferred when nothing is defined. Important note that scheme inference only works for atomic types, so like there is no inference for POJO or any other complex types. When the inferred type is not satisfying or it’s a complex type then fields.#.value-format can be used to provide the proper scheme.
General options
Option | Required | Default | Type | Description |
connector | required | (none) | String | Specify what connector to use, here should be 'savepoint'. |
state.backend.type | required | (none) | Enum Possible values: hashmap, rocksdb | Defines the state backend which must be used for state reading. This must match with the value which was defined in Flink job which created the savepoint or checkpoint. |
state.path | required | (none) | String | Defines the state path which must be used for state reading. All filesystem which are supported by Flink can be used here. |
operator.uid | optional | (none) | String | Defines the operator UID which must be used for state reading (Can't be used together with UID hash). |
operator.uid.hash | optional | (none) | String | Defines the operator UID hash which must be used for state reading (Can't be used together with UID). |
Connector options for column
Option | Required | Default | Type | Description |
fields.#.state-name | optional | (none) | String | Overrides the state name which must be used for state reading. This can be useful when the state name contains characters which are not compliant with SQL column names. |
fields.#.state-type | optional | (none) | Enum Possible values: list, map, value | Defines the state type which must be used for state reading, including value, list and map. When It's not provided then it tries to be inferred from the SQL type (ARRAY=list, MAP=map, all others=value). |
fields.#.map-key-format | optional | (none) | String | Defines the format class scheme for decoding map value key data (for ex. java.lang.Long). When It's not provided then it tries to be inferred from the SQL type (only primitive types supported). |
fields.#.value-format | optional | (none) | String | Defines the format class scheme for decoding value data (for ex. java.lang.Long). When It's not provided then it tries to be inferred from the SQL type (only primitive types supported). |
Public Interfaces
New SQL connector will be added with the name: state.
Compatibility, Deprecation, and Migration Plan
No migration is needed.
Test Plan
It’s planned to implement automated tests.
Rejected Alternatives
None.
