Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Motivation

We have collected the following requirements from our users which have driven this consumer client re-design project:

1. Central Coordination - We need to eliminate the complexity in the current consumer's rebalancing operation that leads to herd effect and split brain effect.

2. Thin Consumer Client - We need a light consumer client code base with minimum external dependencies (e.g. Zookeeper client) to easily re-implement that logic in different languages.

've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all.

  1. Thin consumer client:
    1. We have a lot of users who have expressed interest in using and writing non-java clients. Currently, this is pretty straightfoward for the SimpleConsumer but not for the high level consumer. The high level consumer does some complex failure detection and rebalancing, which is non-trivial to re-implement correctly.
    2. The goal is to have a very thin consumer client, with minimum dependencies to make this easy for the users.
  2. Central co-ordination :
    1. The current version of the high level consumer suffers from herd and split brain problems, where multiple consumers in a group run a distributed algorithm to agree on the same partition ownership decision. Due to different view of the zookeeper data, they run into conflicts that makes the rebalancing attempt fail. But there is no way for a consumer to verify if a rebalancing operation completed successfully on the entire group.
    2. This can be mediated by moving the failure detection and rebalancing logic to a centralized highly-available co-ordinator - https://cwiki.apache.org/confluence/display/KAFKA/Central+Consumer+Coordination

* We think the first two requirements are the prerequisite of the rest. So we have proceeded by trying to design a centralized coordinator for consumer rebalancing without Zookeeper, for details please read here.3.

  1. Manual partition assignment

...

    1. There are a number of stateful data systems would like to manually assign partitions to consumers.

...

    1. The main motive is to enable them to keep some local per-partition state

...

    1. since the mapping from their consumer to partition never changes. Examples of such systems are databases, search indexers

...

    1. etc
    2. A side effect of this requirement is wanting to turn off automatic rebalancing in the high level consumer.
    3. This feature depends on the central co-ordination feature since it is cannot be correctly and easily implemented with the current distributed co-ordination model.
  1. Manual offset management
    1. Some systems require offset management in a custom database, at specific intervals. Overall, the requirement is to have access to the message metadata like topic, partition, offset of the message, and to be able to provide per-partition offsets on consumer startup.
    2. This would require designing new consumer APIs that allow providing offsets on startup and return message metadata with the consumer iterator.
    3. One thing that needs to be thought through is if the consumer client can be allowed to pick manual offset management for some, but not all topics. One option is to allow the consumer to pick one offset management only. This could potentially make the API a bit simpler
    4. This feature depends on the central co-ordination feature since it is cannot be correctly and easily implemented with the current distributed co-ordination model.
  2. Invocation of user specified callback on rebalance
    1. Some applications maintain transient per-partition state in-memory. On rebalance operation, they would need to “flush” the transient state to some persistent storage.
    2. The requirement is to let the user plugin some sort of callback that the high level consumer invokes when a rebalance operation is triggered.
    3. This requirement has some overlap with the manual partition assignment requirement. Probably, if we allow manual partition assignment, such applications might be able to leverage that to flush transient state. But, the issue is that these applications do want automatic rebalancing and might not want to use the manual partition assignment feature.
  3. Non blocking consumer APIs
    1. This requirement is coming from stream processing applications that implement high-level stream processing primitives like filter by, group by, join operations on kafka streams.
    2. To facilitate stream join operations, it is desirable that Kafka provides non-blocking consumer APIs. Today, since the consumer streams are essentially blocking, these sort of stream join operations are not possible.
    3. This requirement seems to involve some significant redesign of the consumer APIs and the consumer stream logic. So it will be good to give this some more thought.

Comments (Guozhang)

4. Manual offset management - Some applications want to manage offsets themselves at the application-system level other than using ZK. We need to provide a way to achieve that in the new consumer redesign. This would involve turning off autocommit.offsets option in the consumer.

* One suggestion is to add on more API on the high-level consumer which allows 1) incremental specification of "topic with any partitions" => "specific number of partitions for the topic" => "specific partitions of the topic" => "specific partitions with specific offsets of the topic". At the last level of settings, we may need to automatically turn off auto-committing offsets.
* One problem with this incremental specification is that one consumer client can require to auto-commit offsets for some topics, while not for some others. This will make the communication between consumers and the coordinator a little bit more complicated, but we can always use a signal telling the coordinator "no need to commit offsets".

5. Rebalance Triggered User Specified Callback - Some applications maintain transient state that they'd like to flush to some data system right when rebalancing operation is triggered. This is because the partition assignment can change after rebalancing.

* This function can be called whenever a consumer receives a stop_fetcher command.

6. Non Blocking Consumer API - This is to facilitate stream join operations across multiple Kafka consumer streams. The problem with this is today the streams are essentially blocking. So such stream join operations are not possible. We need to look into supporting non blocking consumer APIs

* One way to do this is whenever the stream founds there is no data to return in the stream, use some default values as "nulls" to the iterator; this would require to modify the BlockingQueue implementation of the chunk/stream on the consumer.

7. RAC Awareness - It might be potentially efficient to let the consumer's be RAC aware for more efficient consumption.

...

  • Currently Kafka have two types of consumers: "standard" consumer and simple consumer. In simple consumer user can specify broker-partition and offset, but there is no group/re-balance support. So users with requirements 3 and 4 but no requirement for group/re-balance would more prefer to use the simple consumer.
  • Requirement 5 is good to have, while users are responsible for decreasing the re-balancing performance with a heavy callback function.
  • Requirement 6 would need 1) user specified default value when there is no data to pull, 2) probably different chunk size for the "default" chunk. Both of these would be tricky parameters to tune (if not impossible).