Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/cv4x3372g5txw1j20r5l1vwlqrvjoqq5

JIRA: FLINK-33712 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the FLINK RuntimeContext plays a crucial role in connecting user functions with the underlying runtime details. It provides users with essential runtime information, such as taskName, subTaskIndex, and attemptNumber, enabling them to access these details during job execution.

However, the current implementation of the FLINK RuntimeContext exposes the ExecutionConfig to users, which leads to two issues.

Firstly, the ExecutionConfig contains many pieces of information that are unrelated to the user functions, such as the task cancellation interval. Exposing too much unnecessary information to the user functions can confuse users and make management more difficult.

Secondly, exposing the ExecutionConfig to user functions will enable users to directly access and modify it during job execution. This can lead to inconsistencies and potential issues, particularly when operator chaining is enabled. For example, if operators are chained together and the ExecutionConfig retrieved from one function is modified by users, it will also impact the other functions within the same task.

To address this problem, we propose deprecating the RuntimeContext#getExecutionConfig in the FLINK RuntimeContext. In the upcoming FLINK-2.0 version, we plan to completely remove the RuntimeContext#getExecutionConfig method. And we will introduce alternative getter methods that allow users to access specific information without exposing unnecessary runtime details.

Public Interfaces

Adding @Deprecated to RuntimeContext#getExecutionConfig()

Adding the following three getter methods with @PublicEvolving in RuntimeContext:

	/**
     * Create a serializer for a given type.
     *
     * @param typeInformation the type information of the object to be serialized
     * @return the serializer for the given type
     */
    @PublicEvolving
    <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation);

    /**
     * Get global job parameters.
     *
     * @return the global job parameters
     */
    @PublicEvolving
    Map<String, String> getGlobalJobParameters();

    /**
     * Whehter enable object reuse.
     *
     * @return true if object reuse is enabled, false otherwise
     */
    @PublicEvolving
    boolean isObjectReuseEnabled();

Proposed Changes

We propose deprecating the RuntimeContext#getExecutionConfig method and transitioning all its usages to the following new getter method:

  1. Migrate the usage of TypeInformation#createSerializer(RuntimeContext#getExecutionConfig) to RuntimeContext#createTypeSerializer(TypeInformation).

  2. Migrate the usage of  RuntimeContext#getExecutionConfig.getGlobalJobParameters to RuntimeContext#getGlobalJobParameters.

  3. Migrate the usage of  RuntimeContext#getExecutionConfig.isObjectReuseEnabled to RuntimeContext#isObjectReuseEnabled.

Compatibility, Deprecation, and Migration Plan

The RuntimeContext#getExecutionConfig method is planned to be deprecated in Flink-1.19 and eventually removed in Flink-2.0.

Test Plan

N.A.

Rejected Alternatives

Do not add createKryoSerializer and getAutoWatermarkInterval to RuntimeContext for Kafka and Kinesis connector:

In the versions of the Kafka and Kinesis connectors that use the legacy Source, RuntimeContext#getExecutionConfig is used to create KryoSerializer and retrieve the interval for automatic watermark emission. However, FLINK-2.0 requires all connectors to migrate to the new Source API (FLIP-27). As a result, there is no need to add the methods createKryoSerializer and getAutoWatermarkInterval to the RuntimeContext.

  • No labels