Current state: Under discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Exactly once semantics (EOS) provides transactional message processing guarantees. Producers can write to multiple partitions atomically so that either all writes succeed or all writes fail. This can be used in the context of stream processing frameworks, such as Kafka Streams, to ensure exactly once processing between topics.
In Kafka EOS, we use the concept of a "transactional Id" in order to preserve exactly once processing guarantees across process failures and restarts. Essentially this allows us to guarantee that for a given transactional Id, there can only be one producer instance that is active and permitted to make progress at any time. Zombie producers are fenced by an epoch which is associated with each transactional Id. We can also guarantee that upon initialization, any transactions which were still in progress are completed before we begin processing. This is the point of the initTransactions()
API.
The problem we are trying to solve in this proposal is a semantic mismatch between consumers in a group and transactional producers. In a consumer group, ownership of partitions can transfer between group members through the rebalance protocol. For transactional producers, assignments are assumed to be static. Every transactional id must map to a consistent set of input partitions. To preserve the static partition mapping in a consumer group where assignments are frequently changing, the simplest solution is to create a separate producer for every input partition. This is what Kafka Streams does today.
This architecture does not scale well as the number of input partitions increases. Every producer come with separate memory buffers, a separate thread, separate network connections. This limits the performance of the producer since we cannot effectively use the output of multiple tasks to improve batching. It also causes unneeded load on brokers since there are more concurrent transactions and more redundant metadata management.
The root of the problem is that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes. Let's take a look at a sample exactly-once use case, which is quoted from KIP-98:
public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); producer.initTransactions(); while(true) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { producer.beginTransaction(); List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { producer.send(outputRecord); } sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); producer.endTransaction(); } } } } |
As one could see, the first thing when a producer starts up is to register its identity through initTransactions
API. Transaction coordinator leverages this step in order to fence producers using the same transactional.id and to ensure that previous transactions must complete. In the above template, we call consumer.poll() to get data, and internally for the very first time we start doing so, consumer needs to know the input topic offset. This is done by a FetchOffset call to group coordinator. With transactional processing, there could be offsets that are "pending", I.E they are part of some ongoing transactions. Upon receiving FetchOffset request, broker will export offset position to the "latest stable offset" (LSO), which is the largest offset that has already been committed when consumer isolation.level is `read_committed`. Since we rely on unique transactional.id to revoke stale transaction, we believe any pending transaction will be aborted when producer calls initTransaction again. During normal use case such as Kafka Streams, we will also explicitly close producer to send out a EndTransaction request to make sure we start from clean state.
This approach is no longer safe when we allow topic partitions to move around transactional producers, since transactional coordinator doesn't know about partition assignment and producer won't call initTransaction again during its life cycle. Omitting pending offsets and proceed could introduce duplicate processing. The proposed solution is to reject FetchOffset request by sending out a new exception called PendingTransactionException to new client when there is pending transactional offset commits, so that old transaction will eventually expire due to transaction timeout. After expiration, transaction coordinator will take care of writing abort transaction markers and bump the producer epoch. For old consumers, we will choose to send a COORDINATOR_LOAD_IN_PROGRESS exception to let it retry, too. When client receives PendingTransactionException or COORDINATOR_LOAD_IN_PROGRESS, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness. The worst case for availability loss is just waiting for transaction timeout when the last generation producer wasn’t shut down gracefully, which should be rare.
Below is the new approach we discussed:
Note that the current default transaction.timeout is set to one minute, which is too long for Kafka Streams EOS use cases. Considering the default commit interval was set to only 100 milliseconds, we would doom to hit session timeout if we don't actively commit offsets during that tight window. So we suggest to shrink the transaction timeout to be the same default value as session timeout (10 seconds), to reduce the potential performance loss for offset fetch delay when some instances accidentally crash.
Public Interfaces
The main addition of this KIP is a new variant of the current initTransactions
API which gives us access to the consumer group states, such as member state and generation.id.
interface Producer { /** * This API shall be called for consumer group aware transactional producers. */ void initTransactions(Consumer<byte[], byte[]> consumer); // NEW /** * No longer need to pass in the consumer group id in a case where we already get access to the consumer state. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW } |
We shall set `transaction.timout.ms` default to 10000 ms (10 seconds).
We will add a new error code for consumer to wait for pending transaction clearance. In order to be able to return corresponding exceptions for old/new clients, we shall also bump the OffsetFetch protocol version.
PENDING_TRANSACTION(85, "There are pending transactions for the offset topic that need to be cleared", PendingTransactionException::new), |
A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of sync client.
A new generation.id field shall be added to the `TxnOffsetCommitRequest` request:
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId TransactionalId => String GroupId => String ProducerId => int64 ProducerEpoch => int16 Offsets => Map<TopicPartition, CommittedOffset> GenerationId => int32 // NEW |
If the generation.id is not matching group generation, the client will be fenced immediately. An edge case is defined as:
1. Client A tries to commit offsets for topic partition P1, but haven't got the chance to do txn offset commit before a long GC. 2. Client A gets out of sync and becomes a zombie due to session timeout, group rebalanced. 3. Another client B was assigned with P1. 4. Client B doesn't see pending offsets because A hasn't committed anything, so it will proceed with potentially `pending` input data 5. Client A was back online, and continue trying to do txn commit. Here if we have generation.id, we will catch it! |
And here is a recommended new transactional API usage example:
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer producer = new KafkaProducer(); // Will access consumer internal state. Only called once in the app's life cycle. // Note that this is a blocking call until consumer successfully joins the group. producer.initTransactions(consumer); while (true) { // Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // Do some processing and build the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transaction producer.beginTransaction(); for (ProducerRecord record : processed) producer.send(record); producer.sendOffsetsToTransaction(consumedOffsets); producer.commitTransaction(); } |
Some key observations are:
Compatibility, Deprecation, and Migration Plan
It’s extremely hard to preserve two types of stream clients within the same application due to the difficulty of state machine reasoning and fencing. It would be the same ideology for the design of upgrade path: one should never allow task producer and thread producer under the same application group.
Following the above principle, Kafka Streams uses version probing to solve the upgrade problem. Step by step guides are:
The reason for doing two rolling bounces is because the old transactional producer doesn’t have access to consumer generation, so group coordinator doesn’t have an effective way to fence old zombies. By doing first rolling bounce, the task producer will also opt in accessing the consumer state and send TxnOffsetCommitRequest with generation. With this foundational change, it is much safer to execute step 3.