...
A: The short answer is consumer group offsets are managed, committed by the transactional producer and are stored on the target cluster instead.
However the the consumer still has to live on the source cluster in order to pull the data, but it seems the “source-of-truth” offsets are not no longer stored in the source cluster. We propose to use the following idea to rewind the consumer correctly when data mirroring task restarts or rebalances, while its the “source-of-truth” of consumer offsets are stored in the target cluster: (the pseudocode are shown in below)
- MirrorSinkTask don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster.
- the offsets are only written by transaction producer to the target cluster. offsets are
- Consumer offsets are stored on the target cluster using a "fake" consumer group. The "fake"
- “fake” consumer group, that can be created programmatically as long as we know the name of consumer group. The “fake” means there would be no actual records being consumed by the group, just offsets being stored in __consumer_offsets topic. However, the __consumer_offsets topic on the target cluster (managed by the "fake"
- “fake” consumer group) is the "source of truth" offsets
- the “source of truth” offsets.
- With the “fake” consumer group on target cluster, MirrorSinkTask don't rely on Connect's internal offsets tracking or __consumer_offsets on the source cluster.
- the consumer offsets are only written by the producer evolved in the transaction to the target cluster.
- all records are written in a transaction., as if in the single cluster
- when MirrorSinkTask when MirrorSourceTask starts or rebalances, it loads initial offsets from __consumer_offsets on the target cluster.
The outcome of the above idea:
- if the transaction succeeds, the __consumer_offsets topic on the target cluster is updated .
- by following the current protocol of Exactly-Once framework
- if the transaction aborts, all data records are dropped, and the __consumer_offsets topic on the target cluster is not updated.
- when MirrorSinkTask starts/restarts, it resumes at the last committed offsets, as stored in the target cluster.
...