...
We argue that the root of the problem is twofold:
...
that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes.
...
Our proposal is to make transaction coordinators aware of consumer group assignments. Rather than distributing the transactional state by routing every transactional Id to a separate coordinator, we will use the consumer group id to identify a single transaction coordinator which is responsible for managing the state. This gives the transaction coordinator the ability to understand which partitions are being consumed by each member of the group. This can then be used to safely coordinate assignment changes.
We use Currently transaction coordinator uses the initTransactions
API currently in order to fence producers using the same transactional Id and to ensure that previous transactions have been completed. We will add a new initTransactions
API which accepts the set of assigned partitions and the associated consumer group Id. This will be passed to the transaction coordinator in the InitProducerId call, and will be stored with the other transaction statepropose to switch this guarantee on to group coordinator.
Essentially the problem we are trying to solve is making the coordinator aware of the dependencies between processes that come as a result of partition reassignment. When handling the InitProducerId request, the coordinator will use the previous partition assignment of the consumer group to check which transactions need to be completed before it is safe to begin processing. The coordinator will then ensure that only one producer for each assigned partition is allowed to make progress at any time.
...
Code Block |
---|
interface Producer { /** * Initialize transactional state for the producer with the partitions assigned * in the consumer group rebalance. This call ensures that any transactions * involving committed offsets from the set of input partitions must be completed * before this call returns. * * Unlike the no-arg initTransactions() API, this can be called multiple times * on the same instance. Typically it should be called immediately after receiving * a new partition assignment from the group coordinator. */ void initTransactions(GroupAssignment groupAssignment initTransactions(KafkaConsumer<byte[], byte[]> consumer); } public interface GroupAssignment { int generationId(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); // DEPRECATED void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets); // NEW |
We could effectively unset the `transactional.id` config because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id when we rely on group membership. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.
...
Code Block |
---|
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions TransactionalId => NullableString TransactionTimeoutMs => Int64 ConsumerGroupIdTransactionalGroupId => NullableString // NEW ConsumerGroupGeneration => Int32 // NEW InitProducerIdResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch ThrottleTimeMs => Int64 ErrorCode => Int16 ProducerId => Int64 ProducerEpoch => Int16 |
...