Motivation

Flink applications frequently need to evolve their state schema as business requirements change. Currently, when users update a Table API or SQL job with schema changes involving RowData types (particularly nested structures), they encounter serialization compatibility errors during state restoration, causing job failures.The issue occurs because existing state migration mechanisms don't properly handle RowData types during schema evolution, preventing users from making backward-compatible changes like:

  • Adding nullable fields to existing structures
  • Reordering fields within a row while preserving field names
  • Evolving nested row structures

This limitation impacts production applications using Flink's Table API, as the RowData type is central to this interface. Users are forced to choose between maintaining outdated schemas or reprocessing all state data when schema changes are required.

Problem Example

Consider a Flink Table API job that stores metadata in a nested row structure:

Initial Schema

CREATE TABLE Events (
  eventId BIGINT,
  metadata ROW<
    userId INT,
    timestamp BIGINT,
    deviceType STRING
  >
) WITH (...);

Schema Evolution

Later, the application evolves to include more metadata fields:

CREATE TABLE Events (
  eventId BIGINT,
  metadata ROW<
    deviceType STRING,       -- Reordered field
    location STRING,         -- New nullable field
    userId INT,              -- Original field
    timestamp BIGINT,        -- Original field
    appVersion STRING,       -- New nullable field
    sessionId BIGINT         -- New nullable field
  >
) WITH (...);

Current Behavior

When the job is redeployed with this updated schema, it fails with an error:

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer must not be incompatible with the old state serializer.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(...)
    ...

This occurs because the current implementation cannot properly migrate RowData types when the schema changes, preventing users from evolving their applications.

Public Interfaces

The proposal introduces the following changes to public interfaces:

  • Extensions to the TypeSerializerSnapshot interface in the org.apache.flink.api.common.typeutils package:
    • New method: migrateState() - Handles full state migration
    • New method: migrateElement() - Handles individual element migration
  • Modifications to RowData-related classes in the Table API:
    • EnhanceRowDataSerializerSnapshot to support schema evolution with nullable fields
    • Updates to serialization compatibility logic in RowDataSerializer

Proposed Changes

The implementation introduces specialized state migration support for RowData types by:

  • Enhanced TypeSerializerSnapshot Interface:
   public interface TypeSerializerSnapshot<T> {
       // Existing methods...
       
       // New methods:
       default void migrateState(TypeSerializer<T> priorSerializer, 
                                TypeSerializer<T> newSerializer,
                                DataInputDeserializer serializedOldValueInput, 
                                DataOutputSerializer serializedMigratedValueOutput) throws IOException {
           T value = priorSerializer.deserialize(serializedOldValueInput);
           newSerializer.serialize(value, serializedMigratedValueOutput);
       }
       
       default void migrateElement(TypeSerializer<T> priorSerializer,
                                 TypeSerializer<T> newSerializer,
                                 T element,
                                 DataOutputSerializer serializedMigratedValueOutput) throws IOException {
           newSerializer.serialize(element, serializedMigratedValueOutput);
       }
   }
  • Updated RocksDB State Backend:
    • Modify AbstractRocksDBState to use the new migration methods
    • Update RocksDBListState for element-wise RowData migration
    • Update RocksDBMapState for value-specific RowData migration
  • Backward-Compatible Schema Evolution Support:
    • Add case ROW in InternalSerializers

      case ROW:
          return new RowDataSerializer((RowType) type)

    • Add a new field originalRowType in RowDataSerializer

    • Enhanced RowDataSerializerSnapshot in RowDataSerializer 
      • Support for adding nullable fields (defaulting to null for existing records)
      • Support for field reordering based on field names
      • Support for nested structure evolution following the same compatibility rules
public class RowDataSerializerSnapshot implements TypeSerializerSnapshot<RowData> {
       // Existing methods...
       
       // Enhanced compatibility resolution to support schema evolution
       @Override
       public TypeSerializerSchemaCompatibility<RowData> resolveSchemaCompatibility(
               TypeSerializer<RowData> newSerializer) {
           // Implementation checks for compatible schema changes:
           // - Adding nullable fields
           // - Field reordering with preserved names
           // - Nested structure changes following the same rules
           // Returns compatibleAfterMigration() for supported changes
       }
       
       // Specialized implementation for RowData migration
       @Override
       public void migrateState(
               TypeSerializer<RowData> priorSerializer,
               TypeSerializer<RowData> newSerializer,
               DataInputDeserializer in,
               DataOutputSerializer out) throws IOException {
           RowDataSerializer oldRowSerializer = (RowDataSerializer) priorSerializer;
           RowDataSerializer newRowSerializer = (RowDataSerializer) newSerializer;
           
           // 1. Deserialize the old row data
           RowData oldRowData = oldRowSerializer.deserialize(in);
           
           // 2. Create a new GenericRowData with the new schema's arity
           GenericRowData newRowData = new GenericRowData(newRowSerializer.getArity());
           
           // 3. Map fields from old to new positions (handling reordering)
           int[] fieldMapping = createFieldMapping(
               oldRowSerializer.getFieldTypes(), 
               oldRowSerializer.getFieldNames(),
               newRowSerializer.getFieldTypes(),
               newRowSerializer.getFieldNames());
           
           // 4. Copy existing fields to their new positions, set nulls for added fields
           for (int newPos = 0; newPos < newRowSerializer.getArity(); newPos++) {
               int oldPos = fieldMapping[newPos];
               if (oldPos >= 0 && oldPos < oldRowSerializer.getArity()) {
                   // Field exists in old schema, copy the value
                   newRowData.setField(newPos, oldRowData.isNullAt(oldPos) ? 
                       null : oldRowData.getField(oldPos));
               } else {
                   // New field, set to null
                   newRowData.setField(newPos, null);
               }
           }
           
           // 5. Serialize the migrated row data
           newRowSerializer.serialize(newRowData, out);
       }
       
       // Similar implementation for element-wise migration
       @Override
       public void migrateElement(
               TypeSerializer<RowData> priorSerializer,
               TypeSerializer<RowData> newSerializer,
               RowData element,
               DataOutputSerializer out) throws IOException {
           // Similar logic to migrateState but starts with already deserialized element
           // Creates new row with proper schema, maps fields, handles nulls for new fields
       }
       
       // Helper method to create field mapping between schemas
       private int[] createFieldMapping(
               LogicalType[] oldTypes, String[] oldNames,
               LogicalType[] newTypes, String[] newNames) {
           // Returns mapping array where result[newIndex] = oldIndex
           // Uses name-based matching when names are available
           // Handles position-based fallback when names aren't available
       }
   }

Compatibility, Deprecation, and Migration Plan

Impact on existing users:

  • The changes are backward compatible and don't modify existing behavior
  • No migration required for users who don't need schema evolution
  • Users who previously couldn't evolve their RowData schemas can now do so with a savepoint

Migration process:

  • Take a savepoint of the current job
  • Update the application with the new schema
  • Restore from the savepoint (migration happens automatically)

Compatibility guarantees:

  • Only backward-compatible schema changes are supported:
    • Adding nullable fields (non-nullable fields still unsupported)
    • Reordering fields with preserved names
    • Changes to nested structures following these same rules
  • Incompatible changes (removing fields, changing field types) will still be rejected with clear error messages

Test Plan

The implementation is tested through:

  • Unit tests covering various schema evolution scenarios:
    • Adding nullable fields at different positions
    • Reordering fields
    • Nested structure evolution
    • Combination of multiple compatible changes
  • Integration tests verifying (Add SchemaEvolutionWithStateITCase):
    • End-to-end behavior with savepoints
    • Proper rejection of incompatible changes

The tests ensure that:

  • Compatible schema changes succeed
  • Incompatible schema changes are rejected with clear errors
  • Migration works correctly with the RocksDB state backend

Rejected Alternatives

  • Create a custom state serialization instead of updating RowDataSerializer:
    • Considered creating a separate SchemaEvolutionRowDataSerializer that would handle schema evolution
    • Rejected for several reasons:
      • Would require users to explicitly use this serializer instead of getting automatic support
      • Would introduce divergent code paths and potential maintenance issues
      • Would not integrate well with Flink's existing serialization framework
    • The chosen approach of enhancing the existing serializers provides a more seamless experience and maintains backward compatibility