DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently Flink state information (checkpoint/savepoint) can be accessed through State Processor Table API.
It supports atomic and Pojo types but any further complex type where a custom serializer is needed (for example Avro) is impossible. This FLIP covers adding this support.
Public Interfaces
Newly added API:
package org.apache.flink.state.table;
/** {@link TypeInformation} factory for decoding savepoint value data. */
@Experimental
public interface SavepointTypeInformationFactory {
/** Returns {@link TypeInformation} for data deserialization. */
TypeInformation<?> getTypeInformation();
}
Proposed Changes
Add the following type information factory implementation:
package org.apache.flink.state.table;
/** {@link SavepointTypeInformationFactory} for specific avro record. */
public class AvroSavepointTypeInformationFactory implements SavepointTypeInformationFactory {
@Override
public TypeInformation<?> getTypeInformation() {
return new AvroTypeInfo<>(AvroRecord.class);
}
}
Suppose a DataStream application that has the following Avro state:
public class KeyedStateProcessFunction extends KeyedProcessFunction<Long, Long, Long> {
private ValueState<AvroRecord> myAvroRecordState;
@Override
public void open(OpenContext openContext) {
myValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("KeyedAvroValue", new AvroSavepointTypeInformationFactory().getTypeInformation()));
}
}
Then it can read by querying a table created using the following SQL statement:
CREATE TABLE state_table ( `k` BIGINT, `operator-uid` STRING, `KeyedAvroValue` ROW<myLong1 BIGINT, myLong2 BIGINT>, PRIMARY KEY (k) NOT ENFORCED ) with ( 'connector' = 'savepoint', 'state.path' = '/root/dir/of/checkpoint-data', 'operator.uid' = 'keyed-state-process-uid', 'fields.KeyedAvroValue.value-type-factory' = 'org.apache.flink.state.table.AvroSavepointTypeInformationFactory' ); SELECT * FROM state_table;
Compatibility, Deprecation, and Migration Plan
No migration needed.
Test Plan
It’s planned to implement automated integration tests.
Rejected Alternatives
None.