Status

Current state: Accepted

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

One of MM2's main features is syncing consumer group offset between clusters. This feature relies on two separate connectors

  1. MirrorSourceConnector : this connector stores the translation of the new record offsets on target cluster in offset_syncs  internal topic after mirroring the record itself. This translation is done at commit  method since 3.5 where commit()  API stores the offsets in the source system, up to the offsets that have been returned by poll() . The implementation of this API should block until the commit is complete. See KAFKA-14610 for more details.
  2. MirrorCheckpointConnector : this connector uses the new record offsets from offset_syncs  and update checkpoints  and __consumer_offsets  topics. This can be controlled by emit.checkpoints.enabled  and sync.group.offsets.enabled 

For use-cases where customers only run MM2 for replicating between clusters without the need for moving the consumer group offsets by running only MirrorSourceConnector.  Part#1 (blocking until commit) of this feature still an issue as it adds cost to the progress of their replication. The only workaround reducing the latency cost is by increasing offset.lag.max  to INT_MAX  which will result that the offset will never going to be queued for offset sync (Check `PartitionState::update` for details on how MM2 decided if it should emit an offset sync or not) however this doesn't eliminate the cost of creating an internal topic. 

This KIP proposes that we add a new config for MirrorSourceConnector  to control enabling the offsets translation between clusters for replicated topics.

Public Interfaces

PropertiesDefaultDescription
emit.offset-syncs.enabled true

Whether to store the new offset of the replicated records in offset-syncs topic or not. MirrorCheckpointConnector will fail to start if emit.checkpoints.enabled and/or sync.group.offsets.enabled are enabled while emit.offset-syncs.enabled  are disabled.

Proposed Changes

  • Similar to emit.checkpoints.enabled   and emit.heartbeat.enabled   this KIP will add emit.offset-syncs.enabled . The config default value will be true to keep the current implementation.

  • offset-syncs topic will not be created if emit.offset-syncs.enabled  set to false 

  • logic to publish offset syncs in commit()  will not be executed if emit.offset-syncs.enabled  set to false 

  • MirrorCheckpointConnector  validation will fail at start if emit.offset-syncs.enabled  disabled while emit.checkpoints.enabled  and/or sync.group.offsets.enabled  are enabled. 

Compatibility, Deprecation, and Migration Plan

emit.offset-syncs.enabled  is set to true by default so any existing use-cases or use-cases that run this feature will do nothing. Only use-case that wish to fully disable this feature will need to set emit.offset-syncs.enabled to false.

Rejected Alternatives

  • Reuse emit.checkpoints.enabled   and sync.group.offsets.enabled  and assume that if both are disabled then this feature needs to be fully disabled. I didn't want to rely on MirrorCheckpointConnector  configs to configure MirrorSourceConnector . And this might impact customers who enabled  MirrorSourceConnector  with intention to run MirrorCheckpointConnector  later in the future 

  • No labels