DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Kafka MirrorMaker 2 provides cross-cluster replication for Kafka topics and consumer group offsets. Currently, offset synchronisation is handled by the MirrorCheckpointConnector, which periodically translates and emits consumer group offsets from source to the target cluster. Also, it optionally automatically commits consumer group offsets to the target cluster.
The offset translation mechanism relies on two parallel processes: MirrorSourceConnector tasks periodically produce offset-sync records, which MirrorCheckpointConnector tasks then consume to build and maintain an in-memory cache of partition offset mappings.
However, this approach has limitations: offset translation is inherently lossy, especially for lagging consumers. Each partition retains only up to 64 offset-syncs, with the most recent sync always preserved and older syncs spaced exponentially apart. Consequently, recently mirrored records translate accurately, but translation accuracy degrades exponentially as records age increases.
Each MirrorCheckpointConnector task maintains an offset mapping cache for each of its assigned partitions. For example, after mirroring 1000000 records to a partition, the cache entries and the offset translation might look like this:
Source Offset | Actual Target offset | Closest Cached Mapping | Translated Target Offset (cached target + 1) | Re-read Count |
1000100 | Not mirrored yet | -1 | ||
1000000 | 900000 | [0] 1000000→900000 | 900000 | 0 (exact match) |
999999 | 899999 | [1] 999900→899900 (gap: 100) | 899901 | 99 |
999890 | 899890 | [2] 999700→899700 (gap: 200) | 899701 | 189 |
999650 | 899650 | [3] 999300→899300 (gap: 400) | 899301 | 349 |
999200 | 899200 | [4] 998500→898500 (gap: 800) | 898501 | 699 |
998400 | 898400 | [5] 996900→896900 (gap: 1600) | 896901 | 1499 |
996800 | 896800 | [6] 993700→893700 (gap: 3200) | 893701 | 3099 |
993600 | 893600 | [7] 987300→887300 (gap: 6400) | 887301 | 6299 |
987200 | 887200 | [8] 974500→874500 (gap: 12800) | 874501 | 12699 |
974400 | 874400 | [9] 948900→848900 (gap: 25600) | 848901 | 25499 |
948800 | 848800 | [10] 897700→797700 (gap: 51200) | 797701 | 51099 |
897600 | 797600 | [11] 795300→695300 (gap: 102400) | 695301 | 102299 |
795200 | 695200 | [12] 590500→490500 (gap: 204800) | 490501 | 204699 |
590400 | 490400 | [13] 180900→80900 (gap: 409600) | 80901 | 409499 |
…could go up to [63] but 14 entries are enough for 1000000 offsets |
Let’s say there is a lagging consumer and its current offset is 998400. The record is mirrored to the target cluster at offset 898400. The closest cached mapping for this source offset is 996900→896900, therefore the translated offset would be 896901 (896900 + 1). During failover, this consumer would reprocess 1499 records (898400 - 896901). This would only grow bigger as the offset gets older as demonstrated by the example.
Also, if a consumer commits a more recent offset 1000100, the MirrorCheckpointConnector is not able to translate that offset, and returns -1. During failover, the consumer would continue from the most recently translated offset which is 1000000, and the consumer would reprocess 100 records.
Moreover, the offset-syncs topic records mappings at a fixed internal value defined by offset.lag.max set to 100 by default. This has several limitations:
- Offset mappings are only recorded every N offsets
- High-throughput topics produce offset-sync records at scale
- Reducing offset.lag.max increases the topic throughput, network overhead and broker and Connect task load.
- The configuration value applies to all partitions therefore there is no way to optimise based on individual topic partitions’ throughput or activity patterns.
This creates a fundamental tradeoff between precision and scalability.
Furthermore, when the MirrorCheckpointConnector starts, its tasks consume the whole offset-syncs topic, therefore careful tuning of retention and compaction is needed. This adds to other operational complexity of managing internal topics which require correct ACLs and credentials, low latency access across clusters, monitoring and troubleshooting.
Proposed Changes
This proposal introduces a new MirrorMaker Connector called MirrorGroupOffsetConnector that provides more consistent offset translation using timestamps of records without relying on the internal topics and in-memory offset caches. Users would be able to run the new Connector just like the MirrorCheckpointConnector but with simpler configurations.
The new Connector provides more consistent offset translation across all consumer positions in the log. While failover may reprocess some records due to multiple records sharing the same timestamp, the amount of reprocessed data remains consistent and predictable, which is a significant improvement over MirrorCheckpointConnector, where reprocessing grows exponentially as offset mappings age and become sparse.
The key tradeoff here is that MirrorCheckpointConnector achieves perfect translation for log-end offsets and exact offset mappings (though exact matches are rare), whereas the timestamp-based approach may be off by some offsets in all cases. However, this tradeoff becomes favourable when consumers lag behind the log end—even by just tens or hundreds of offsets. At this point, they fall between offset mappings, which have a minimum gap of 100 (the default of offset.lag.max). In these scenarios, the timestamp-based approach delivers equal or better accuracy while maintaining consistency regardless of how far behind the consumer has fallen, eliminating the exponential degradation problem.
If your consumers typically operate with any lag (a common pattern in production), this Connector would provide a more reliable failover experience with bounded, predictable reprocessing instead of the variable and potentially large reprocessing windows of offset sync based translation.
The MirrorGroupOffsetConnector also removes the need to manage, monitor, and troubleshoot the offset-syncs internal topic. Translation works by retrieving timestamps directly from source partitions and querying the target cluster for corresponding offsets, which is a more straightforward process with fewer moving parts and less operational overhead.
When MirrorGroupOffsetConnector starts, its tasks execute the following steps:
- Retrieve source offsets: List offsets for consumer groups (applying group and topic filters) from the source cluster using Admin.listConsumerGroupOffsets, similar to MirrorCheckpointConnector.
- Extract timestamps: For each offset, get the exact record from the source cluster to retrieve its timestamp. A dedicated source consumer is optimized to fetch minimal data per partition by configuring max.partition.fetch.bytes=1 while being assigned to multiple partitions to fetch in parallel. If no matching record is found (timeout, compaction, or deletion), translation is deferred to the next interval.
- Map to target offsets: Query the target cluster for offsets matching these timestamps using Admin.listOffsets. The API returns the earliest offset when multiple offsets share a timestamp (due to high producer throughput and broker batch processing), preventing data loss by ensuring consumers reprocess records rather than skip them.
- Sync offsets: Commit the translated offsets to target consumer groups via Admin.alterConsumerGroupOffsets.
Example of the offset translation using timestamp:
Source Offset | Timestamp | Actual Target offset | Translated Target Offset |
1001 | A | Not mirrored yet | |
1000 | B | 600 | 600 |
990 | E | 590 | 580 |
980 | E | 580 | 580 |
960 | G | 560 | 560 |
900 | I | 500 | 500 |
800 | K | 400 | 400 |
700 | L | 300 | 300 |
600 | O | 200 | 200 |
400 | P | 0 | 0 |
In this example, source offsets between 980 and 990 all have the same timestamp, E. Offset 980 is mirrored to the target offset 580 and 990 is mirrored to the target offset 590. When translating any source offset in this range, the earliest offset with timestamp E is returned, in this case, target offset 580. This means if a consumer fails over while at source offset 990, it would resume from the target offset 580 instead of 590, reprocessing 10 records. The amount of reprocessing depends on how many records share the same timestamp.
If the source consumer offset is more recent than what has been mirrored to the target cluster, translation is unavailable - similar to MirrorCheckpointConnector’s behaviour. For example, if the consumer is at source offset 1001 but mirroring has only reached offset 1000, the connector would log a debug message and skip translation. The consumer group’s target offset remains at its previous value (e.g. 600) until mirroring catches up.
Translated offsets are never committed if they would move the consumer backward. For example, if source offset 1001 has timestamp G, which translates to target offset 560, but the consumer group has already committed offset 600 on the target cluster, the update is skipped, This prevents data loss from unexpected timestamp-based regressions and matches MirrorCheckpointConnector’s behaviour. This protection is also useful if users happen to be running both MirrorCheckpointConnector and MirrorGroupOffsetConnector simultaneously—for instance, during a gradual migration between connectors. The backward-movement check prevents either connector from regressing offsets committed by the other.
Considerations when using MirrorGroupOffsetConnector
There are few things users would need to take into consideration when using the new connector. The MirrorGroupOffsetConnector requires the source cluster to be available for offset translation. In a disaster scenario where the source cluster becomes unavailable, consumers would continue from their last committed offset in the target cluster without translation. This is similar to MirrorCheckpointConnector, which relies on its last cached offset mappings from the offset syncs topic. RemoteClusterUtils can translate uncommitted offsets manually, but it also depends on reading the last available checkpoint records, making it subject to similar limitations. The configuration sync.group.offsets.interval.seconds would determine how much out of sync the consumer offsets are at the target cluster, therefore how much data would be reprocessed.
The MirrorGroupOffsetConnector relies on consistent and monotonically increasing timestamps to accurately translate consumer group offsets between clusters. Timestamp inconsistencies can cause offset translation to fail or produce incorrect results. When the source cluster uses log.message.timestamp.type=CreateTime (the default), producers control record timestamps and may set them to arbitrary or inaccurate values. This can be mitigated by configuring log.message.timestamp.after.max.ms and log.message.timestamp.before.max.ms to ensure timestamps fall within acceptable ranges.
The offset translation mechanism assumes that source and target records have identical timestamps. The target cluster must preserve the original timestamps from the source cluster therefore log.message.timestamp.type must be set to CreateTime (the default value) on the target cluster. If message.timestamp.type=LogAppendTime is configured at the topic level on the source cluster, this topic-level configuration will be mirrored to the target cluster along with the topic itself. Since topic-level configurations take precedence over broker-level settings, this can inadvertently override the target cluster's broker-level log.message.timestamp.type setting. This would cause the target cluster brokers to override the original timestamps when mirroring records, breaking offset translation. These will be documented for users to be aware of when using the new connector.
Public Interfaces
Example on configuring MirrorGroupOffsetConnector:
{
"name": "A-to-B-MirrorGroupOffsetConnector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorGroupOffsetConnector",
"source.cluster.alias": "A",
"target.cluster.alias": "B",
"group": ".*",
"source.cluster.bootstrap.servers": "localhost:9092",
"target.cluster.bootstrap.servers": "localhost:9192"
}
}
Here are other configurations that can also be set for MirrorGroupOffsetConnector in addition to the common MirrorMaker configurations:
- consumer.poll.timeout.ms
- groups.exclude
- refresh.groups.enabled
- refresh.groups.interval.seconds
- sync.group.offsets.interval.seconds
- group.filter.class
- task.assigned.groups
When running MirrorGroupOffsetConnector instead of MirrorCheckpointConnector, offset-syncs topic is no longer needed. Therefore users can disable this topic by setting emit.offset-syncs.enabled to false to avoid unnecessary resource consumption.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
There is not impact on existing users as there won't be any change for the existing MirrorMaker connectors.
- If we are changing behaviour how will we phase out the older behaviour?
No change to existing behaviour. Users can easily switch from MirrorCheckpointConnector to MirrorGroupOffsetConnector
- If we need special migration tools, describe them here?
No need of special migration tool. Users can stop MirrorCheckpointConnector and then start MirrorGroupOffsetConnector. Or they can even run them side by side temporarily while migrating.
- When will we remove the existing behaviour?
There is no plan to remove MirrorCheckpointConnector. MirrorGroupOffsetConnector offers alternative for consumers that tend to lag behind.
Test Plan
There will be unit tests and integration tests added following the existing pattern for MirrorMaker connectors.
Rejected Alternatives
Creating a tool similar to RemoteClusterUtils that translates older offsets more consistently using timestamp could be simpler than running it as a Connector. The mechanism relies on the source cluster being available in order to get the timestamps of the offsets. So running it as a tool like this would not be useful if there is an unplanned disaster, where the source cluster is not available. By running it as a Connector like MirrorCheckpointConnector, would continuously sync the consumer offsets to the target cluster. In the event of disaster, consumers can failover to the target cluster using the offsets that were committed last. The configuration sync.group.offsets.interval.seconds would determine how much out of sync the consumer offsets are at the target cluster, therefore how much data would be reprocessed.