This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
Motivation
Flink is a distributed processing engine, if some nodes are overloaded, then it may cause flink's subtask processing to slow down, which in turn leads to backpressure and lag.
There are various data Shuffle strategies in Flink, the common ones are Forward, Rebalance, Rescale, Shuffle, HASH(KeyBy), customPartition, etc.
| Partitioner | Data bound to subtasks |
|---|
| Rebalance | not |
| Rescale | not bound to subtasks per group |
| Shuffle | not |
| customPartition | potential, This depends on the detailed implementation. |
| Forward | yes |
| Hash | yes |
| Global | yes |
| etc | ... |
For [(Potential)Data not bound to subtask] scenarios (4 partition strategies: Rebalance, Rescale), adjusting dynamically the subtask of the received data according to the processing load of the downstream operator would be good to achieve the effect of peak-shaving and valley-filling, and try to ensure the throughput of flink jobs.
Public Interfaces
Introduce two options at configuration level:
taskmanager.network.adaptive-partitioner.enabled
- default value: false
- type: boolean
- description: Whether use the adaptive partition strategy
taskmanager.network.adaptive-partitioner.max-traverse-size
- default value: 4
- type: int
- description: How many channels to traverse at most when looking for an idler channel.
- Why use this parameter?
- Traversing all sub-partitions results when partitioning is a cost action, using this strategy could help make the trade-off better.
- The effects shown in the following table[FLINK-31655].


In this way, whether it is an SQL job or a JAR job, as long as this feature is enabled and used in conjunction with the two parameters mentioned above, this adaptive partition capability can be applied to the target partition.
Proposed Changes
Introduce the AdaptiveLoadBasedRecordWriter
Introduce the AdaptiveLoadBasedRecordWriter to partition adaptively based on the loading of sub-partitions for adaptive RescalePartitioner & RebalancePartitioner.
public final class AdaptiveLoadBasedRecordWriter<T extends IOReadableWritable>
extends RecordWriter<T> {
// Core attributions
// Max traverse channel size when selecting an idler channel.
private final int maxTraverseSize;
// To restore the current channel index.
private int currentChannel;
// The available sub-partitions to send records.
private final int numberOfSubpartitions;
// Place holders of other auxiliary methods or attributions.
void traverseToTheIdlestChannel() {
// main logic overview
// 1. compute the size of bytes per subpartition result in the next #maxTraverseSize channels
// It can be considered that all partitions form an abstract probation queue,
// where each calculation covers a range of #maxTraverseSize items traversed backward.
// 2. Get the channel whose bytes in queue is the smallest in
// the #maxTraverseSize channels as the target channel.
// 3. Assign the currentChannel with the target channel(in step2)
}
@Override
public void emit(T record) throws IOException {
// Placeholders to check or prepare.
// ...
// Main calling logic
traverseToTheIdlestChannel();
// Placeholders to send record
// ...
}
}
When & where to use adaptive partition strategy for the target partitioners?
Adjust the record writer creation logic in StreamTask#createRecordWriter:
When the StreamPartitioner#isEnabledAdaptivePartitionTrait returning true, then initializing a new AdaptiveLoadBasedRecordWriter like following:
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
NonChainedOutput streamOutput,
int outputIndex,
Environment environment,
String taskNameWithSubtask,
long bufferTimeout) {
// The old lines placeholders.
// ...
// The new adjusted lines.
Configuration conf = environment.getJobConfiguration();
final boolean adaptable = conf.get(NettyShuffleEnvironmentOptions.ADAPTIVE_PARTITIONER_ENABLE) && bufferWriter.getNumberOfSubpartitions() > 1;
final int maxTraverseSize = conf.get(NettyShuffleEnvironmentOptions.ADAPTIVE_PARTITIONER_MAX_TRAVERSE_SIZE);
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
.setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskNameWithSubtask)
.setAdaptable(adaptable)
.setMaxTraverseSize(maxTraverseSize)
.build(bufferWriter);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
Benchmark result[link]
Benchmark (adaptivePartitioner) (adaptivePartitionerMaxTraverseSize) (sourceType) Mode Cnt Score Error Units
InputOnSkewDownstreamProcessAvailabilityBenchmark.mapRebalanceMapSink true 2 F27_UNBOUNDED thrpt 30 2.031 ± 0.285 ops/ms
InputOnSkewDownstreamProcessAvailabilityBenchmark.mapRebalanceMapSink false 2 F27_UNBOUNDED thrpt 30 0.658 ± 0.017 ops/ms
Other candidate designs and comparisons
Candidate design 1 (Changed as the main design, now)
Public Interfaces
Introduce the following options at configuration level:
taskmanager.network.adaptive-partitioner.enabled
- default value: false
- type: boolean
- description: Whether use the adaptive partition strategy
taskmanager.network.adaptive-partitioner.max-traverse-size
- default value: 4
- type: int
- description: How many channels to traverse at most when looking for an idler channel.
In this way, whether it is an SQL job or a JAR job, as long as this feature is enabled and used in conjunction with the two parameters mentioned above, this adaptive partition capability can be applied to the target partition.
Candidate design 2
Public Interfaces
Introduce the following options at configuration level:
taskmanager.network.adaptive-partitioner.max-traverse-size
- default value: 4
- type: int
- description: How many channels to traverse at most when looking for an idler channel.
Introduce the following new methods at DateStream:
- public <K> DataStream<T> adaptivePartitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector)
- public DataStream<T> adaptiveShuffle()
- public DataStream<T> adaptiveRebalance()
- public DataStream<T> adaptiveRescale()
In this way:
- In SQL jobs, the SQL hint syntax can be used to enable the Adaptive Partitioning feature for the corresponding partitioner of the target job. In this case, the SQL planner will replace the partitioning method within the target scope with the aforementioned API.
- In JAR jobs, developers need to actively call the above method to manually enable the Adaptive Partitioning feature.
- All partitioners that use adaptive partitioning share the same configuration value of taskmanager.network.adaptive-partitioner.max-traverse-size.
Comparisons
| Design | JAR Jobs: Compatibility to enable the feature | JAR Jobs: Fine-Grained Operator Partitioning | SQL Jobs: Compatibility to enable the feature | SQL Jobs: Fine-Grained Operator Partitioning | Implementation Complexity |
|---|
| The rejected (old-main) design | High:
- Use the config options
- Or use the related new methods
| High: - Use API & config options based on cases
- Could support operator-level configuration
- traverse size
- Whether enabled the feature
| Middle:
| Low:
- Use the config option to control the traverse size for all operators
- Use the config option to enable the feature for all operators
| Middle:
- Introduce the API related logic & parameters passing
- Adapt the common part in Runtime
- No need to adapt SQL Planner
|
(new-)Main design ( the original Candidate design 1) | Middle:
| Middle:
- Use the config option to control the traverse size for all operators
| Middle:
| Low: - Use the config option to control the traverse size for all operators
- Use the config option to enable the feature for all operators
| Low:
- Introduce the API related logic & parameters passing
- Adapt the common part in Runtime
- No need to adapt SQL Planner
|
| Candidate design 2 | Low: need call the new related methods(e.g. the API source coding in jobs) | Middle:
- Use the config option to control the traverse size for all operators
| Low: - need use sql hint syntax(e.g. the sql source coding in jobs)
| Low: - Use the config option to control the traverse size for all operators
- Use the config option to enable the feature for all operators
| High:
- Introduce the API related logic & parameters passing
- Adapt the common part in Runtime
- Adapt the SQL Planner about the SQL hint syntax processing
|
Compatibility, Deprecation, and Migration Plan
Test Plan
Add the corresponding test cases related to the change.
Rejected Alternatives
Public Interfaces
Introduce two options at configuration level:
taskmanager.network.adaptive-partitioner.enabled
default value: falsetype: booleandescription: Whether use the adaptive partition strategy
taskmanager.network.adaptive-partitioner.max-traverse-size
default value: 4type: intdescription: How many channels to traverse at most when looking for an idler channel.Why use this parameter?Traversing all sub-partitions results when partitioning is a cost action, using this strategy could help make the trade-off better.The effects shown in the following table[FLINK-31655].
Introduce the new methods at API level:
Introduce the new methods at API level at DataStream:
Introduce the new following methods as the overview list and keep the original methods about partitionCustomThe, shuffle, rebalance, rescale:
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector, boolean adaptable)
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector,
int enabledAdaptableAndMaxTraverseSize)
public DataStream<T> shuffle(boolean adaptable)
public DataStream<T> shuffle(int enabledAdaptableAndMaxTraverseSize)
public DataStream<T> rebalance(boolean adaptable)
public DataStream<T> rebalance(int enabledAdaptableAndMaxTraverseSize)
public DataStream<T> rescale(boolean adaptable)
public DataStream<T> rescale(int enabledAdaptableAndMaxTraverseSize)
// old placeholders lines
// ...
// The new introduced methods.
// Keep the original partitionCustom
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
* method takes the key selector to get the key to partition on, and a partitioner that accepts
* the key type.
*
* <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*
* @param partitioner The partitioner to assign partitions to keys.
* @param keySelector The KeySelector with which the DataStream is partitioned.
* @return The partitioned DataStream.
* @see KeySelector
*/
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
return setConnectionType(
new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
}
// The new introduced for partitionCustom.
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
* method takes the key selector to get the key to partition on, and a partitioner that accepts
* the key type.
*
* <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*
* @param partitioner The partitioner to assign partitions to keys.
* @param keySelector The KeySelector with which the DataStream is partitioned.
* @param adaptable If enabled the load based adaptive partition. Note, This parameter can only
* be used when the partition logic is not bound to downstream subtasks; otherwise, it would
* cause semantic errors in data partitioning.
* @return The partitioned DataStream.
* @see KeySelector
*/
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector, boolean adaptable) {
return setConnectionType(
new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector), adaptable));
}
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
* method takes the key selector to get the key to partition on, and a partitioner that accepts
* the key type.
*
* <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*
* @param partitioner The partitioner to assign partitions to keys.
* @param keySelector The KeySelector with which the DataStream is partitioned.
* @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
* specify how many channels to traverse at most when looking for the idlest channel. Note,
* this parameter can only be used when the partition logic is not bound to downstream
* subtasks, otherwise, it would cause semantic errors in data partitioning.
* @return The partitioned DataStream.
* @see KeySelector
*/
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner,
KeySelector<T, K> keySelector,
int enabledAdaptableAndMaxTraverseSize) {
return setConnectionType(
new CustomPartitionerWrapper<>(
clean(partitioner),
clean(keySelector),
true,
enabledAdaptableAndMaxTraverseSize));
}
// Keep the original shuffle
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
* uniformly randomly to the next operation.
*
* @return The DataStream with shuffle partitioning set.
*/
@PublicEvolving
public DataStream<T> shuffle() {
return setConnectionType(new ShufflePartitioner<T>());
}
// The new introduced for shuffle.
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
* uniformly randomly to the next operation.
*
* @param adaptable If enabled the load based adaptive partition.
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> shuffle(boolean adaptable) {
return setConnectionType(new ShufflePartitioner<T>(adaptable));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
* uniformly randomly to the next operation.
*
* @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
* specify how many channels to traverse at most when looking for the idlest channel.
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> shuffle(int enabledAdaptableAndMaxTraverseSize) {
return setConnectionType(
new ShufflePartitioner<T>(true, enabledAdaptableAndMaxTraverseSize));
}
// Keep the original rebalance
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to instances of the next operation in a round-robin fashion.
*
* @return The DataStream with rebalance partitioning set.
*/
public DataStream<T> rebalance() {
return setConnectionType(new RebalancePartitioner<T>());
}
// The new introduced for rebalance.
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to instances of the next operation in a round-robin fashion.
*
* @param adaptable If enabled the load based adaptive partition.
* @return The DataStream with rebalance partitioning set.
*/
public DataStream<T> rebalance(boolean adaptable) {
return setConnectionType(new RebalancePartitioner<T>(adaptable));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to instances of the next operation in a round-robin fashion.
*
* @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
* specify how many channels to traverse at most when looking for the idlest channel.
* @return The DataStream with rebalance partitioning set.
*/
public DataStream<T> rebalance(int enabledAdaptableAndMaxTraverseSize) {
return setConnectionType(
new RebalancePartitioner<T>(true, enabledAdaptableAndMaxTraverseSize));
}
// Keep the original rescale
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to a subset of instances of the next operation in a round-robin fashion.
*
* <p>The subset of downstream operations to which the upstream operation sends elements depends
* on the degree of parallelism of both the upstream and downstream operation. For example, if
* the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
* one upstream operation would distribute elements to two downstream operations while the other
* upstream operation would distribute to the other two downstream operations. If, on the other
* hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
* 4 then two upstream operations will distribute to one downstream operation while the other
* two upstream operations will distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @return The DataStream with rescale partitioning set.
*/
@PublicEvolving
public DataStream<T> rescale() {
return setConnectionType(new RescalePartitioner<T>());
}
// The new introduced for rescale.
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to a subset of instances of the next operation in a round-robin fashion.
*
* <p>The subset of downstream operations to which the upstream operation sends elements depends
* on the degree of parallelism of both the upstream and downstream operation. For example, if
* the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
* one upstream operation would distribute elements to two downstream operations while the other
* upstream operation would distribute to the other two downstream operations. If, on the other
* hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
* 4 then two upstream operations will distribute to one downstream operation while the other
* two upstream operations will distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @param adaptable If enabled the load based adaptive partition.
* @return The DataStream with rescale partitioning set.
*/
public DataStream<T> rescale(boolean adaptable) {
return setConnectionType(new RescalePartitioner<T>(adaptable));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to a subset of instances of the next operation in a round-robin fashion.
*
* <p>The subset of downstream operations to which the upstream operation sends elements depends
* on the degree of parallelism of both the upstream and downstream operation. For example, if
* the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
* one upstream operation would distribute elements to two downstream operations while the other
* upstream operation would distribute to the other two downstream operations. If, on the other
* hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
* 4 then two upstream operations will distribute to one downstream operation while the other
* two upstream operations will distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @param enabledAdaptableAndMaxTraverseSize Enabled the load based adaptive partition and
* specify how many channels to traverse at most when looking for the idlest channel.
* @return The DataStream with rescale partitioning set.
*/
public DataStream<T> rescale(int enabledAdaptableAndMaxTraverseSize) {
return setConnectionType(
new RescalePartitioner<T>(true, enabledAdaptableAndMaxTraverseSize));
}
Supplemental description of the above changes
Why need API and configuration level?
Providing API-level control methods gives users a more precise entry point to control whether to enable this feature.
This can help address certain issues.
For example, if the feature is enabled at the configuration level, all operators that support adaptive dynamic partitioning will have it activated.
However, some of these operators may not actually require adaptive dynamic partitioning, or enabling the feature for certain operators may be unsuitable and could even lead to optimization results that are counterproductive.
How is the precedence logic handled between the two levels of configuration?
Example a Flink JobGraph as : JobVertexA(JVA) —--rebalance---→ JVB —----rescale—→ JVC
The assignment logic would be shown as the following table.
Config enabled?: true / false
| JVA set adaptive by API?: not set(default) / false / true
| JVB set adaptive by API?: not set(default) / false / true
| =>
| JVA final adaptive ?: not set / false / true
| JVB final adaptive ?: not set / false / true
|
false
| not set
| not set
|
| false
| false
|
false
| not set
| false
|
| false
| false
|
false
| not set
| true
|
| false
| true
|
false
| false
| not set
|
| false
| false
|
false
| false
| false
|
| false
| false
|
false
| false
| true
|
| false
| true
|
false
| true
| not set
|
| true
| false
|
false
| true
| false
|
| true
| false
|
false
| true
| true
|
| true
| true
|
|
|
|
|
|
|
true
| not set
| not set
|
| true
| true
|
true
| not set
| false
|
| true
| false
|
true
| not set
| true
|
| ture
| true
|
true
| false
| not set
|
| false
| true
|
true
| false
| false
|
| false
| false
|
true
| false
| true
|
| false
| true
|
true
| true
| not set
|
| true
| true
|
true
| true
| false
|
| true
| false
|
true
| true
| true
|
| true
| true
|
In a similar way:
the value of maxTraverse actively specified via the API takes precedence over the value configured in the global configuration.
That is, when the adaptive partition feature is actively enabled for a certain operator at the API level without actively specifying the value of maxTraverse,
the corresponding partitioner will use the value of maxTraverse from the global configuration for processing.
Advantages and disadvantages of the current public interface
It provides a flexible and fine-grained configurable scope at the API level, which can effectively handle complex scenarios where different operators have different requirements regarding whether dynamic partitioning is enabled.For SQL jobs, no additional adaptation is required; this feature can be enabled simply through job-level configuration.The design is complex.
Acknowledgements
Thanks Mang Zhang for reporting the feature, and previous work from developers involved in the discussion.