Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Messages has valid magic number compared with its outer record batch; if not throw InvalidRecordException.
  • Messages for compacted topics must have keys; if not throw InvalidRecordException.
  • When magic value >= 1, messages must have monotonically increasing (relative) offsets starting from 0; if not throw InvalidRecordException.
  • When magic value >= 1 and validate that timestamp Type is CREATE_TIME; and also message's timestamp is within the range of configured DiffMaxMs. If not throw InvalidTimestampException.
  • When magic value <= 1, validate message CRC (for magic > 1 there's no record-level CRC); if not validate throw InvalidRecordException.
  • When magic value <= 1, check whether the record is compressed, hence is nested record content (for magic > 1 it's always not compressed). If not throw InvalidTimestampException.
  • When magic value >= 2, check that record batch has valid offset range, count, sequence number and are not control records; if failed throw InvalidRecordException.
  • For transactional / idempotent record batch, also validate the following:
    • Configured magic number should >= 2, if not throw UnsupportedForMessageFormatException.
    • Producer epoch should be larger than or equal to book-kept epoch, if not throw ProducerFencedException.
    • If producer epoch is larger than book-kept epoch, check sequence is 0; if not throw OutOfOrderSequenceException or UnknownProducerIdException depending on epoch.
    • If producer epoch is equal to book-kept epoch, check that sequence is continuous; if not throw OutOfOrderSequenceException or UnknownProducerIdException depending on sequence number.
    • NOTE that OutOfOrderSequenceException can only be thrown from the callback, while UnknownProducerIdException can be thrown directly from the caller of send() / commitTxn() etc as well.

And the above exceptions would cause the whole batch (and therefore the whole partition data) to be rejected with the corresponding error code – note that the only exception is InvalidRecordException, which inherits from CorruptRecordException and hence would result in CORRUPT_MESSAGE (2) which is a retriable error, but many of those InvalidRecordException cases above are actually not-retriable at all. All other error codes correspond to an APIException and hence thrown to user's callbacks / Future object directly.


However, a lot of those errors above are actually triggered by a single record, not at the record-batch level; but nevertheless when the whole batch was rejected, and all record's Future callback will throw the same exception which is very confusing (think: a send() call of record B failed because another record A is corrupted, but the same exception would throw for record B's callback indicating corrupted error). So we'd like to 1) introduce more information in the returned error message of the ProduceResponse to improve such cases; also 2) introduce a separate error code from the retriable CORRUPT_MESSAGE which indicate fatal errors from invalid record.

...