Apache Kafka 4.0.0 is shipped with an early access release of KIP-932: Queues for Kafka. This feature is still under active development.
It must not be enabled on production clusters. We advise users to try it out in clusters specifically created for the purpose of testing.
This KIP introduces the concept of a share group as a way of enabling cooperative consumption using Kafka topics. It does not add the concept of a "queue" to Kafka per se, but rather introduces share groups to accommodate queuing use-cases using Kafka topics. You can think of a share group as roughly equivalent to a "durable shared subscription" in existing systems.
Limitations
- It is not possible to upgrade clusters on which this early access feature has been enabled and used. We are still actively developing it and want to keep the flexibility to make non-backward compatible changes.
- The RPCs in the Kafka protocol to support share groups are unstable in Apache Kafka 4.0 and subject to change. This means they need to be explicitly enabled in the broker configuration. In fact, you will be able to use the share groups with the Apache Kafka 4.0 client only with 4.0 brokers. Optimization work underway is changing the protocol in an incompatible way and we plan to stabilize the RPCs in Apache Kafka 4.1.
- The format of the records stored to support share groups is not yet finalized in Apache Kafka 4.0 and subject to change. This means that if you attempt to upgrade a cluster from Apache Kafka 4.0 after you have used share groups, you may find that brokers will not behave correctly, or even fail to start. This is why you should only use it in a sandbox environment you do not intend to upgrade. We plan to finalize the record formats in Apache Kafka 4.1 and support upgrading from this point on.
- The storage of acknowledgement information on the new internal share-group state topic is introduced with this early access feature. We expect efficiency, reliability and broker start-up time to improve in the next release.
- The partition assignor is extremely simple and assigns all available partitions to all subscribed group members. A more efficient scheme is described in the KIP and will be introduced in the next release.
- The
kafka-share-groups.sh
tool is able to list and describe share groups, but not able to reset or delete them. The starting offset information printed by this tool may be incorrect in some situations. - The
share.isolation.level
configuration is not supported. This means that share groups currently always haveread_uncommitted
behavior. - The testing in multi-broker configurations has been quite limited so far. Consumption using share groups while brokers are being rolled has known problems which are already fixed in the code being prepared for the next release.
How to test it
The simplest way is to create a new cluster consisting of a single broker, as illustrated in the Apache Kafka quickstart documentation. That refers to a broker configuration file called config/server.properties
. To enable KIP-932, you need to add the following lines before you start the broker:
# DO NOT ENABLE THESE SETTINGS ON PRODUCTION CLUSTERS # Only use in a sandbox environment you do not intend to upgrade unstable.api.versions.enable=true group.coordinator.rebalance.protocols=classic,consumer,share
When you start the broker with the new configuration, you will see log lines indicating that share groups are enabled for non-production use.
The rest of this tutorial assumes that your broker is listening for unsecured connections on localhost:9092
, which is the default if you use the supplied config/server.properties
configuration file.
First, create a topic called quickstart-events
to use.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events
By default, a new share group will start consuming data produced after the group was created, so it's easiest to start the consumer before the producer.
You can use the kafka-console-share-consumer.sh
tool to create a consumer in a share group (called "console-share-consumer" by default):
$ bin/kafka-console-share-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events
Then you can use the kafka-console-producer.sh
tool to produce records to quickstart-events
, which will then be consumed by the consumer.
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic quickstart-events
If you start multiple instances of kafka-console-share-consumer.sh
, they will all be in the same share group and the produced records will be distributed among them. You might have to produce quite a few records before you see them being distributed because of the batching inherent to how Kafka works, but if you run a long-running workload, you will find that the distribution is even.
You can use the kafka-groups.sh
and kafka-share-groups.sh
tools to take a look at what's going on. For example:
// List all groups on the cluster $ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list // List the share groups $ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list // Describe the share group "console-share-consumer" $ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group console-share-consumer // Describe the members of share group "console-share-consumer" $ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group console-share-consumer --members // Describe the state of share group "console-share-consumer" $ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group console-share-consumer --state
You can also write your own applications using the KafkaShareConsumer
class. The javadoc and KIP-932 are the best sources of information at this point.
How to report issues
Feel free to file a bug in the Apache Kafka project for any issues you find. We appreciate your help to test it. Happy testing!