Discussion thread
Vote thread

FLINK-12786 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that have the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task.

The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key.

Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved.

Note that to achieve the benefits brought by local aggregation, it’s required that the aggregated results can be easily obtained with decomposition and combination. The condition is satisfied by many common aggregating operations, e.g., sum, count and topN. Few other aggregating operations, like cardinality, cannot be easily decomposed and combined, hence will not benefit from the usage of local aggregation.

In addition, we introduced a new kind of Keyed state named "Local keyed state" to support our implementation. The "Local keyed state" mainly provide benefit for the implementation of local aggregation. What's more, It also provides more general capabilities for some flexible "local computing." Here, "local computing" not only covers "local aggregation" but also covers more general processing logic processed by "KeyedProcessFunction", "ProcessWindowFunction" and stateful APIs in local. In addition, it supports the implementation of local aggregation based on Window API, because window operator used local keyed state in this scenarios. However, from the API level, the usage of the local keyed state is the same as the generic keyed state, we do not change any interface of keyed state.

Public Interfaces

A few APIs to support local aggregation need to be added to DataStream class, list below:

LocalKeyedStream<T, Tuple> localKeyBy(int… fields);
LocalKeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> LocalKeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> LocalKeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);

The usage of the API, please see the examples:

//local aggregation with existed aggregate APIs

//local aggregation with user defined "AggregateFunction" to calculate avg
DataStream<Tuple2<String, Long>> source = null;
	.aggregate(new AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {},
				new WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, String, TimeWindow>() {})
	.aggregate(new AggregateFunction<Tuple3<String, Long, Long>, Long, Tuple2<String, Long>>() {},
				new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String, TimeWindow>());

//local aggregation based on ProcessFunction
	.process(new ProcessFunction<Tuple2<String,Long>, Tuple2<String, Long>>() {})

//local aggregation based on ProcessWindowFunction
	.process(new ProcessWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, String, TimeWindow>() {})
	.aggregate()	//.process()


Users can perform local aggregation with local keyed streams. Local keyed streams resemble keyed streams in many respects. They both partition elements according to keys and allow access to states affiliated with keys. But the partitioning in local keyed streams does not require shuffling and is performed locally.

Figure 1. Local aggregation.

The difference in partitioning schema makes it non-trivial to save and restore local keyed states, especially when the degree of parallelism changes. In keyed streams, keys are assigned to a set of key groups and we can easily redistribute these key groups to tasks when the degree of parallelism changes. But in local keyed streams, as elements are partitioned locally, each task has a full range of key groups. When the degree of parallelism changes, we have to redistribute the key groups of old tasks and construct new key groups for new tasks.

Figure 2. Saving and restoring of local keyed states

Figure 2 illustrates how local keyed states are saved and restored when the degree of parallelism changes. Similar to the operators on keyed streams, after materializing local keyed states in persistent storage, the operators on local keyed streams will response to the checkpoint coordinator with a state handle containing the meta information of its local key groups. Suppose the parallelism is 3 and the max parallelism is 12, then there exist 36 local key groups in the checkpoint.

When the degree of parallelism is changed, the checkpoint coordinator will assign these local key groups to new tasks in a roughly even manner. It will iterate over all state handles and assign state handles to new tasks according to the number of key groups. A state handle will be split if only a portion of its key groups is assigned.

In the illustrated example, the state handle of Task 2 is split into 2 parts, each of which contains the meta information of 6 local key groups. Task 1’ then restores its local keyed states with the state handle of Task 1 and the first part of the state handle of Task 2.

A task may be assigned multiple local key groups with the same id at restoring. These instances should be merged to construct the new key group.

The merging of list states is quite straight-forward. We can simply merge two list states of the same key by appending the elements of one list state to the other one.

Reducing states and aggregating states can also be merged with the help of user-defined functions. But currently these user-defined functions are not saved in checkpoints, making it impossible to perform merging at restoring.

A solution at first thought is to save these user-defined functions in checkpoints. To be backward compatible, this solution will require a different metadata format for the snapshots of local keyed states.

We can also perform lazy merging to avoid the need for the saving of user-defined functions. When restoring from checkpoints, we keep the values from different key group instances in a list and perform merging when they are accessed. That way, local keyed states, and keyed states can share the same metadata format.

Value states and map states can also be merged if users can provide user-defined merge functions in the state descriptors. But the need for the merging of value and map states is not urgent because in most cases users can replace value and map states with reducing and aggregating states if a user-defined merge function can be provided.


In the keyed streams produced by localKeyBy, the partition transformation deploys LocalKeyGroupStreamPartitioner instead of KeyGroupStreamPartitioner to partition stream elements.

Since all operator on local keyed streams are performed locally, these operators must be chained with their inputs. So we disabled some transform APIs, they are connect / join / intervalJoin / coGroup. Like ForwardPartitioner, we will check the parallelism of upstream and downstream nodes of LocalKeyGroupStreamPartitioners when generating stream graphs. Exceptions will be thrown if their parallelism are not equal.

When generating job graphs, the operators will be chained with the inputs if the partitioner is typed LocalKeyGroupStreamPartitioner.

Besides the selector and the serializer for keys, the scope for key partitioning will also be written in the operator’s configuration. The scope can help the operator know it is performed in a keyed stream or a local keyed stream.

The access to local keyed states is very similar to the access to keyed state except that

  • The key group range is full for all tasks, and
  • The access to value and map states is disallowed.

If the merging of reducing and aggregating states is performed lazily, we should use lists to store the values of reducing and aggregating states. When accessing reducing and aggregating states, we iterate over all the elements in the list and merge these elements to produce the result. The extra merging operation only happens at the first access after restoring. In other cases, there only exists one element in the list and no merging is needed.

The materializing of local keyed states is also similar to that of keyed states, hence can share the same code.

When recovering from failures, the checkpoint coordinator will distribute the state handles of local keyed states to tasks according to the method described in Section 2.1.

When restoring from local keyed state handles, we iterate all key-value pairs in assigned key groups. In the cases where the merging functions are saved in local keyed state handles, we merge the values of the same key with corresponding merge functions. While in the cases where the merging is performed lazily, we simply append values to the lists of their keys.

The total implementation can be split into a few subtasks(steps):

  • introduce a KeyScope enum to distinguish the key comes from localKeyBy API or general keyBy API
public enum KeyScope {


	private final boolean local;

	KeyScope(boolean local) {
		this.local = local;

	public boolean isLocal() {
		return local;
  • introduce a new stream partitioner named LocalKeyedStreamPartitioner, its implementation is similar to ForwardPartitioner to forward the local keyed stream. Note: the code snippet is still based on the old interface.
public class LocalKeyedStreamPartitioner<T, K> extends StreamPartitioner<T>
		implements ConfigurableStreamPartitioner {
	private static final long serialVersionUID = 1L;

	private final int[] returnArray = new int[] {0};

	private final KeySelector<T, K> keySelector;

	private int maxParallelism;

	public LocalKeyedStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {
		Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
		this.keySelector = Preconditions.checkNotNull(keySelector);
		this.maxParallelism = maxParallelism;

	public int[] selectChannels(
			SerializationDelegate<StreamRecord<T>> record,
			int numberOfOutputChannels) {
		return returnArray;
  • add new APIs for DataStream and KeyedStream:
//core internal method in DataStreamprivate KeyedStream<T, Tuple> localKeyBy(Keys<T> keys) {
        return KeyedStream.localKeyedStream(this,
        clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
//core internal method in KeyedStream
static <T, KEY> KeyedStream<T, KEY> localKeyedStream(DataStream<T> dataStream,
		KeySelector<T, KEY> keySelector,
		TypeInformation<KEY> keyType) {
	PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(
		new LocalKeyedStreamPartitioner<>(keySelector,

	return new KeyedStream<>(dataStream,
  • Introduce LocalKeyedStateHandle interface and implement LocalKeyedKeyGroupsStateHandle / LocalKeyedIncrementalKeyedStateHandle
//marker interface
public interface LocalKeyedStateHandle extends KeyedStateHandle {}

public class LocalKeyedKeyGroupsStateHandle extends KeyGroupsStateHandle
		implements LocalKeyedStateHandle {

	private static final long serialVersionUID = -2194187691361383511L;

	public LocalKeyedKeyGroupsStateHandle(KeyGroupRangeOffsets groupRangeOffsets,
			StreamStateHandle streamStateHandle) {
		super(groupRangeOffsets, streamStateHandle);

	public LocalKeyedKeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) {
		return new LocalKeyedKeyGroupsStateHandle(

public class LocalKeyedIncrementalKeyedStateHandle extends IncrementalKeyedStateHandle
		implements LocalKeyedStateHandle {

	private static final long serialVersionUID = 4081913652254161489L;

	public LocalKeyedIncrementalKeyedStateHandle(
			UUID backendIdentifier,
			KeyGroupRange keyGroupRange,
			long checkpointId,
			Map<StateHandleID, StreamStateHandle> sharedState,
			Map<StateHandleID, StreamStateHandle> privateState,
			StreamStateHandle metaStateHandle) {
		super(backendIdentifier, keyGroupRange, checkpointId, sharedState, privateState,
  • Support states of local aggregation mergence for RocksDB statebackend
//add a new method named reDistributeLocalKeyedStates for to re-distribute local keyed states, its signature is :
public static List<KeyedStateHandle>[] reDistributeLocalKeyedStates(OperatorState operatorState, int newParallelism) {

//when job is rescaling and restoring from local keyed snapshots, we may encounter more than one accumulator, so it needs to be merged
//So we introduce a mergeAccumulatorIfNeed method to do this:
	private ACC mergeAccumulatorIfNeed(List<ACC> list) {
		if (list == null || list.isEmpty()) {
			return null;
		ACC result = list.get(0);
		if (list.size() > 1) {
			for (int i = 1; i < list.size(); i++) {
				ACC other = list.get(i);
				if (result != null || other != null) {
					result = aggFunction.merge(result, other);
		return result;

//and override the getInternal method:
	public ACC getInternal(byte[] key) {
		try {
			byte[] valueBytes = backend.db.get(columnFamily, key);
			if (valueBytes == null) {
				return null;
			// When restoring from local keyed snapshots, we may encounter more than one accumulator
			// in case of rescaling.
			if (firstGet) {
				firstGet = false;
				List<ACC> list = deserializeList(valueBytes);
				return mergeAccumulatorIfNeed(list);
			return valueSerializer.deserialize(dataInputView);
		} catch (IOException | RocksDBException e) {
			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);

Alternative implementation

In section 3, we described our design based on Flink state. When discussed in the mailing thread, there are also some good suggestions. For example, introduce a stateless operator for local aggregation can provide better performance in some particular scenarios. The stateless operator would buffer the intermediate results and flush them during `StreamOperator::prepareSnapshotPreBarrier()`.

Supporting both two kinds of implementation is valuable, we can get benefit from different scenarios. In our original implementation, we reused window operator (a stateful operator) and window relevant APIs so that we can reuse some Flink concepts and flexible user interfaces. So we need to give a unified abstraction for both implementations.

A good solution to compatible with both is to enhance the WindowOperator and let it support the stateless implementation of local aggregation. We can provide a config option in the WindowOperator and give the choice of both two implementation to users.

Compatibility, Deprecation, and Migration Plan

This feature is biased towards optimization, so there are no compatibility-related issues.

Test Plan

All relevant changes are verified by unit tests, and if possible, we will try to write integration tests to verify it.

Rejected Alternatives