Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Motivation

As part of our analysis around KAFKA-19430, we realized it's not possible to handle a CorruptRecordException in Streams because it's not exposed to the client; instead, a generic KafkaException is thrown. This KIP proposes to expose CorruptRecordException directly, allowing the client to decide how to handle this specific error.

Public Interfaces

The idea is to expose CorruptRecordException when the error is already present in the broker's fetch response, which signifies disk corruption. We will also enhance CorruptRecordException to include the TopicPartition and offset.

It's important to highlight that offset here is an offset we're trying to read from, not a offset of corruption!

CorruptRecordException

package org.apache.kafka.common.errors;


import org.apache.kafka.common.TopicPartition;


/**
 * This exception indicates a record has failed its internal CRC check, this generally indicates network or disk corruption.
 */
public class CorruptRecordException extends RetriableException {


    private static final long serialVersionUID = 1L;


    private TopicPartition topicPartition;
    private long corruptOffset;


    public CorruptRecordException() {
        super("This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.");
    }


    public CorruptRecordException(String message) {
        super(message);
    }


    public CorruptRecordException(Throwable cause) {
        super(cause);
    }


    public CorruptRecordException(String message, Throwable cause) {
        super(message, cause);
    }


    public CorruptRecordException(String message, Throwable cause, TopicPartition topicPartition, long corruptOffset) {
        super(message, cause);
        this.topicPartition = topicPartition;
        this.corruptOffset = corruptOffset;
    }


    public TopicPartition topicPartition() {
        return topicPartition;
    }


    public long corruptOffset() {
        return corruptOffset;
    }
}

Proposed Changes

  • Expose CorruptRecordException when it is present in broker responses, indicating a disk failure.

  • Enhance CorruptRecordException to contain the topic partition and corrupted offset.

Compatibility, Deprecation, and Migration Plan

  • There is no deprecation planned.

  • The CorruptRecordException class will be updated.

  • It will be exposed for the described use case.

Test Plan

  • Unit tests will be created to ensure the new functionality works as expected.
  • No labels