Status
Current state: Adopted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is aimed to extend current Consumer Group Reset Offset tooling (implemented by KIP-122[1]) for Kafka Streams applications.
Currently `kafka-streams-application-reset` only supports returning to the earliest offset on input topics. But `kafka-consumer-groups` `reset-offsets` tool support more options detailed in KIP-122.
This KIP is considering include current `reset-offsets` scenarios on `kafka-streams-application-reset` to have more options over input-topics offsets.
Public Interfaces
"kafka-streams-application-reset" supports the current features[2]:
- for any specified input topic, it resets all offsets to zero
- for any specified intermediate topic, seeks to the end for all partitions
- for all internal topic
3.1. resets all offsets to zero
3.2. deletes the topic
This KIP is considering adding these options:
Scenarios
Only one scenario should be defined for `input-topics` to proceed with the execution. If not,`to-earliest` will be used by default.
1. | Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm --to-datetime YYYY-MM-DDTHH:mm:SS.sssZ --to-datetime YYYY-MM-DDTHH:mm:SS.sss | Datetime must be specified in ISO8601 format. This option will translate the datetime to Epoch milliseconds, find the offsets by timestamp, and reset to those offsets. If the Timezone is not specified, it will use UTC. | Reset to first offset since 01 January 2017, 00:00:00 hrs --application-id app1 --input-topics foo --to-datetime 2017-01-01T00:00:00.000Z |
2. | Reset by Duration | --by-duration PnDTnHnMnS | Duration must be specified in ISO8601 format. This option will subtract the duration to the current timestamp in the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects. | Reset to first offset since one week ago (from current timestamp): --application-id app1 --input-topics foo,bar --by-duration P7D |
3. | Reset to Earliest (DEFAULT) | --to-earliest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToBeginning`. This scenario will be defined by default if none scenario is specified. | Reset to earliest offset available: --application-id app1 --input-topics foo,bar --to-earliest |
4. | Reset to Latest | --to-latest | This option will reset offsets to the earliest using Kafka Consumer's `#seekToEnd` | Reset to latest offset available: --application-id app1 --input-topics foo,bar --to-latest |
5. | Reset to Offset | --to-offset | This option will reset offsets to an specific value. | Reset to offset 1 in all partitions: --application-id app1 --input-topics foo,bar --to-offset 1 |
6. | Shift Offset by 'n' | --shift-by n | This option will add the `n` value to the current offset, and reset to the result. `n` can be a positive or negative value, so offset will be move backward if it is negative, and forward if it is positive. If current offset + n is higher than the latest offset, new offset will be set to latest. If current offset + n is lower than the earliest offset, new offset will be set to earliest. | Reset to current offset plus 5 positions: --application-id app1 --input-topics foo,bar --shift-by 5 |
7. | Shift from file | --from-file PATH_TO_FILE | This option will take a Reset Plan CSV file with the offsets to reset by topics/partitions. It does not require scope, because topics and partitions are defined in the file. A validation will be done to the topics included in the file: it cannot be also included on the `Intermediate` and `Internal` topics managed by the tool. | Reset using a file with reset plan: --application-id app1 --from-file reset-plan.csv |
Execution Options
1. | Execute | (no execution arguments) | This execution option will run the reset offset process based on scenario and scope. | Prints and execute resetting all topics and partitions to earliest: --application-id app1 --input-topics foo,bar --to-earliest |
2. | Dry-Run | --dry-run | This execution option will only print out the result of the scenario by scope. (i.e. dry-run) The output will look like this: INPUT TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET INTERMEDIATE TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET INTERNAL TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET | Prints result of resetting all topics and partitions to earliest: --application-id app1 --input-topics foo,bar --to-earliest --dry-run |
Proposed Changes
1. Implement reset-offset Scenarios option described above on `kafka-streams-application-reset` using `KafkaConsumer`.
2. Use `kafka-consumer-groups` `reset-offsets` format to define input topics (e.g. `topic2:0,1,2`) on `kafka-streams-application-reset`.
3. Keep execution parameters uniform between both tools: It will execute by default, and have a `dry-run` parameter just show the results. This will involve change current `ConsumerGroupCommand` to change execution options.
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
- Proposed Change 3. breaks compatibility for tools that rely on the `–execute` parameter to run the reset operation.
- Other changes should not impact as defaults should make code using the class `StreamResetter.java` work as it used to.
If we are changing behavior how will we phase out the older behavior?
- Current option `–execute` will be marked as "goint-to-be-deprecated" and replaced by `dry-run` parameter, until it gets replaced.
When will we remove the existing behavior?
- Next major release.
Rejected Alternatives
Current `kafka-streams-application-reset` as mentioned in the Motivation supports this functionality partially, but it will be deprecated.
We will keep both tools `kafka-streams-application-reset` and `reset-offsets`, reusing `reset-offsets` on the background when `kafka-streams-application-reset` is executed.
This KIP was considering to remove Zookeeper dependency, but this has been handled by KIP-198.
[1] KIP-122: Add Reset Consumer Group Offsets tooling
[2] https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/