Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
Discussion thread | https://lists.apache.org/thread/m67s4qfrh660lktpq7yqf9docvvf5o9l |
---|---|
Vote thread | https://lists.apache.org/thread/2xmcxs67xxzwool554fglrnklyvw348h |
JIRA | |
Release | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink supports different serializers such as Java basic types, Pojo
, and Kryo
. When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment
and ExecutionConfig
, and can also configure Pojo
and Kryo
serialized data classes through the three configuration options as pipeline.default.kryo-serializers
, pipeline.registered-kryo-types
, and pipeline.registered-pojo-types
. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following issues:
1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.
2) Currently serializer configuration capabilities are not equal to API. Three options are provided to configure data types and serializers, but custom serializers are not supported. At the same time, there are priority issues among the three configuration items. If users configure a data type in all these options, it will cause conflicts.
3) Kryo serializer compatibility issues. Users can create custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following issues
a) Different Kryo
versions have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.
b) The issues of schema evolution in state, such as adding or removing columns for the state, and the new job cannot recover from the original state when it uses Kryo serializer.
Flink has options to turn on/off Kryo
serializer and configure Kryo
/ Pojo
serializers, this FLIP wound like to improve serialization configuration and usage in Flink based on that.
1) Configuration for customized serializer. Flink supports @TypeInfo
annotation and TypeInfoFactory
to create customized serializers for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.
2) Provide uniform configuration for Kryo
, Pojo
and customized serializer. Flink now configures serializers for Kryo
and Pojo
in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.
3) Add more built-in serializers in Flink such as List
, Map
and Set
. Flink has many serializers for basic data types, but when users use composite data type such as List<String>
or Map<String, Integer>
, Flink can only use Kryo
or use defined serializers for them. More built-in serializers should be added in Flink for them to improve the performance.
Public Interface
Deprecated Methods
The following methods in ExecutionConfig
will be deprecated
and be removed in flink-2.0
Method | Annotation |
org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type) | @Public |
org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type) | |
org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) | |
org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer) | |
org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) | |
org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer) | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers() | |
org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses() | |
org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers() | |
org.apache.flink.api.common.ExecutionConfig#enableForceAvro() | |
org.apache.flink.api.common.ExecutionConfig#enableForceKryo() | |
org.apache.flink.api.common.ExecutionConfig#enableGenericTypes() | |
org.apache.flink.api.common.ExecutionConfig#disableForceAvro() | |
org.apache.flink.api.common.ExecutionConfig#disableForceKryo() | |
org.apache.flink.api.common.ExecutionConfig#disableGenericTypes() | |
org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled() | |
org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled() | |
org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled() |
The following methods in StreamExecutionEnvironment
will be deprecated and removed in flink-1.20
Method | Annotation |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type) | @Public |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer) |
The following method in TypeInformation
and it's sub-classes will be deprecated and removed in next release.
Method | Annotation |
org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config) | @PublicEvolving |
Users can configure pipeline.force-avro
, pipeline.force-kryo
and pipeline.generic-types
in job after enableForceAvro
, enableForceKryo
and enableGenericTypes
are deprecated and removed as follows.
Configuration configuraiton = new Configuration(); configuration.set(PipelineOptions.GENERIC_TYPES, false); configuration.set(PipelineOptions.FORCE_KRYO, true); configuration.set(PipelineOptions.FORCE_AVRO, true); StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration); // Build flink job with env.
Serialization Options
There are options pipeline.default.kryo-serializers
, pipeline.registered-kryo-types
and pipeline.registered-pojo-types
for Kryo
and Pojo
serializers in Flink, they will be deprecated and removed in flink-2.0. This FLIP will introduce a new option pipeline.serialization-config
for all Kryo
, Pojo
and customized serializers.
@PublicEvolving public class PipelineOptions { /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ @Deprecated public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS = key("pipeline.default-kryo-serializers") .stringType() .asList() .noDefaultValue() .withDescription( Description.builder() .text( "Semicolon separated list of pairs of class names and Kryo serializers class names to be used" + " as Kryo default serializers") .linebreak() .linebreak() .text("Example:") .linebreak() .add( TextElement.code( "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;" + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2")) .build()); /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ @Deprecated public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES = key("pipeline.registered-kryo-types") .stringType() .asList() .noDefaultValue() .withDescription( Description.builder() .text( "Semicolon separated list of types to be registered with the serialization stack. If the type" + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the" + " type ends up being serialized with Kryo, then it will be registered at Kryo to make" + " sure that only tags are written.") .build()); /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ @Deprecated public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES = key("pipeline.registered-pojo-types") .stringType() .asList() .noDefaultValue() .withDescription( Description.builder() .text( "Semicolon separated list of types to be registered with the serialization stack. If the type" + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the" + " type ends up being serialized with Kryo, then it will be registered at Kryo to make" + " sure that only tags are written.") .build()); public static final ConfigOption<List<String>> SERIALIZATION_CONFIG = key("pipeline.serialization-config") .stringType() .asList() .noDefaultValue() .withDescription( Description.builder() .text( "List of pairs of class names and serializer config to be used. There is `type` fiend in" + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own" + " configuration.") .linebreak() .linebreak() .text("Example:") .linebreak() .add( TextElement.code( "[org.example.ExampleClass1: {type: pojo}," + " org.example.ExampleClass2: {type: kryo}," + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer}," + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer}," + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]")) .build()); }
The value for pipeline.serialization-config
is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.
1) type
: the serializer type which could be pojo
, kryo
or typeinfo
. If the value of type
is pojo
or kryo
without kryo-type
, it means the data type will use pojo
or kryo
serialzier directly.
2) kryo-type
: the type for kryo
serializer which could be default
or registered
. The kryo
serializer will use the serializer for the data type as default serializers when the kryo-type
is default
, and register the data type and its serializer to kryo
serializer when the kryo-type
is registered
. When the field kryo-type
exists, there must be a field class
to configure the specific serializer class name.
3) class: it is the serializer class name for kryo
or typeinfo
. For kryo
serializer, it should be a subclass of com.esotericsoftware.kryo.Serializer
, and for typeinfo
it should be a subclass of org.apache.flink.api.common.typeinfo.TypeInfoFactory
.
Currently when Flink initializes the Kryo serializer, it automatically detects whether flink-avro
is in job classpath: if it does not exist, an avro dummy serializer is created and registered with Kryo; otherwise, the Avro serializer is loaded from flink-avro and registered with Kryo. We introduce a new option pipeline.force-kryo-avro
in flink and its default value is false
, Flink
will detect the flink-avro
module and register avro serializer with Kryo
only when the option is configured as true
.
@PublicEvolving public class PipelineOptions { public static final ConfigOption<Boolean> FORCE_KRYO_AVRO = key("pipeline.force-kryo-avro") .booleanType() .defaultValue(false) .withDescription(Description.builder() .text( "Force register avro classes in kryo serializer.") .linebreak() .linebreak() .text( "Important: Make sure to include the %s module.", code("flink-avro")) .build()); }
Introduce SerializerConfig
This FLIP introduces SerializerConfig
for serializers in flink to decouple the serializer from ExecutionConfig
, it can be created from Configuration
and provides methods for serializers. The TypeExtractor
and TypeInformation
will use SerializerConfig
instead of ExecutionConfig
.
/** * A config to define the behavior for serializer in flink job, it manages the registered type and serializers. * The config is created from job configuration and used by flink to create serializer for data type. **/ @PublicEvolving public final class SerializerConfig implements Serializable { /** Create serializer config instance from configuration. */ public SerializerConfig(Configuration configuration) { ...; } /** Returns the registered types with their Kryo Serializer classes. */ public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getRegisteredTypesWithKryoSerializerClasses(); /** Returns the registered Kryo Serializer classes. */ public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses(); /** Returns the registered Kryo types. */ public LinkedHashSet<Class<?>> getRegisteredKryoTypes(); /** Returns the registered POJO types. */ public LinkedHashSet<Class<?>> getRegisteredPojoTypes(); /** Returns the registered type info factories. */ public Map<Class<?>, Class<? extends TypeInfoFactory<?>>> getRegisteredTypeInfoFactories(); /** * Checks whether generic types are supported. Generic types are types that go through Kryo * during serialization. * * <p>Generic types are enabled by default. * * @see #enableGenericTypes() * @see #disableGenericTypes() */ public boolean hasGenericTypesDisabled(); /** Returns whether the Apache Avro is the serializer for POJOs. */ public boolean isForceAvroEnabled(); /** Returns whether kryo is the serialzier for POJOs. */ public boolean isForceKryoEnabled(); }
The relevant method in TypeInformation
Method | Annotation |
org.apache.flink.api.common.typeinfo#createSerializer(SerializerConfig config) | @PublicEvolving |
Disable Kryo By Default
Currently, option pipeline.generic-types
in Flink is used to enable/disable Kryo
serialzier, its default value is true
which means Flink will use Kryo
as fallback serializer. This causes Kryo to be used by default to serialize some user-defined data without the user being aware of it, which can lead to incompatibility between the new and old versions of the job state during version upgrades. To avoid this issue, in Flink-2.0
, the default value of pipeline.generic-types
can be changed to false
to prevent the usage of the Kryo serializer without the user's knowledge.
Proposed Changes
Create Data Serializer
When users create a Flink job, TypeExtractor
will choose an appropriate serializer for data type and create TypeInformation
. After supporting the above data types and serializer configurations, it follows the following priority when Flink needs to create a serializer and TypeInformation
for a data type.
The data types and serializers in the configuration file have the highest priority. For data types that are not in the configuration, Flink traverses and creates serializers for TypeInfo
annotation, basic data, Tuple
, composited data, Pojo
and generic data types in turn. Besides current built-in serializers, this FLIP will add more serializers for composited data types List
, Map
, Set
and so on. Flink has TypeInformation
for these data types like MapTypeInfo
, ListTypeInfo
and MultisetTypeInfo
, however, TypeExtractor
cannot recognize the composited data types and use these serializers. We would like to support them in this FLIP.
Configure Avro Serializer
Flink uses AvroUtils
to manage Avro
serializers. When flink-avro
is in the job classpath, it creates the org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
utility class. Otherwise, it uses the built-in org.apache.flink.api.java.typeutils.AvroUtils$DefaultAvroUtils
. AvroUtils
has following methods which are used in different places.
1) void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type)
The Serializers
calls this method to register Avro
data types with the kryo
serializer through ExecutionConfig#registerTypeWithKryoSerializer
and ExecutionConfig#addDefaultKryoSerializer
. The uppermost entry point for this method is in the ExecutionEnvironment
which is used by DataSet
API to create jobs. Since DataSet
will no longer be used and will be removed in the future, this method in AvroUtils
can be marked as Deprecated
and removed in flink-2.0
.
2) void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations)
KryoSerializer
registers serializer for GenericData.Array
to kryo
registration as follows
a) If flink-avro
exists in job classpath, it automatically creates AvroKryoSerializerUtils
and registers GenericData.Array
and its serializer to kryo serializer.
b) If flink-avro
is not in the job classpath, a default DefaultAvroUtils
is created and a dummy serializer is registered. As mentioned above, we can add a pipeline.force-kryo-avro
option with a default value of false. Only when the option is set to true, flink will lookup flink-avro
module and register the Avro serializer in KryoSerializer
.
3) TypeSerializer<T> createAvroSerializer(Class<T> type)
The default value of the Flink job option pipeline.force-avro
is false. When users set it to true, they need to add flink-avro
to the job classpath. Then, Pojo
will use Avro
serializers, and PojoTypeInfo
will create an AvroSerializer
instance for the data type. This part of the processing can remain unchanged.
4) TypeInformation<T> createAvroTypeInfo(Class<T> type)
When a Flink job creates a serializer for a data type which is a subclass of org.apache.avro.specific.SpecificRecordBase
, it needs to ensure that flink-avro
is in the job classpath. This part of the processing can remain unchanged.
Compatibility, Deprecation, and Migration Plan
1) Usage Of Fallback Kryo Serializer
Since the Kryo
serializer has issues such as schema evolution and version upgrades, in Flink-2.0
we will default to disabling the Kryo serializer. If users do not turn on it manually, flink job will throw an exception to remind that they need to set pipeline.generic-types
to true, or develop a custom serializer and add it in configuration.
2) Existing Serializer Registration and Configuration
The methods about serialization in StreamExecutionEnvironment
and ExecutionConfig
will be deprecated and removed in flink-2.0
. The newly added SerializerConfig
is compatible with the current interface of ExecutionConfig
. For currently flink jobs that set data types and serializers, they can use following methods to complete the migration for compatibility with flink-2.0
.
a) StreamExecutionEnvironment#registerType(Class<?> type)、ExecutionConfig#registerKryoType(Class<?> type)和ExecutionConfig#registerPojoType(Class<?> type)
Configure the specified data types to use the Pojo and Kryo serializers as follows:
pipeline.serialization-config: org.example.PojoDataClass: {type: pojo}, org.example.KryoDataClass: {type: kryo}
b) StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)、ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)
Configure the specified data types to use the Kryo
serializer class as follows:
pipeline.serialization-config: org.example.KryoDataClass: {type: kryo, kryo-type: registered, class: org.example.KryoSerializerClass}
c) ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
Configure the specified data types to use default Kryo
serializer classes as follows:
pipeline.serialization-config: org.example.KryoDataClass: {type: kryo, kryo-type: default, class: org.example.KryoSerializerClass}
d) For data types are annotated with @TypeInfo
use TypeInfoFactory
:
pipeline.serialization-config: org.example.TypeDataClass: {type: typeinfo, class: org.example.TypeDataInfoFactory}
e) StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)和ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)
We do not want to support these methods in flink-2.0
. Users can construct the specified serializer class and specify the Kryo Serializer class as described above.
f) For the configuration of specifying data types and serializer classes using pipeline.default.kryo-serializers
, pipeline.registered-kryo-types
, and pipeline.registered-pojo-types
, users can reconfigure them according to the new format.
Test Plan
UT & E2E
Rejected Alternatives
NONE
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory