Status
Current state: Adopted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-334 introduced into the Consumer the RecordDeserializationException with offsets information. That is useful to skip a poison pill but as you do not have access to the Record, it still prevents easy implementation of dead letter queue or simply logging the faulty data.
Public Interfaces
The changes in the RecordDeserializationException are a new constructor with added argument timestampType, timestamp, keyBuffer, valueBuffer, Headers and a new enum DeserializationExceptionOrigin. New accessor methods for the added fields.
A new enum DeserializationExceptionOrigin field is allowing to differentiate if needed the origin of the error: KEY or VALUE.
As it’s only addition, it would still be compatible with existing consumer code.
public class RecordDeserializationException extends SerializationException { // New enum public enum DeserializationExceptionOrigin { KEY, VALUE } @Deprecated public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause); // New constructor public RecordDeserializationException(DeserializationExceptionOrigin origin, TopicPartition partition, long offset, long timestamp, TimestampType timestampType, ByteBuffer keyBuffer, ByteBuffer valueBuffer, Headers headers, String message, Throwable cause); // New methods public DeserializationExceptionOrigin origin(); public TimestampType timestampType(); public long timestamp(); public ByteBuffer keyBuffer(); public ByteBuffer valueBuffer(); public Headers headers(); }
Proposed Changes
We propose to include record content and metadata to the RecordDeserializationException. The keyBuffer() and valueBuffer() methods will allow to access the content of the record. This would still permit to access offsets and then skip the record but also to fetch the concerned data for specific processing by sending it to a dead letter queue or whatever action that makes sense.
Usage example
Here is an example of basic usage to implement a DLQ feature
{ // … try (KafkaConsumer<String, SimpleValue> consumer = new KafkaConsumer<>(settings())) { // Subscribe to our topic LOGGER.info("Subscribing to topic " + KAFKA_TOPIC); consumer.subscribe(List.of(KAFKA_TOPIC)); LOGGER.info("Subscribed !"); try (KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerSettings())) { //noinspection InfiniteLoopStatement while (true) { try { final var records = consumer.poll(POLL_TIMEOUT); LOGGER.info("poll() returned {} records", records.count()); for (var record : records) { LOGGER.info("Fetch record key={} value={}", record.key(), record.value()); // Any processing // ... } } catch (RecordDeserializationException re) { long offset = re.offset(); Throwable t = re.getCause(); LOGGER.error("Failed to consume at partition={} offset={}", re.topicPartition().partition(), offset, t); sendDlqRecord(dlqProducer, re); LOGGER.info("Skipping offset={}", offset); consumer.seek(re.topicPartition(), offset + 1); } catch (Exception e) { LOGGER.error("Failed to consume", e); } } } } finally { LOGGER.info("Closing consumer"); } } void sendDlqRecord(KafkaProducer<byte[], byte[]> dlqProducer, RecordDeserializationException re) { var dlqRecord = new ProducerRecord<>(DLQ_TOPIC, Utils.toNullableArray(re.keyBuffer()), Utils.toNullableArray(re.valueBuffer())); try { dlqProducer.send(dlqRecord).get(); LOGGER.info("Record sent to DLQ"); } catch (Exception e) { LOGGER.error("Failed to send corrupted record to DLQ", e); } }
Compatibility, Deprecation, and Migration Plan
This change is backward compatible and will have no impact on existing application.
The previous RecordDeserializationException constructor is deprecated as no more used.
Test Plan
- Unit test CompletedFetchTest
Rejected Alternatives
- Using a raw byte array consumer and doing the deserialization in the user code. This permits dead letter queue implementation but is moving all the complexity of deserialization to user’s code.