Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


  1. Let InvalidRecordException to not inherit from CorruptedException anymore, instead inherit from ApiException directly (which is non-retriable). And also moved it to "org.apache.kafka.common" to become a public class.
  2. On the broker side:
    1. For the above cases which throws InvalidRecordException that indicates fatal errors (i.e. except the case of CRC checksum failures which we would change the error code to CORRUPT_MESSAGE), return the new error code INVALID_RECORD.
    2. When setting the error code, if there are multiple types of errors within a single batch, since for most cases we will throw the exception right away, we would only indicate one error code which would be the first error encountered while validating the batch.
      1. For that error code to set, we will try to encode the list of error_records as relative offsets of the records that are causing the whole batch to be rejected (again, in most cases this would be empty since we throw immediately after the first error).
      2. For that error code to set, optionally try to encode the customized error message (in most cases it would be empty).
  3. On the client side, augment the error handling so that:
    1. If the error_code's corresponding exception is re-triable, follow the current behavior to retry the whole batch as-is (so far the only case would be CorruptedException);
    2. If the error_code's corresponding exception is not re-triable, check if error_records is empty or not:
      1. If it is empty, reject the whole batch and set the exception for all the records' future (e.g. UnsupportedForMessageFormatException or ProducerFencedException).
      2. If it is not empty, only remove those records in the field, and then retry by creating a new batch with those error records removed (for idempotent producers, also reset the sequence number as well as offset). In this way, records in the same batch would not be rejected as a whole, but some records may still succeed while those culprits be rejected (since this KIP cases would include InvalidRecordException and InvalidTimestampException).
  4. Improve the metrics on broker-side for better user visibility for different types of errors that log-validator would expose under  "BrokerTopicStats":

    Code Block
    -- NoKeyCompactedTopicRecordsPerSec: counter of failures by compacted records with no key
    -- InvalidMagicNumberRecordsPerSec: counter of failures by records with invalid magic number
    -- InvalidMessageCrcRecordsPerSec: counter of failures by records with crc corruption
    -- NonIncreasingOffsetRecordsPerSec: counter of failures by records with invalid offset

Compatibility, Deprecation, and Migration Plan