Status

Current state: Accepted 

Discussion thread: here

JIRA: here 

Vote thread: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Producer supports following transactional APIs

  1. initTransactions for transactional producer identity initialization

  2. beginTransaction to start a new transaction

  3. sendOffsetsToTransaction to commit consumer offsets advanced within the current transaction

  4. commitTransaction commit the ongoing transaction

  5. abortTransaction abort the ongoing transaction

In addition to above transactional APIs, idempotent producer sends records to broker using send API


Apache Kafka supports a variety of client SDKs for different programming languages:

  1. Java: The official Java client library supports the producer, consumer, Streams, and Connect APIs.

  2. librdkafka and derived clients:

    1. C/C++: A C/C++ client library supporting the Producer and Consumer APIs.

    2. Python: A Python client library supporting the Producer and Consumer APIs.

    3. Go: A Go client library supporting the Producer and Consumer APIs.

    4. .NET: A .NET client library supporting the Producer and Consumer APIs.

We require consistent error handling across all clients SDKs and APIs. Functionally the client can handle error by:

  • Retry the operation without bothering the application (retriable)

  • Share with the application that an error has occurred, but the client can recover if the transaction is aborted by the client and under the covers, the epoch is bumped (abortable)

  • Share with the application that an error has occurred, and there is no way to recover except shut down the client (fatal)

In this KIP, we want to accomplish a few goals:

We have attempted error handing in accepted KIP: KIP-691: Enhance Transactional Producer Exception Handling - Apache Kafka - Apache Software Foundation which we can perhaps leverage some of the work here.

Proposed Changes

We are proposing to group exceptions into four types:

  1. Retriable: mostly maps onto current retriable errors. Some of these errors may indicate that a metadata update at the producer level is required. Because of that we introduce two subclasses to the retriable error

    1. retry only (just send the request again – after some period of backoff)

    2. refresh metadata and retry (request metadata and maybe modify the request before resending)

  2. Abortable: mostly maps onto abortable errors. The error is bubbled to the application layer, and it can choose to roll back any state from the ongoing transaction and abort the transaction. The producer does not need to restart, as after aborting, it can be confident that the state was as it was before the transaction started. The application can also close the producer and react as it does for application-recoverable cases, but it doesn’t need to.
  3. Application-Recoverable: maps on to some fatal errors. The error is bubbled to the application layer, and it may need to do a bit more to roll back and clean up state. The producer can not simply abort and know the current state of the partitions. Applications can handle this in different ways – Kafka Streams may rebalance a task and/or close the task and restart it. Another application may read from their own checkpoint to continue. In any case, the producer must restart and will be unusable after encountering this error.

  4. Invalid-Configuration: maps to some fatal errors. The error is bubbled up to the application layer. The application can decide what to do. The producer doesn’t need to restart, but the application may chose to close it.

Each error code always represent the same class and rarely rely on client state to determine how to handle. Additionally, while it is good to have a mapping, It is also useful to have a general strategy – ie a typical unknown error (not specified by the client to have a type) should probably be application recoverable.


Note: For Retriable errors, the producer handles retries internally, keeping the failure details hidden from the application. Conversely, other types of exceptions will be surfaced to the application code for handling.

Exception Table 

Below table contains list of exceptions with current and New handing. All exceptions are grouped under a header which represent Exception Name:

Yellow → Needs modification

Check Exception handling match as mentioned in KIP-691

Cross Exception handling doesn't match as mentioned in KIP-691

RetriableException

Exception/Error Names

Current handling

New Handling


Producer API

Transaction API

Producer API

Transaction API

Comments

CorruptRecordException

NotEnoughReplicasAfterAppendException

NotEnoughReplicasException

TimeoutException

Retriable if the error is retriable. Otherwise abortable

Retriable if the error is retriable otherwise abortable

Producer Retriable

Producer Retriable


ConcurrentTransactionsException

Retriable

Retriable

Producer Retriable (KIP-890 may see this)

Producer Retriable


CoordinatorLoadInProgressException

Retriable

Retriable

Producer Retriable

Producer Retriable


RefreshRetriableException

Exception/Error Names

Current handling

New Handling


Producer API

Transaction API

Producer API

Transaction API

Comments

UnknownTopicOrPartitionException

NotLeaderOrFollowerException

Retriable if the error is retriable. Otherwise abortable

Retriable if the error is retriable otherwise abortable

Refresh + Retriable

Refresh + Retriable

Both UnknownTopicOrPartitionException and NotLeaderOrFollowerException  extends InvalidMetadataException 


Current class hierarchy in code is:

InvalidMetadataException < RetriableException < ApiException



New class hierarchy in code will be:

InvalidMetadataException < RefreshRetriableException < RetriableException < ApiException

NotCoordinatorException

CoordinatorNotAvailableException

Retriable

Retriable

Refresh + Retriable

Refresh + Retriable


TransactionAbortableException

Exception/Error Names

Current handling

New Handling



Producer API

Transaction API

Producer API

Transaction API

Comments

TransactionAbortableException
** note – Added via KIP-890

Producer Abortable

Producer Abortable

Producer Abortable

Producer Abortable


ApplicationRecoverableException

Exception/Error Names

Current handling

New Handling



Producer API

Transaction API

Producer API

Transaction API

Comments

IllegalStateException

Abortable

Sometimes Fatal depending on whether application or Sender caused issue. See: kafka: KAFKA-14831: Illegal state errors should be fatal in transactional producerCLOSED

Application Recoverable (probably not expected)

Application Recoverable


We will handle all default exceptions as generic unknown errors, which will be application recoverable


KafkaException

Abortable (default seems to be abortable)

Fatal in most cases, but abortable when there are partition errors

Application Recoverable (not expected)

Application Recoverable (not expected)

RuntimeException

Abortable (default seems to be abortable)

Fatal – only thrown as this generic type when correlation ID is wrong. This should be updated as KIP-691 suggests

Application Recoverable (not expected)

Application Recoverable (not expected)

InvalidProducerEpochException

Abortable

Fatal

Application Recoverable

Application Recoverable


ProducerFencedException

Fatal

Fatal

Application Recoverable

Application Recoverable


InvalidPidMappingException

Sometimes retriable (no longer returned) or abortable

Abortable

Application Recoverable

Application Recoverable


FencedInstanceIdException

CommitFailedException

UnknownMemberIdException

IllegalGenerationExceiption

N/A

Abortable (TxnOffsetCommit Only)

N/A

Application Recoverable

CommitFailedException extends KafkaException, which is default exception handled as Application Recoverable. This class will not be extended by ApplicationRecoverableException

CorrelationIdMismatchException



Application Recoverable

Application Recoverable

CorrelationIdMismatchException extends IllegalStateException, which is default exception handled as Application Recoverable. This class will not be extended by ApplicationRecoverableException

InvalidTxnStateException

Abortable

Fatal

Producer Abortable (KIP-890 relies on this – note this is the only one that differs with Produce API)

Application Recoverable

InvalidTxnStateException requires special handling where producer API and Transactional API has different handling


InvalidConfigurationException

Exception/Error Names

Current handling

Expected Handling


Producer API

Transaction API

Producer API

Transaction API

Comments

AuthenticationException

Abortable

Fatal

Invalid Configuration

Invalid Configuration (not expected)


ClusterAuthorizationException

TransactionalIdAuthorizationException

UnsupportedVersionException

UnsupportedForMessageFormatException

Fatal, except UnsupportedForMessageFormatException which is Abortable

Cluster/Transaction Auth → abortable on InitProducerId, other errors abortable

TransactionAuth → fatal, others abortable for AddPartitions, Find Coordinator, EndTxn, AddOffsets

Transaction Auth, UnssupportedForMessageFormat → fatal for OffsetCommit

Invalid Configuration

Invalid Configuration


InvalidRecordException

InvalidRequiredAcksException

RecordBatchTooLargeException

InvalidTopicException

Retriable if the error is retriable. Otherwise abortable

Retriable if the error is retriable otherwise abortable

Invalid Configuration

Invalid Configuration


TopicAuthorizationException

GroupAuthorizationException

Abortable

Abortable

Invalid Configuration

Invalid Configuration

Both TopicAuthorizationException and GroupAuthorizationException extends AuthorizationException 


Current class hierarchy in code is:

TopicAuthorizationException < AuthorizationException < ApiException

New class hierarchy in code will be TopicAuthorizationException < AuthorizationException < InvalidConfigurationException < ApiException`


Public Interfaces

We have below exception classes available in current Kafka code

// Producer Abortable Transaction
public class TransactionAbortableException extends ApiException {
    ...
}

//Producer Retriable  
public abstract class RetriableException extends ApiException {
   ...
}


// Invalid-Configuration
public class InvalidConfigurationException extends ApiException {
 	...
}


We will add new exception types, as listed in the table below, that extend the existing exceptions.

//Producer Refresh and Retriable
public abstract class RefreshRetriableException extends RetriableException {
	...
}

//Application-Recoverable new
public abstract class ApplicationRecoverableException extends ApiException {
    ...
}


We will extend below existing exceptions types to maintain class hierarchy 

// UnknownTopicOrPartitionException and NotLeaderOrFollowerException  extends InvalidMetadataException 
public class InvalidMetadataException extends RefreshRetriableException {
	...
}

// TopicAuthorizationException and GroupAuthorizationException extends AuthorizationException 
public class AuthorizationException extends InvalidConfigurationException {
	...
}


Client side code example

public class TransactionalClientDemo {

    private static final String CONSUMER_GROUP_ID = "my-group-id";
    private static final String OUTPUT_TOPIC = "output";
    private static final String INPUT_TOPIC = "input";
    private static KafkaConsumer<String, String> consumer;
    private static KafkaProducer<String, String> producer;

    public static void main(String[] args) {
        initializeApplication();

        boolean isRunning = true;
        // Continuously poll for records
        while (isRunning) {
            try {
                try {
                    // Poll records from Kafka for a timeout of 60 seconds
                    ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));

                    // Process records to generate word count map
                    Map<String, Integer> wordCountMap = new HashMap<>();

                    for (ConsumerRecord<String, String> record : records) {
                        String[] words = record.value().split(" ");
                        for (String word : words) {
						  wordCountMap.merge(word, 1, Integer::sum);
                        }
                    }

                    // Begin transaction
                    producer.beginTransaction();

                    // Produce word count results to output topic
                    wordCountMap.forEach((key, value) ->
                            producer.send(new ProducerRecord<>(OUTPUT_TOPIC, key, value.toString())));

                    // Determine offsets to commit
                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
                        long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
                        offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
                    }

                    // Send offsets to transaction for atomic commit
                    producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID);

                    // Commit transaction
                    producer.commitTransaction();
                } catch (TransactionAbortableException e) {
                    // Abortable Exception: Handle Kafka exception by aborting transaction. producer.abortTransaction() should not throw abortable exception.
                    producer.abortTransaction();
                    resetToLastCommittedPositions(consumer);
                }
            } catch (InvalidConfigurationException e) {
                //  Fatal Error: The error is bubbled up to the application layer. The application can decide what to do
                closeAll();
                throw e;
            } catch (KafkaException | ApplicationRecoverableException e) {
                // Application Recoverable: The application must restart
                closeAll();
                initializeApplication();
            }
        }

    }

Full example can be accessed at: https://github.com/apache/kafka/pull/15913/files 


Note: The producer.abortTransaction() method will not throw an abortable exception to avoid creating a loop.

Compatibility, Deprecation, and Migration Plan

For applications using older code

For existing errors, if the application code already handles the error, it will continue to do so. However, the new TransactionAbortableException  introduced in KIP-890 extends KafkaException, which is treated as fatal by applications. Therefore, we expect TransactionAbortableException  to also be treated as fatal by older code.


Currently, the transactional producer returns retriable exception types, such as TimeoutException , which poses a risk of duplicates in Kafka. In this KIP, we will update the transactional producer such that all retriable exceptions will be translated to TransactionAbortableException in transaction producer code path. Older clients that are using the transactional producer and handling TimeoutException  by retrying the produce operation can update to handle TransactionAbortableException .


For applications using newer code

Applications using newer code, as mentioned in Clientsidecodeexample, have the flexibility to adjust their exception handling logic based on the specific type of exception encountered, enabling them to take appropriate actions.

Handling for OutOfOrderSequenceException  requires additional considerations which is out of scope for this KIP.  OutOfOrderSequenceException  and UnknownProducerIdException  (which extends OutOfOrderSequenceException  ) will not be categorised under any of the defined exception groups in this KIP. This will not impact applications using newer code.


Test Plan

We will add additional integration and unit tests to check if client acts as expected.

Rejected Alternatives

We discussed another approach where we encode the handling into the response as a separate field from the error code or something similar. Produce responses have space for a code and for a message, so something similar could be done to distinguish the cause/meaning of the error from the action that should be taken. The encoding of the error could be included in the definition of the response spec for other client compatibility. We rejected this approach as it requires a larger overhaul on the APIs as well as the clients.


  • No labels