...
When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker can send unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareAcknowledge
API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.
Client programming interface
...
A new interface KafkaShareConsumer
is introduced for consuming from share groups. It looks very similar to KafkaConsumer
trimmed down to the methods that apply to share groups.Introducing a new KafkaShareConsumer
interface has two advantages compared with overloading KafkaConsumer
to work with share groups:
...
.
...
...
To join a share group, the client application instantiates a KafkaShareConsumer
using the configuration parameter group.id
to give the ID of the share group. Then, it uses KafkaShareConsumer
.subscribe(Collection<String> topics)
to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.
...
The batch contains no records, in which case the application just polls again. The call to
KafkaShareConsumer
.commitAsync()
just does nothing because the batch was empty.All of the records in the batch are processed successfully. The calls to
KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT)
marks all records in the batch as successfully processed.One of the records encounters an exception. The call to
KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType.REJECT)
rejects that record. Earlier records in the batch have already been marked as successfully processed. The call toKafkaShareConsumer
.commitAsync()
commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.
Client programming interface option 2 - previous option included temporarily for comparison purposes
The KafkaConsumer
interface is enhanced to support share groups.
To join a share group, the client application instantiates a KafkaConsumer
using the configuration parameters group.type="share"
and group.id
to give the ID of the share group. Then, it uses KafkaConsumer.subscribe(Collection<String> topics)
to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.
Each call to KafkaConsumer.poll(Duration)
fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement.
If the application calls the new KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
method for any record in the batch, it is using explicit acknowledgement. The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
must be issued in the order in which the records appear in the ConsumerRecords
object, which will be in order of increasing offset for each share-partition. In this case:
The application calls
KafkaConsumer.commitSync/Async()
which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.The application calls
KafkaConsumer.poll(Duration)
without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.The application calls
KafkaConsumer.close()
which attempts to commit any pending acknowledgements and releases any remaining acquired records.
If the application does not call KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
for any record in the batch, it is using implicit acknowledgement. In this case:
The application calls
KafkaConsumer.commitSync/Async()
which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.The application calls
KafkaConsumer.poll(Duration)
without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.The application calls
KafkaConsumer.close()
which releases any acquired records without acknowledgement.
The KafkaConsumer
guarantees that the records returned in the ConsumerRecords
object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically.
Example - Acknowledging a batch of records (implicit acknowledgement)
In this example, a consumer using share group "myshare"
subscribes to topic "foo"
. It processes all of the records in the batch and then calls KafkaConsumer.commitSync()
which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable. Note that apart from the configuration group.type="share"
, this code would work with a consumer group. This is the only example for which that is true.
Code Block | ||
---|---|---|
| ||
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Returns a batch of acquired records
for (ConsumerRecord<String, String> record : records) {
doProcessing(record);
}
consumer.commitSync(); // Commit the acknowledgement of all the records in the batch
} |
Behind the scenes, the KafkaConsumer
fetches records from the share-partition leader. The leader selects the records in Available state, and will return complete record batches (https://kafka.apache.org/documentation/#recordbatch) if possible. It moves the records into Acquired state, increments the delivery count, starts the acquisition lock timeout, and returns them to the KafkaConsumer
. Then the KafkaConsumer
keeps a map of the state of the records it has fetched and returns a batch to the application.
When the application calls KafkaConsumer.commitSync()
, the KafkaConsumer
updates the state map by marking all of the records in the batch as Acknowledged and it then commits the acknowledgements by sending the new state information to the share-partition leader. For each share-partition, the share-partition leader updates the record states atomically.
Example - Per-record acknowledgement (explicit acknowledgement)
In this example, the application uses the result of processing the records to acknowledge or reject the records in the batch.
Code Block | ||
---|---|---|
| ||
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Returns a batch of acquired records
for (ConsumerRecord<String, String> record : records) {
try {
doProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT); // Mark the record as processed successfully
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT); // Mark the record as unprocessable
}
}
consumer.commitAsync(); // Commit the acknowledgements of all the records in the batch
} |
In this example, each record processed is separately acknowledged using a call to the new KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
method. The AcknowledgeType
argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a deserialization error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, the AcknowledgeType.RELEASE
is more appropriate because the record remains eligible for further delivery attempts.
The calls to KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
are simply updating the state map in the KafkaConsumer
. It is only once KafkaConsumer.commitAsync()
is called that the acknowledgements are committed by sending the new state information to the share-partition leader.
Example - Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)
In this example, the application stops processing the batch when it encounters an exception.
Code Block | ||
---|---|---|
| ||
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Returns a batch of acquired records
for (ConsumerRecord<String, String> record : records) {
try {
doProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT); // Mark the record as processed successfully
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT); // Mark this record as unprocessable
break;
}
}
consumer.commitAsync(); // Commit the acknowledgements of the acknowledged records only
} |
There are the following cases in this example:
The batch contains no records, in which case the application just polls again. The call to
KafkaConsumer.commitAsync()
just does nothing because the batch was empty.All of the records in the batch are processed successfully. The calls to
KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT)
marks all records in the batch as successfully processed.One of the records encounters an exception. The call to
KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT)
rejects that record. Earlier records in the batch have already been marked as successfully processed. The call toKafkaConsumer.commitAsync()
commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.
Access control
Share group access control is performed on the GROUP
resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.
Operations which read information about a share group need permission to perform the
DESCRIBE
action on the named group resourceOperations which change information about a share group (such as consuming a record) need permission to perform the
READ
action on the named group resource
Managing durable share-partition state
The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:
The Share-Partition Start Offset (SPSO)
The state of the in-flight records
The delivery counts of records whose delivery failed
The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt.
Examples
...
Operation
...
State changes
...
Cumulative state
...
Starting state of topic-partition with latest offset 100
...
SPSO=100, SPEO=100
...
SPSO=100, SPEO=100
...
In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards
...
Fetch records 100-109
...
SPEO=110, records 100-109 (acquired, delivery count 1)
...
SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)
...
Acknowledge 100-109
...
SPSO=110
...
SPSO=110, SPEO=110
...
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records
...
Fetch records 110-119
...
SPEO=120, records 110-119 (acquired, delivery count 1)
...
SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)
...
Release 110
...
record 110 (available, delivery count 1)
...
SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)
...
Acknowledge 119
...
record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged
...
SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged
...
Fetch records 110, 120
...
SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)
...
SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)
...
Lock timeout elapsed 111, 112
...
records 111-112 (available, delivery count 1)
...
SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)
...
Acknowledge 113-118
...
records 113-118 acknowledged
...
SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)
...
Fetch records 111,112
...
records 111-112 (acquired, delivery count 2)
...
SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)
...
Acknowledge 110
...
SPSO=111
...
SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)
...
Acknowledge 111,112
...
SPSO=120
...
SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)
Access control
Share group access control is performed on the GROUP
resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.
Operations which read information about a share group need permission to perform the
DESCRIBE
action on the named group resourceOperations which change information about a share group (such as consuming a record) need permission to perform the
READ
action on the named group resource
Managing durable share-partition state
The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:
The Share-Partition Start Offset (SPSO)
The state of the in-flight records
The delivery counts of records whose delivery failed
The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt.
Examples
Operation | State changes | Cumulative state |
---|---|---|
Starting state of topic-partition with latest offset 100 | SPSO=100, SPEO=100 | SPSO=100, SPEO=100 |
In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards | ||
Fetch records 100-109 | SPEO=110, records 100-109 (acquired, delivery count 1) | SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1) |
Acknowledge 100-109 | SPSO=110 | SPSO=110, SPEO=110 |
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records | ||
Fetch records 110-119 | SPEO=120, records 110-119 (acquired, delivery count 1) | SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1) |
Release 110 | record 110 (available, delivery count 1) | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1) |
Acknowledge 119 | record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged |
Fetch records 110, 120 | SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
Lock timeout elapsed 111, 112 | records 111-112 (available, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
Acknowledge 113-118 | records 113-118 acknowledged | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
Fetch records 111,112 | records 111-112 (acquired, delivery count 2) | SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
Acknowledge 110 | SPSO=111 | SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
Acknowledge 111,112 | SPSO=120 | SPSO=120, SPEO=121, record 120 (acquired, delivery count 1) |
Further details to follow as the design progresses.
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
Client API changes
KafkaShareConsumer
This KIP introduces a new interface for consuming records from a share group.
Method signature | Description |
---|---|
void acknowledge(ConsumerRecord record) | Acknowledge successful delivery of a record returned on the last poll(Duration) . The acknowledgement is committed on the next commitSync() or commitAsync() call. |
void acknowledge(ConsumerRecord record, AcknowledgementType type) | Acknowledge delivery of a record returned on the last poll(Duration) indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync() or commitAsync() call. |
void close() | Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. |
void close(Duration timeout) | Tries to close the consumer cleanly within the specified timeout. |
void commitAsync() | Commits the acknowledgements for the records returned. |
void commitSync() | Commits the acknowledgements for the records returned. |
void commitSync(Duration timeout) | Commits the acknowledgements for the records returned. |
Map<MetricName, ? extends Metric> metrics() | Get the metrics kept by the consumer. |
List<PartitionInfo> partitionsFor(String topic) | Get metadata about the partitions for a given topic. |
List<PartitionInfo> partitionsFor(String topic, Duration timeout) | Get metadata about the partitions for a given topic. |
ConsumerRecords<K,V> poll(Duration timeout) | Fetch data for the topics or partitions specified using the subscribe API. |
void subscribe(Collection<String> topics) | Subscribe to the given list of topics to get dynamically assigned partitions. |
Set<String> subscription() | Get the current subscription. |
void unsubscribe() | Unsubscribe from topics currently subscribed with subscribe(Collection) . |
void wakeup() | Wakeup the consumer. |
AcknowledgeType
The new org.apache.kafka.clients.consumer.AcknowledgeType
enum distinguishes between the types of acknowledgement for a record consumer using a share group.
Enum constant | Description |
---|---|
ACCEPT (0) | The record was consumed successfully |
RELEASE (1) | The record was not consumed successfully. Release it for another delivery attempt. |
REJECT (2) | The record was not consumed successfully. Reject it and do not release it for another delivery attempt. |
AdminClient
Add the following methods on the AdminClient
interface.
Method signature | Description |
---|---|
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition,OffsetAndMetadata> offsets, AlterShareGroupOffsetsOptions options) | Alter offset information for a share group. |
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) | Delete offset information for a set of partitions in a share group. |
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) | Delete share groups from the cluster. |
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) | Describe some share groups in the cluster. |
ListShareGroupOffsetsResult listShareGroupOffsets(String groupId, ListShareGroupOffsetsOptions options) | List the share group offsets available in the cluster. |
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) | List the share groups available in the cluster. |
The equivalence between the consumer group and share group interfaces is clear. There are some differences:
- Altering the offsets for a share group resets the Share Start Offset for topic-partitions in the share group (share-partitions)
- The members of a share group are not assigned partitions
- A share group has only three states -
EMPTY
,STABLE
andDEAD
Command-line tools
A new tool is added for working with share groups called kafka-share-groups.sh
. It has the following options:
Option | Description |
---|---|
--all-topics | Consider all topics assigned to a group in the `reset-offsets` process. |
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--command-config <String: command config property file> | Property file containing configs to be passed to Admin Client. |
--delete | Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2 |
--delete-offsets | Delete offsets of share group. Supports one share group at the time, and multiple topics. |
--describe | Describe share group and list offset lag (number of records not yet processed) related to given group. |
--dry-run | Only show results without executing changes on share groups. Supported operations: reset-offsets. |
--execute | Execute operation. Supported operations: reset-offsets. |
--group <String: share group> | The share group we wish to act on. |
--list | List all share groups. |
--members | Describe members of the group. This option may be used with the '--describe' option only. |
--offsets | Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only. |
--reset-offsets | Reset offsets of share group. Supports one share group at a time, and instances must be inactive. |
--to-datetime <String: datetime> | Reset |
Further details to follow as the design progresses.
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
Client API changes
KafkaShareConsumer - only applies to Client Programming interface option 1
This KIP introduces a new interface for consuming records from a share group.
...
KafkaConsumer - only applies to Client Programming interface option 2
This KIP introduces two new methods on KafkaConsumer
which apply only to share groups:
void acknowledge(ConsumerRecord record)
void acknowledge(ConsumerRecord record, AcknowledgementType type)
Of the existing KafkaConsumer
methods, many of them do not apply to share groups and will result in an exception.
...
Only applies to share groups, otherwise throws a new InvalidGroupTypeException
...
Only applies to share groups, otherwise throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
Yes
...
Yes
...
Yes
...
Yes
...
No, throws a new InvalidGroupTypeException
...
Yes
...
Yes
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
Yes
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
Yes
...
Yes
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
Yes
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
No, throws a new InvalidGroupTypeException
...
Yes
...
Yes
...
Yes
AcknowledgeType
The new org.apache.kafka.clients.consumer.AcknowledgeType
enum distinguishes between the types of acknowledgement for a record consumer using a share group.
...
AdminClient
Add the following methods on the AdminClient
interface.
...
The equivalence between the consumer group and share group interfaces is clear. There are some differences:
- Altering the offsets for a share group resets the Share Start Offset for topic-partitions in the share group (share-partitions)
- The members of a share group are not assigned partitions
- A share group has only three states -
EMPTY
,STABLE
andDEAD
Command-line tools
A new tool is added for working with share groups called kafka-share-groups.sh
. It has the following options:
Option | Description |
---|---|
--all-topics | Consider all topics assigned to a group in the `reset-offsets` process. |
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--command-config <String: command config property file> | Property file containing configs to be passed to Admin Client. |
--delete | Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2 |
--delete-offsets | Delete offsets of share group. Supports one share group at the time, and multiple topics. |
--describe | Describe share group and list offset lag (number of records not yet processed) related to given group. |
--dry-run | Only show results without executing changes on share groups. Supported operations: reset-offsets. |
--execute | Execute operation. Supported operations: reset-offsets. |
--group <String: share group> | The share group we wish to act on. |
--list | List all share groups. |
--members | Describe members of the group. This option may be used with the '--describe' option only. |
--offsets | Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only. |
--reset-offsets | Reset offsets of share group. Supports one share group at a time, and instances must be inactive. |
--to-datetime <String: datetime> | Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'. |
--to-earliest | Reset offsets to earliest offset. |
--to-latest | Reset offsets to latest offset. |
--topic <String: topic> | The topic whose share group information should be deleted or topic which should be included in the reset offset process. |
--version | Display Kafka version. |
...
Configuration | Description | Values | Type of the group: "consumer" or "share" . Only applies for Client Programming Interface option 2. | Default "consumer" | |
---|---|---|---|---|---|
record.lock.duration.ms | Record acquisition lock duration in milliseconds. | null, which uses the cluster configuration share.record.lock.duration.ms , minimum 1000, maximum limited by the cluster configuration share.record.lock.duration.max.ms |
...
The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.
Request schema
Code Block |
---|
{
"apiKey": NN,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ShareAcknowledgeRequest", |
Code Block |
{ "apiKeyvalidVersions": NN"0", "typeflexibleVersions": "request0+", "listenersfields": ["zkBroker", "broker"] { "name": "SessionId", "type": "int32", "versions": "0+", "nameabout": "ShareAcknowledgeRequest"The share session ID." }, { "validVersionsname": "0SessionEpoch", "type": "int32", "flexibleVersionsversions": "0+", "fieldsabout": [ "The share session epoch, which is used for ordering requests in a session." }, { "name": "SessionIdTopics", "type": "int32[]AcknowledgeTopic", "versions": "0+", "about": "The share session ID." },topics containing records to acknowledge.", "fields": [ { "name": "SessionEpochTopicId", "type": "int32uuid", "versions": "0+", "about": "The shareunique session epoch, which is used for ordering requests in a session." topic ID"}, { "name": "TopicsPartitions", "type": "[]AcknowledgeTopicAcknowledgePartition", "versions": "0+", "about": "The topicspartitions containing records to acknowledge.", "fields": [ { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+", "about": "The uniquepartition topicindex." ID"}, { "name": "PartitionsAcknowledgementBatches", "type": "[]AcknowledgePartitionAcknowledgementBatch", "versions": "0+", "about": "TheRecord partitionsbatches containing records to acknowledge.", "fields": [ { "name": "PartitionIndexStartOffset", "type": "int32int64", "versions": "0+", "about": "The partition index." Start offset of batch of records to acknowledge."}, { "name:": "AcknowledgementBatchesLastOffset", "type": "[]AcknowledgementBatchint64", "versions": "0+", "about": "Record batchesLast offset (inclusive) of batch of records to acknowledge."}, "fields": [ { "name": "StartOffsetGapOffsets", "type": "[]int64", "versions": "0+", "about": "Start offset of batch of recordsArray of offsets in this range which do not correspond to acknowledgerecords."}, { "name":" "LastOffsetAcknowledgeType", "type": "int64string", "versions": "0+", "default": "accept", "about": "LastThe offsettype (inclusive) of batch of records to acknowledge."}, { "name": "GapOffsets", "type": "[]int64", "versionsof acknowledgement, such as accept or release."} ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ShareAcknowledgeResponse", "validVersions": "0+", "aboutflexibleVersions": "Array of offsets in this range which do not correspond to records."}0+", "fields": [ { "name": "AcknowledgeTypeThrottleTimeMs", "type": "stringint32", "versions": "0+", "defaultignorable": "accept"true, "about": "The typeduration ofin acknowledgement,milliseconds suchfor aswhich acceptthe or release."} ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "responseint16", "nameversions": "ShareAcknowledgeResponse0+", "validVersionsignorable": "0"true, "flexibleVersionsabout": "0+", "fields": [The top level response error code." }, { "name": "ThrottleTimeMsSessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": truefalse, "about": "The durationshare in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotasession ID." }, { "name": "ErrorCodeResponses", "type": "int16[]ShareAcknowledgeTopicResponse", "versions": "0+", "ignorable": true, "about": "The top level response error code." }, topics.", "fields": [ { "name": "SessionIdTopicId", "type": "int32uuid", "versions": "0+", "default": "0", "ignorable": falsetrue, "about": "The shareunique sessiontopic ID." }, { "name": "ResponsesPartitions", "type": "[]ShareAcknowledgeTopicResponsePartitionData", "versions": "0+", "about": "The responsetopic topicspartitions.", "fields": [ { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+", "ignorable": true, "about": "The uniquepartition topicindex." ID"}, { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+", "about": "The topicerror partitions."code, "fields": [ or 0 if there was no error." } { "name": "PartitionIndex", "type": "int32", "versions": "0+", ]} ]} "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ]} ]} ]} ] } |
Metrics
Further details to follow as the design progresses.
Future Work
There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.
This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.
A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE
) on the group than a proper consumer ( READ
). This is a little more complicated because it needs to have a position independent of the SPSO so that it can traverse along the queue.
The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.
Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.
Compatibility, Deprecation, and Migration Plan
The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.
Test Plan
Detail to follow.
Rejected Alternatives
]}
]
} |
Metrics
Further details to follow as the design progresses.
Future Work
There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.
This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.
A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE
) on the group than a proper consumer ( READ
). This is a little more complicated because it needs to have a position independent of the SPSO so that it can traverse along the queue.
The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.
Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.
Compatibility, Deprecation, and Migration Plan
The changes in this KIP add to the capabilities of Kafka rather than changing existing behavior.
Test Plan
Detail to follow.
Rejected Alternatives
Share group consumers use KafkaConsumer
In this option, the regular KafkaConsumer
was used by consumers to consume records from a share group, using a configuration parameter group.type
to choose between using a share group or a consumer group. While this means that existing Kafka consumers can trivially make use of share groups, there are some obvious downsides:
- An application using
KafkaConsumer
with a consumer group could be switched to a share group with very different semantics with just a configuration change. There is almost no chance that the application would work correctly. - Libraries such as Kafka Connect which embed Kafka consumers while not work correctly with share groups without code changes beyond changing the configuration. As a result, there is a risk of breaking connectors due to misconfiguration using the
group.type
configuration property. - More than half of the
KafkaConsumer
methods do not make sense for share groups introducing a lot of unnecessary cruft.
As a result, the KIP now proposes an entirely different class KafkaShareConsumer
which gives a very interface as KafkaConsumer
but eliminates the downsides listed above.None