Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Motivation

Although the current SplitEnumerator is responsible for assigning splits, it lacks visibility into the actual runtime status or distribution of these splits.

  1. Enumerator Assignment: The SplitEnumerator proactively assigns splits to readers.

  2. State Recovery During Restart: When a SourceOperator restarts (e.g., during failover), it restores splits from state and reassigns them to readers.

  3. Split Completion: When a bounded split finishes reading, it does not notify the SplitEnumerator. As a result, the enumerator may remain unaware that the split has been completed.
  4. Restart with Changed subscription: During restart, if source options remove a topic or table. The splits which have already assigned can not be removed.


However, under the existing design, the SplitEnumerator only assigns splits to readers, after which it ceases to monitor their progress.For instance, the SplitEnumerator does not track whether an assigned split has been completed by the reader or redistributed (e.g., after restoring state with a different parallelism level). Most connectors (such as Kafka) do not maintain dynamic records of split assignments, leading to scenarios where splits are statically distributed based solely on topic/partition hashing.


This approach risks imbalanced split assignments, where a single reader may become overloaded due to an excessive number of splits. Processing delays then accumulate, creating critical bottlenecks for operations with strict timeliness requirements (e.g., real-time analytics or low-latency pipelines).


As demonstrated in FLINK-31762, , these issues manifest in two common scenarios:

Case 1: Kafka connector consumer two topics:

Let us assume that there are two topics, and each topic has four partitions. We have set the parallelism to eight to consume these two topics. However, the current partition assignment method may lead to some subtasks being assigned two partitions while others are left with none.

Case 2: Kafka connector restored with both different topic partition number and Flink parallelism

The second scenario occurs when users scale up Kafka's parallelism. The typical workflow is: pause the Flink job, increase the parallelism, and restart the job with state recovery.

For example:

  • If using a Flink source with 3 parallelism to consume a topic with 3 partitions.

  • When changing the Flink parallelism to 6, the start index changes when new partitions are discovered due to the change in the number of readers.

Assume tp.topic().hashCode() * 31 = 2,

From the beginning, the initial is 3 parallelism readers for 3 partitions. Thus, the partition 1 will be assign to task (2+ 1)%3 =0.

subtask

0

1

2

partitions of topic 1

1

2

3

then change to 6 parallelism readers:

subtask

0

1

2

3

4

5

partitions of topic 1

1

2

3




then the new discovered partitions 4 will be assigned to (2+ 4)%6  = 0

subtask

0

1

2

3

4

5

partitions of topic 1

1,4

2,5

4, 6


Then, we will find that the split is unevenly assigned.


This FLIP introduces two key enhancements:

  1. A new source contract that enables the enumerator to have a complete global view of splits.
  2. An implementation in the Kafka connector to address split imbalance during recovery.


Proposed Change

New Source Base Contract

Extending FLIP-27, this FLIP modified ReaderRegistrationEvent Contract:

If a Source implements SupportsSplitReassignmentOnRecovery, the SourceOperator does not assigned splits from state to the reader. Instead, it reports the splits via a ReaderRegistrationEvent sent to the SplitEnumerator.If a Source is not implements SupportsSplitReassignmentOnRecovery, the SourceOperator assigned splits from state to the reader.The splits won't be included in ReaderRegistrationEvent.


After the SourceCoordinator receives the ReaderRegistrationEvent, it calls addReader on the SplitEnumerator. At this point, the enumerator can access the reported splits through the ReaderInfo obtained from  SplitEnumeratorContext#registeredReaders.


The contract is simple, but the implement is not so simple. Then, I will take Kafka connector as example to show how to implements it, and explain why.

 Kafka connector

Behaviour Changes

Two built-in partition assignment  strategies are provided via the new configuration option scan.paritition-assignment-strategy: hash and roundrobin.

  1. Hash tries to evenly distribute the partitions of each individual topic. If there are multiple topics, there is no guarantee that the partitions across all the topics are evenly distributed.  Roundrobin tries to evenly distribute the partitions of all the topics, the number of partitions consumed by each reader are balanced, but for a specific topic, its partitions are not guaranteed to be evenly distributed. 
  2. Regardless of the partition assignment algorithm once a partition is assigned to a reader task, it will always be assigned to the same reader task after failover or restart until the parallelism changes. When parallelism changes, a completely new assignment will be generated globally.


KafkaSource need to implements SupportSplitReassingmentOnRecovery.

To properly manage split lifecycle and recovery, the Kafka SplitEnumerator follows a structured approach:

  1. Persist the Pending Splits in State.

  2. AddReader: The whole idea is that if the parallelism is not changed, will use same split assignment. Only when the parallelism is changed, redistribute the splits.

    1. Take splits from  ReaderInfo ofSplitEnumeratorContext.
    2. Remove the unmatched splits. This meaning that the source options remove a topic or table when restart.
    3. If the split’s owner matches the owner in globalSplitAssignment: Reassign the split to the same reader. Put into pendingPartitionSplitAssignment first then assigned them.
    4. If the split's owner differs from globalSplitAssignment  ignored.
  3. AddSplitBack: Add split into pendingPartitionSplitAssignment with previous reader. 



Next, I will display the whole contract of Kafka connector to show why design like this.

The Whole Contract of Kafka connector 

1. Reader Registration on Global Restart

When a reader starts (SourceOperator#open), it sends a ReaderRegistrationEvent to the SplitEnumerator.

If Source supports splits reassignment on recovery, the event includes the splits previously assigned to this reader (restored from checkpointed state). In this case, these splits are not immediately assigned back to reader — only the SplitEnumerator has the authority to reassign them during recovery.


2. Split Reassignment In Enumerator

Upon receiving split reports from readers:

  • The SourceCoordinator will add reader to SplitEnumerator (SplitEnumerator#addReader).
  • The enumerator get the splits from ReaderInfo,  then determines how to redistribute these splits (e.g., balancing load, filtering invalid ones).


Underly Problem:

However, there are some case will cause split loss. For example, consider the following sequence:

1. reader A reports splits (1 and 2) up on restart.

2. enumerator receives the report but still not assign split 1 and 2.

3. A checkpoint or savepoint is done. Reader A now only has empty state.

4. The whole job is restarted from latest checkpoint, reader A reports empty splits.Then split 1 and 2 are loss.

Solution:

To prevent this, the enumerator should be responsible for preserving unassigned splits in state for recovery. This ensures that no split is silently dropped before being durably recorded. 


3. Single Task failover if the first checkpoint has not finished after global restart.

If Reader A restarts before the first checkpoint completes after a global restart, it will re-report its split from the savepoint.
Moreover, splits(from SplitAssignmentTracker) that were reassigned to Reader A but not yet persisted will also be added back to the enumerator.


Underly Problem:

Another specific edge case occurs when only one reader restarts:

  1. Initial state: Reader 1 reports splits (1, 2).

    substask

    1

     2

    partitions of topic 1

    1,2


  2. Enumerator action: Assigns split 1  to Reader1, Assigns split 2  to Reader2

    substask

    1

     2

    partitions of topic 1

    1

    2

  3. Failure scenario: Reader A fails before checkpointing. Split 1 will be added back to enumerator.

    substask

    1

     2

    partitions of topic 1

    1,1

    2

  4. Recovery issue: Upon recovery, Reader A re-reports splits (1,2). The split 1 will be added to Reader A twice and split 2 will be added to Reader B again.

    substask

    1

     2

    partitions of topic 1

    1,1

    2,2


Solution 

To solve this problem , introduce a globalSplitAssignment to track assignment after restart.

When the split enumerator receives a split, this split must already exist in globalSplitAssignment.

  • If the split’s owner matches the owner in globalSplitAssignment:  Reassign the split to the same reader.
  • If the split’s owner differs from the owner in globalSplitAssignment: This indicates the split was reassigned to another reader; ignore it.


Take  Problem for  example,

1 . Initial state: Reader 1 reports splits (1, 2).

2. Enumerator action: Assigns split 1  to Reader1, Assigns split 2  to Reader2

3. Failure scenario: Reader A fails before checkpointing. Split 1 will be added back to enumerator.

Then split 1 will be put into pendingSplitAssignment, and wait for reader 1 back.

4. Recovery issue: Upon recovery, Reader 1 re-reports splits (1,2).When Reader A restarts and re-reports splits 1 and 2,

  • Split 1: Its taskId differs from globalSplitAssignment  ignored.
  • Split 2:  Its taskId same as globalSplitAssignment → reassign it to Reader1.(pendingSplitAssignment will remove duplicates)

5. Finally, problem is solved.

4.  Single Task failover after the first checkpoint finished.

In this situation, these splits must not be reassigned to other tasks, as this would reintroduce imbalance.

Underly Problem

Take Round-Robin Strategy as example

  1. Initial state:: 2 parallelism, 2 splits. 
  2. Enumerator action:  Split 1 → Reader 1, and the first checkpoint is done. Then Split 2 → Reader 2 .  
  3. Failure scenario: After Split 2 is assigned to Reader , but before the second checkpoint is done, Reader 2 restarts.  Then split 2 will be added back to enumerator, and round-robin strategy assigns Split 2 to Reader 1.
  4. Recovery issue: Reader 2 restart with empty split to report. Finally,  Reader 1 now has 2 splits, Reader 2 has 0 → Imbalanced distribution.
Solution:

Use same strategy as Single Task failover if the first checkpoint has not finished after global restart. Take the problem as example.

1 . Initial state:: 2 parallelism, 2 splits. 

2. Enumerator action:  Split 1 → Reader 1, and the first checkpoint is done. Then Split 2 → Reader 2 .  

3. Failure scenario: After Split 2 is assigned to Reader , but before the second checkpoint is done, Reader 2 restarts. Split 2 will be added back to enumerator.

Then split 2 will be put into pendingSplitAssignment, and wait for reader 2 back.

4. Recovery: Reader 2 restart with empty split to report. Then will add split2 to Reader2 back from pendingSplitAssignment.


5. The SourceCoordinator sending the first checkpoint request.

What if the job is checkpointed before the source coordinator has received all registrations?Will the split be loss?
Fortunately, the current contract between the reader and enumerator ensures that this scenario cannot occur. This design significantly reduces overall complexity.


The SourceCoordinator always receives split reports before any subsequent checkpoint request. This ordering is guaranteed by two mechanisms:

  • The task status update (e.g., RUNNING) and ReaderRegistrationEvent share the same RPC channel. On the JobManager side, the split report is processed before the status update.

  • The Checkpoint Coordinator only triggers a checkpoint after all tasks have transitioned to RUNNING state (see DefaultCheckpointPlanCalculator#calculateCheckpointPlan).

This ensures that the enumerator has visibility into all restored splits view before the first checkpoint is initiated.

Public Interfaces

Source Base

ReaderRegistrationEvent

Add assigned split  tracking to ReaderRegistrationEvent. 

 /**
 * The SourceOperator should always send the ReaderRegistrationEvent with the
 * `reportedSplitsOnRegistration` list. But it will not add the splits to readers if {@link
 * org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery} is implemented.
 */
public class ReaderRegistrationEvent implements OperatorEvent {

    private static final long serialVersionUID = 1L;

    private final int subtaskId;
    private final String location;
    private final ArrayList<byte[]> splits;

    public ReaderRegistrationEvent(int subtaskId, String location) {
        this.subtaskId = subtaskId;
        this.location = location;
        this.splits = new ArrayList<>();
    }

    ReaderRegistrationEvent(int subtaskId, String location, ArrayList<byte[]> splits) {
        this.subtaskId = subtaskId;
        this.location = location;
        this.splits = splits;
    }

    public static <SplitT extends SourceSplit>
            ReaderRegistrationEvent createReaderRegistrationEvent(
                    int subtaskId,
                    String location,
                    List<SplitT> splits,
                    SimpleVersionedSerializer<SplitT> splitSerializer)
                    throws IOException {
        ArrayList<byte[]> result = new ArrayList<>();
        for (SplitT split : splits) {
            result.add(splitSerializer.serialize(split));
        }
        return new ReaderRegistrationEvent(subtaskId, location, result);
    }

    public <SplitT extends SourceSplit> List<SplitT> splits(
            SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
        if (splits.isEmpty()) {
            return Collections.emptyList();
        }
        List<SplitT> result = new ArrayList<>(splits.size());
        for (byte[] serializedSplit : splits) {
            result.add(splitSerializer.deserialize(splitSerializer.getVersion(), serializedSplit));
        }
        return result;
    }

    public int subtaskId() {
        return subtaskId;
    }

    public String location() {
        return location;
    }

    @Override
    public String toString() {
        return String.format(
                "ReaderRegistrationEvent[subtaskId = %d, location = %s)", subtaskId, location);
    }
}


SupportSplitReassignmentOnRecovery

Add a SupportSplitReassignmentOnRecovery for source which need to report splits to enumerator and receive reassignment.

 /**
 * A decorative interface {@link Source}.
 * Implementing this interface indicates that the source operator needs to report splits to the enumerator and receive reassignment.
 */
public interface SupportSplitReassignmentOnRecovery {
}


ReaderInfo

Add reportedSplitsOnRegistration to ReaderInfo.Note that only provided non-empty value when source implements SupportsSplitReassignmentOnRecovery.

/**
 * A container class hosting the information of a {@link SourceReader}.
 *
 * <p> The SourceOperator should always send the ReaderRegistrationEvent with the `reportedSplitsOnRegistration` list. But it will not add the splits to readers if `SupportSplitReassignmentOnRecovery` is implemented.
 */
@Public
public final class ReaderInfo implements Serializable {

    private static final long serialVersionUID = 1L;

    private final int subtaskId;
    private final String location;
    private final List<SourceSplit> reportedSplitsOnRegistration;

    public ReaderInfo(int subtaskId, String location) {
        this(subtaskId, location, Collections.emptyList());
    }

    ReaderInfo(int subtaskId, String location, List<SourceSplit> splits) {
        this.subtaskId = subtaskId;
        this.location = location;
        this.reportedSplitsOnRegistration = splits;
    }

    @SuppressWarnings("unchecked")
    public static <SplitT extends SourceSplit> ReaderInfo createReaderInfo(
            int subtaskId, String location, List<SplitT> splits) {
        return new ReaderInfo(subtaskId, location, (List<SourceSplit>) splits);
    }

    @SuppressWarnings("unchecked")
    public <SplitT extends SourceSplit> List<SplitT> getReportedSplitsOnRegistration() {
        return (List<SplitT>) reportedSplitsOnRegistration;
    }

    /**
     * @return the ID of the subtask that runs the source reader.
     */
    public int getSubtaskId() {
        return subtaskId;
    }

    /**
     * @return the location of the subtask that runs this source reader.
     */
    public String getLocation() {
        return location;
    }

    @Override
    public boolean equals(Object o) {
        if (!(o instanceof ReaderInfo)) {
            return false;
        }
        ReaderInfo that = (ReaderInfo) o;
        return subtaskId == that.subtaskId
                && Objects.equals(location, that.location)
                && Objects.equals(reportedSplitsOnRegistration, that.reportedSplitsOnRegistration);
    }

    @Override
    public int hashCode() {
        return Objects.hash(subtaskId, location, reportedSplitsOnRegistration);
    }
}
  


Kafka connector

Add new config option:scan.split-assigner

Add scan.split-assigner with two choices: hash and round-robin.

     public static final ConfigOption<SplitAssignerType> SCAN_SPLIT_ASSIGNER =
            ConfigOptions.key("scan.split-assigner")
                    .enumType(SplitAssignerType.class)
                    .defaultValue(SplitAssignerType.HASH)
                    .withDescription(Description.builder()
                            .text("The strategy for assigning splits to source readers. Valid options are:")
                            .list(
                                    text("'hash' - Distributes splits by topic hash, ensuring splits from the same topic are evenly assigned across readers. "
                                            + "However, overall distribution may be imbalanced when topics have different partition counts."),
                                    text("'round-robin' - Splits are assigned in a round-robin fashion, ensuring that all splits are evenly distributed across readers regardless of topic.\n"
                                            + "However, splits from the same topic may not be evenly distributed— they can end up scattered across different readers.\n"
                                            + "If data throughput varies significantly across topics, this can lead to runtime load imbalance, even if the number of splits is balanced.")
                            )
                            .build());


public enum SplitAssignerType {
    HASH("hash"),

    ROUND_ROBIN("round-robin");


    private final String value;

    SplitAssignerType(String value) {
        this.value = value;
    }

    public String getValue() {
        return value;
    }

}



KafkaSourceEnumState

add a pendingSplits field in KafkaSourceEnumState to persistently track splits that have been discovered but not yet assigned. This ensures they are not lost during failures.

/** The state of Kafka source enumerator. */
@Internal
public class KafkaSourceEnumState {
    /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
    private final Set<TopicPartitionAndAssignmentStatus> partitions;
    /**
     * this flag will be marked as true if initial partitions are discovered after enumerator
     * starts.
     */
    private final boolean initialDiscoveryFinished;           

    /**
     * The splits that have been fetched from Kafka but not yet assigned to readers.
     */
    private final Set<KafkaPartitionSplit> pendingSplits;

    /**
     * The discovered and initialized partition splits that are waiting for owner reader to be
     * ready.
     */
    private final Map<Integer, Set<KafkaPartitionSplit>> pendingPartitionSplitAssignment;
    private final Map<KafkaPartitionSplit, Integer> globalPartitionSplitAssignment;       



public KafkaSourceEnumState(
            Set<TopicPartitionAndAssignmentStatus> partitions,
            boolean initialDiscoveryFinished,
            Set<KafkaPartitionSplit> pendingSplits) {
        this.partitions = partitions;
        this.initialDiscoveryFinished = initialDiscoveryFinished;
       this.pendingSplits = pendingSplits;
    }


    public KafkaSourceEnumState(
            Set<TopicPartition> assignPartitions,
            Set<TopicPartition> unassignedInitialPartitions,
            Set<KafkaPartitionSplit> pendingSplits,
            boolean initialDiscoveryFinished) {}
}


Compatibility, Deprecation, and Migration Plan

None.


Rejected Alternatives

Enumerator doen't reassign the split accross split.

The proposed Registration Process is as follows:

  1. Reader Registration with Deferred Assignment
    When a reader starts (SourceOperator#open), it sends a ReaderRegistrationEvent to the SplitEnumerator, including its previously assigned splits (restored from state). However, these splits are not yet assigned to the reader. The SourceOperator is placed in a PENDING state.

  2. Prevent State Pollution During Registration
    While in the PENDING state, SourceOperator#snapshotState will not update the operator state. This prevents empty or outdated reader state (e.g., with removed splits) from polluting the checkpoint.

  3. Enumerator Performs Split Cleanup and Acknowledges
    Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes any splits that are no longer valid (e.g., due to removed topics or tables) and returns the list of remaining valid split IDs to the reader via a ReaderRegistrationACKEvent.
    For backward compatibility, the default behavior is to return all split IDs (i.e., no filtering).

  4. Finalize Registration and Resume Normal Operation
    When the SourceOperator receives the ReaderRegistrationACKEvent, it assigns the confirmed splits to the reader and transitions its state to REGISTERED. From this point onward, SourceOperator#snapshotState can safely update the operator state.

Limitations: Reassignment is not supported for already assigned splits (enumerator can only assign to the least assigned reader)

Advantage: Reduce resource cost of uploading complete split info for large datasets.


Change The scope of AddSplitBack

Add splits back to the split enumerator. This will only happen is three situations: when a SourceReader fails and there are splits assigned to it after the last successful checkpoint. when a SourceReader fails and there are splits assigned to it before the last successful checkpoint. when a global restart, there are splits from the stored reader savepoint or checkpoint.


The problem we are trying to solve here is to give the splits back to the SplitEnumerator. There are only two types of splits to give back: 
1) splits whose assignment has been checkpointed. - In this case, we rely on addReader() + SplitEnumeratorContext to give the splits back, this provides more information associated with those splits.
2) splits whose assignment has not been checkpointed. -  In this case, we use addSplitsBack(), there is no reader info to give because the previous assignment did not take effect to begin with.

From the SplitEnumerator implementation perspective, the contract is straightforward.
1. The SplitEnumerator is the source of truth for assignment.
2. When the enumerator receives the addSplits() call, it always add these splits back to unassigned splits / pending splits.
3. When the enumerator receives the addReader() call, that means the reader has no current assignment, and has returned its previous assignment based on the reader side info. The SplitEnumerator checks the SplitEnumeratorContext to retrieve the returned splits from that reader (i.e. previous assignment) and handle them according to its own source of truth knowledge of assignment - Ignore a returned split if it has been assigned to a different reader, otherwise put it back to unassigned splits / pending splits. Then the enumerator assigns new splits to the newly added reader, which may use the previous assignment as a reference. This should work regardless of whether it is a global failover, partial failover, restart, etc. There is no need for the SplitEnumerator to distinguish what failover scenario it is.


  • No labels