Apache Kafka 4.1.0 is shipped with a preview release of KIP-932: Queues for Kafka. This feature is not yet recommended for use on production clusters, but it is ready for evaluation and testing.

This KIP introduces the concept of a share group as a way of enabling cooperative consumption using Kafka topics. It does not add the concept of a "queue" to Kafka per se, but rather introduces share groups to accommodate queuing use-cases using Kafka topics. With a share group, the number of partitions can exceed the number of partitions and the delivery of each message is individually tracked, making handling of unprocessable messages much simpler. You can think of a share group as roughly equivalent to a "durable shared subscription" in existing systems.

Limitations

  • It is not possible to enable share groups until all brokers are using Apache Kafka 4.1.0 or later.
  • The early access support for KIP-932 in Apache Kafka 4.0 is not compatible with this preview code. If you enabled share groups on Apache Kafka 4.0, the cluster cannot be upgraded to Apache Kafka 4.1. You cannot use the Apache Kafka 4.0 share consumer with an Apache Kafka 4.1 cluster and, conversely, you cannot use the Apache Kafka 4.1 share consumer with an Apache Kafka 4.0 cluster. This support is a replacement for the early access code.
  • The partition assignor is still being enhanced and optimized. The preview assignor can compute assignments which are not ideally balanced.
  • The --reset-offsets  feature of kafka-share-groups.sh  is not implemented.

How to test it

The simplest way is to create a new cluster consisting of a single broker, as illustrated in the Apache Kafka quickstart documentation.

You must enable share groups in the cluster before you can use them. This can be achieved using the following command:

$ bin/kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1

Similarly, if you want to disable share groups again, use the following command.

$ bin/kafka-features.sh --bootstrap-server localhost:9092 downgrade --feature share.version=0

The rest of this tutorial assumes that your broker is listening for unsecured connections on localhost:9092 , which is the default if you use the supplied config/server.properties  configuration file.

First, create a topic called quickstart-events  to use.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events

By default, a new share group will start consuming data produced after the group was created, so it's easiest to start the consumer before the producer.

You can use the kafka-console-share-consumer.sh  tool to create a consumer in a share group (called "console-share-consumer" by default):

$ bin/kafka-console-share-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events

Then you can use the kafka-console-producer.sh  tool to produce records to quickstart-events , which will then be consumed by the consumer.

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic quickstart-events

If you start multiple instances of kafka-console-share-consumer.sh , they will all be in the same share group and the produced records will be distributed among them. You might have to produce quite a few records before you see them being distributed because of the batching inherent to how Kafka works, but if you run a long-running workload, you will find that the distribution is even.

You can use the kafka-groups.sh  and kafka-share-groups.sh  tools to take a look at what's going on. For example:

// List all groups on the cluster
$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list
 
// List the share groups
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list
 
// Describe the share group "console-share-consumer"
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group console-share-consumer
 
// Describe the members of share group "console-share-consumer"
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group console-share-consumer --members
 
// Describe the state of share group "console-share-consumer"
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group console-share-consumer --state

You can also write your own applications using the KafkaShareConsumer  class.

  • No labels