Current state: Draft
Discussion thread: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Consumer semantics is very useful for distributed data processing in Kafka, however the granularity of parallelism doesn’t sometimes satisfy the business need. To avoid peak traffic affecting production, Kafka users would normally do the capacity planning beforehand to allow 5X ~ 10X future traffic increase. This aims to avoid hitting the scalability bottleneck at the best effort, but still by chance that eventually the traffic goes beyond the original planning (which may be a good thing ). One solution people have considered is to do online partition expanding. The proposal was not continuing to evolve due to its complexity and operation overhead. A second painful option is to switch input topic on the fly. To do that, job owner needs to feed a new topic with more number of partitions, setup a mirrored job and make sure the metrics are aligned for the A/B jobs, eventually make the switch. As of today, this whole process is manual, cumbersome and error-prone for most companies that have critical SLAs.
In the infra cost perspective, pre-define an impractical high number of partitions will definitely increase the network traffic as more metadata and replication will be needed. Besides extra money paid, the operation overhead increases while maintaining broker cluster in good shape with more topic partitions beyond necessity. It's been a known pain point for Kafka streaming processing scalability which is of great value to be resolved, as the number of processing tasks match the number of input partitions. Therefore, Kafka Streams jobs' overhead increases with the number of partition increase at the same time.
Also in reality for use cases such as Kafka Streams, online partition expansion is impossible for stateful operations, as the number of partition for changelog/repartition topics are fixed. This means we couldn't change the input partitions gracefully once the job is up and running. To be able to scale up, we have to cleanup any internal topics to continue processing, which means extra downtime and data loss. By scaling on the client side directly, the operation becomes much easier.
Today most consumer based processing model honors the partition level ordering. However, ETL operations such as join, aggregation and so on are per-key level, so the relative order across different keys does not require to be maintained, except for user customized operations. Many organizations are paying more system guarantee than what they actually need.
The proposal here, is to decouple the consumption threads and physical partitions count, by making consumers capable of collaborating on individual topic partition. There are a couple of benefits compared with existing model:
- Data consume and produce scales are no longer coupled. This means we could save money by configuring a reasonable topic with decent amount of partitions to save cost.
- Better avoid partition level hotkeys. When a specific key is processing really slow, the decoupled key based consumption could bypass it and make progress on other keys.
No operation overhead for scaling out in extreme cases. Users just need to add more consumer/stream capacity to unblock even there are a few partitions available.
We want to clarify beforehand that this KIP would be a starting point of a transformational change on the consumer consumption semantics. It's not possible to have all the design details rolling out in one shot. Instead, the focus is to define a clear roadmap of what things need to be done, and illustrate the long term plan while getting some concrete tasks in starting steps. There will also be follow-up KIPs and design docs, so stay tuned.
As stated above, the scaling cap for consumer based application is the number of input partitions. In an extreme scenario when there is one single input partition with two consumers, one consumer must remain idle. If the single box consumer could not keep up the speed of processing, there is no other solution but lagging. It would be ideal we could co-process data within one partition by two consumers when the partition level order is not required, such that we could add as many consumer instances as we want.
So this cooperative consumption model applies with following limitations:
- The bottleneck is on the application processing, not data consumption throughput which could be caused by other reasons such as network saturation of broker.
- The processing semantic does not require partition level order, otherwise only one consumer could work on the input sequentially without parallelism.
For pt 2, there are also different requirements for stateless and stateful operations, such as whether key level ordering is required or not. Naturally speaking, with a requirement of key level ordering, the broker needs to allocate the same message key to the same consumer within one generation as a must. For stateless operation which doesn't care about the ordering at all, broker could just do a round robin assignment of records to fetch requests.
We would start from supporting a new offset commit semantics as this unblocks the potential to process data regardless of partition level ordering. The stateful operation support is of more value in nature, so concurrently we could add supports on key based filtering with fetch calls. The optimized stateless and transactional supports have to be built on top of the first two steps.
|Individual commit||Create a generic offset commit model beyond current partition → single offset mapping.||No|
|Key filtering based fetch||Add capability to FetchRequest with specific hashed key range or specific keys||No|
|Rebalance support for concurrent assignment||Add assignor support to allow splitting single topic partition with different hash range||Key based filtering|
|Transactional support||Incorporate individual commit into the transaction processing model||Key based filtering|
|Support cooperative fetch on broker||Round robin assign data to cooperative consumer fetches when there is no key-range specified||Rebalance support|
High Level Design
*** Note that we will use term IC to refer to the individual commit for followup discussion.
The logic behind individual commit is very straightforward. For sequential commit each topic partition will only point to one offset number, while under individual commit it is possible to have "gaps" in between. The "stable offset" is a borrowed concept from transaction semantic which is just for illustration purpose, whose purpose is to mark the position where all the messages before it are already committed. The `IC` blocks refer to the individual commits that are marking records already processed.
In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. The log needs to be highly compacted to avoid disk waste, which is different requirement from the existing consumer offset topic. Thus, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. Furthermore, this isolation should make the IC feature rollout more controllable by avoiding messing up stable offset in primary and achieve at-least-once when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.
The offset commit and offset fetch workflow will be changed under individual commit mode.
The IC offset will be appended into corresponding offset log as normal, however the broker shall maintain an in-memory state to keep track of the individual commits that are written but not yet cleaned up. To eventually compact this topic, each IC record will go through the coordinator memory to check whether there could be records deleted. In the illustration scenario,
- If there is an IC [48-49] then we no longer need to maintain IC ranges [45-47] and [50-50]. We shall append 2 null records to delete [45-47] and [50-50] and augment the original IC into [45-50]
- If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
- Push stable offset to 47 in the primary offset topic
- Append a record to remove IC [45-47] in the IC offset topic
There is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. For the sake of smooth progress, we define a new config on the max ranges being collected before triggering a commit call.
Also we need to take care of the memory bound for maintaining the individual offsets. If there are too many of them, we eventually couldn't store all in the heap. One solution is to block progress of the active group when the number of IC offsets are too large, which usually indicates a dead member within the group. The coordinator will callout rebalance in this extreme case to make sure progress could be made again.
During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there is any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.
Imagine a scenario when a consumer fetches based on below setup:
Offset: stable: 40, IC: [43-45], [48-49]
Fetched range: [40-50]
Consumer will commit only the processed offsets:
[41-42], [46-47], [50-50]
which matches the offset commit semantic on always committing offsets that are processed within current batch.
In order to cleanup unnecessary IC offsets, the OffsetCommitResponse shall also contain the information about the latest stable offset, so that consumers could safely delete any IC offsets that fall behind stable offset.
Key Based Filtering
In order to allow sharing between the consumers, broker has to be able to distinguish data records by keys. For a generic purpose, the easy way to assign records by keys is to get a hashing range and allow range split when having multiple consumers talking to the same partition. This would be our starting point to support data sharing between consumers. Suppose we do a key hashing to a space of [0, Long.MAX] while two consumers are assigned to the same partition, they will each get key range [0, Long.MAX/2 - 1] and [Long.MAX/2, Long.MAX]. This hash range assignment decision will be made during rebalance phase.
With the key based hashing, brokers will load topic partitions data into the main memory and do the partitioning work based on the key hashed range. When a FetchRequest comes with specific key range, broker will only reply the ranges that meet the request need. This workflow could be implemented as a generic server side filtering, and KIP-283 is already a good example where we also attempt to load data into memory for down-conversion.
There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run. In the near term, we could consider cache the filter results of one batch of data so that when other fetch requests fall into this range, we could serve in-memory result instead of doing a repetitive fetch. For example, if a fetch request coming in to ask for range [40-50], but only [40, 44] are satisfying its hash range. The result of [45, 49] will be saved as message keys, available for next fetch request instead of evicting immediately. We also recommend a config to control how much memory we would like to spend on this feature, which is called max.bytes.key.range.hashing. When the cache is sufficient with space and the Fetch request is hanging for new data, producer results could write directly into the cache and do the filtering in order to further save the disk read time.
Also user could opt to choose either load the message key or the entire message into the main memory depending on the understanding of business needs since the variation of message value could be huge. This option would be introduced as a topic level config. Either way, once a cached result is being sent out, it will be evicted immediately to leave room for the current or future key range filter results.
With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. By leveraging cloud operation technology such as Kubernetes, user could generate a set of pod configurations that defines which consumer subscribing to which topic partition range, and do easy hot swap when certain nodes go down. We shall discuss the consumer API changes to support this feature in the public interface section.
Thinking in even longer term, this work could be more generic for broker side filtering. Client could specify: 1. key ranges, 2. blacklist keys, 3. whitelist keys, 4. max/min record size, etc to further reduce the network cost with a penalty on throughput and latency.
Rebalance support for concurrent assignment
To satisfy different administration requirements to turn off this feature at any time, we are placing several configurations on:
- Broker level, to determine whether to allow IC request,
- Topic level, to determine whether this topic is allowed to be cooperatively processed
- Consumer level, to determine if this consumer opts to share processing with someone else
To determine the hash range for every consumer generation, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers that are opting into the key sharing mode. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of consumers. If 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:
N = Long.MAX
M1: tp1[0, N/2 - 1],
M2: tp2[0, N/2 - 1],
M4: tp1[N/2, N]
M5: tp2[N/2, N]
The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:
tp1: M1, M4
tp2: M2, M5
The assigned ranges could then be used by the consumer to make their fetch more fruitful. We don't plan to introduce a separate assignor strategy, but instead we would like to rollout this strategy as a plugin to the existing assignment assignors. As of today the ConsumerPartitionAssignor interface looks like below:
Take round robin assignor as an example while merging the restriction requirements, an abstract assignor that could take partition split into consideration looks like below:
Similarly for other assignment strategy such as range assignor, we always attempt to do a `reverse-assign` when consumers out-number any topic's total partitions.
This means for 5 consumers subscribing to 2 topics with 2 and 3 partitions each, the final assignment will look like (in partition perspective):
t1p0: c1, c2, c3
t1p1: c4, c5
t2p0: c1, c2
t2p1: c3, c4
This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.
Supporting Kafka Streams Rebalance
Kafka Streams have built a customized partition assignment semantics. For stateful DSL operations, there is a subtle requirement to make sure that the change in key distribution under the same partition will not encounter `key not found` error on the stream instance state. An example is like two stream instances A, B subscribing to one topic partition and do a per key aggregation. Record key m belongs to instance A for the first generation, however gets reassigned to instance B in the second generation. How does instance B know the previous states of record m? The answer is to replay the changelog topics. So in order to make Streams work with IC semantics, we could not split the stream changelogs because that would make it hard to maintain the key mapping. If multiple stream threads subscribing to the same partition, the changelog shall not be split. When the stream thread realizes a key range changes during rebalance, the restore consumer shall apply the new standalone mode key range assignment where the key based filtering rule could also be applied during restoration.
One more detail on stream is about task definition. The task will still be associated with partition number, but internally we mark it as a "Shared" task and put the key range into it. During rebalance, we will try to group shared tasks associating based on their partition and assign them onto the same stream instance, and the state store for each topic partition could allow concurrent access by shared tasks.
For example, if we have 2 partitions tp-0, tp-1 and 2 stream instances, with each stream instance configuring with 2 threads. In total, we have 4 consumers subscribing to them, and create four shared tasks:
task-0-[0, N/2 - 1],
task-1-[0, N/2 - 1],
task-1-[N/2 , N],
Normally for non-shared tasks, we would create two local state stores state-store-0, state-store-1. During rebalance, we will target to make the following assignments:
M1: task-0-[0, N/2 - 1], task-0-[N/2, N]
M2: task-1-[0, N/2 - 1], task-1-[N/2, N]
The benefit of arranging such assignment is to reduce state store creation. If unfortunately we have an assignment such like:
M1: task-0-[0, N/2 - 1], task-0-[N/2, N]
M2: task-1-[0, N/2 - 1], task-1-[N/2, N]
We need to create 2 state stores on each machine, which in total is 4.
Another issue will be on the interactive query, as a specific record key may be assigned to any shared task. So StreamsMetadata needs
Look into the future: Transactional Support
Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. The current transaction model uses transaction markers to specify whether all the records before it are ready to be revealed in downstream. So inherently, this is a design that obeys partition level ordering. To avoid over-complicating the transaction semantic, we will introduce a new delayed queue or purgatory for transactional IC semantic, where the commit shall be blocked until it could hit the stable offset. Take Kafka Streams as an example, if we have a chunk of data [40-49] with stable offset at 39, with two stream threads A and B are turning on EOS at the same time in the sharing mode:
- Stream thread A did a fetch to get [40, 44]
- Stream thread B did a fetch to get [45, 49]
- Stream thread B finished processing and issue a transactional commit of [45, 49]
- As the offset range [40, 44] is held by someone at the moment, transactional commit request will be put in a purgatory to wait for stable offset advance
- Stream thread A finished processing and issue a transactional commit of [40, 44]
- We first advance the stable offset and reply stream thread A with successful commit
- Then we search purgatory to reply stream thread B and allow it to proceed
This high level workflow is blocking in nature, due to the nature of Kafka transaction model. Another approach is to add the current transaction model capability to track key level ordering which is also very tricky. The design is still open to discussion and may change as the Kafka transaction model evolves.
Look into the future: Cooperative Fetch
Take a look back at the stateless operations like filter or map, there is no necessity to honor the consumer → key mappings during the processing. From KIP-283, we already know it's very costly and inefficient to copy data around. Based on client's need, broker could do a random assignment when receiving fetch requests without key level granularity. It will keep track of who has been fetched to which position. This means we don't need to load any data into the main memory and the total of IC offsets will be significantly dropped.
For example if we have a chunk of data [40-49], where offsets 41, 43, 45, 47, 49 belong to key A and 40, 42, 44, 46, 48 belong to key B, consumers owning key A will commit 5 IC offsets, same as key B owner. If we don't maintain that mapping, for consumer fetching we will first return data range [40-44] and advance the in-memory marker, and then reply [45, 49] even potentially this consumer is trying to fetch starting from offset 40, with the hope that someone else will take care of [40-44] eventually.
Cooperative fetch is more like an optimization upon our stateless use case scenario, instead of a new feature. To make it right, we have to encode more metadata within the consumer fetch request such as consumer generation and consumer id, wisely refusing advancing the marker unless necessary when 1. the consumer is zombie, 2. the same consumer is retrying the fetch 3. session timeout. And there should also be a max wait time for commit gaps so that we don't lose the chance to process the data when consumer whoever did the fetch encounters hard failures already. It could be set the same as consumer configured session timeout.
The design is still open to discussion as the trade-off between improvements over complexity is still not clear.
The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges indicating the acknowledged messages, and in the response a LastStableOffset will be added to let consumer purge local IC offsets.
When the offset range is not empty, broker will only handle offset ranges and ignore the plain offset field. Optionally broker will check if the original offset field is set to -1 to make sure there is no data corruption in the request. Trivially, transaction offset commit will also include the offset range.
The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility.
We shall also put offset ranges as part of OffsetFetchResponse:
To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:
To recognize whether a consumer is allowed to subscribe as key-share, we shall put a new boolean field in Subscription to indicate that:
To leverage the key based filtering in a group subscription mode, we will also add callbacks the ConsumerRebalanceListener to populate key range assignments if necessary:
For better visibility, we shall include the key range information in StreamsMetadata:
For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResult:
We are going to define a series of exceptions which are thrown as 1. access control was hit, 2. inconsistency
Determine whether the group coordinator allows individual commit.
The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.
Default: 209715200 bytes
The maximum memory allowed for each replica fetcher to do general broker side filtering, including the key hash processing.
Default: 104857600 bytes
The segment size for the individual commit topic.
Default: 10485760 bytes
Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering.
Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization.
The segment size for the individual commit topic.
Default: 10485760 bytes
Determine whether this consumer will participate in a shared consumption mode with other consumers.
Commit the progress if accumulated offset ranges are beyond this number.
Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.
The blockers for us to implement a similar feature are:
- Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
- Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
- Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.
Compatibility, Deprecation, and Migration Plan
As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.
To upgrade an existing consumer use case to allow individual commit + key based filtering + rebalance support, the user needs to:
- Upgrade broker to latest, by default individual commit is turned on
- Allow the input topic to be shared by turning on the range fetch config
- Upgrade consumer client to allow range sharing
- Rolling bounce the group, and the consumer group should be able to share consumption of the topic
Failure Recovery and Test Plan
We discuss the failure scenario alongside with the test plan to better justify the value.
- Individual Commit failure. The failure could result from:
- Broker restriction. If this is the case, the client will immediately fail with restriction exception.
- IC log not found or corrupted. As the _consumed_offsets topic need to co-partition with IC log, we need to load the replica from ISR to the current coordinator, and force a leader election to nominate the new log as the leader.
- Commit too old: the IC offset is smaller than the current LSO. In this case, reply with the current LSO and client will adjust its position to catch up with latest.
- In at least once, commit in duplicate is not a serious flag. Brokers should not response a separate exception but will do the warning log for user to debug later.
- Range Fetch failure. The failure could result from:
- Topic restriction. The client will immediately crash with restriction exception.
- Other common fetch request errors like topic not found, shall be handled in the same way
- Rebalance support failure. The failure could result from:
- Consumer restriction: Consumer will reject an assignment with range fetch if it is not allowed to.
- Make sure the no assignment overlap is detected in system tests.
- Other common rebalance failure.
There are a couple of alternatives to this proposal here.
- KIP-253 proposed physical partition expansion, which is a fairly complex implementation and could be hard to reason about correctness.
- Some discussion around making Kafka Streams as a multi-threading model where consumers are completely decoupled from processing thread. This means we have to conquer the concurrent processing challenge and there could be more inherent work to redesign state store semantics too.