Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Serialization is involved in data exchanging across JVMs, off-heap state access and state persistence. If not done right, it can easily cost a lot of your job’s performance and break state-compatibility when upgrading your job.
Flink comes with a unique type serialization framework that handles most common types efficiently with the built-in serializers and leaves the remaining to the generic type serializer Kryo in a hardcoded way. We propose supporting custom generic type serializers to give users more flexibility on maintaining state-compatibility when upgrading to Flink 2.x or aiming for higher performance:

  1. State-compatibility: Flink 2.0 upgraded Kryo from v2.24.0 to v5.6.2 with FLIP-317, and hence dropped state compatibility with 1.x jobs that rely on Kryo v2.24.0.
  2. Performance: some new serializer frameworks have higher performance. For example, Apache Fory provides 3x speedup compared to Kryo by our internal testing.

Public Interfaces

  1. Introduce a new job-level option to specify the generic type serializer, with the built-in Kryo v5.6.2 serializer as the default value:

    Key

    Default

    Type

    Description

    pipeline.generic-type-serializer

    org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer

    String

    The fully qualified class name of the generic type serializer to be used.

  2. Promote TypeSerializer and TypeSerializerSnapshot APIs to @Public. Both APIs have been annotated @PublicEolving for years and should be promoted according to FLIP-321 for stronger API stability guarantee.

  3. The custom generic type serializer must extend the abstract TypeSerializer class and has a public constructor with parameters as below:

    public class CustomGenericSerializer<T> extends TypeSerializer<T> {
        public CustomGenericSerializer(Class<T> type, SerializerConfig serializerConfig) {
            ...
        }
        ...
    }

Proposed Changes

  1. Make GenericTypeInfo create serializer by loading the serializer class specified by the option pipeline.generic-type-serializer.

  2. We plan to cooperate with the Apache Fory community to provide a reference implementation, maintained under the Apache Fory project.

  3. For Kryo v2.24.0 compatible implementation, users can easily copy from the previous Flink releases. The community is also welcomed to maintain one under the Flink-extensions project. Note that since Flink already bundles Kryo v5.6.2, users have to explicitly bundle Kryo v2.24.0 with their user jar and place the jar at the front of the classpath to override the bundled Kryo dependencies.

Compatibility, Deprecation, and Migration Plan

No compatibility issue is involved. The new feature will be documented.

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

N/A