You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Motivation

We have collected the following requirements from our users which have driven this consumer client re-design project:

  1. Central Coordination - We need to eliminate the complexity in the current consumer's rebalancing operation that leads to herd effect and split brain effect.
  2. Thin Consumer Client - We need a light consumer client code base with minimum external dependencies (e.g. Zookeeper client) to easily re-implement that logic in different languages.

* We think the first two requirements are the prerequisite of the rest. So we have proceeded by trying to design a centralized coordinator for consumer rebalancing without Zookeeper, for details please read here.

  1. Manual partition assignment - Many partitioned stateful data systems would like to manually assign partitions to consumers. This is because they need to keep local per-partition state. For example, search indexers.
  2. Manual offset management - Some applications want to manage offsets themselves at the application-system level other than using ZK. We need to provide a way to achieve that in the new consumer redesign. This would involve turning off autocommit.offsets option in the consumer.

* One suggestion is to add on more API on the high-level consumer which allows 1) incremental specification of "topic with any partitions" => "specific number of partitions for the topic" => "specific partitions of the topic" => "specific partitions with specific offsets of the topic". At the last level of settings, we may need to automatically turn off auto-committing offsets.
* One problem with this incremental specification is that one consumer client can require to auto-commit offsets for some topics, while not for some others. This will make the communication between consumers and the coordinator a little bit more complicated, but we can always use a signal telling the coordinator "no need to commit offsets".

  1. Rebalance Triggered User Specified Callback - Some applications maintain transient state that they'd like to flush to some data system right when rebalancing operation is triggered. This is because the partition assignment can change after rebalancing.

* This function can be called whenever a consumer receives a stop_fetcher command.

  1. Non Blocking Consumer API - This is to facilitate stream join operations across multiple Kafka consumer streams. The problem with this is today the streams are essentially blocking. So such stream join operations are not possible. We need to look into supporting non blocking consumer APIs

* One way to do this is whenever the stream founds there is no data to return in the stream, use some default values as "nulls" to the iterator; this would require to modify the BlockingQueue implementation of the chunk/stream on the consumer.

  1. RAC Awareness - It might be potentially efficient to let the consumer's be RAC aware for more efficient consumption.

Considerata

  • Currently Kafka have two types of consumers: "standard" consumer and simple consumer. In simple consumer user can specify broker-partition and offset, but there is no group/re-balance support. So users with requirements 3 and 4 but no requirement for group/re-balance would more prefer to use the simple consumer.
  • Requirement 5 is good to have, while users are responsible for decreasing the re-balancing performance with a heavy callback function.
  • Requirement 6 would need 1) user specified default value when there is no data to pull, 2) probably different chunk size for the "default" chunk. Both of these would be tricky parameters to tune (if not impossible).
  • No labels