Current state: Accepted
Discussion thread: here
JIRA: here
Vote thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Kafka Producer supports following transactional APIs
initTransactions for transactional producer identity initialization
beginTransaction to start a new transaction
sendOffsetsToTransaction to commit consumer offsets advanced within the current transaction
commitTransaction commit the ongoing transaction
abortTransaction abort the ongoing transaction
In addition to above transactional APIs, idempotent producer sends records to broker using send API
Apache Kafka supports a variety of client SDKs for different programming languages:
Java: The official Java client library supports the producer, consumer, Streams, and Connect APIs.
librdkafka and derived clients:
C/C++: A C/C++ client library supporting the Producer and Consumer APIs.
Python: A Python client library supporting the Producer and Consumer APIs.
Go: A Go client library supporting the Producer and Consumer APIs.
.NET: A .NET client library supporting the Producer and Consumer APIs.
We require consistent error handling across all clients SDKs and APIs. Functionally the client can handle error by:
Retry the operation without bothering the application (retriable)
Share with the application that an error has occurred, but the client can recover if the transaction is aborted by the client and under the covers, the epoch is bumped (abortable)
Share with the application that an error has occurred, and there is no way to recover except shut down the client (fatal)
In this KIP, we want to accomplish a few goals:
Simplify error handling, make it easy for other clients to implement protocols
Give clarity in error messages
Make the expected errors clear: [KAFKA-14439] Specify returned errors for various APIs and versions - ASF JIRA, [KAFKA-7787] Add error specifications to KAFKA-7609 - ASF JIRA
We have attempted error handing in accepted KIP: KIP-691: Enhance Transactional Producer Exception Handling - Apache Kafka - Apache Software Foundation which we can perhaps leverage some of the work here.
Proposed Changes
We are proposing to group exceptions into four types:
Retriable: mostly maps onto current retriable errors. Some of these errors may indicate that a metadata update at the producer level is required. Because of that we introduce two subclasses to the retriable error
retry only (just send the request again – after some period of backoff)
refresh metadata and retry (request metadata and maybe modify the request before resending)
- Abortable: mostly maps onto abortable errors. The error is bubbled to the application layer, and it can choose to roll back any state from the ongoing transaction and abort the transaction. The producer does not need to restart, as after aborting, it can be confident that the state was as it was before the transaction started. The application can also close the producer and react as it does for application-recoverable cases, but it doesn’t need to.
Application-Recoverable: maps on to some fatal errors. The error is bubbled to the application layer, and it may need to do a bit more to roll back and clean up state. The producer can not simply abort and know the current state of the partitions. Applications can handle this in different ways – Kafka Streams may rebalance a task and/or close the task and restart it. Another application may read from their own checkpoint to continue. In any case, the producer must restart and will be unusable after encountering this error.
Invalid-Configuration: maps to some fatal errors. The error is bubbled up to the application layer. The application can decide what to do. The producer doesn’t need to restart, but the application may chose to close it.
Each error code always represent the same class and rarely rely on client state to determine how to handle. Additionally, while it is good to have a mapping, It is also useful to have a general strategy – ie a typical unknown error (not specified by the client to have a type) should probably be application recoverable.
Note: For Retriable errors, the producer handles retries internally, keeping the failure details hidden from the application. Conversely, other types of exceptions will be surfaced to the application code for handling.
Exception Table
Below table contains list of exceptions with current and New handing. All exceptions are grouped under a header which represent Exception Name:
Yellow → Needs modification
Check → Exception handling match as mentioned in KIP-691
Cross → Exception handling doesn't match as mentioned in KIP-691
Exception/Error Names | Current handling | New Handling | |||
Producer API | Transaction API | Producer API | Transaction API | Comments | |
CorruptRecordException NotEnoughReplicasAfterAppendException NotEnoughReplicasException TimeoutException | Retriable if the error is retriable. Otherwise abortable | Retriable if the error is retriable otherwise abortable | Producer Retriable | Producer Retriable | |
ConcurrentTransactionsException | Retriable | Retriable | Producer Retriable (KIP-890 may see this) | Producer Retriable | |
CoordinatorLoadInProgressException | Retriable | Retriable | Producer Retriable | Producer Retriable |
Exception/Error Names | Current handling | New Handling | |||
Producer API | Transaction API | Producer API | Transaction API | Comments | |
UnknownTopicOrPartitionException NotLeaderOrFollowerException | Retriable if the error is retriable. Otherwise abortable | Retriable if the error is retriable otherwise abortable | Refresh + Retriable | Refresh + Retriable | Both UnknownTopicOrPartitionException and NotLeaderOrFollowerException extends InvalidMetadataException Current class hierarchy in code is: InvalidMetadataException < RetriableException < ApiException
InvalidMetadataException < < RetriableException < ApiException |
NotCoordinatorException CoordinatorNotAvailableException | Retriable | Retriable | Refresh + Retriable | Refresh + Retriable |
Exception/Error Names | Current handling | New Handling | |||
Producer API | Transaction API | Producer API | Transaction API | Comments | |
TransactionAbortableException | Producer Abortable | Producer Abortable | Producer Abortable | Producer Abortable |
Exception/Error Names | Current handling | New Handling | |||
Producer API | Transaction API | Producer API | Transaction API | Comments | |
IllegalStateException | Abortable | Sometimes Fatal depending on whether application or Sender caused issue. See: kafka: KAFKA-14831: Illegal state errors should be fatal in transactional producerCLOSED | Application Recoverable (probably not expected) | Application Recoverable | We will handle all default exceptions as generic unknown errors, which will be application recoverable |
KafkaException | Abortable (default seems to be abortable) | Fatal in most cases, but abortable when there are partition errors | Application Recoverable (not expected) | Application Recoverable (not expected) | |
RuntimeException | Abortable (default seems to be abortable) | Fatal – only thrown as this generic type when correlation ID is wrong. This should be updated as KIP-691 suggests | Application Recoverable (not expected) | Application Recoverable (not expected) | |
InvalidProducerEpochException | Abortable | Fatal | Application Recoverable | Application Recoverable | |
ProducerFencedException | Fatal | Fatal | Application Recoverable | Application Recoverable | |
InvalidPidMappingException | Sometimes retriable (no longer returned) or abortable | Abortable | Application Recoverable | Application Recoverable | |
FencedInstanceIdException CommitFailedException UnknownMemberIdException IllegalGenerationExceiption | N/A | Abortable (TxnOffsetCommit Only) | N/A | Application Recoverable | CommitFailedException extends KafkaException, which is default exception handled as Application Recoverable. This class will not be extended by ApplicationRecoverableException |
CorrelationIdMismatchException | Application Recoverable | Application Recoverable | CorrelationIdMismatchException extends IllegalStateException, which is default exception handled as Application Recoverable. This class will not be extended by ApplicationRecoverableException | ||
InvalidTxnStateException | Abortable | Fatal | Producer Abortable (KIP-890 relies on this – note this is the only one that differs with Produce API) | Application Recoverable | InvalidTxnStateException requires special handling where producer API and Transactional API has different handling |
Exception/Error Names | Current handling | Expected Handling | |||
Producer API | Transaction API | Producer API | Transaction API | Comments | |
AuthenticationException | Abortable | Fatal | Invalid Configuration | Invalid Configuration (not expected) | |
ClusterAuthorizationException TransactionalIdAuthorizationException UnsupportedVersionException UnsupportedForMessageFormatException | Fatal, except UnsupportedForMessageFormatException which is Abortable | Cluster/Transaction Auth → abortable on InitProducerId, other errors abortable TransactionAuth → fatal, others abortable for AddPartitions, Find Coordinator, EndTxn, AddOffsets Transaction Auth, UnssupportedForMessageFormat → fatal for OffsetCommit | Invalid Configuration | Invalid Configuration | |
InvalidRecordException InvalidRequiredAcksException RecordBatchTooLargeException InvalidTopicException | Retriable if the error is retriable. Otherwise abortable | Retriable if the error is retriable otherwise abortable | Invalid Configuration | Invalid Configuration | |
TopicAuthorizationException GroupAuthorizationException | Abortable | Abortable | Invalid Configuration | Invalid Configuration | Both TopicAuthorizationException and GroupAuthorizationException extends AuthorizationException Current class hierarchy in code is: TopicAuthorizationException < AuthorizationException < ApiException |
Public Interfaces
We have below exception classes available in current Kafka code
// Producer Abortable Transaction public class TransactionAbortableException extends ApiException { ... } //Producer Retriable public abstract class RetriableException extends ApiException { ... } // Invalid-Configuration public class InvalidConfigurationException extends ApiException { ... }
We will add new exception types, as listed in the table below, that extend the existing exceptions.
//Producer Refresh and Retriable public abstract class RefreshRetriableException extends RetriableException { ... } //Application-Recoverable new public abstract class ApplicationRecoverableException extends ApiException { ... }
We will extend below existing exceptions types to maintain class hierarchy
// UnknownTopicOrPartitionException and NotLeaderOrFollowerException extends InvalidMetadataException public class InvalidMetadataException extends RefreshRetriableException { ... } // TopicAuthorizationException and GroupAuthorizationException extends AuthorizationException public class AuthorizationException extends InvalidConfigurationException { ... }
Client side code example
public class TransactionalClientDemo { private static final String CONSUMER_GROUP_ID = "my-group-id"; private static final String OUTPUT_TOPIC = "output"; private static final String INPUT_TOPIC = "input"; private static KafkaConsumer<String, String> consumer; private static KafkaProducer<String, String> producer; public static void main(String[] args) { initializeApplication(); boolean isRunning = true; // Continuously poll for records while (isRunning) { try { try { // Poll records from Kafka for a timeout of 60 seconds ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60)); // Process records to generate word count map Map<String, Integer> wordCountMap = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { String[] words = record.value().split(" "); for (String word : words) { wordCountMap.merge(word, 1, Integer::sum); } } // Begin transaction producer.beginTransaction(); // Produce word count results to output topic wordCountMap.forEach((key, value) -> producer.send(new ProducerRecord<>(OUTPUT_TOPIC, key, value.toString()))); // Determine offsets to commit Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); } // Send offsets to transaction for atomic commit producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID); // Commit transaction producer.commitTransaction(); } catch (TransactionAbortableException e) { // Abortable Exception: Handle Kafka exception by aborting transaction. producer.abortTransaction() should not throw abortable exception. producer.abortTransaction(); resetToLastCommittedPositions(consumer); } } catch (InvalidConfigurationException e) { // Fatal Error: The error is bubbled up to the application layer. The application can decide what to do closeAll(); throw e; } catch (KafkaException | ApplicationRecoverableException e) { // Application Recoverable: The application must restart closeAll(); initializeApplication(); } } }
Full example can be accessed at:
Note: The producer.abortTransaction()
method will not throw an abortable exception to avoid creating a loop.
Compatibility, Deprecation, and Migration Plan
For applications using older code
For existing errors, if the application code already handles the error, it will continue to do so. However, the new TransactionAbortableException
introduced in KIP-890 extends KafkaException, which is treated as fatal by applications. Therefore, we expect TransactionAbortableException
to also be treated as fatal by older code.
Currently, the transactional producer returns retriable exception types, such as TimeoutException
, which poses a risk of duplicates in Kafka. In this KIP, we will update the transactional producer such that all retriable exceptions will be translated to TransactionAbortableException
in transaction producer code path. Older clients that are using the transactional producer and handling TimeoutException
by retrying the produce operation can update to handle TransactionAbortableException
For applications using newer code
Applications using newer code, as mentioned in Clientsidecodeexample, have the flexibility to adjust their exception handling logic based on the specific type of exception encountered, enabling them to take appropriate actions.
Handling for OutOfOrderSequenceException requires additional considerations which is out of scope for this KIP. OutOfOrderSequenceException and UnknownProducerIdException
(which extends OutOfOrderSequenceException
) will not be categorised under any of the defined exception groups in this KIP. This will not impact applications using newer code.
Test Plan
We will add additional integration and unit tests to check if client acts as expected.
Rejected Alternatives
We discussed another approach where we encode the handling into the response as a separate field from the error code or something similar. Produce responses have space for a code and for a message, so something similar could be done to distinguish the cause/meaning of the error from the action that should be taken. The encoding of the error could be included in the definition of the response spec for other client compatibility. We rejected this approach as it requires a larger overhaul on the APIs as well as the clients.