Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-334 introduced into the Consumer the RecordDeserializationException with offsets information. That is useful to skip a poison pill but as you do not have access to the Record, it still prevents easy implementation of dead letter queue or simply logging the faulty data.

Public Interfaces

The changes in the RecordDeserializationException are a new constructor with added argument timestampType, timestamp, keyBuffer, valueBuffer, Headers and a new enum DeserializationExceptionOrigin.  New accessor methods for the added fields.

A new enum DeserializationExceptionOrigin field is allowing to differentiate if needed the origin of the error: KEY or VALUE.

As it’s only addition, it would still be compatible with existing consumer code.

RecordDeserializationException.java
public class RecordDeserializationException extends SerializationException {

  // New enum
  public enum DeserializationExceptionOrigin {
     KEY,
     VALUE
  }

  @Deprecated
  public RecordDeserializationException(TopicPartition partition,
                                        long offset,
                                        String message,
                                        Throwable cause);

  // New constructor
  public RecordDeserializationException(DeserializationExceptionOrigin origin,
                                        TopicPartition partition,
                                        long offset,
                                        long timestamp,
                                        TimestampType timestampType,
                                        ByteBuffer keyBuffer,
                                        ByteBuffer valueBuffer,
                                        Headers headers,
                                        String message,
                                        Throwable cause);

  // New methods
  public DeserializationExceptionOrigin origin();
  public TimestampType timestampType();
  public long timestamp();
  public ByteBuffer keyBuffer();
  public ByteBuffer valueBuffer();
  public Headers headers();
}

Proposed Changes

We propose to include record content and metadata to the RecordDeserializationException. The keyBuffer() and valueBuffer() methods will allow to access the content of the record. This would still permit to access offsets and then skip the record but also to fetch the concerned data for specific processing by sending it to a dead letter queue or whatever action that makes sense.

Usage example

Here is an example of basic usage to implement a DLQ feature

{

// …

  try (KafkaConsumer<String, SimpleValue> consumer = new KafkaConsumer<>(settings())) {

     // Subscribe to our topic
     LOGGER.info("Subscribing to topic " + KAFKA_TOPIC);
     consumer.subscribe(List.of(KAFKA_TOPIC));
     LOGGER.info("Subscribed !");

     try (KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerSettings())) {
         //noinspection InfiniteLoopStatement
         while (true) {
             try {
                 final var records = consumer.poll(POLL_TIMEOUT);
                 LOGGER.info("poll() returned {} records", records.count());
                 for (var record : records) {
                     LOGGER.info("Fetch record key={} value={}", record.key(), record.value());
                     // Any processing
                     // ...
                 }

             } catch (RecordDeserializationException re) {
                 long offset = re.offset();
                 Throwable t = re.getCause();
                 LOGGER.error("Failed to consume at partition={} offset={}", re.topicPartition().partition(), offset, t);
                 sendDlqRecord(dlqProducer, re);
                 LOGGER.info("Skipping offset={}", offset);
                 consumer.seek(re.topicPartition(), offset + 1);
             } catch (Exception e) {
                 LOGGER.error("Failed to consume", e);
             }
         }
     }

  } finally {
     LOGGER.info("Closing consumer");
  }
}


void sendDlqRecord(KafkaProducer<byte[], byte[]> dlqProducer, RecordDeserializationException re) {
   var dlqRecord = new ProducerRecord<>(DLQ_TOPIC, Utils.toNullableArray(re.keyBuffer()), Utils.toNullableArray(re.valueBuffer()));
   try {
       dlqProducer.send(dlqRecord).get();
       LOGGER.info("Record sent to DLQ");
   } catch (Exception e) {
       LOGGER.error("Failed to send corrupted record to DLQ", e);
   }
}

Compatibility, Deprecation, and Migration Plan

This change is backward compatible and will have no impact on existing application.

The previous RecordDeserializationException constructor is deprecated as no more used.

Test Plan

  • Unit test CompletedFetchTest

Rejected Alternatives

  • Using a raw byte array consumer and doing the deserialization in the user code. This permits dead letter queue implementation but is moving all the complexity of deserialization to user’s code.
  • No labels