Status

Current state: Under discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when using `kafka-get-offsets.sh` to get the offset by time, we have these choices:

--time <String: <timestamp> /              timestamp of the offsets before that. 
  -1 or latest /                           [Note: No offset is returned, if the
  -2 or earliest /                         timestamp greater than recently
  -3 or max-timestamp /                    committed record timestamp is
  -4 or earliest-local /                   given.] (default: latest)
  -5 or latest-tiered      

For the "latest" option, the "high watermark" is always returned because the default option IsolationLevel.READ_UNCOMMITTED is the one always being used.
In order to be able to get the last stable offset (LSO) for transaction support, we would need to have the command support sending the IsolationLevel.READ_COMMITTED option.


Public Interfaces


  • Modification of GetoffsetShell.java to support a new flag to select isolation level, "--time -1 --isolation committed" and "--time -1 --isolation uncommitted"
  • Modification of OffsetSpec.java to extend OffsetSpec with LSOSpec
  • Modification of Admin.java to support sending the IsolationLevel.READ_COMMITTED option.

Proposed Changes

  1. GetOffsetShell.java
    1. Modify shell help description to include isolation level flag descirption
    2. Modify parseOffsetSpec to parse LSO option
  2. OffsetSpec.java
    1. Extending the class to support LSO by creating an LSOSpec class
  3. Admin.java
    • listOffsets creates a ListOffsetOptions instance using the argumentless constructor that sets the isolation level to IsolationLevel.READ_UNCOMMITTED. To achieve this, we can change the way the convenience method calls listOffsets calling the argumentless constructor by using the ListOffsetOptions constructor that sets the isolation level, on the offsetSpec value that we also send to listOffsets

Compatibility, Deprecation, and Migration Plan

  • Compatibility issues would arise if the option is used on a broker that doesn't support it. An error would be thrown in that case.

Test Plan

  1. Extend the tests in GetOffsetShellTest.java
  2. Extend the tests in GetOffsetShellParsingTest.java

Rejected Alternatives

  • We can pre-determine whether we need IsolationLevel.READ_COMMITTED or IsolationLevel.READ_UNCOMMITTED in GetOffshetShell.java (depending on the user's input) and send an argument, but that wouldn't be good practice given we'd have to expose isolationLevel in  GetOffshetShell.java


  • No labels