This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Motivation

Flink is a distributed processing engine, if some nodes are overloaded, then it may cause flink's subtask processing to slow down, which in turn leads to backpressure and lag.

There are various data Shuffle strategies in Flink, the common ones are Forward, Rebalance, Rescale, Shuffle, HASH(KeyBy), customPartition, etc.


PartitionerData bound to subtasks
Rebalancenot
Rescalenot bound to subtasks per group
Shufflenot
customPartitionpotential, This depends on the detailed implementation.
Forwardyes
Hashyes
Globalyes
etc...


For [(Potential)Data not bound to subtask] scenarios (4 partition strategies: Rebalance, Rescale),  adjusting dynamically the subtask of the received data according to the processing load of the downstream operator would be good to achieve the effect of peak-shaving and valley-filling, and try to ensure the throughput of flink jobs.

Public Interfaces

Introduce two options at configuration level:

taskmanager.network.adaptive-partitioner.enabled

  • default value: false
  • type: boolean
  • description: Whether use the adaptive partition strategy


taskmanager.network.adaptive-partitioner.max-traverse-size

  • default value: 4
  • type: int
  • description: How many channels to traverse at most when looking for an idler channel.
  • Why use this parameter?
    • Traversing all sub-partitions results when partitioning is a cost action, using this strategy could help make the trade-off better.
    • The effects shown in the following table[FLINK-31655].

In this way, whether it is an SQL job or a JAR job, as long as this feature is enabled and used in conjunction with the two parameters mentioned above, this  adaptive partition capability can be applied to the target partition.

Proposed Changes

Introduce the AdaptiveLoadBasedRecordWriter

Introduce the AdaptiveLoadBasedRecordWriter to partition adaptively based on the loading of sub-partitions for adaptive RescalePartitioner & RebalancePartitioner.

AdaptiveLoadBasedRecordWriter
public final class AdaptiveLoadBasedRecordWriter<T extends IOReadableWritable>
        extends RecordWriter<T> {

    // Core attributions
    // Max traverse channel size when selecting an idler channel.
    private final int maxTraverseSize;
    // To restore the current channel index.
    private int currentChannel;
    // The available sub-partitions to send records.
    private final int numberOfSubpartitions;

    // Place holders of other auxiliary methods or attributions.

    void traverseToTheIdlestChannel() {
        // main logic overview
        // 1. compute the size of bytes per subpartition result in the next #maxTraverseSize channels
        //     It can be considered that all partitions form an abstract probation queue, 
        //     where each calculation covers a range of #maxTraverseSize items traversed backward.
        // 2. Get the channel whose bytes in queue is the smallest in 
        //    the #maxTraverseSize channels as the target channel.
        // 3. Assign the currentChannel with the target channel(in step2)
    }

    @Override
    public void emit(T record) throws IOException {
        // Placeholders to check or prepare.
        // ... 

        // Main calling logic
        traverseToTheIdlestChannel();
        
        // Placeholders to send record
        // ... 
    }
}

When & where to use adaptive partition strategy for the target partitioners?

Adjust the record writer creation logic in StreamTask#createRecordWriter:

When the StreamPartitioner#isEnabledAdaptivePartitionTrait returning true, then initializing a new AdaptiveLoadBasedRecordWriter like following:

StreamTask
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
            NonChainedOutput streamOutput,
            int outputIndex,
            Environment environment,
            String taskNameWithSubtask,
            long bufferTimeout) {

        // The old lines placeholders.
		// ...

        // The new adjusted lines.
        Configuration conf = environment.getJobConfiguration();
        final boolean adaptable = conf.get(NettyShuffleEnvironmentOptions.ADAPTIVE_PARTITIONER_ENABLE) && bufferWriter.getNumberOfSubpartitions() > 1;
        final int maxTraverseSize = conf.get(NettyShuffleEnvironmentOptions.ADAPTIVE_PARTITIONER_MAX_TRAVERSE_SIZE);

        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
                new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
                        .setChannelSelector(outputPartitioner)
                        .setTimeout(bufferTimeout)
                        .setTaskName(taskNameWithSubtask)
                        .setAdaptable(adaptable)
                        .setMaxTraverseSize(maxTraverseSize)
                        .build(bufferWriter);
        output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return output;
}

Benchmark result[link]



Benchmark                                                              (adaptivePartitioner)  (adaptivePartitionerMaxTraverseSize)   (sourceType)   Mode  Cnt  Score   Error   Units
InputOnSkewDownstreamProcessAvailabilityBenchmark.mapRebalanceMapSink                   true                                     2  F27_UNBOUNDED  thrpt   30  2.031 ± 0.285  ops/ms
InputOnSkewDownstreamProcessAvailabilityBenchmark.mapRebalanceMapSink                  false                                     2  F27_UNBOUNDED  thrpt   30  0.658 ± 0.017  ops/ms


Other candidate designs and comparisons

Candidate design 1 (Changed as the main design, now)

Public Interfaces

Introduce the following options at configuration level:

taskmanager.network.adaptive-partitioner.enabled

  • default value: false
  • type: boolean
  • description: Whether use the adaptive partition strategy

taskmanager.network.adaptive-partitioner.max-traverse-size

  • default value: 4
  • type: int
  • description: How many channels to traverse at most when looking for an idler channel.

In this way, whether it is an SQL job or a JAR job, as long as this feature is enabled and used in conjunction with the two parameters mentioned above, this  adaptive partition capability can be applied to the target partition.

Candidate design 2

Public Interfaces

Introduce the following options at configuration level:

taskmanager.network.adaptive-partitioner.max-traverse-size

  • default value: 4
  • type: int
  • description: How many channels to traverse at most when looking for an idler channel.

Introduce the following new methods at DateStream:

  • public <K> DataStream<T> adaptivePartitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector)
  • public DataStream<T> adaptiveShuffle() 
  • public DataStream<T> adaptiveRebalance()
  • public DataStream<T> adaptiveRescale()

In this way:

  • In SQL jobs, the SQL hint syntax can be used to enable the Adaptive Partitioning feature for the corresponding partitioner of the target job. In this case, the SQL planner will replace the partitioning method within the target scope with the aforementioned API.
  • In JAR jobs, developers need to actively call the above method to manually enable the Adaptive Partitioning feature.
  • All partitioners that use adaptive partitioning share the same configuration value of taskmanager.network.adaptive-partitioner.max-traverse-size.

Comparisons


DesignJAR Jobs:
Compatibility
to enable the feature
JAR Jobs:
Fine-Grained Operator
Partitioning
SQL Jobs:
Compatibility
to enable the feature
SQL Jobs:
Fine-Grained Operator
Partitioning

Implementation
Complexity

The rejected (old-main) designHigh:
  • Use the config options
  • Or use the related new methods

High:

  • Use API & config options based on cases
  • Could support operator-level configuration
    • traverse size
    • Whether enabled the feature
Middle:
  • Use the config options

Low:

  • Use the config option to control the traverse size for all operators
  • Use the config option to enable the feature for all operators
Middle:
  • Introduce the API related logic  & parameters passing
  • Adapt the common part in Runtime
  • No need to adapt SQL Planner
(new-)Main design
( the original Candidate design 1)
Middle:
  • Use the config options
Middle:
  • Use the config option to control the traverse size for all operators
Middle:
  • Use the config options

Low:

  • Use the config option to control the traverse size for all operators
  • Use the config option to enable the feature for all operators
Low
  • Introduce the API related logic  & parameters passing
  • Adapt the common part in Runtime
  • No need to adapt SQL Planner
Candidate design 2Low:
need call the new related methods(e.g. the API source coding in jobs)
Middle:
  • Use the config option to control the traverse size for all operators

Low:

  • need use sql hint syntax(e.g. the sql source coding in jobs)

Low:

  • Use the config option to control the traverse size for all operators
  • Use the config option to enable the feature for all operators
High:
  • Introduce the API related logic  & parameters passing
  • Adapt the common part in Runtime
  • Adapt the SQL Planner about the SQL hint syntax processing


Compatibility, Deprecation, and Migration Plan

  • N.A

Test Plan

Add the corresponding test cases related to the change.

Rejected Alternatives

Public Interfaces

Introduce two options at configuration level:

taskmanager.network.adaptive-partitioner.enabled

  • default value: false
  • type: boolean
  • description: Whether use the adaptive partition strategy


taskmanager.network.adaptive-partitioner.max-traverse-size

  • default value: 4
  • type: int
  • description: How many channels to traverse at most when looking for an idler channel.
  • Why use this parameter?
    • Traversing all sub-partitions results when partitioning is a cost action, using this strategy could help make the trade-off better.
    • The effects shown in the following table[FLINK-31655].


Introduce the new methods at API level:

Introduce the new methods  at API level at DataStream:

  • Introduce the new following methods as the overview list and keep the original methods about partitionCustomThe, shuffle, rebalance, rescale:
    • public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector, boolean adaptable)
    • public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector, 
      int enabledAdaptableAndMaxTraverseSize)
    • public DataStream<T> shuffle(boolean adaptable) 
    • public DataStream<T> shuffle(int enabledAdaptableAndMaxTraverseSize)
    • public DataStream<T> rebalance(boolean adaptable)
    • public DataStream<T> rebalance(int enabledAdaptableAndMaxTraverseSize)
    • public DataStream<T> rescale(boolean adaptable)
    • public DataStream<T> rescale(int enabledAdaptableAndMaxTraverseSize)
DataStream.java
    // old placeholders lines
    // ...

    // The new introduced methods.


	// Keep the original partitionCustom 
    /**
     * Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
     * method takes the key selector to get the key to partition on, and a partitioner that accepts
     * the key type.
     *
     * <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
     * of fields.
     *
     * @param partitioner The partitioner to assign partitions to keys.
     * @param keySelector The KeySelector with which the DataStream is partitioned.
     * @return The partitioned DataStream.
     * @see KeySelector
     */
    public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        return setConnectionType(
                new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
    }

    // The new introduced for partitionCustom.
    /**
     * Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
     * method takes the key selector to get the key to partition on, and a partitioner that accepts
     * the key type.
     *
     * <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
     * of fields.
     *
     * @param partitioner The partitioner to assign partitions to keys.
     * @param keySelector The KeySelector with which the DataStream is partitioned.
     * @param adaptable If enabled the load based adaptive partition. Note, This parameter can only
     *     be used when the partition logic is not bound to downstream subtasks; otherwise, it would
     *     cause semantic errors in data partitioning.
     * @return The partitioned DataStream.
     * @see KeySelector
     */
    public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner, KeySelector<T, K> keySelector, boolean adaptable) {
        return setConnectionType(
                new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector), adaptable));
    }

    /**
     * Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
     * method takes the key selector to get the key to partition on, and a partitioner that accepts
     * the key type.
     *
     * <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
     * of fields.
     *
     * @param partitioner The partitioner to assign partitions to keys.
     * @param keySelector The KeySelector with which the DataStream is partitioned.
     * @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
     *     specify how many channels to traverse at most when looking for the idlest channel. Note,
     *     this parameter can only be used when the partition logic is not bound to downstream
     *     subtasks, otherwise, it would cause semantic errors in data partitioning.
     * @return The partitioned DataStream.
     * @see KeySelector
     */
    public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner,
            KeySelector<T, K> keySelector,
            int enabledAdaptableAndMaxTraverseSize) {
        return setConnectionType(
                new CustomPartitionerWrapper<>(
                        clean(partitioner),
                        clean(keySelector),
                        true,
                        enabledAdaptableAndMaxTraverseSize));
    }

	// Keep the original shuffle
    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
     * uniformly randomly to the next operation.
     *
     * @return The DataStream with shuffle partitioning set.
     */
    @PublicEvolving
    public DataStream<T> shuffle() {
        return setConnectionType(new ShufflePartitioner<T>());
    }

    // The new introduced for shuffle.
    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
     * uniformly randomly to the next operation.
     *
     * @param adaptable If enabled the load based adaptive partition.
     * @return The DataStream with shuffle partitioning set.
     */
    public DataStream<T> shuffle(boolean adaptable) {
        return setConnectionType(new ShufflePartitioner<T>(adaptable));
    }

    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
     * uniformly randomly to the next operation.
     *
     * @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
     *     specify how many channels to traverse at most when looking for the idlest channel.
     * @return The DataStream with shuffle partitioning set.
     */
    public DataStream<T> shuffle(int enabledAdaptableAndMaxTraverseSize) {
        return setConnectionType(
                new ShufflePartitioner<T>(true, enabledAdaptableAndMaxTraverseSize));
    }

 	// Keep the original rebalance
    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
     * evenly to instances of the next operation in a round-robin fashion.
     *
     * @return The DataStream with rebalance partitioning set.
     */
    public DataStream<T> rebalance() {
        return setConnectionType(new RebalancePartitioner<T>());
    } 

    // The new introduced for rebalance.
    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
     * evenly to instances of the next operation in a round-robin fashion.
     *
     * @param adaptable If enabled the load based adaptive partition.
     * @return The DataStream with rebalance partitioning set.
     */
    public DataStream<T> rebalance(boolean adaptable) {
        return setConnectionType(new RebalancePartitioner<T>(adaptable));
    }

    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
     * evenly to instances of the next operation in a round-robin fashion.
     *
     * @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
     *     specify how many channels to traverse at most when looking for the idlest channel.
     * @return The DataStream with rebalance partitioning set.
     */
    public DataStream<T> rebalance(int enabledAdaptableAndMaxTraverseSize) {
        return setConnectionType(
                new RebalancePartitioner<T>(true, enabledAdaptableAndMaxTraverseSize));
    }

 	// Keep the original rescale 
    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
     * evenly to a subset of instances of the next operation in a round-robin fashion.
     *
     * <p>The subset of downstream operations to which the upstream operation sends elements depends
     * on the degree of parallelism of both the upstream and downstream operation. For example, if
     * the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
     * one upstream operation would distribute elements to two downstream operations while the other
     * upstream operation would distribute to the other two downstream operations. If, on the other
     * hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
     * 4 then two upstream operations will distribute to one downstream operation while the other
     * two upstream operations will distribute to the other downstream operations.
     *
     * <p>In cases where the different parallelisms are not multiples of each other one or several
     * downstream operations will have a differing number of inputs from upstream operations.
     *
     * @return The DataStream with rescale partitioning set.
     */
    @PublicEvolving
    public DataStream<T> rescale() {
        return setConnectionType(new RescalePartitioner<T>());
    }

    // The new introduced for rescale. 
    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
     * evenly to a subset of instances of the next operation in a round-robin fashion.
     *
     * <p>The subset of downstream operations to which the upstream operation sends elements depends
     * on the degree of parallelism of both the upstream and downstream operation. For example, if
     * the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
     * one upstream operation would distribute elements to two downstream operations while the other
     * upstream operation would distribute to the other two downstream operations. If, on the other
     * hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
     * 4 then two upstream operations will distribute to one downstream operation while the other
     * two upstream operations will distribute to the other downstream operations.
     *
     * <p>In cases where the different parallelisms are not multiples of each other one or several
     * downstream operations will have a differing number of inputs from upstream operations.
     *
     * @param adaptable If enabled the load based adaptive partition.
     * @return The DataStream with rescale partitioning set.
     */
    public DataStream<T> rescale(boolean adaptable) {
        return setConnectionType(new RescalePartitioner<T>(adaptable));
    }

    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
     * evenly to a subset of instances of the next operation in a round-robin fashion.
     *
     * <p>The subset of downstream operations to which the upstream operation sends elements depends
     * on the degree of parallelism of both the upstream and downstream operation. For example, if
     * the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
     * one upstream operation would distribute elements to two downstream operations while the other
     * upstream operation would distribute to the other two downstream operations. If, on the other
     * hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
     * 4 then two upstream operations will distribute to one downstream operation while the other
     * two upstream operations will distribute to the other downstream operations.
     *
     * <p>In cases where the different parallelisms are not multiples of each other one or several
     * downstream operations will have a differing number of inputs from upstream operations.
     *
     * @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
     *     specify how many channels to traverse at most when looking for the idlest channel.
     * @return The DataStream with rescale partitioning set.
     */
    public DataStream<T> rescale(int enabledAdaptableAndMaxTraverseSize) {
        return setConnectionType(
                new RescalePartitioner<T>(true, enabledAdaptableAndMaxTraverseSize));
    }

Supplemental description of the above changes

Why need API and configuration level?

Providing API-level control methods gives users a more precise entry point to control whether to enable this feature.

This can help address certain issues.

For example, if the feature is enabled at the configuration level, all operators that support adaptive dynamic partitioning will have it activated.

However, some of these operators may not actually require adaptive dynamic partitioning, or enabling the feature for certain operators may be unsuitable and could even lead to optimization results that are counterproductive.

How is the precedence logic handled between the two levels of configuration?

Example a Flink JobGraph as :     JobVertexA(JVA)    —--rebalance---→      JVB    —----rescale—→     JVC

The assignment logic would be shown as the following table.

Config enabled?:
true / false

JVA set adaptive by API?:
not set(default) / false / true

JVB set adaptive by API?:
not set(default) / false / true

=>

JVA final adaptive ?:
not set / false / true

JVB final adaptive ?:
not set / false / true

false

not set

not set


false

false

false

not set

false


false

false

false

not set

true


false

true

false

false

not set


false

false

false

false

false


false

false

false

false

true


false

true

false

true

not set


true

false

false

true

false


true

false

false

true

true


true

true







true

not set

not set


true

true

true

not set

false


true

false

true

not set

true


ture

true

true

false

not set


false

true

true

false

false


false

false

true

false

true


false

true

true

true

not set


true

true

true

true

false


true

false

true

true

true


true

true


In a similar way:
the value of maxTraverse actively specified via the API takes precedence over the value configured in the global configuration.

That is, when the adaptive partition feature is actively enabled for a certain operator at the API level without actively specifying the value of maxTraverse,

the corresponding partitioner will use the value of maxTraverse from the global configuration for processing.

Advantages and disadvantages of the current public interface

  • It provides a flexible and fine-grained configurable scope at the API level, which can effectively handle complex scenarios where different operators have different requirements regarding whether dynamic partitioning is enabled.
  • For SQL jobs, no additional adaptation is required; this feature can be enabled simply through job-level configuration.
  • The design is complex.


Acknowledgements

Thanks Mang Zhang for reporting the feature, and previous work from developers involved in the discussion.

  • No labels