Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Flink state information (checkpoint/savepoint) can be accessed through State Processor Table API.

It supports atomic and Pojo types but any further complex type where a custom serializer is needed (for example Avro) is impossible. This FLIP covers adding this support.

Public Interfaces

Newly added API:

package org.apache.flink.state.table;

/** {@link TypeInformation} factory for decoding savepoint value data. */
@Experimental
public interface SavepointTypeInformationFactory {
   /** Returns {@link TypeInformation} for data deserialization. */
   TypeInformation<?> getTypeInformation();
}

Proposed Changes

Add the following type information factory implementation:

package org.apache.flink.state.table;

/** {@link SavepointTypeInformationFactory} for specific avro record. */
public class AvroSavepointTypeInformationFactory implements SavepointTypeInformationFactory {
   @Override
   public TypeInformation<?> getTypeInformation() {
       return new AvroTypeInfo<>(AvroRecord.class);
   }
}

Suppose a DataStream application that has the following Avro state:

public class KeyedStateProcessFunction extends KeyedProcessFunction<Long, Long, Long> {
  private ValueState<AvroRecord> myAvroRecordState;

  @Override
  public void open(OpenContext openContext) {
    myValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("KeyedAvroValue", new AvroSavepointTypeInformationFactory().getTypeInformation()));
  }
}

Then it can read by querying a table created using the following SQL statement:

CREATE TABLE state_table (
  `k` BIGINT,
  `operator-uid` STRING,
  `KeyedAvroValue` ROW<myLong1 BIGINT, myLong2 BIGINT>,
  PRIMARY KEY (k) NOT ENFORCED
)
with (
  'connector' = 'savepoint',
  'state.path' = '/root/dir/of/checkpoint-data',
  'operator.uid' = 'keyed-state-process-uid',
  'fields.KeyedAvroValue.value-type-factory' = 'org.apache.flink.state.table.AvroSavepointTypeInformationFactory'
);


SELECT * FROM state_table;

Compatibility, Deprecation, and Migration Plan

No migration needed.

Test Plan

It’s planned to implement automated integration tests.

Rejected Alternatives

None.