Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, users may want to increase Kafka partitions, eg:

  1. Increase in the number of consumers: If the number of consumers increases, existing partitions may not be able to meet their needs. In this case, adding partitions can more efficiently distribute consumer loads.
  2. Increase in data traffic: If data traffic increases, existing partitions may not be able to handle all the data. In this case, adding partitions can more efficiently handle data flow.
  3. Insufficient storage capacity: If storage capacity is insufficient to meet data retention periods, adding partitions can increase storage capacity.


When Kafka partitions are increased, users want to be able to dynamically discover new partitions without having to restart the Flink job. Although Flink already provides the ability of dynamic partition discovery, there are still some problems:

  1. Dynamic partition discovery is disabled by default, and users have to specify the interval of discovery in order to turn it on.
  2. The strategy used for new partitions is same as the initial offset strategy. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations. Other ploblems see final Section: User specifies OffsetsInitializer for new partition .
  3. Current JavaDoc does not display the whole behavior of OffsetsInitializers. There will be a different offset strategy when the partition does not meet the condition.(See final Section: User specifies OffsetsInitializer for new partition .).


Therefore, this Flip has there main objectives:

  1. Enable partition discovery by default.
  2. Provide a EARLIEST strategy for later discovered partitions.

  3. Organize the code logic of the current built-in OffsetsInitializer, then modify the JavaDoc to let users know.

 

Public Interfaces

JavaDoc of SpecifiedOffsetsInitializer

add this: "Use Specified offset for specified partitions while use commit offset or Earliest for unspecified partitions. Specified partition offset should be less than the latest offset, otherwise it will start from the earliest. "


JavaDoc of TimestampOffsetsInitializer

add this: "Initialize the offsets based on a timestamp. If the message meeting the requirement of the timestamp have not been produced to Kafka yet, just use the latest offset"


Kafka table source (Table / SQL API)

"scan.topic-partition-discovery.interval" will be set to 5 minutes by default, aligned with the default value of "metadata.max.age.ms" in Kafka consumer.

If users want to disable dynamic partition discovery,  they can set the discovery interval to 0. Can't use negative values here as the type of this option is Duration.

Kafka source (DataStream API)

"partition.discovery.interval.ms"  will be set to 5 minutes by default, with discovery interval set to 5 minutes.

Both negative and zero will be interpreted as disabling the feature.


KafkaSourceEnumerator

Add variable attributes unassignedInitialPartitons and initialDiscoveryFinished to KafkaSourceEnumerator, which will be explained in the Proposed Changes section.

KafkaSourceEnumerator
public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions,
    		 Set<TopicPartition> unassignedInitialPartitons,
             boolean initialDiscoveryFinished             ) {

}

    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness) {
        this(
                subscriber,
                startingOffsetInitializer,
                stoppingOffsetInitializer,
                properties,
                context,
                boundedness,
                Collections.emptySet(),
                Collections.emptySet(),
                true
            );
    }

 



KafkaSourceEnumState

Depends on the proposed changes.

public class KafkaSourceEnumState {
    private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
    private final boolean initialDiscoveryFinished;
}

class KafkaPartitionWithAssignStatus {
    private final TopicPartition topicPartition;
    private int assignStatus;
}

enum KafkaPartitionSplitAssignStatus{
    ASSIGNED,
    UNASSIGNED_INITIAL
}

 



Proposed Changes

Main Idea

Currently, the offset of newly discovered partitions is initialized by the user-specified OffsetsInitializer, which could lead to potential data loss (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).

As a consequence, we also propose changing the behavior to start consuming from the earliest offset for newly discovered partitions.

However, because the snapshot state currently only contains assignedPartitions (the assigned partitions) , it cannot distinguish between unassigned partitions as first-discovered or new, and therefore cannot use different offset strategies.

The main idea of this FLIP is that: the snapshot state contains new variables that distinguishes first-discovered or new partitions, so that the partitions can be distinguished during a restart.


Note: The current design only applies to cases where all existing partitions can be discovered at once. If all old partitions cannot be discovered at once, the subsequent old partitions discovered will be treated as new partitions, leading to message duplication. Therefore, this point needs to be particularly noted.


Add unassignedInitialPartitons and initialDiscoveryFinished to snapshot state

Add the unassignedInitialPartitons(collection) and initialDiscoveryFinished(boolean) to the snapshot state. unassignedInitialPartitons represents the collection of first-discovered partitions that have not yet been assigned. firstDiscoveryDone represents whether the first-discovery has been done.

The reason for these two attributes is explained below:

Why do we need the unassignedInitialPartitons collection? Because we cannot guarantee that all first-discovery partitions will be assigned before checkpointing. After a restart, all partitions will be re-discovered, but only whether the partition has been assigned can be determined now, not whether the unassigned partition is a first-discovered partition or a new partition. Therefore, unassignedInitialPartitons is needed to represent the first-discovered partitions that have not been assigned, which should be an empty set during normal operation and will not take up much storage.

Why do we need initialDiscoveryFinished? Only relying on the unassignedInitialPartitons attribute cannot distinguish between the following two cases (which often occur in pattern mode):

  • The first partition discovery is so slow, before which the checkpoint is executed and then job is restarted . At this time, the restored unassignedInitialPartitons is an empty set, which means non-discovery. The next discovery will be treated as first discovery.
  • The first time the partition is discovered is empty, and new partitions can only be found after multiple partition discoveries. If a restart occurs between this period, the restored unassignedInitialPartitons is also an empty set, which means empty-discovery.The next discovery will be treated as new discovery.

Now that the partitions must be greater than 0 when creating topics, when the second case will occur? There are three ways to specify partitions in Kafka: by topic, by partition, and by matching the topic with a regular expression. Currently, if the initial partition number is 0, an error will occur for the first two methods. However, when using a regular expression to match topics, it is allowed to have 0 matched topics.


The following is a modification of KafkaSourceEnumState:

public class KafkaSourceEnumState {
    private final Set<TopicPartition> assignedPartitions;
	  private final Set<TopicPartition> unassignedInitialPartitons;
    private final boolean initialDiscoveryFinished;
}



 Merge assignedPartitions and unassignedInitialPartitons collections

Following the previous plan, the snapshot state needs to save both assignedPartitions and unassignedInitialPartitons collections. Item of both collections are TopicPartition, but with different status. Therefore, we can use one collection to pull all them in.

Adding a status would make it easier to extend the process of partitions discovery through additional rules in the future.

Every Kafka partition in the Kafka snapshot state will have a "type" attribute indicating whether it is assigned or initial:

  • assigned: the partition has already been assigned to the reader , which can be restored as assignedPartition.
  • unassignedInitial: unassigned part of the first-discovered partition, which can be restored as unAssignedInitialPartitions.



The following is a modification of KafkaSourceEnumState:

public class KafkaSourceEnumState {
    private final Set<KafkaPartitionWithAssignStatus> assignedAndInitialPartitions;
    private final boolean initialDiscoveryFinished;
}

class KafkaPartitionWithAssignStatus {
    private final TopicPartition topicPartition;
    private int assignStatus;
}

enum KafkaPartitionSplitAssignStatus{
    ASSIGNED,
    UNASSIGNED_INITIAL
}




Overhead of Partition Discovery in Kafka Source

Partition discovery is performed on the KafkaSourceEnumerator, which asynchronously fetches topic metadata from the Kafka cluster and checks if there are any new topics and partitions. This should not cause performance issues on the Flink side.

On the Kafka broker side, partition discovery sends a MetadataRequest to the Kafka broker to fetch topic information. Considering that the Kafka broker has its metadata cache and the default request frequency is relatively low (once every 30 seconds), this is not a heavy operation, and the broker's performance will not be significantly affected. It would also be great to get some inputs from Kafka experts.



Compatibility, Deprecation, and Migration Plan

Changing KafkaSourceEnumState also requires changing the corresponding serialization and deserialization methods. Compatibility issues should be taken into consideration during deserializing. Thus,  pay attentions to KafkaSourceEnumStateSerializer.



Test Plan

This feature will be guarded by unit tests and integration tests in the Kafka connector's source code.


Rejected Alternatives

Initially the default discovery interval is set to 30 seconds

Initially the default discovery interval is set to 30 seconds. This interval is relatively too short comparing with the default value of its equivalent option "metadata.max.age.ms" in Kafka consumer. We adjust it to 5 minutes to align with the default of "metadata.max.age.ms". 



Ensure that the first partition obtained is assigned before saving the snapshot

If we can guarantee that the first-discovered partition is assigned before checkpointing, all partitions discovered after restarting will be new partitions.

The implementation is to change the KafkaSourceEnumerator's snapshotState method to a blocking one, which resumes only after the first-discovered partition has been successfully obtained and assigned to KafkaSourceReader.


Advantage: No need to change the snapshot state variable values.

Disadvantage: The partition discovery, partition assignment, and checkpoint operation are coupled together. if the partition is not assigned before checkpointing, the SourceCoordinator's event-loop thread will be blocked, but partition assignment also requires the event-loop thread to execute, which will cause thread self-locking. Therefore, the existing thread model needs to be changed, which is more complicated.


Alternative to the initialDiscoveryFinished variable

As mentioned before, the reason for introducing firstDiscoveryDone is that, after restarting, if the unAssignedInitialPartitions in the snapshot state is empty, it cannot be determined whether it’s because of non-discovery or empty-discovery.

Then, if we change the first discovery method to a synchronous method, we can ensure that it has been done before checkpointing. Because when the event-loop thread starts, it first adds a discovery event to the blocking queue. When it turns to execute the checkpoint event, the partition has already been discovered successfully.

However, this method is only to reduce one boolean variable in the snapshot state, but the event-loop thread will be blocked for this purpose, which is not worth it. Partition discovery can be a heavily time-consuming operation, especially when pattern matching a large number of topics. In this case, the SourceCoordinator cannot process other event operations during the waiting period, such as Reader registration.


User specifies OffsetsInitializer for new partition

Plan

The Add the OffsetsInitializer for the newly discovered partition. Its default value is EARLIEST. If the user needs to, they can adopt different strategies.

KafkaSourceBuilder
 public class KafkaSourceBuilder<OUT> {
  public KafkaSourceBuilder<OUT> setNewDiscoveryOffsets(
            OffsetsInitializer newDiscoveryOffsetsInitializer) {
        this.newDiscoveryOffsetsInitializer = newDiscoveryOffsetsInitializer;
        return this;
    }
}


Reject Reason

SpecifiedOffsetsInitializer

First, let's take a look at what happens in the current SpecifiedOffsetsInitializer. When use SpecifiedOffsetsInitializer, the following strategy is adopted for unspecified partitions(See SpecifiedOffsetsInitializer#getPartitionOffsets):

  1. Use committed offset first.However, new partitions don’t have committed offsets.
  2. If there is no committed offset, use Kafka's OffsetResetStrategy. Currently, Kafka's OffsetResetStrategy is set to Earliest. (See KafkaDynamicSource#createKafkaSource -> OffsetsInitializer#offsets)

That is, for specified partitions, use Specified offset, and for unspecified partitions, use Earliest.

For continuous streaming jobs, unspecified partitions are more common.

The problem of specified partitions is that the latest offset of a new partition is generally 0 or a small value when discovery, which is very unpredictable and unreasonable. If the specified offset is greater than that value, Kafka Client will consume according to the OffsetResetStrategy, that is EARLIEST (See org.apache.kafka.clients.consumer.internals.Fetcher#handleOffsetOutOfRange).

image.png

For example, if the Specified offset of topic1-1 is set to 10, when new partition-1 is discovered with offsets range of [0, 5], all messages in the range of [0, 5] will be consumed. When new partition-1 is discovered with offsets range of [0, 15], then consumption will begin from offset 10. If partition-2 has no specified offset, when new partition-2 is discovered with offsets range of [0, 15], all messages in the range of [0, 15] will be consumed.


Using "Earliest" ensures that the same behavior of consuming offsets for new partitions is maintained.

The specified offset strategy only affects existing partitions and can start from a certain point in middle of offset range . It should not be used for new partitions.



TimestampOffsetsInitializer 

Next, let's take a look at what happens in the current TimestampOffsetsInitializer. If the timestamp is too late, it will fail to hit the new partition’s message. In this case, the offset will be set to the latest offset (see TimestampOffsetsInitializer#getPartitionOffsets).

It would be difficult to estimate the time if the new partition uses timestamps. If we set the new partition as a timestamp during startup:

  • For the new partitions before this timestamp, they will be immediately adopted using the latest offset once discovered, instead of waiting until the timestamp. It also depends on how many messages are written between partition creation and discovery, which is very unpredictable and unreasonable.
  • For the new partitions created after this timestamp, it will be equivalent to the earliest.(For continuous streaming jobs, this is more common.)

For example, if the timestamp is set to tomorrow at 7:00, partitions created before 7:00 tomorrow will be consumed from the latest offset of discovered moment, while partitions created after 7:00 tomorrow will be consumed from the earliest offset.

The timestamp strategy only affects existing partitions and can start from a certain point in middle of time range. It should not be used for new partitions.


Conclusion

All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.

After analyzing the above,  using EARLIEST is the most reasonable option.

All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.

All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.

All these problems can be reproducible in the current version. The reason why they haven't been exposed is probably because users usually set the existing specified offset or timestamp, so it appears as earliest in production.





  • No labels