Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-334 introduced into the Consumer the RecordDeserializationException with offsets information. That is useful to skip a poison pill but as you do not have access to the Record, it still prevents easy implementation of dead letter queue or simply logging the faulty data.

Public Interfaces

The changes in the RecordDeserializationException are a new protected constructor with added argument timestampType, timestamp, keyBuffer,valueBuffer and Headers.  New accessor methods for the added fields and extra key/value() methods with byte array allocation.

KeyDeserializationException and ValueDeserializationException child methods have similar public constructors, allowing to differentiate if needed the origin of the error.

As it’s only addition, it would still be compatible with existing consumer code.

RecordDeserializationException.java
public class RecordDeserializationException extends SerializationException {
  @Deprecated
  public RecordDeserializationException(TopicPartition partition,
                                        long offset,
                                        String message,
                                        Throwable cause);

  // New constructor
  protected RecordDeserializationException(TopicPartition partition,
                                          long offset,
                                          long timestamp,
                                          TimestampType timestampType,
                                          ByteBuffer keyBuffer,
                                          ByteBuffer valueBuffer,
                                          Headers headers,
                                          String message,
                                          Throwable cause);

  // New methods
  public TimestampType timestampType();
  public long timestamp();
  public ByteBuffer keyBuffer();
  public ByteBuffer valueBuffer();
  public byte[] key();
  public byte[] value();
  public Headers headers();
}
KeyDeserializationException.java
public class KeyDeserializationException extends RecordDeserializationException {
   public KeyDeserializationException(TopicPartition partition,
                                      long offset,
                                      long timestamp,
                                      TimestampType timestampType,
                                      ByteBuffer keyBuffer,
                                      ByteBuffer valueBuffer,
                                      Headers headers,
                                      String message,
                                      Throwable cause);
}
ValueDeserializationException.java
public class ValueDeserializationException extends RecordDeserializationException {
   public ValueDeserializationException(TopicPartition partition,
                                      long offset,
                                      long timestamp,
                                      TimestampType timestampType,
                                      ByteBuffer keyBuffer,
                                      ByteBuffer valueBuffer,
                                      Headers headers,
                                      String message,
                                      Throwable cause);
}

Proposed Changes

We propose to include record content and metadata to the RecordDeserializationException, to create 2 childs classes KeyDeserializationException and ValueDeserializationException to differentiate key and value deserialization error. The key() and value() methods will lazily create the needed byte array to access content of the record. This would still allow us to access offsets and then skip the record but also to fetch the concerned data for specific processing by sending it to a dead letter queue or whatever action that makes sense.

Usage example

Here is an example of basic usage to implement a DLQ feature

{

// …

  try (KafkaConsumer<String, SimpleValue> consumer = new KafkaConsumer<>(settings())) {

     // Subscribe to our topic
     LOGGER.info("Subscribing to topic " + KAFKA_TOPIC);
     consumer.subscribe(List.of(KAFKA_TOPIC));
     LOGGER.info("Subscribed !");

     try (KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerSettings())) {
         //noinspection InfiniteLoopStatement
         while (true) {
             try {
                 final var records = consumer.poll(POLL_TIMEOUT);
                 LOGGER.info("poll() returned {} records", records.count());
                 for (var record : records) {
                     LOGGER.info("Fetch record key={} value={}", record.key(), record.value());
                     // Any processing
                     // ...
                 }

             } catch (RecordDeserializationException re) {
                 long offset = re.offset();
                 Throwable t = re.getCause();
                 LOGGER.error("Failed to consume at partition={} offset={}", re.topicPartition().partition(), offset, t);
                 sendDlqRecord(dlqProducer, re());
                 LOGGER.info("Skipping offset={}", offset);
                 consumer.seek(re.topicPartition(), offset + 1);
             } catch (Exception e) {
                 LOGGER.error("Failed to consume", e);
             }
         }
     }

  } finally {
     LOGGER.info("Closing consumer");
  }
}


void sendDlqRecord(KafkaProducer<byte[], byte[]> dlqProducer, RecordDeserializationException re) {
   var dlqRecord = new ProducerRecord<>(DLQ_TOPIC, re.key(), re.value());

   try {
       dlqProducer.send(dlqRecord).get();
       LOGGER.info("Record sent to DLQ");
   } catch (Exception e) {
       LOGGER.error("Failed to send corrupted record to DLQ", e);
   }
}

Compatibility, Deprecation, and Migration Plan

This change is backward compatible and will have no impact on existing application.

Test Plan

  • Unit test CompletedFetchTest

Rejected Alternatives

  • Using a raw byte array consumer and doing the deserialization in the user code. This permits dead letter queue implementation but is moving all the complexity of deserialization to user’s code.
  • No labels