Status

Current state: "Under Discussion"

Discussion thread: here 

JIRA: KAFKA-15259 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In transactions, having a poison record or sending the record to a topic with issues ends up going to an error state, which leads to dropping the whole batch. This logic does not allow the user to do advanced error handling and ignore some errors. More specifically, the custom production exception handler of Kafka Streams can not influence the control to skip the error by dropping the bad record and committing the transaction with the rest of the batch records.  

Proposed Changes

We aim at enabling users to ignore the `send(ProducerRecord)` errors by adding an input parameter to the `send` method so that the user is able to determine not going to the error state by a poison pill record. 
The following list categorizes all types of errors that cause a transaction to fail. The category with a
(tick) beside specifies the cases that the new `send` API is going to cover. 

  • producer-side errors
    • recoverable, such as `RecordTooLargeException` (tick) 
    • irrecoverable, such as `ProducerFencedException` (error)
  • broker-side errors (error)

Currently, producer-side recoverable errors (the KIP's target category) prevent a record from being added to a batch. They additionally make a transition to an `error` state, which causes the transaction to fail. This KIP provides the possibility to commit a transaction successfully in presence of such errors. Obviously, the problematic records are not added to the batch, but the transition to the `error` state is not done. In other words, with the new `send` API, the transaction does not fail because of a single poison pill record that is not even present in the batch. Obviously, the transaction can still fail due to other types of errors (for example, broker-side errors).

Public Interfaces

If the user 1) is performing a transaction and 2) passes the `TxnSendOption` with the value `IGNORE_SEND_ERRORS` to the `send` method, any poison pill record is excluded from the batch, and the transaction is committed successfully. Note that if the user sets the `TxnSendOption` to `IGNORE_SEND_ERRORS` outside of a transaction, the overloaded `send` method throws an `IllegalStateException`.


KafkaProducer
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback, TxnSendOption option) {}


     public enum TxnSendOption {
        /**
         * The irrecoverable {@link #send(ProducerRecord)} errors lead the transaction to the error state, which ends up an unsuccessful commit. 
         */
        NONE,
        /**
         * The records causing irrecoverable errors are excluded from the batch and the transaction is committed successfully. 	
         * Note to use this, only in transactions. Otherwise {@link #send(ProducerRecord, Callback, TxnSendOption)} throws exception.
         */
        IGNORE_SEND_ERRORS
    }


We define the method in the interface as well. The `default` implementation helps with backward compatibility.

Producer
     /**
     * See {@link KafkaProducer#send(ProducerRecord, Callback, SendTxnOption)}
      */
    default void send(ProducerRecord record, Callback callback, SendTxnOption option) throws ProducerFencedException {}


Compatibility, Deprecation, and Migration Plan

Since the default behaviour is preserved, the change has no impact on existing users.

Test Plan

Unit tests for `KafkaProducer` to show that the new feature works with different `send()` errors and exceptions such as RecordTooLargeException.

Rejected Alternatives

  • Add a producer custom exception handler interface: see KIP-1038 and the discussions.
  • Identify poison pill records application-side:  It is not efficient and sometimes not even doable. For example for identifying too-large-records the application must be aware of producer configs as well as serialization method, which is not feasible sometimes. More over, checking every single record's size before sending it to catch the bad record is an overhead considering that this check is done by Producer as well.
  • Add the feature of clearing errors to `flush()`: `flush()` is not necessarily a transactional method + `AddPartition` is not done successfully in the next `send`.
  • Add the feature of clearing errors to `commitTxn()`: `AddPartition` is not done successfully in the next `send`.
  • `send()` throws ApiException: may break backward compatibility.
  • No need of explicit `flush()` before calling `commitTransaction(commitOptions)` :  not safe + `AddPartition` is not done successfully in the next `send`
  • The user must use ALOS instead of EOS in case they want to drop the poison pill records: ALOS can not guarantee not having duplicates.


  • No labels