Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Typos

...

Whenever the group epoch is larger that than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interfaceinterfaces, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

...

There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.


Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged and the frequency with which it is logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt, and this requires just one update to the durable share-partition state. Indeed, if there’s only a single consumer for a share-partition, only the SPSO needs to be logged as it moves along the topic. That’s of course an unlikely situation given these are share groups, but it illustrates the point.

Share-group state persister interface

The component which manages durable share-partition state is called the share-group state persister. Eventually, this could be a pluggable component which implements a new interface org.apache.kafka.server.group.share.Persister . For now, it is not pluggable, and Kafka includes a built-in default implementation org.apache.kafka.server.group.share.DefaultStatePersister . This section describes the behavior of this built-in persister.

When records are acknowledged using the ShareFetch or ShareAcknowledge RPCs, durable state will need to be written by the persister. The share-partition leader serving these requests will wait for the persister to complete writing the durable state before responding.

The share coordinator and the share-group state topic

The share coordinator is responsible for persistence of share-group state on a new internal topic called "__share_group_state" . The responsibility for being a share coordinator is distributed across the brokers in a cluster. This model has similarities with the group coordinator and the transaction coordinator. Indeed, the share coordinator is built using the coordinator runtime, like the new group coordinator. The share-group state topic is highly partitioned (50 by default), but it is not compacted. It uses the clean-up policy of "delete" with unlimited retention ("retention.ms=-1"). The share coordinator manages the space used by pruning records periodically as described later. The responsibility for being a share coordinator is controlled in the same way as a group coordinator; the broker which leads a partition of the share-group state topic is the coordinator for the share-partitions whose records belong on that partition.

The existing group coordinator is responsible for the management and assignment for share groups. While it would also be possible to incorporate the responsibility to persist share-group state into the existing group coordinator and write this information onto the consumer offsets topic, that topic is already a known bottleneck for some users and adding to the problem does not seem prudent.

The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState , ReadShareGroupState , WriteShareGroupState  and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type FindCoordinatorRequest.CoordinatorType.SHARE  (2) and a key consisting of "group:topicId:partition".

When a share coordinator becomes leader of a partition of the share-group state topic, it scans the partition to replay the records it finds and build its in-memory state. Once complete, it is ready to serve the share-group state RPCs.

The records on the share-group state topic are keyed by (group, topicId, partition). This means that the records for each share-partition are on the same share-group state topic partition. It also means that a share-group with a lot of consumers and many share-partitions has its state spread across many share coordinators. This is intentional to improve scalability.

Share-group state records

The share coordinator writes two types of record: ShareSnapshot and ShareUpdate. The ShareSnapshot record contains a complete snapshot of the durable state for a share-partition. Repeatedly re-serializing and writing the entire state introduces a performance bottleneck. The ShareUpdate record contains a partial update to the state. So the current durable state of a share-partition consists of the latest ShareSnapshot and zero or more ShareUpdate records following it.

The records also include a snapshot epoch. Each time a ShareSnapshot is written for a share-partition, the snapshot epoch for that share-partition is increased by 1. Each time a ShareUpdate is written for a share-partition, it uses the snapshot epoch of the latest ShareSnapshot.

The share coordinator will prefer to write a snapshot over an update (for example, when the SPSO moves and there are no in-flight records, the snapshot will be small and there’s no need to write an update instead). The share coordinator will take a snapshot periodically, frequently enough to minimise the number of ShareUpdate records to replay but rarely enough to minimise the performance cost of taking snapshots.

The records also include a state epoch. This is used to ensure that all of the components involved

...

are aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Key

Value

ShareSnapshot


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64
States[]:
  BaseOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


ShareUpdate


Code Block
Version: 1
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareSnapshot and any subsequent ShareUpdate records whose snapshot epoch matches. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

Managing the space of the share-group state topic

The share-group state data is not very amenable to log compaction. As a result, the share coordinator uses unlimited log retention and prunes the log records itself using ReplicaManager.deleteRecords. The share coordinator can delete all records before the latest ShareSnapshot for all active share-partitions. By taking periodic snapshots, the latest ShareSnapshot is replaced. For idle share-partitions, the share coordinator will periodically write a new ShareSnapshot so

...

the older records can be pruned.

Using the share coordinator

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset configuration.

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

...

The new org.apache.kafka.clients.consumer.AcknowledgementCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the call the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

...

Test Plan

The feature will be throughly thoroughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new feature.

...

  1. An application using KafkaConsumer with a consumer group could be switched to a share group with very different semantics with just a configuration change. There is almost no chance that the application would work correctly.
  2. Libraries such as Kafka Connect which embed Kafka consumers while will not work correctly with share groups without code changes beyond changing the configuration. As a result, there is a risk of breaking connectors due to misconfiguration using the group.type  configuration property.
  3. More than half of the KafkaConsumer  methods do not make sense for share groups introducing a lot of unnecessary cruft.

...