Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Motivation

Checkpointing is a useful feature in the context of a failover: consumers can minimize duplicated message processing due to checkpoints. This is all that can be achieved in the context of a single replication flow.

But in the context of bidirectional replication, the same problem occurs on failback: any progress made in the downstream topics will be lost when the consumer switches back to the upstream cluster. Depending on the lag between the consumer and the replication before failover happened, this can be potentially a large amount of reprocessing.

For example, assuming 2 clusters (A and B), consumer group C, consuming from topic T. C is originally connected to cluster A:

  1. T has 1000 messages in its single partition.
  2. C manages to consume the first 500 messages from T in cluster A, commits its offsets.
  3. Replication correctly replicates all messages into the A.T topic in the B cluster, checkpointing creates a checkpoint close to the 500th message for group C.
  4. Now a failover happens (e.g. cluster A is under maintenance), C connects to cluster B, and starts consuming from topic A.T based on the latest checkpoint.
  5. C manages to consume the remaining 500 messages from A.T, commits its offsets into B.
  6. Cluster A becomes available again (e.g. maintenance is done).
  7. C fails back to cluster A.
  8. C starts consuming from topic T. Its committed offset is still 500, meaning that C consumes the second 500 messages again.

This KIP aims to resolve this issue in bidirectional replications by allowing the checkpointing to create "reverse" checkpoints from downstream topics to upstream topics. This will help in minimizing the level of reprocessing by providing the same fine tuning options as checkpointing has, instead of having an unbounded window of reprocessing which is defined by the lag of the consumer group compared to the replication.

Public Interfaces

New configurations for MirrorCheckpointConnector:

  • reverse.checkpointing.enabled (false) - Enables the new reverse checkpointing feature. Reverse checkpointing requires that there is an opposite, active flow.
  • reverse.checkpointing.topic.filter.class (null) - Optional topic filter used to specify which topics should be reverse checkpointed. When not configured, all replica topics originating from the target cluster will be reverse checkpointed.
  • reverse.checkpointing.topic.filter. - Configuration prefix to pass overrides to the reverse checkpointing topic filter.

Proposed Changes

Currently, MirrorCheckpointConnector uses the offset-syncs of the same flow to generate checkpoints for downstream topics. This mechanism only allows translating offsets from upstream to downstream, and only applies to topics being replicated in the flow.

In the context of a bidirectional replication, a reverse checkpoint can be created by using the offset-syncs of the opposite flow. The upstream:downstream offset pairs of the opposite flow can be used to implement the reverse mappings.



Changes to implement:

  1. Update OffsetSyncStore to be able to track reverse offsets. (subclass/flag)
  2. Update MirrorCheckpointTask to instantiate an OffsetSyncStore for the reverse checkpointing, consuming from the offset-syncs topic of the opposite flow. (i.e. if the offset syncs of the current flow are located in the source, the offset sync of the opposite flow will be in the target)
  3. Update MirrorCheckpointTask to optionally instantiate the configured TopicFilter class.
  4. Update MirrorCheckpointTask to generate reverse checkpoints - i.e. in the B -> A flow, detect reverse-checkpointable topics, transform the topic name back to the upstream name (with DefaultReplicationPolicy, strip the starting "A."), and generate a reverse checkpoint.
  5. Detecting reverse-checkpointable topics:
    1. If the reverse checkpointing topic filter was configured, use the topic filter to decide.
    2. If the reverse checkpointing topic filter was not configured, use the ReplicationPolicy.topicSource method, and include the topic if it was replicated from the current target. (e.g. in the B->A flow, topicSource("A.T") = "A", which matches the target alias).

With the above changes, all current features and guarantees of MirrorCheckpointConnector become available for reverse checkpoints: sync groups offsets, monotonic checkpoints, offset translation based on historical offset syncs.

Based on the previous points, there are 2 hard requirements for this feature to work:

  1. An opposite, active flow exists - i.e. this feature only works in a bi-directional replication.
  2. The ReplicationPolicy has to be able to correctly transform a replica topic name to the upstream topic name (i.e. has a correct upstreamTopic implementation) - this is true for both DefaultReplicationPolicy and IdentityReplicationPolicy.

Based on the ReplicationPolicy implementation, the following has to be configured by the user:

  1. If the ReplicationPolicy can correctly identify the source cluster of a topic (e.g. DefaultReplicationPolicy), the reverse checkpointing topic filter can be omitted, and all replica topics originating from the target will be reverse-checkpointed.
  2. If the ReplicationPolicy cannot correctly identify the source cluster of a topic (e.g. IdentityReplicationPolicy), the reverse checkpointing filter has to be configured, and users need to manually specify which topics should be reverse-checkpointed.

Compatibility, Deprecation, and Migration Plan

  • Since the feature is activated with a flag, which is false by default, this change is fully backward compatible, and does not require migration.

Test Plan

Integration testing on a bidirectional replication, with a failover and failback described in the Motivation section - expectation is that reprocessing will be minimal after failback.

Rejected Alternatives

  1. Instead of adding the "reverse.checkpointing.topic.filter.class" configuration, only rely on the ReplicationPolicy to identify which topics should be reverse-checkpointed. While this is a simpler solution (fewer configs and changes), it does not allow the feature to work with some ReplicationPolicy implementations, such as IdentityReplicationPolicy.
  • No labels