DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Luke Chen, Federico Valeri, Omnia Ibrahim, PoAn Yang, Kuan-Po Tseng, Jiunn-Yang Huang
Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-20186 - Getting issue details... STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka deployments often require replicating data across geographically distributed clusters for disaster recovery (DR), regulatory compliance, data locality, cluster migrations or active-active architectures. While MirrorMaker 2 (MM2) provides cross-cluster replication capabilities, it presents significant operational challenges.
- Operational Burden: MM2 runs as standalone Connect workers external to Kafka brokers, requiring separate deployment, monitoring, and lifecycle management. Operators must provision additional hosts, manage Connect-specific configurations, and coordinate MM2 upgrades independently from Kafka broker upgrades.
- Compression Cost: If source cluster records are compressed, MM2 will decompress and compress them again when producing to the destination cluster. These redundant operations decrease the mirroring throughput and increase latency.
- Lossy Offset: The offset translation process in MM2 is inherently lossy. When translating an offset from the source cluster to the target cluster, MM2 cannot guarantee returning the exact same record. It is not possible to maintain a complete in-memory mapping of source to target offsets for all mirrored records. When an exact translation is unavailable, MM2 guarantees that the record at the translated offset is always earlier than the actual record, ensuring consuming applications never skip data at the cost of potential reprocessing. This conservative approach can lead to significant duplicate processing during failover scenarios, particularly for high-throughput topics where offset translation granularity is coarse.
- External Offset Management: Advanced streaming platforms like Apache Flink and Apache Spark, Kafka Connect source connectors, and transactional applications often store consumed offsets externally rather than in Kafka's __consumer_offsets topic. When a failover happens, these applications face additional complexity because source and destination offsets don't match. Applications must query MM2's offset-sync internal topic to translate offsets, adding operational complexity and potential failure points. This offset translation dependency complicates DR procedures and increases the risk of incorrect offset mapping leading to data loss or duplicate processing.
Cluster Mirroring addresses these operational challenges by integrating cross-cluster replication directly into Kafka brokers, providing a simpler and more robust solution for cross-cluster replication. Producers write to the source cluster and receive acknowledgments based on the source cluster's replication requirements (e.g. acks=all ensures replication to all in-sync replicas within the source cluster). Data is then asynchronously replicated to destination clusters with no impact on producer latency or throughput. While Cluster Mirroring is optimized for geo-replication, DR and migration use cases where a single source cluster replicates to one or more destination clusters, it also provides a foundation for more complex topologies.
- Integrated Architecture: Replication logic runs within broker processes, eliminating external dependencies and reducing the operational footprint.
- Simplified Configuration: Creating a cluster mirror requires a single command-line invocation or Admin API call with bootstrap servers and security credentials.
- Metadata Synchronization: Topic configurations, consumer group offsets, and ACLs are periodically synchronized from source to destination cluster without additional configuration.
- Unified Monitoring: Mirroring metrics are exposed through standard Kafka broker JMX metrics alongside existing replication metrics. Operators use familiar tools and dashboards to monitor cross-cluster replication.
- Faster Failover: The failover operation is simplified because metadata synchronization is continuous and automatic. Consumer applications can resume processing immediately after switching clusters without any offset translation.
- Delta Failback: Destination leader acts as a follower with regards to source leader, so it will always fetch from the local log end offset to catch up with the leader, making it possible to mirror only the delta when failing back (reverse mirroring).
- Version Compatibility: For migration or DR use cases where failback is not required, this proposal supports source brokers up to version 2.1.0 included, leveraging the client/broker forward compatibility introduced in v4.0.
Cross-datacenter network latency makes synchronous replication impractical for some deployments. Requiring synchronous acknowledgment from a geographically distant cluster would introduce significant latency (typically 50-200ms for inter-region replication), making it unsuitable for latency-sensitive applications. Asynchronous replication should provide the right balance for DR use cases where availability and performance of the primary cluster must not be compromised by cross-datacenter latency. Applications requiring zero data loss across cluster failures can wait for the follow-up KIP that will extend this design to support synchronous mirroring, or handle the lag using application-level caching. In the event of a catastrophic failure of the source cluster, recently produced records that have not yet been replicated to the destination cluster will be lost. The amount of data loss depends on replication lag at the time of failure. Organizations must handle a non-zero RPO determined by the replication lag between source and destination clusters. Typical replication lag ranges from seconds to minutes depending on network bandwidth, throughput, and geographic distance.
Stretched clusters are not suitable for DR scenarios because they provide no protection against software failures, configuration errors, and accidental topic deletions. Vendors that recommend stretched cluster deployments typically position them for high availability (HA) rather than DR, and notably, most do not offer stretched clusters as a managed service option, further underscoring the operational challenges and limited DR effectiveness of this architecture.
Proposed Changes
Cluster Mirroring introduces a coordinator-based architecture integrated into Kafka brokers for managing cross-cluster replication. The design consists of three primary components that work together to provide automatic metadata synchronization and data replication. The following diagram illustrates how these components are wired together. For the sake of clarity, some internal APIs are excluded from the overview diagram.
The mirror name is stored as a topic-level internal configuration called mirror.name that has the same validation rules of topic names, and propagates through Kafka's metadata log as configuration change records. When topics are added to a mirror, the controller generates configuration metadata records that are replicated to all brokers through the standard metadata update mechanism. Brokers monitor these configuration changes to detect when partitions they lead belong to a mirror, triggering the creation of mirror fetchers and enforcement of read-only semantics. This design ensures that mirror associations are visible, auditable, and manageable through standard Kafka tools while maintaining strict control over how mirroring relationships are established and modified.
Main Components
MirrorCoordinator
The MirrorCoordinator (MC) manages Cluster Mirroring state using a partitioned coordinator pattern similar to the group and transaction coordinators.
We use a composite key (mirror name, topic id, and partition number) to distribute coordination work across the __mirror_state topic's partitions, which is the internal compacted topic used to store mirror metadata. Each mirror partition independently hashes to a coordinator, spreading the load across all brokers in the cluster. This means a mirror with hundreds of partitions will have its state management distributed evenly across the cluster rather than concentrated on a single broker.
Responsibilities:
- State Management: Mirror partition states and control records are stored in __mirror_state internal topic. The coordinator loads these metadata on startup and partition leadership changes.
- Partition Assignment: Mirror partitions are distributed evenly to coordinators across all brokers and allows for horizontal scaling. The number of coordinator partitions is configurable via mirror.topic.num.partitions.
- Leader Election: When a broker becomes the leader for a __mirror_state partition, it loads the mirror metadata for all mirrors assigned to that partition and begins coordinating those mirrors. On resignation, it clears its in-memory state to avoid stale metadata.
- Metadata Refresh Scheduling: The coordinator schedules periodic metadata refresh operations by invoking a metadata manager every 30 seconds by default. This ensures that topology changes, configuration updates, and offset commits in the source cluster are continuously propagated to the destination cluster. The refresh interval is configurable via mirror.metadata.refresh.interval.ms.
- State Transitions: The coordinator manages asynchronous state transitions for mirror partitions. Each partition is an independent replication unit with its own state. When the coordinator is the leader for a mirror partition, it writes the state updates directly to __mirror_state topic, otherwise it reads and writes the state via new internal RPCs, enabling distributed coordination across the cluster.
State descriptions:
- UNKNOWN: The partition has no cached state (broker just became leader, state not loaded yet). Not an explicit API-driven state, just the absence of state.
- PREPARING: The coordinator for this partition detects via onMetadataUpdate that it leads a mirror partition. It fetches last mirrored epochs from the source cluster and truncates logs to align the local log with the source.
Valid from: UNKNOWN, MIRRORING, STOPPED. - MIRRORING: All ISR set members have completed truncation. A mirror fetcher thread is started to continuously replicate records from the source cluster. Valid from: PREPARING, PAUSED.
- PAUSING: Triggered by the pause operation. The system removes fetchers for the affected partitions. Valid from: MIRRORING only.
- PAUSED: Fetchers have been removed. The partition stays read-only with no active fetchers and no metadata sync (configs, consumer groups, ACLs). On resume, transitions directly to MIRRORING. Valid from: PAUSING only.
- STOPPING: Triggered by the remove topics from mirror operation or topic deletion on the source. The system stores the last mirrored epoch to __mirror_state topic and bumps the leader epoch. Valid from: MIRRORING, PAUSED.
- STOPPED: The last mirrored epoch have been persisted. The topic becomes writable on the destination cluster. Tthe mirror fetcher is removed and the read-only flag is cleared. Valid from: STOPPING only.
- FAILED: An error occurred. Valid from: any state. The operator that wants to restart a failed mirror partition can remove the topic from the mirror and add it back again. The STOPPING handler already removes fetchers and truncates to the LSO, both of which are safe operations on a failed partition (fetchers are likely already gone, and truncation is a best-effort cleanup). More sophisticated recovery strategies can be added later with a follow-up KIP.
Example scenarios:
- Starting a mirror (UNKNOWN -> PREPARING -> MIRRORING): The addTopicsToMirror command sets mirror.name config via the controller. The metadata update propagates to brokers. The broker leading the partition finds out the partition state via the coordinator, and this might trigger readMirrorState RPC to query from the remote coordinator responsible for handling that mirror partition, and transitions to PREPARING if it's in a valid transition state. After truncation completes, it moves to MIRRORING and starts the mirror fetcher to fetch data from the source cluster.
- Failing over to destination (MIRRORING -> STOPPING -> STOPPED): The removeTopicsFromMirror command appends the ".removed” suffix to the mirror.name config. The partition leader detects the stop request, transitions to STOPPING, truncates to LSO, persists the last mirrored epoch, then moves to STOPPED. The topic is now writable after the STOPPED state.
- Restarting a stopped mirror (STOPPED -> PREPARING -> MIRRORING): The mirror.name config is set again. The controller is notified, sees the partition in STOPPED state and transitions to PREPARING, re-truncating and resuming replication.
MirrorMetadataManager
The MirrorMetadataManager (MMM) implements periodic metadata synchronization between source and destination clusters. It maintains persistent network connections to all source clusters. During periodic metadata refresh, the broker validates that the source cluster ID has not changed. If a mismatch is detected, metadata sync for that mirror is halted and an error is logged. This prevents silent data corruption in case of misconfiguration or unintended source cluster replacement.
Responsibilities:
- Connection Management: The manager maintains a connection pool with one blocking sender per source cluster. These connections are created lazily when the first topic for a mirror is added. Each sender uses the security credentials and network settings from the mirror configuration, allowing different mirrors to use different authentication mechanisms.
- Topic Metadata Synchronization: Every refresh cycle, the manager fetches topic metadata from source clusters using standard MetadataRequest calls. For each topic in the mirror configuration:
- Topic Creation: If a topic exists in the source but not the destination, the manager sends a CreateTopics request to the controller with identical partition count and configurations.
- Partition Expansion: If the source topic has more partitions than the destination, the manager sends a CreatePartitions request to scale up the destination topic to match.
- Configuration Sync: Topic configurations are compared between source and destination. Any differences trigger an IncrementalAlterConfigs request to align destination configs with the source.
- Topic Deletion: When a topic is deleted on the source cluster, the mirror partitions on the destination cluster moves to STOPPED state. This prevents accidental deletions to affect the destination cluster. In case it was intentional, the operator would need to manually remove the topic from the mirror.
- Consumer Group Offset Synchronization: The manager synchronizes classic and share consumer group offsets to enable seamless failover (no offset translation):
- Lists all consumer groups using ListGroups request.
- Fetches committed offsets for each group using OffsetFetch request or DescribeShareGroupOffsets request.
- Commits those offsets to the destination cluster's group coordinator using the internal OffsetCommit or AlterShareGroupOffsets request.
- ACL Synchronization: Access control lists are mirrored from source to destination to maintain consistent security policies:
- Fetches all ACLs from the source using DescribeAcls request.
- Compares with the destination cluster's current ACLs from the metadata image.
- Creates missing ACLs using CreateAcls request.
- Deletes ACLs that exist in destination but not in source using DeleteAcls request.
Cluster Mirroring allows users to modify configurations in the destination cluster, though these changes are periodically overridden by the topic configuration synchronization cycle. This design choice was made because while dynamic configuration changes could be blocked, static configuration changes via properties files cannot be prevented, making override inevitable. However, this approach presents challenges in environments with external governing systems like the Strimzi operator, where the continuous reconciliation process conflicts with the refresh cycle, potentially causing performance impacts. More critically, temporary configuration mismatches such as reduced retention periods or altered partition counts could lead to data loss or missing partitions until the next synchronization cycle detects and corrects the discrepancy, highlighting the need for careful operational awareness when mixing mirroring with external cluster management solutions.
Metadata synchronization operates at the mirror level rather than the partition level, so it uses a separate coordinator assignment based on the mirror name alone. Only the broker assigned as the metadata coordinator for a given mirror performs synchronization, and it applies changes only to the mirror partitions it manages. This avoids both redundant synchronization across brokers and unnecessary updates to partitions managed by other coordinators.
Each mirror can define its own filtering rules independently, loaded from the manager at each refresh cycle:
- The mirror.topic.properties.exclude config controls which topic configuration properties are excluded from synchronization using regex patterns.
- follower.replication.throttled.replicas, leader.replication.throttled.replicas, message.timestamp.difference.max.ms, log.message.timestamp.before.max.ms, log.message.timestamp.after.max.ms, message.timestamp.type, unclean.leader.election.enable, min.insync.replicas, mirror.name (default)
- .*throttled.* (exclude all throttle-related properties)
- min.insync.replicas,unclean.leader.election.enable (exclude only these two properties)
- The mirror.groups.include config controls which consumer group offsets are synced using regex patterns.
- .* (all groups by default)
- app-.* (only sync groups starting with app-)
- app-.*,service-.* (sync groups starting with app- or service-)
- The mirror.acl.include config controls which ACLs are synced using semicolon-separated rules with the format resourceType;resourceName;operation;permissionType;principal. Use * as a wildcard for any field. The resourceName and principal fields support regex. Trailing wildcard fields can be omitted:
- * (all ACLs by default)
- TOPIC;orders.* (all ACLs for topics matching orders.*)
- *;*;*;*;User:alice (all ACLs for principal User:alice)
- *;*;*;*;User:app-.* (all ACLs for principals matching User:app-.*)
- TOPIC;*;READ;ALLOW (all topic READ/ALLOW ACLs)
- GROUP;consumer-.*;READ;ALLOW;User:bob (READ/ALLOW ACLs on groups matching consumer-.* for User:bob)
- TOPIC;orders.*,*;*;*;*;User:alice (sync all topic ACLs for orders.* topics and all ACLs for User:alice)
MirrorFetcherThread
The MirrorFetcherManager (MFM) extends AbstractFetcherManager to handle fetcher thread lifecycle for mirror partitions. It uses a three-dimensional key (fetcher ID, source broker endpoint, mirror name) to organize threads, ensuring that:
- Partitions from different mirrors use separate threads for authentication isolation.
- Partitions from the same mirror are distributed across multiple threads for load balancing.
- Leader changes in the source partition trigger thread reassignment or recreation to the new source broker.
The MirrorFetcherThread (MFT) is a specialized implementation of AbstractFetcherThread that handles cross-cluster data replication with consumer Fetch requests and different epoch semantics than standard intra-cluster replication, but keeping the same log consistency validations. The destination cluster's replica is not registered as a follower in the source cluster. Using a follower Fetch request would cause the source broker to attempt updating follower replica status for a replica it doesn't know about. A consumer Fetch request avoids this issue, as it carries no such side effects on the source broker's replica state. In other words, destination partition leaders operate in a dual-role. They act as followers when fetching committed data from the source cluster leader up to the last stable offset (LSO), while simultaneously serving as leaders for their local replicas in the destination cluster.
A mirror topic is created with the same topic ID as in the source cluster. This serves two purposes: it satisfies fetch request validation on the source broker, and it enables identity verification during failback where the destination cluster can confirm it is working with the exact same topic by comparing topic IDs. To maintain data consistency, destination partitions are marked as read-only and reject produce requests from clients with ReadOnlyTopicException.
Mirror Leader Epoch
In KAFKA-18723, we identified a race condition where a late-arriving fetch response could contain corrupted or inconsistent records. The fix ensures that only record batches whose partition leader epoch is less than or equal to the leader epoch in the Fetch request are appended. The destination cluster stores batches using the leader epoch from the source cluster. For the mirroring leader in the destination cluster, this works naturally: the leader epoch in the Fetch request is set to the latest source cluster leader epoch, so the existing validation applies without issue. For followers in the destination cluster, however, the situation is different. During mirroring, the local leader epoch diverges from the batch leader epoch. The fetched batch may carry a leader epoch of X while the local leader epoch is Y, where X > Y or X < Y. In either case, the fix no longer applies correctly. To address this, we introduce the MirrorLeaderEpoch field in the Fetch request and response.
The CurrentLeaderEpoch in Fetch response serves 2 purposes:
- The leader will verify it to make sure the fetch request is up-to-date
- The fetch response receiver will use it to validate the records in the fetch response.
For followers in the destination cluster, the CurrentLeaderEpoch can only serve for the first purpose. For the second purpose, because of the leader epoch inconsistency in the batches and the local metadata, the validation will not work. Therefore, the MirrorLeaderEpoch in the Fetch request will be set to the latest leader epoch in the leader's log, and the CurrentLeaderEpoch will still be set to the local current leader epoch. This way, when receiving the Fetch response, the follower's validation can work as expected.
Log Convergence
There are two main concepts to keep in mind when dealing with log convergence across Kafka clusters:
- Last Mirrored Epoch (LME): The greatest leader epoch of a given partition that a destination cluster recognizes from the source cluster and stores in the __mirror_state internal topic. It represents the synchronization point between source and destination. After the log is truncated to LME, the destination cluster does not contain any record with a leader epoch beyond the LME. Only the source cluster owns leader epochs exceeding the LME. This ensures the source leader epoch remains the source of truth, even when epoch histories diverge across clusters during asynchronous mirroring.
- Leader Epoch Bump (LEB): The leader epoch in the destination cluster remains unchanged during replication. That means it is possible that the fetched batch carries a leader epoch of X that does not match with the local leader epoch. To ensure the leader epoch remains monotonically increasing, it is incremented when a partition becomes writable after failover. New records produced on the cluster will then carry an epoch higher than anything in the existing log.
A two-phase truncation protocol is applied by the mirror fetchers:
- LME truncation: Synchronizes leader epoch history between clusters. Truncates log at the start offset of the first non-mirrored epoch and waits until all ISRs complete truncation. After this phase: epoch histories match, but log may still diverge within the last epoch.
- Replication protocol truncation: Triggered by leader epoch comparison during fetch. Handles offset-level divergence within remaining epochs. Truncates to the exact offset where the source's epoch ends. After this phase: logs fully converged, normal replication proceeds.
A) Cluster B mirrors from source cluster A for the first time. A has no LME knowledge for this partition, so it returns epoch -1. B truncates everything and replicates from scratch.
B) Mirroring stops on B. B stores LME=1 and bumps to epoch 2. Meanwhile A also bumps to epoch 2 and gets a new record. Then A starts mirroring from B. A gets LME=1 from B, truncates the new record, and starts replicating from there.
Unclean Leader Election
Cluster Mirroring relies on leader epoch alignment between source and destination to guarantee log consistency. This mechanism assumes that the source cluster's log is an authoritative, append-only sequence of records for each leader epoch. That assumption holds as long as leader elections on the source are clean, meaning each new leader was a fully caught-up ISR member and no committed records were lost during the transition. When unclean leader election (ULE) occurs on the source cluster, this assumption breaks. An out-of-sync replica becomes leader and the source log silently loses committed records from the previous epoch. The source cluster's log now has a gap or a divergent suffix that was never replicated to the destination.
In this example we see how LME truncation and the subsequent replication protocol resolve an ULE that happens before mirroring starts.
1) Cluster B is mirroring from cluster A, and the leader node in A has a failure.
2) Unclean leader election is triggered in cluster A. The new leader only contains a record at offset 0. Then 3 more records are appended.
3) Before cluster B detects the leadership change, failover to B. And when A failback, the log truncation of LME will truncate records beyond epoch 1.
The problem is that Cluster Mirroring cannot detect log divergence caused by ULE that happens after LME truncation. A non-ISR replica that missed truncation may still hold records with leader epochs beyond the LME. If that replica becomes leader through unclean election, the replication protocol cannot detect the divergence.
1) Cluster B is mirroring from A, and there are ULEs triggered 3 times in cluster A. This is the first time, and the new leader appends 3 records with epoch 2.
2) After the second ULE, the new leader is the ex-leader before step 1. Now, failover to B, B bumps leader epoch to 2 and appends record in offset 3. And A starts to mirror B.
3) After the third ULE, the leader is the ex-leader in step 1. Now the log diverge cannot be resolved by the replication protocol.
A new dynamic topic level configuration mirror.support.unclean.leader.election (boolean, default false) is introduced to support ULE. When enabled, LME log truncation waits for all replicas (not just ISR members) to join the ISR and complete the truncation. This ensures that every replica has been truncated past the LME, so even subsequent unclean leader elections cannot introduce undetectable divergence. If some replicas cannot catch up due to slow network or disk issues, mirroring remains pending or enters the FAILED state, requiring manual intervention (resolving the issue or reassigning partition replicas to healthy brokers). This is a dynamic configuration, so it can be enabled at any point during the mirroring lifecycle. If it is enabled before mirroring starts, all records are guaranteed to be consistent between the two clusters, even if an unclean leader election occurs. If it is enabled when mirroring is already running, only records produced after the next LME log truncation are guaranteed to be consistent. If some replicas cannot catch up with the leader during LME log truncation due to slow network, disk issues, or other failures, the mirror partition moves to the FAILED state. In this case, users have to manually resolve the underlying issue or reassign the partition replicas to healthy brokers, and then restart mirroring.
Existing Features Integration
Batch Compression
Cluster Mirroring preserves the compression format of record batches from the source cluster without recompression. When mirroring data, compressed record batches are copied directly from the source to the destination cluster, maintaining the original compression type (gzip, snappy, lz4, zstd, or none) and the exact byte-level representation of the data. This approach avoids unnecessary CPU overhead from decompression and recompression during replication, ensures bit-for-bit data integrity, and prevents potential issues with different compression implementations producing different outputs for the same data.
Topic Compaction
Cluster Mirroring fully supports log compacted topics, preserving both compacted records and offset gaps from the source cluster. When a topic uses cleanup.policy=compact, Kafka removes obsolete records with duplicate keys, creating gaps in the offset sequence. For example, if a source partition contains offsets 0-100 and compaction removes records at offsets 30-40 and 60-70, the remaining records will have gaps: offsets 0-29, 41-59, and 71-100 are missing. The mirror leader replicates these compacted log segments exactly as they exist in the source cluster, maintaining the same offset assignments and gaps. After failover, when the mirror topic becomes writable, log compaction continues normally in the destination cluster according to the topic's compaction policy, and any new records produced locally will fill in after the highest mirrored offset.
When a destination cluster lags behind the source on a compacted topic, tombstone records may not yet be replicated at the time of failover. For example, if the source has a tombstone at offset 100 that deletes a key originally written at offset 3, but the destination has only replicated up to offset 50, that tombstone is never applied on the destination. After failover, offset 3 remains as a stale entry that will never be cleaned up. The problem is compounded on failback: truncating the former source to match the destination also discards the tombstone, so the orphaned key persists in both clusters permanently. This is an inherent limitation of asynchronous replication. Compaction correctness depends on the full sequence of tombstones being present, and any that fall beyond the replication watermark at failover time are lost. The same problem exists with MirrorMaker 2. The follow-up KIP for synchronous mirroring may address this by ensuring zero lag at switchover, guaranteeing all tombstones are replicated before failover occurs.
Topic Retention
Cluster Mirroring handles topic retention policies by periodically synchronizing the topic configurations from the source cluster, ensuring that the topic retention policies are consistent. When the source cluster applies retention policies, older log segments are deleted and the log start offset advances. For example, if a topic originally contained offsets 0-100 and retention deletes offsets 0-99, the source cluster's log start offset becomes 100. When the mirror leader fetches from the source, it discovers the new log start offset and updates its local log start offset to match, creating the same offset gap. If a mirror follower attempts to fetch from an offset below the source cluster's log start offset (e.g. fetching offset 50 when log start offset is 100), the source broker returns an OffsetOutOfRangeException. The mirror leader handles this by truncating its local log to the source's current log start offset and resuming fetching from that point. This ensures the destination cluster mirrors the current retention state of the source cluster without attempting to replicate already-deleted data.
Consumer Groups
Cluster Mirroring synchronizes consumer group offsets from the source cluster to the destination cluster, enabling consumers to resume consumption from their last committed offset after failover. The MirrorMetadataManager periodically fetches consumer group committed offsets from the source cluster and replicates it to the destination cluster's. This ensures that consumer groups maintain their consumption progress across both clusters. During offset synchronization, the committed offset in the destination cluster may temporarily exceed the current log end offset (LEO) of the mirror topic. For example, if a consumer commits offset 100 in the source cluster but the destination cluster has only mirrored up to offset 80 (LEO = 80), the MirrorMetadataManager still commits offset 100 to the destination cluster. This is acceptable because the mirror leader continues fetching data and the LEO will eventually advance to include offset 100. However, if a failover occurs before the mirrored data catches up, consumers attempting to resume from offset 100 will receive an OffsetOutOfRangeException. To handle this scenario gracefully, consumers should configure auto.offset.reset=latest when consuming from mirror topics. This ensures that if a committed offset is beyond the current LEO after failover, the consumer automatically resets to the latest available offset rather than failing or resetting to the earliest offset.
Security Control
Cluster Mirroring supports comprehensive security controls through both authorization and authentication mechanisms. This ensures that only authorized principals can establish and manage cluster mirrors. When configuring a mirror, operators specify ACLs that should be synchronized from the source cluster, and these ACLs are periodically replicated to the destination cluster to maintain consistent access control policies across both environments.
When connecting to the source cluster, Cluster Mirroring requires only the bootstrap server address and appropriate credentials, no other sensitive cluster information is exposed or required. The destination cluster's mirror configuration supports all standard Kafka authentication mechanisms including TLS/SSL for encrypted transport and SASL for client authentication. Each mirror can be configured with its own security settings, allowing different mirrors to connect to source clusters with varying security requirements. This enables secure cross-cluster replication even when source and destination clusters use different authentication protocols or when connecting across security boundaries such as on-premises to cloud environments. All credentials are stored as mirror configuration records in the destination cluster metadata log, and used exclusively for establishing authenticated connections to the source cluster.
Source cluster permissions (mirror principal):
RPC | Component | ACL Operation | ACL Resource | Purpose |
| Fetch | MFT | Read | Topic | Data replication |
| Metadata | MMM | Describe | Topic | Topic discovery and leader tracking |
| DescribeConfigs | MMM | DescribeConfigs | Topic | Topic configuration sync |
| ListGroups | MMM | Describe | Group | Consumer group offset sync |
| OffsetFetch | MMM | Describe | Group | Consumer group offset sync |
| DescribeAcls | MMM | Describe | Cluster | ACL synchronization |
| DescribeMirrors | MC | Read | Cluster | Log truncation when preparing |
| ApiVersions | MMM | Feature negotiation | ||
| ListOffsets | MFT | Describe | Topic | Offset bounds discovery |
| OffsetsForLeaderEpoch | MFT | Describe | Topic | Leader epoch validation for truncation |
Destination cluster permissions:
RPC | Component | ACL Operation | ACL Resource | Purpose |
| CreateMirror | Controller | Create | ClusterMirror | New cluster mirror creation |
| AddTopicsToMirror | Controller | Alter | ClusterMirror | Mirror topics creation |
| AddTopicsToMirror | Controller | Read | Topic | Mirror topics creation |
| RemoveTopicsFromMirror | Controller | Alter | ClusterMirror | Mirror topics removal (failover) |
| RemoveTopicsFromMirror | Controller | Read | Topic | Mirror topics removal (failover) |
| PauseMirrorTopics | Controller | Alter | ClusterMirror | Mirror topics pause |
| PauseMirrorTopics | Controller | Read | Topic | Mirror topics pause |
| ResumeMirrorTopics | Controller | Alter | ClusterMirror | Mirror topics resume |
| ResumeMirrorTopics | Controller | Read | Topic | Mirror topics resume |
| DeleteMirror | Controller | Alter | ClusterMirror | Delete a cluster mirror |
| ListMirrors | Broker | Describe | ClusterMirror | Mirror topic listing |
| DescribeMirrors | Broker | Describe | ClusterMirror | Mirror topic describe (state, lag) |
| DescribeConfigs | Broker | DescribeConfigs | ClusterMirror | Mirror configuration describe |
| WriteMirrorStates | MC | ClusterAction | Cluster | Mirror partition state write |
| ReadMirrorStates | MC | ClusterAction | Cluster | Mirror partition state read |
| BumpLeaderEpochs | MC | ClusterAction | Cluster | Leader epoch bump when stopping |
| FindCoordinator | Broker | ClusterAction | Cluster | Mirror coordinator location |
| CreateTopics | MMM | Create | Topic | Topic creation |
| CreatePartitions | MMM | Alter | Topic | Partitions scaling |
| IncrementalAlterConfigs | MMM | AlterConfigs | ClusterMirror | Mirror configuration update |
| OffsetCommit | MMM | Read | Topic | Source CG offsets commit |
| OffsetCommit | MMM | Read | Group | Source CG offsets commit |
| CreateAcls | MMM | Alter | Cluster | Source ACLs creation |
| DeleteAcls | MMM | Alter | Cluster | Source ACLs removal |
An operator can grant ClusterMirror:*:CREATE,ALTER,DESCRIBE for full mirror management, or scope it to specific mirrors like ClusterMirror:prod-dr:DESCRIBE for read-only monitoring of a single mirror, without granting any broker-level privileges.
Idempotent Producer
The approach is to proactively expire the stale producer state on failover. The key insight is that, during mirroring, the destination partition is read-only: no local producers exist, so all ProducerStateManager (PSM) entries originate from mirrored data. When mirroring stops, all PSM entries are stale and can be safely expired. Records from the source are stored as-is on the destination, with no PID modification, which otherwise would require a checksum recalculation. A MIRROR_PID_RESET control record (type 7) is written to each destination partition's log during the STOPPING state transition, after the fetcher has been removed and truncation to LSO is completed, but before the partition becomes writable.
When a failover happens, the relevant mirror partition state transitions are:
- STOPPING: Remove fetchers, truncate to LSO, persist LME, write MIRROR_PID_RESET control record.
- STOPPED: Partition is writable (terminal state, no actions).
The key follows the standard control record format (version=0, type=7). The value uses the following schema:
{
"type": "data",
"name": "MirrorPidResetRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Version", "type": "int16", "versions": "0",
"about": "The version of the mirror PID reset record."},
{ "name": "SourceClusterId", "type": "string", "versions": "0",
"about": "The source cluster UUID for verification."}
]
}
The SourceClusterId field records which source cluster the mirrored data came from, enabling future validation (e.g. detecting unexpected source cluster changes) and data provenance tracing from the log itself.
When the control batch is encountered during append or during log recovery, all producer entries are removed from the PSM. This ensures both leaders and followers handle the control batch barrier consistently. Given that the partition is read-only during mirroring, all PSM entries originate from mirrored data. Expiring all entries is safe: no local producer state exists to preserve. Control batches are filtered out by the consumer fetcher via isControlBatch checks, just like transaction markers (commit/abort). The log dump tool is enhanced to deserialize MIRROR_PID_RESET records.
The control record approach works correctly with all practical mirroring topologies:
- Active-passive (A to B): B mirrors from A, stores records as-is. On failover, the MIRROR_PID_RESET record expires all PSM entries. Local producers get fresh PIDs from the coordinator with no collision risk.
- Failback (A to B, then B to A): After failover, B becomes writable. Later, A starts mirroring from B and truncates its log to the LSO. A then stores B's records as-is. B's MIRROR_PID_RESET record is included in the fetched data and appended to A's log, triggering PSM expiration on A. This is consistent with the general rule: when the MIRROR_PID_RESET record is encountered during append or during log recovery, all producer entries are removed from the PSM. A will write its own MIRROR_PID_RESET record when it eventually stops mirroring from B, producing a clean slate before A becomes writable again.
- Fan-out (A to B, A to C): B and C mirror independently from A, each with its own PSM per partition. On failover, each writes its own MIRROR_PID_RESET record independently.
- Fan-in (A to C, B to C, different topics): Each topic's partitions have independent PSMs. The MIRROR_PID_RESET record is written per partition during the STOPPING transition of each mirror.
- Chain (A to B to C): B mirrors from A, stores records as-is. C mirrors from B, stores records as-is. On failover at any point in the chain, the MIRROR_PID_RESET record expires all PSM entries on the stopping node. Longer chains work inductively by the same principle.
Exactly-Once Semantics
Cluster Mirroring ensures transactional consistency when stopping by truncating to the last stable offset. Note that this doesn't mean it supports exactly-once semantics (EOS) across clusters, which would require synchronous communication.
During the mirror stopping transition, the MirrorCoordinator performs a log truncation operation that resets each mirror partition to its LSO. This offset represents the point in the log where all transactions have been decided (committed or aborted), essentially the highest offset where data is known to be consistent from a transactional perspective. Any records beyond this point may belong to incomplete transactions and should not persist after mirroring stops. Note that the actual lag may be greater than what's reported by the metrics. This approach prevents a critical consistency issue: the destination cluster could retain partial transaction data that would never be completed since mirroring has stopped. This would leave the topic in an inconsistent state where read_committed consumers may be blocked due to incomplete transaction data. Additionally, the transaction coordinator would not be able to rollback these hanging transactions because there would be no __transaction_state metadata in the destination cluster.
Kafka consumers with isolation.level=read_committed determine transaction visibility using only the LSO, which is computed from COMMIT/ABORT control markers in the log. Consumers never interact with the transaction coordinator or validate producer IDs. This separation between log-level markers (replicated) and coordinator state (not replicated) is why transactional consumers work correctly on mirror topics without mirroring coordinator state. The LSO truncation during failover ensures all remaining transactions have mirrored markers, maintaining this guarantee.
Consider this source cluster log:
Offset | Type | IsTxn | PID | Content |
0 | DATA_RECORD | true | 4001 | key=A, value=1 |
1 | DATA_RECORD | true | 4001 | key=B, value=2 |
2 | DATA_RECORD | true | 4002 | key=X, value=9 |
3 | CONTROL_MARKER | true | 4001 | COMMIT marker for PID 4001 |
4 | CONTROL_MARKER | true | 4002 | ABORT marker for PID 4002 |
5 | DATA_RECORD | false | none | key=Z, value=10 |
If replication reaches offset 4 and the source cluster fails, the destination cluster contains data records for transaction 4002 (offset 2) without the abort marker (offset 4). This creates a hanging transaction that can never be committed or aborted on the destination cluster. Note that this approach causes data loss for any in-flight transactions or non-mirrored completed transactions when we experiencing a lag during the failover and may result in already-processed records being lost if consumers on the destination cluster read uncommitted data.
Bandwidth Control
Cluster Mirroring adopts a dual-sided throttling mechanism that extends Kafka's existing bandwidth control capabilities to work across cluster boundaries.
- Destination Cluster Throttling: To avoid conflicts with intra-cluster replication controls, mirror-specific throttling configurations operate independently from standard replication throttling. The system provides two configuration levels: a broker-level rate limit (mirror.replication.throttled.rate) that sets the overall bandwidth ceiling for mirror replication traffic, and a topic-level replica list (mirror.replication.throttled.replicas) that specifies which partition-broker combinations should be throttled using the standard partition-index:broker-id notation. Operators can dynamically adjust throttling rates at runtime without restarting brokers, first setting a cluster-wide default rate, then fine-tuning specific topic partitions as mirroring progresses. This allows gradual bandwidth allocation as mirror relationships are established.
- Source Cluster Throttling: The source cluster side requires a different approach because mirror fetch requests operate as consumer traffic rather than replication traffic. This design is intentional since the mirroring must fetch only up to the LSO to maintain transactional consistency, which is a consumer-level guarantee not available through the replication protocol. Consequently, standard leader replication throttling mechanisms cannot apply to mirror traffic. Instead, the source cluster leverages Kafka's client quota system. Each mirror fetcher thread presents itself with a deterministic client identifier that encodes the broker ID, fetcher thread number, and mirror name. Operators can apply per-client byte rate quotas to these identifiers, effectively throttling the outbound mirror traffic from the source cluster. This approach integrates seamlessly with Kafka's existing quota enforcement infrastructure.
In a follow-up KIP we will add source-side throttling that allows source cluster leaders to limit bandwidth served to all mirror fetchers, similar to how leader.replication.throttled.rate controls intra-cluster replication. This provides independent control over mirror catch-up traffic without impacting local replication or consumer workloads. Combined with destination-side throttling, operators gain complete bidirectional bandwidth control for mirror traffic.
Tiered Storage
Mirror topics in the destination cluster currently only replicate data from local storage on the source broker. Integrating with tiered storage would allow mirroring to handle data that has been offloaded to remote storage (e.g., S3, HDFS), enabling full replication of topics with long retention periods without requiring all data to reside in local broker storage. A detailed design of the metadata synchronization protocol, API schema, and state management will be provided in a follow-up KIP.
Share Group
Cluster Mirroring supports both traditional consumer groups and share consumer groups (Kafka Queue functionality) to ensure seamless failover for all consumer types. While the data mirroring mechanism remains identical, the offset synchronization strategy differs based on the group type. Share consumer groups use a different offset management model based on Share-Partition Start Offset (SPSO) and Share-Partition End Offset (SPEO) rather than traditional committed offsets. First we retrieve the current SPSO for each share group using the DescribeShareGroupOffsets API from the source cluster, and then we update the SPSO in the destination cluster using the AlterShareGroupOffsets API, which also initializes the group state in both the group coordinator and share coordinator. This means the API can initialize a share group in the destination cluster even if it doesn't exist yet, eliminating the need for pre-creation or complex state management.
Kafka enforces that consumer group and share group names must be unique within a single cluster. This creates a potential conflict scenario during mirroring. When such conflicts occur, the offset commit operation will fail with GroupIdNotFoundException. Users must resolve these conflicts manually by either deleting the conflicting group in the destination cluster before mirroring begins, or excluding the conflicting groups from offset synchronization. These conflicts affect only offset synchronization and do not impact data mirroring itself. The topic data continues to replicate normally, and only the automatic offset synchronization for the conflicting groups is blocked.
Active-Active Writes
Active-active topology is not initially supported in Cluster Mirroring, though it could potentially be achieved through topic prefixing and removing the reliance on topic ID for mirroring. This is a candidate for a future improvement KIP. Instead, bidirectional mirroring is supported, but only when mirroring different topics between clusters, allowing records produced to either cluster to be consumed from both. Unlike MirrorMaker 2, Cluster Mirroring does not need special cycle detection or prevention logic because the read-only enforcement inherently blocks the conditions that would create infinite replication loops.
Synchronous Mirroring
Currently, mirroring is asynchronous. The source cluster acknowledges the producer without waiting for the destination to replicate the data. Sync mirroring would guarantee that records are replicated to the destination cluster before the source acknowledges the produce request, providing stronger durability guarantees at the cost of higher latency. This would be useful for workloads where zero data loss across clusters is a strict requirement.
Future extensions to synchronous mirroring could enable preservation of transactional semantics across clusters. Streaming platforms using exactly-once mode (Apache Kafka Streams, Apache Flink, Apache Spark) rely on the source cluster's transactional protocol and coordination. During failover or migration scenarios, transactional metadata for pending transactions does not transfer to the destination cluster, potentially breaking exactly-once guarantees. Supporting transactional cross-cluster replication would require coordinating transactional metadata and ensuring transaction state consistency across clusters, something MM2's Connect-based architecture cannot support.
Diskless Topics
At the time of writing, the Diskless Topics design is still under discussion (KIP-1500 and other sub-KIPs), so there will be future KIPs to support this feature. Diskless topics store data exclusively in tiered storage, with no local log segments on brokers. Supporting mirroring for diskless topics requires adapting the fetch and replication mechanisms to work without local storage, which introduces changes to how mirror offsets are tracked and how truncation is handled during failover.
Command Workflows
Failover Process
Failover is initiated by calling the RemoveTopicsFromMirror API, which appends a ".removed" suffix to the mirror.name internal config. This transitions the mirror topics from read-only to writable state after the stopping process completes gracefully. When producers reconnect to the destination cluster after failover, they obtain new producer IDs which are separate from previously mirrored IDs, so they begin writing with fresh sequence numbers starting from 0. Consumers can reconnect to the destination cluster using the same group ID, resuming from the last synchronized offsets, minimizing data re-processing or gaps. The transition is transparent from the consumer's perspective and offset management continues normally through the destination's group coordinator.
# 9091 (source) -----> 9094 (destination) # in case of disaster, the operator can failover by running the following command bin/kafka-mirror.sh --bootstrap-server :9094 --remove --topic .* --mirror my-mirror # 9091 (source) --x--> 9094 (destination) # now all mirror topics are detached from the source cluster and accept writes (the two clusters are allowed to diverge)
Failback Process
Failback enables mirroring to be reversed after a failover, allowing the original source cluster to become the destination and vice versa. This is critical for scenarios where you want to fail back to the original cluster after recovering from an outage or planned maintenance. When failback is initiated on the old source cluster, it needs to determine where to truncate its log before starting to fetch from the new source cluster. If the new API is supported, the broker sends a LastMirrorredEpochs request to the new source cluster asking for the LME, and then truncates its local log to the last offset of the returned epoch. If the new API is not supported, the broker truncates to zero and starts mirroring from scratch.
Before transitioning a mirror partition from PREPARING to MIRRORING, the MirrorCoordinator must ensure that all in-sync replicas in the destination cluster have truncated their logs to the correct offset. If less than min ISR are available, we will skip and retry in the following fetch. This coordination step validates that every ISR member has completed truncation before the partition is allowed to begin actively fetching from the source cluster. Without it, the mirror leader could start appending new data from the source while local followers still hold divergent log segments, causing inconsistencies within the destination cluster. After truncation, reverse mirroring begins normally. Note that the log truncation on everse mirroring may cause the data loss if there are records that didn't get mirrored to the old destination cluster.
# when the source cluster is back, the operator can failback by creating a mirror with the same name echo "bootstrap.servers=localhost:9094" > /tmp/my-mirror.properties bin/kafka-mirrors.sh --bootstrap-server :9091 --create --mirror my-mirror --mirror-config /tmp/my-mirror.properties bin/kafka-mirrors.sh --bootstrap-server :"9091 --add --topic .* --mirror my-mirror # 9091 (destination) <----- 9094 (source)
Create Mirror
- The user sends CreateMirror requests to any broker with the mirror name and mirror related properties (bootstrap servers, security settings, etc.).
- The broker forwards the request to the active controller.
- The controller saves the properties into the metadata log as ConfigRecord entries with type MIRROR.
- If this is the first mirror being created, the controller also auto creates the __mirror_state internal topic.
- All brokers receive the metadata update and the MirrorMetadataManager registers the new mirror configuration.
Add Topics to Mirror
- User sends AddTopicsToMirror request with topics and mirror name.
The broker forwards to the active controller.
- The controller validates that each topic exists and is not already in a mirror. It then sets the topic config mirror.name=<mirrorName> for each topic, generating a ConfigRecord per topic into the metadata log.
- Response is sent back to clients with per topic results.
- When the MirrorMetadataManager in the partition leader node gets notified about the topic config update, it detects that mirror.name is not empty and has no .removed or .paused suffix. It then queries the current mirror partition state from the coordinator. The coordinator could be located on a different broker node, so a ReadMirrorStates inter broker RPC may be needed.
- Based on the current mirror partition state, the state machine transitions the partition. In most cases, from UNKNOWN to PREPARING.
- During PREPARING, the mirror fetcher performs Last Mirrored Epoch (LME) truncation. The LME is the greatest leader epoch that the source cluster recognizes from the destination. If the source has no LME knowledge (first time mirroring), it returns -1 and the destination truncates everything and replicates from scratch. Otherwise, the destination truncates at the start offset of the first epoch beyond the LME. It then waits until all ISR members (or all replicas if mirror.support.unclean.leader.election=true) complete the truncation.
- Once all ISR members have completed truncation, the state transitions from PREPARING to MIRRORING. A MirrorFetcherThread is created and starts sending consumer Fetch requests (not follower requests) to the source cluster to replicate data. The Fetch protocol handles any offset level divergence by truncating to the exact offset where the source epoch ends. The fetched batch retains its original leader epoch from the source.
- The partition state is persisted to the __mirror_state topic on each state change via local append or WriteMirrorStates (when coordinator is remote) as MirrorPartitionStateKey/MirrorPartitionStateValue records, distributed by hash(mirrorName, topicId, partition) % numPartitions.
- The MirrorMetadataManager also periodically synchronizes topic configs, consumer group offsets, and ACLs from the source cluster.
Remove Topics from Mirror
- User sends RemoveTopicsFromMirror request with topics and mirror name.
- The controller validates each topic belongs to the specified mirror and is in MIRRORING state. It then updates the topic config by appending the .removed suffix, e.g. mirror.name=my-mirror.removed, generating a ConfigRecord.
- When the MirrorMetadataManager gets notified, it detects the .removed suffix on mirror.name. It queries the current mirror partition state from the coordinator, and transitions the mirror partition to STOPPING.
- During STOPPING:
- The MirrorFetcherManager removes all fetcher threads for the affected partitions, stopping replication.
- Bump the leader epoch for the partitions to ensure monotonically increasing epochs for new records.
- The log is truncated to LSO for transactional consistency.
- The LME is recorded as sKey/LastMirrorEpochsValue records into the __mirror_state topic for potential future failback.
- A MIRROR_PID_RESET control record is written to the partition log, which expires all ProducerStateManager entries so that new producers get fresh PIDs with no collision risk.
- The state transitions from STOPPING to STOPPED. The read only flag is cleared and the topic becomes writable. New producers can start producing with fresh PIDs starting at sequence 0 and a higher leader epoch.
Pause Topics
- User sends PauseMirrorTopics request with topics and mirror name.
- The controller validates each topic belongs to the specified mirror and is currently in MIRRORING state. It appends the .paused suffix to the mirror name config, e.g. mirror.name=my-mirror.paused, generating a ConfigRecord.
- When the MirrorMetadataManager in the partition leader node gets notified, it detects the .paused suffix. It transitions the state to PAUSING.
- During PAUSING, the MirrorFetcherManager removes the fetcher threads for the affected partitions. No more data is replicated.
- The state transitions from PAUSING to PAUSED. The partition remains read only. Metadata synchronization (configs, groups, ACLs) is also halted for the paused topics.
- The partition state change is persisted to the __mirror_state topic.
Resume Topics
- User sends ResumeMirrorTopics request with topics and mirror name.
- The controller validates the topic is currently paused (has .paused suffix). It removes the .paused suffix, restoring the original mirror name, e.g. mirror.name=cluster1, generating a ConfigRecord.
- When the MirrorMetadataManager in the partition leader node gets notified, it detects that mirror.name no longer has the .paused suffix.
- The state transitions directly from PAUSED to MIRRORING. No log truncation is needed because the partition is already at the correct offset from before the pause.
- New MirrorFetcherThread instances are created and resume replication from the current log end offset.
- Metadata synchronization (configs, groups, ACLs) also resumes.
Delete Mirror
- The user sends a DeleteMirror request with the mirror name.
- The controller validates that the mirror is empty (no topics assigned) or all its partitions are in STOPPED state.
- If valid, the controller tombstones the mirror configuration in the cluster metadata log, removing all ConfigRecord entries for the mirror.
- The mirror state records in __mirror_state internal topic are also tombstoned.
- Any remaining coordinator state is shut down, source cluster connections are closed, and the mirror name becomes available for reuse.
- After deletion, failback using this mirror configuration is no longer possible.
List Mirrors
The user sends ListMirrorsRequest to any broker (no parameters required).
- The broker handler gets all configured mirror partitions from, which reads from the in memory metadata cache.
- For each authorized mirror, the broker returns: mirror name, source cluster ID, source bootstrap servers, and topic count.
- No metadata records are written. This is a read only operation against the local metadata cache.
Describe Mirrors
- The user sends DescribeMirrorsRequest with optional mirror names (empty means all mirrors).
The broker handler queries two sources:
The ReplicaManager which provides source offset, destination offset, and lag for each partition.
- The MirrorCoordinator which provides the current partition state from the metadata manager cache.
The request is forwarded to each broker that only reports partitions for which it has lag information or is the partition leader. This avoids duplicate reporting across brokers.
- For each partition, the response includes: mirror name, topic name, partition ID, source offset, destination offset, lag, current state, and LME.
- No metadata records are written. This is a read only operation.
Public Interfaces
Command-Line
A new dump flag allows to decode cluster mirroring metadata for debugging purpose:
$ bin/kafka-dump-log.sh --mirror-state-decoder --files /tmp/server*/data/__mirror_state-1/00000000000000000000.log
Dumping /home/fvaleri/Documents/kafka/build/test/server4/data/__mirror_state-0/00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1771239071191 size: 98 magic: 2 compresscodec: none crc: 3314855113 isvalid: true
| offset: 0 CreateTime: 1771239071191 keySize: 13 valueSize: 17 sequence: -1 headerKeys: [] key: {"type":"2","data":{"mirrorName":"my-mirror"}} payload: {"version":"0","data":{"topicName":"my-topic","partition":0,"state":0}}
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 98 CreateTime: 1771239071219 size: 98 magic: 2 compresscodec: none crc: 3968746657 isvalid: true
| offset: 1 CreateTime: 1771239071219 keySize: 13 valueSize: 17 sequence: -1 headerKeys: [] key: {"type":"2","data":{"mirrorName":"my-mirror"}} payload: {"version":"0","data":{"topicName":"my-topic","partition":0,"state":1}}
A new command-line tool kafka-mirrors.sh provides administrative operations for managing cluster mirrors:
$ bin/kafka-mirrors.sh --help
This tool helps to create cluster mirrors and add topics to them.
Option Description
------ -----------
--add Add topic(s) to an existing cluster
mirror (supports regex).
--alter Alter the configuration of an existing
cluster mirror.
--bootstrap-server <String: server to REQUIRED: The destination Kafka server
connect to> to connect to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client.
--create Create a new cluster mirror from a
source cluster.
--describe Describe a cluster mirror including
partition lag and state.
--help Print usage information.
--list List all cluster mirrors.
--mirror <String: mirror> The name of the cluster mirror.
--mirror-config <String: mirror config Property file containing source
property file> cluster configs for mirroring.
--pause Pause mirroring for topic(s) matching
the pattern (supports regex).
--remove Remove topic(s) from an existing
cluster mirror (supports regex).
--replication-factor <Short: The replication factor to use for the
replication-factor> mirror topic. If not specified, uses
the destination cluster's default.
--resume Resume mirroring for previously paused
topic(s) matching the pattern
(supports regex).
--topic <String: topic> Topic name or regex pattern to match
topics (e.g., 'my-topic' or 'test-.
*').
--version Display Kafka version.
Create a new cluster mirror in the destination cluster (forbidden suffixes: .removed, .paused):
$ echo "bootstrap.servers=localhost:9092" >/tmp/mirror.properties $ bin/kafka-mirror.sh --bootstrap-server :9094 --create --mirror my-mirror --mirror-config /tmp/mirror.properties Created mirror my-mirror
Add a topic or set of topics to an existing cluster mirror (start mirroring; the topic flag accepts regex expression):
$ bin/kafka-mirror.sh --bootstrap-server :9094 --add --topic my-topic --mirror my-mirror Added 1 topic(s) to mirror my-mirror: [my-topic]
Remove a specific topic or set of topics from a mirror (failover; topics become writable):
$ bin/kafka-mirror.sh --bootstrap-server :9094 --remove --topic my-topic --mirror my-mirror Removed 1 topic(s) from mirror my-mirror: [my-topic]
Delete a mirror including its configuration (the mirror must be empty or include only stopped partitions):
$ bin/kafka-mirrors.sh --bootstrap-server :9094 --delete --mirror my-mirror Deleted mirror my-mirror
Pause mirroring for a specific topic or set of topics (topics remain read-only):
$ bin/kafka-mirrors.sh --bootstrap-server :9094 --pause --topic my-topic --mirror my-mirror Paused mirroring for 1 topic(s) in mirror my-mirror: [my-topic]
Resume mirroring for a specific topic or set of topics:
$ bin/kafka-mirrors.sh --bootstrap-server :9094 --resume --topic my-topic --mirror my-mirror Resumed mirroring for 1 topic(s) in mirror my-mirror: [my-topic]
List configured mirrors with additional information:
$ bin/kafka-mirrors.sh --bootstrap-server :9094 --list MIRROR TOPICS CLUSTER-ID BOOTSTRAP-SERVER my-mirror 2 lBq12jYZRp-9wF3M9MPopg localhost:9091,localhost:9092 new-mirror 1 lBq12jYZRp-9wF3M9MPopg localhost:9091,localhost:9092
Describe configured mirrors to check their lag compared to their source topics (use --mirror flag to filter other partitions):
$ bin/kafka-mirrors.sh --bootstrap-server :9094 --describe MIRROR TOPIC PARTITION SOURCE-OFFSET DESTINATION-OFFSET LAG STATE my-mirror bar 0 2324 2324 0 MIRRORING my-mirror foo 0 69 66 3 MIRRORING my-mirror foo 1 94 84 10 MIRRORING my-mirror foo 2 94 90 4 MIRRORING new-mirror baz 0 189 189 0 MIRRORING new-mirror baz 1 859 859 0 MIRRORING
Alter mirror configuration (any valid configuration triggers a reconnection):
$ bin/kafka-configs.sh --bootstrap-server :9094 --entity-type mirrors --entity-name my-mirror \
--alter --add-config bootstrap.servers=localhost:9092
Completed updating config for mirror my-mirror.
Throttling on the destination cluster:
$ bin/kafka-configs.sh --bootstrap-server :9094 --entity-type brokers --entity-name 4 \ --alter --add-config mirror.replication.throttled.rate=100000000 Completed updating config for broker 4. $ bin/kafka-configs.sh --bootstrap-server :9094 --entity-type topics --entity-name my-topic \ --alter --add-config mirror.replication.throttled.replicas=[0:4] Completed updating config for topic my-topic.
Throttling on the source cluster:
$ bin/kafka-configs.sh --bootstrap-server :9091 --alter --add-config 'consumer_byte_rate=1024' \ --entity-type clients --entity-name broker-4-fetcher-0-mirror-my-mirror Completed updating config for client broker-4-fetcher-0-mirror-my-mirror.
Grant mirror admin full access to a specific mirror:
$ bin/kafka-acls.sh --bootstrap-server :9094 --add \
--cluster-mirror my-mirror \
--operation Create --operation Alter --operation Describe --operation Delete \
--operation AlterConfigs --operation DescribeConfigs \
--allow-principal User:mirror-admin
Adding ACLs for resource `ResourcePattern(resourceType=CLUSTER_MIRROR, name=my-mirror, patternType=LITERAL)`:
(principal=User:mirror-admin, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=ALTER, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=DELETE, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=ALTER_CONFIGS, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=DESCRIBE_CONFIGS, permissionType=ALLOW)
Grant read-only monitoring access to all mirrors:
$ bin/kafka-acls.sh --bootstrap-server :9094 --add \
--cluster-mirror '*' \
--operation Describe --operation DescribeConfigs \
--allow-principal User:monitor
Adding ACLs for resource `ResourcePattern(resourceType=CLUSTER_MIRROR, name=*, patternType=LITERAL)`:
(principal=User:monitor, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:monitor, host=*, operation=DESCRIBE_CONFIGS, permissionType=ALLOW)
List ACLs for a specific mirror:
$ bin/kafka-acls.sh --bootstrap-server :9094 --list --cluster-mirror my-mirror
Current ACLs for resource `ResourcePattern(resourceType=CLUSTER_MIRROR, name=my-mirror, patternType=LITERAL)`:
(principal=User:mirror-admin, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=ALTER, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=DELETE, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=ALTER_CONFIGS, permissionType=ALLOW)
(principal=User:mirror-admin, host=*, operation=DESCRIBE_CONFIGS, permissionType=ALLOW)
Admin Client
New methods are added to the Admin interface for programmatic cluster mirror management, along with their supporting classes:
/**
* Create a new cluster mirror.
*
* @param mirrorName The name of the cluster mirror
* @param configs Configuration for the cluster mirror, including bootstrap servers and security settings
* @param options Options for the create mirror operation
* @return The CreateMirrorResult
*/
CreateMirrorResult createMirror(String mirrorName, Map<String, String> configs, CreateMirrorOptions options);
/**
* Add topics to an existing cluster mirror for cross-cluster replication.
*
* When topics are added to a mirror, they become read-only on the destination cluster and start
* replicating data from the source cluster. This operation marks the specified topics with the
* mirror name, preventing local writes and enabling the MirrorFetcherThread to begin replication.
*
* @param mirrorName The mirror name to add the topics to
* @param topics Set of topic names to add to mirroring
* @param options Options for the add topics to mirror operation
* @return The AddTopicsToMirrorResult containing futures for each topic addition
*/
AddTopicsToMirrorResult addTopicsToMirror(String mirrorName, Set<String> topics, AddTopicsToMirrorOptions options);
/**
* Remove topics from cluster mirror, making them writable on the destination cluster.
*
* This operation is typically used during failover scenarios when the destination cluster needs to
* be promoted from passive (read-only mirror) to active (accepting writes). Removing topics from
* the mirror clears the mirrorName field from partition metadata, which allows producers to write
* to these partitions.
*
* @param mirrorName The mirror name to remove the topics from
* @param topics Set of topic names to remove from mirroring
* @param options Options for the remove topics from mirror operation
* @return The RemoveTopicsFromMirrorResult containing futures for each topic removal
*/
RemoveTopicsFromMirrorResult removeTopicsFromMirror(String mirrorName, Set<String> topics, RemoveTopicsFromMirrorOptions options);
/**
* Pause mirroring for the specified topics.
*
* Paused topics remain read-only on the destination cluster but stop fetching new data from the
* source cluster. The mirror fetcher threads are removed for these partitions, preserving the
* current replicated state. Mirroring can be resumed later with {@link #resumeMirrorTopics}.
*
* @param mirrorName The mirror name to pause the topics for
* @param topics Set of topic names to pause mirroring for
* @param options Options for the pause mirror topics operation
* @return The PauseMirrorTopicsResult containing futures for each topic
*/
PauseMirrorTopicsResult pauseMirrorTopics(String mirrorName, Set<String> topics, PauseMirrorTopicsOptions options);
/**
* Resume mirroring for previously paused topics.
*
* Resumed topics restart fetching data from the source cluster, picking up from where they
* left off. New mirror fetcher threads are created and the partitions transition back to the
* MIRRORING state.
*
* @param mirrorName The mirror name to resume the topics for
* @param topics Set of topic names to resume mirroring for
* @param options Options for the resume mirror topics operation
* @return The ResumeMirrorTopicsResult containing futures for each topic
*/
ResumeMirrorTopicsResult resumeMirrorTopics(String mirrorName, Set<String> topics, ResumeMirrorTopicsOptions options);
/**
* Delete a cluster mirror including its configuration.
*
* The mirror must be empty (no topics) or all its topics must have been removed (in STOPPED
* state). After deletion, all mirror metadata are tombstoned and failback is no longer possible.
*
* @param mirrorName The name of the cluster mirror to delete
* @param options Options for the delete mirror operation
* @return The DeleteMirrorResult
*/
DeleteMirrorResult deleteMirror(String mirrorName, DeleteMirrorOptions options);
/**
* List the cluster mirrors available in the cluster.
*
* @param options The options to use when listing the mirrors.
* @return The ListMirrorsResult.
*/
ListMirrorsResult listMirrors(ListMirrorsOptions options);
/**
* Describe cluster mirrors.
*
* This operation retrieves detailed information about cluster mirrors including:
* - Topics being mirrored
* - Partition-level lag information (source offset vs destination offset)
* - Mirroring state for each partition (INITIALIZING, PREPARING, MIRRORING, etc.)
*
* @param mirrorNames The names of the mirrors to describe
* @param options The options to use when describing mirrors
* @return The DescribeMirrorsResult
*/
DescribeMirrorsResult describeMirrors(Collection<String> mirrorNames, DescribeMirrorsOptions options);
Protocol Changes
This section describes all protocol level changes and new RPCs.
EntityType
A new entity type is added for the message generator to provide schema-level type validation for mirror name fields:
public enum EntityType {
// ... existing types ...
@JsonProperty("mirrorName")
MIRROR_NAME(FieldType.StringFieldType.INSTANCE);
}
ResourceType
A new resource type is added to the ResourceType enum to enable per-mirror authorization:
public enum ResourceType {
// ... existing types ...
CLUSTER_MIRROR((byte) 8);
CoordinatorType
The FindCoordinatorRequest object is extended to support a new coordinator type:
public enum CoordinatorType {
// ... existing types ...
MIRROR((byte) 3);
}
CreateTopic
The CreateTopic API is extended to add information required for mirror topic creation.
{ "name": "MirrorInfo", "type": "MirrorInfo", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
"about": "Mirror information for creating a mirror topic from a source cluster.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "8+",
"about": "The topic ID from the source cluster." }
]}
The topic ID field ensures mirror topics retain the same topic ID as the source cluster topic. This allows fetch requests to pass validation on the source broker, and enables the system to verify that a topic being mirrored to a same-named topic in the destination cluster is indeed the same logical topic, not a name collision.
In normal topic creation, the MirrorInfo field will be null. When receiving the CreateTopic request, the controller will check the new field. If it is not set, the topic ID will be generated with random UUID as usual. Otherwise, the controller will do the following validation:
- This topic ID is not used by other topics in the current cluster
- The replicas for the partition assignment are all active and not in fenced or controlled shutdown. This is to make sure when a topic gets deleted and re-created with the same topic ID, the stale offline log dir won’t be treated as the active log dir after it becomes online (KAFKA-16234).
Fetch
The Fetch API is extended to add the MirrorLeaderEpoch field used by the destination cluster internal replication.
FetchRequest
// new added field in in FetchPartition type
{ "name": "MirrorLeaderEpoch", "type": "int32", "versions": "19+", "default": "-1", "taggedVersions": "19+", "tag": 2, "ignorable": true,
"about": "The latest known mirror leader epoch." }
FetchResponse
// new added field in PartitionData type
{ "name": "MirrorLeaderEpoch", "type": "int32", "versions": "19+", "default": "-1", "taggedVersions": "19+", "tag": 3, "ignorable": true,
"about": "The latest known mirror leader epoch." },
CreateMirror
Allows users to create a mirror and supply its configuration. When the broker receives the request, it validates that the mirror name is not already in use, contains only permitted characters, and does not end with ".removed" or ".paused" suffix. Once validated, the request is forwarded to the controller, which persists the configuration in the metadata log.
CreateMirrorRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "CreateMirrorRequest",
"latestVersionUnstable": true,
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name."},
{ "name": "Config", "type": "[]MirrorConfig", "versions": "0+",
"about": "The cluster mirror configurations.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The configuration key name." },
{ "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The value to set for the configuration key."}
]}
]
}
CreateMirrorResponse
{
"apiKey": TBD,
"type": "response",
"name": "CreateMirrorResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The error message, or null if there was no error." }
]
}
AddTopicsToMirror
Adds topics to a specified mirror. The broker validates that all target topic partitions are in either UNKNOWN or STOPPED state; otherwise, the request is rejected with an INVALID_REQUEST error. Once validated, the request is forwarded to the controller, which sets the mirror.name topic config to the specified mirror name.
AddTopicsToMirrorRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "AddTopicsToMirrorRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "about": "The data for the topics.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." }
]}
]
}
AddTopicsToMirrorResponse
{
"apiKey": TBD,
"type": "response",
"name": "AddTopicsToMirrorResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0",
"about": "The results for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]
}
RemoveTopicsFromMirror
Allows users to detach topics from their associated mirror. The broker validates that all target topic partitions are in either PREPARING or MIRRORING state. Once validated, the request is forwarded to the controller, which appends the ".removed" suffix to the mirror.name topic config to mark the topics as no longer mirrored.
RemoveTopicsFromMirrorRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "RemoveTopicsFromMirrorRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "about": "The data for the topics.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." }
]}
]
}
RemoveTopicsFromMirrorResponse
{
"apiKey": TBD,
"type": "response",
"name": "RemoveTopicsFromMirrorResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0",
"about": "The results for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]
}
PauseMirrorTopics
Pauses data replication and metadata sync for the specified mirror topics, keeping them read-only on the destination cluster.
PauseMirrorTopicsRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "PauseMirrorTopicsRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+",
"about": "The mirror name to pause the topics for." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "about": "The data for the topics.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." }
]}
]
}
PauseMirrorTopicsResponse
{
"apiKey": TBD,
"type": "response",
"name": "PauseMirrorTopicsResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0",
"about": "The results for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]
}
ResumeMirrorTopics
Resumes data replication and metadata sync for previously paused mirror topics from where they left off.
ResumeMirrorTopicsRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "ResumeMirrorTopicsRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "about": "The data for the topics.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." }
]}
]
}
ResumeMirrorTopicsResponse
{
"apiKey": 103,
"type": "response",
"name": "ResumeMirrorTopicsResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0",
"about": "The results for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]
}
DeleteMirror
Permanently deletes a cluster mirror, including its configuration. The mirror must be empty (no topics) or all its partitions must be in STOPPED state. After deletion, all metadata are tombstoned, making failback impossible. This is an irreversible operation.
DeleteMirrorRequest
{
"apiKey": 104,
"type": "request",
"listeners": ["broker", "controller"],
"name": "DeleteMirrorRequest",
"latestVersionUnstable": true,
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name to delete."}
]
}
DeleteMirrorResponse
{
"apiKey": 104,
"type": "response",
"name": "DeleteMirrorResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The error message, or null if there was no error." }
]
}
ListMirrors
Returns the current mirror names and their associated topic counts in the cluster. It also includes source cluster ID and bootstrap server.
ListMirrorsRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker"],
"name": "ListMirrorsRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": []
}
ListMirrorsResponse
{
"apiKey": TBD,
"type": "response",
"name": "ListMirrorsResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Mirrors", "type": "[]ListedMirror", "versions": "0+",
"about": "Each mirror in the response.", "fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "SourceBootstrap", "type": "string", "versions": "0+",
"about": "The source cluster bootstrap servers." },
{ "name": "SourceClusterId", "type": "string", "versions": "0+", "default": "",
"about": "The source cluster ID, or empty if not yet resolved." },
{ "name": "TopicCount", "type": "int32", "versions": "0+", "default": "0",
"about": "The number of topics configured for this mirror. 0 indicates an empty mirror with no topics." }
]}
]
}
DescribeMirrors
Returns the current mirroring status, state, and configuration for the specified mirror topics on the destination cluster. Allows destination cluster partition leaders to query the LME from the source cluster.
DescribeMirrorsRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker"],
"name": "DescribeMirrorsRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorNames", "type": "[]string", "versions": "0+", "entityType": "mirrorName",
"about": "The names of the mirrors to describe. Null or empty array means all mirrors." },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", "default": "false",
"about": "Whether to include authorized operations." }
]
}
DescribeMirrorsResponse
{
"apiKey": TBD,
"type": "response",
"name": "DescribeMirrorsResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Mirrors", "type": "[]DescribedMirror", "versions": "0+",
"about": "Each described mirror.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicPartitions", "versions": "0+",
"about": "Each topic in the mirror.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionDetail", "versions": "0+",
"about": "Each partition detail.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "SourceOffset", "type": "int64", "versions": "0+", "default": "-1",
"about": "The high watermark offset from the source cluster leader, or -1 if not yet available." },
{ "name": "DestinationOffset", "type": "int64", "versions": "0+", "default": "-1",
"about": "The log end offset on the destination cluster, or -1 if not yet available." },
{ "name": "Lag", "type": "int64", "versions": "0+", "default": "-1",
"about": "The lag (source offset - destination offset), or -1 if not yet available." },
{ "name": "State", "type": "string", "versions": "0+",
"about": "The partition state." },
{ "name": "", "type": "int32", "versions": "0+", "default": "-1",
"about": "The last mirror leader epoch, or -1 if not available." }
]}
]}
]}
]
}
ReadMirrorStates
Reads the current mirror partition states from the internal __mirror_state topic on the destination cluster.
ReadMirrorStatesRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "ReadMirrorStatesRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The cluster mirror name." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0",
"about": "The data for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0",
"about": "The data for the partitions.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0",
"about": "The partition index." }
]}
]}
]
}
ReadMirrorStatesResponse
{
"apiKey": TBD,
"type": "response",
"name": "ReadMirrorStatesResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0",
"about": "The read results for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0",
"about": "The results for the partitions.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0",
"about": "The partition index." },
{ "name": "LastMirrorEpoch", "type": "int32", "versions": "0", default": "-1",
"about": "The last mirror leader epoch, or -1 if not available." },
{ "name": "State", "type": "int8", "versions": "0+",
"about": "The mirror partition state." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]}
]
}
WriteMirrorStates
Persists mirror partition state transitions to the internal __mirror_state topic on the destination cluster.
WriteMirrorStatesRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "WriteMirrorStatesRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0+", "entityType": "mirrorName",
"about": "The mirror name." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0",
"about": "The data for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0",
"about": "The data for the partitions.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0",
"about": "The partition index." },
{ "name": "", "type": "int32", "versions": "0", "default": "-1",
"about": "The last mirror leader epoch, or -1 if not available." },
{ "name": "State", "type": "int8", "versions": "0+",
"about": "The mirror partition state." }
]}
]},
{ "name": "RemovedTopics", "type": "[]string", "versions": "0+", "about": "The topic names to be removed." }
]
}
WriteMirrorStatesResponse
{
"apiKey": TBD,
"type": "response",
"name": "WriteMirrorStatesResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0",
"about": "The write results for the topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0",
"about": "The results for the partitions.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]}
]
}
BumpLeaderEpochs
Allows destination cluster partition leaders to bump the leader epoch on the destination controller so that it is at least as high as the source cluster's leader epoch.
BumpLeaderEpochsRequest
{
"apiKey": TBD,
"type": "request",
"listeners": ["broker", "controller"],
"name": "BumpLeaderEpochsRequest",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]TopicState", "versions": "0+", "about": "The topic and partitions state.",
"fields": [
{"name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]LeaderEpochState", "versions": "0+", "about": "The partition leader epochs.",
"fields": [
{"name": "partitionIndex", "type": "int32", "versions": "0+", "about": "The partition index."},
{"name": "minLeaderEpoch", "type": "int32", "versions": "0+", "default": -1, "about": "The minimum leader epoch that the destination cluster should bump to."}
]}
]}
]
}
BumpLeaderEpochsResponse
{
"apiKey": TBD,
"type": "response",
"name": "BumpLeaderEpochsResponse",
// Version 0 is the initial version.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "Topics", "type": "[]TopicPartitions", "versions": "0+",
"about": "Each topic in the mirror.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionDetail", "versions": "0+",
"about": "Each partition state.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0",
"about": "The error code, or 0 if there was no error." }
]}
]}
]
}
Metadata Records
LastMirrorEpochs
The greatest leader epoch of a given partition that a destination cluster recognizes from the source cluster.
{
"apiKey": 1,
"type": "coordinator-key",
"name": "sKey",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0",
"about": "The cluster mirror name."}
]
}
{
"apiKey": 1,
"type": "coordinator-value",
"name": "LastMirrorEpochsValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]Topic", "versions": "0+",
"about": "The mirror topics for which we want to store the last mirrored epochs.", "fields": [
{ "name": "Name", "type": "string", "versions": "0",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]Partition", "versions": "0+",
"about": "Each partition to record the last mirrored epochs.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "", "type": "int32", "versions": "0+",
"about": "The last mirror leader epoch for this partition." }
]}
]}
]
}
MirrorPartitionState
MirrorPartitionState record represents the lifecycle states of a mirrored partition.
{
"apiKey": 2,
"type": "coordinator-key",
"name": "MirrorPartitionStateKey",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "MirrorName", "type": "string", "versions": "0",
"about": "The cluster mirror name."}
]
}
{
"apiKey": 2,
"type": "coordinator-value",
"name": "MirrorPartitionStateValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "TopicName", "type": "string", "versions": "0",
"about": "The topic name."},
{ "name": "Partition", "type": "int32", "versions": "0",
"about": "The partition index."},
{ "name": "State", "type": "int8", "versions": "0+",
"about": "The mirror partition state." }
]
}
Configuration
A new configuration resource type is added for cluster mirrors, which is stored in the cluster metadata internal log. Both DescribeConfigs and IncrementalAlterConfigs will be bumped to new versions so that clients can check ApiVersionsResponse to determine whether the broker supports mirror configuration management, rather than having to send a request and interpret the error.
public enum Type {
// ... existing types ...
MIRROR((byte) 64, "mirror"); // New type
}
Mirror Configuration
Set via CreateMirror or IncrementalAlterConfigs. Stored in cluster metadata records.
Key | Description | Default |
|---|---|---|
bootstrap.servers | A list of host/port pairs to use for establishing the initial connection to the source cluster. | |
mirror.topic.properties.exclude | A comma-separated list of topic config property names to exclude from synchronization. Properties in this list will not be replicated from the source cluster. The mirror.name property is always excluded regardless of this setting. | follower.replication.throttled.replicas, leader.replication.throttled.replicas, message.timestamp.difference.max.ms, log.message.timestamp.before.max.ms, log.message.timestamp.after.max.ms, message.timestamp.type, unclean.leader.election.enable, min.insync.replicas, mirror.name |
mirror.groups.include | A comma-separated list of regex patterns for consumer group IDs to include in offset synchronization. Only consumer groups whose IDs match at least one of the patterns will have their offsets replicated from the source cluster. | .* |
mirror.acl.include | A comma-separated list of ACL include rules. Each rule uses semicolon-separated fields: resourceType;resourceName;operation;permissionType;principal. Use '*' as wildcard for any field. The resourceName field supports regex patterns. Trailing wildcard fields can be omitted. See AclRule javadoc for examples. | * |
security.protocol | Protocol for source cluster communication (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL). | |
sasl.* | SASL configuration properties. | |
ssl.* | SSL configuration properties. |
Topic Configuration
Set by topic creation or alter. Stored in topic config.
Key | Description | Default |
|---|---|---|
mirror.name | Identifies the mirror that manages this topic. Topics with this configuration set are read-only and can only be modified through mirror management APIs. This property is filtered out from DescribeConfigs responses to avoid exposing internal state to users. | |
mirror.replication.throttled.replicas | A list of replicas for which log replication should be throttled on the mirror follower node. The list should describe a set of replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle all replicas for this topic." | |
mirror.support.unclean.leader.election | When enabled, LME log truncation waits for all replicas (not just ISR members) to join the ISR and complete the truncation. | false |
Broker Configuration
Set via broker config. Stored in server.properties or dynamic broker config.
Key | Description | Default |
|---|---|---|
mirror.topic.num.partitions | Number of partitions for __mirror_state internal topic. | 50 |
mirror.topic.replication.factor | Replication factor for __mirror_state internal topic. | 3 |
mirror.num.replica.fetchers | Number of fetcher threads per mirrored source broker, | 1 |
mirror.metadata.refresh.interval.ms | The interval in milliseconds at which the coordinator refreshes metadata from source clusters. This controls how frequently the coordinator polls source clusters to detect new topics and metadata changes. | 30000 |
mirror.replication.throttled.rate | A long representing the upper bound (bytes/sec) on replication traffic for mirrored follower node enumerated in the property “mirror.replication.throttled.replicas” (for each topic). This property can be only set dynamically. It is suggested that the limit be kept above 1MB/s for accurate behaviour. | MAX_LONG |
request.timeout.ms | Maximum amount of time in milliseconds the client will wait for the response of a request. | 30000 |
socket.* | Socket connection configurations. | |
replica.* | Fetcher threads configurations. |
Metrics
A core set of metrics will be provided with the initial implementation.
Name | Type | Group | Tags | Description | JMX Bean |
|---|---|---|---|---|---|
MaxLag | MirrorFetcherManager | kafka.server.mirror | clientId=MirrorReplica | Max lag in messages between destination leader and source leader replicas. | kafka.server.mirror:type=MirrorFetcherManager,name=MaxLag,clientId=MirrorReplica |
MinFetchRate | MirrorFetcherManager | kafka.server.mirror | clientId=MirrorReplica | The min fetch rate between destination leader and source leader replicas. | kafka.server.mirror:type=MirrorFetcherManager,name=MirrorReplica |
ConsumerLag | FetcherLagMetrics | kafka.server | clientId=MirrorFetcherThread-{sourceBroker.id}-{fetcherId}-{mirrorName},topic=([-.\w]+),partition=([0-9]+) | Lag in messages per remote leader replica. | kafka.serverr:type=FetcherLagMetrics,name=ConsumerLag,clientId=MirrorFetcherThread-{sourceBroker.id}-{fetcherId}-{mirrorName},topic=([-.\w]+),partition=([0-9]+) |
DeadThreadCount | MirrorFetcherManager | kafka.server.mirror | clientId=MirrorReplica | Number of dead mirror fetcher threads. | kafka.server,mirror:type=MirrorFetcherManager,name=DeadThreadCount,clientId=MirrorReplica |
FailedPartitionsCount | MirrorFetcherManager | kafka.server.mirror | clientId=MirrorReplica | Total count for failed partitions for any reason like auth, authorization, failed network with source. | kafka.serve.mirror:type=MirrorFetcherManager,name=FailedPartitionsCount,clientId=MirrorReplica |
BytesPerSec | FetcherStats | kafka.server | clientId=MirrorFetcherThread-{sourceBroker.id}-{fetcherId}-{mirrorName},brokerHost={host},brokerPort={port} | Extend kafka.server.FetcherStats to report mirror fetcher threads. | kafka.server:type=FetcherStats,name=BytesPerSec,clientId=MirrorFetcherThread-{sourceBroker.id}-{fetcherId}-{mirrorName},brokerHost={host},brokerPort={port},mirror-name={mirrorName} |
RequestsPerSec | FetcherStats | kafka.server | clientId=MirrorFetcherThread-{sourceBroker.id}-{fetcherId}-{mirrorName},brokerHost={host},brokerPort={port} | Extend kafka.server.FetcherStats to report mirror fetcher threads. | kafka.server:type=FetcherStats,name=RequestsPerSec,cclientId=MirrorFetcherThread-{sourceBroker.id}-{fetcherId}-{mirrorName}, brokerHost={host},brokerPort={port},mirror-name={mirrorName} |
LocalTimeMs, MessageConversionsTimeMs, RemoteTimeMs, RequestBytes, RequestQueueTimeMs, ResponseQueueTimeMs, ResponseSendTimeMs, TemporaryMemoryBytes, TotalTimeMs | RequestMetrics | kafka.network | request=[mirror_requests] | Extend kafka.network:type=RequestMetrics to list cluster mirror requests. | kafka.network:type=RequestMetrics,name=*, request=* |
ErrorsPerSec | RequestMetrics | kafka.network | request=[mirror_requests],error=* | Extend kafka.network:type=RequestMetrics to list cluster mirror requests. | kafka.network:type=RequestMetrics,name=ErrorsPerSec, request=*, error=* |
RequestsPerSec | RequestMetrics | kafka.network | request=[mirror_requests],version=* | Extend kafka.network:type=RequestMetrics to list cluster mirror requests. | kafka.network:type=RequestMetrics,name=RequestsPerSec, request=*, version=* |
connection-close-rate, connection-close-total, connection-count, connection-creation-rate, connection-creation-total, failed-authentication-rate, failed-authentication-total, failed-reauthentication-rate, failed-reauthentication-total, incoming-byte-rate, incoming-byte-total, network-io-rate, network-io-total, outgoing-byte-rate, outgoing-byte-total, reauthentication-latency-avg, reauthentication-latency-max, request-rate, request-size-avg, request-size-max, request-total, response-rate, response-total, select-rate, select-total, successful-authentication-no- reauth-total, successful-authentication-rate, successful-authentication-total, successful-reauthentication-rate, successful-reauthentication-total | mirror-broker-{DestinationBroker.id}-fetcher-{fetcherId}-mirror-{mirrorName}-metrics | kafka.server | broker-id={sourceBroker.id},fetcher-id={fetcherId} | Fetcher requests in the cluster mirror metrics. | kafka.server:type=mirror-broker-{sourceBroker.id}-fetcher-{fetcherId}-mirror-{mirrorName}-metrics,broker-id={sourceBroker.id},fetcher-id={fetcherId} |
MetadataRefreshError | MirrorMetadataManager | kafka.server.mirror | Number of topic metadata refresh sync errors. | kafka.server.mirror:type=MirrorMetadataManager,name=aclSyncError | |
TopicConfigMetadataSyncError | MirrorMetadataManager | kafka.server.mirror | Number of topic configuration sync errors. | ||
ConsumerGroupOffsetSyncError | MirrorMetadataManager | kafka.server.mirror | Number of CGs sync errors. | ||
AclSyncError | MirrorMetadataManager | kafka.server.mirror | Number of ACLs sync errors. | kafka.server.mirror:type=MirrorMetadataManager,name=aclSyncError | |
ByteRate | MirrorReplication | kafka.server | Bandwidth quota metrics. Indicates the throttled data mirror replication rate of the broker in bytes/sec. | kafka.server:type=MirrorReplication | |
FailedPartitionState | MirrorMetadataManager | kafka.server.mirror | Number of partitions in failed state. | kafka.server.mirror:type=MirrorMetadataManager,name=FailedPartitionState | |
StoppedPartitionState | MirrorMetadataManager | kafka.server.mirror | Number of partitions in a stopped state. | kafka.server.mirror:type=MirrorMetadataManager,name=StoppedPartitionState | |
StoppingPartitionState | MirrorMetadataManager | kafka.server.mirror | Number of partitions in stopping state. | kafka.server.mirror:type=MirrorMetadataManager,name=StoppingPartitionState | |
MirroringPartitionState | MirrorMetadataManager | kafka.server.mirror | Number of partitions in mirroring state. | kafka.server.mirror:type=MirrorMetadataManager,name=MirroringPartitionState | |
PreparingPartitionState | MirrorMetadataManager | kafka.server.mirror | Number of partitions in preparing state. | kafka.server.mirror:type=MirrorMetadataManager,name=PreparingPartitionState |
Errors
List of protocol-level errors returned in API responses:
| Code | Name | Message | Used By |
|---|---|---|---|
| 3 | UNKNOWN_TOPIC_OR_PARTITION | The topic does not exist on the target cluster | RemoveTopicsFromMirror, PauseMirrorTopics, ResumeMirrorTopics |
| 15 | COORDINATOR_NOT_AVAILABLE | The mirror coordinator is not active | WriteMirrorStates, ReadMirrorStates |
| 31 | CLUSTER_AUTHORIZATION_FAILED | The client is not authorized to perform the mirror operation | WriteMirrorStates, ReadMirrorStates |
| 35 | UNSUPPORTED_VERSION | Cluster mirroring is disabled (mirror.version=0) | CreateMirror, AddTopicsToMirror, RemoveTopicsFromMirror, PauseMirrorTopics, ResumeMirrorTopics, ListMirrors, DescribeMirrors, DeleteMirror |
| TBD | MIRROR_AUTHORIZATION_FAILED | Mirror authorization failed | CreateMirror, AddTopicsToMirror, RemoveTopicsFromMirror, PauseMirrorTopics, ResumeMirrorTopics, DeleteMirror |
| TBD | READ_ONLY_TOPIC | The topic is read-only because it is a mirror topic on the target cluster | Produce |
| TBD | INVALID_MIRROR_NAME | The mirror name does not meet the naming rules | CreateMirror |
| TBD | UNKNOWN_MIRROR | The topic is not assigned to any mirror | RemoveTopicsFromMirror, PauseMirrorTopics, ResumeMirrorTopics |
| TBD | TOPIC_ALREADY_IN_MIRROR | The topic is already assigned to a mirror | AddTopicsToMirror |
| TBD | TOPIC_NOT_IN_MIRROR | The topic does not belong to the specified mirror | RemoveTopicsFromMirror, PauseMirrorTopics, ResumeMirrorTopics |
| TBD | MIRROR_TOPIC_ALREADY_PAUSED | The mirror topic is already paused | PauseMirrorTopics |
| TBD | MIRROR_TOPIC_NOT_PAUSED | The mirror topic is not paused | ResumeMirrorTopics |
| TBD | MIRROR_TOPIC_BEING_REMOVED | The mirror topic is being removed | ResumeMirrorTopics |
| TBD | MIRROR_NOT_EMPTY | The mirror still has active or non-removed topics | DeleteMirror |
Compatibility, Deprecation, and Migration Plan
Cluster Mirroring will be introduced through a phased rollout across multiple Kafka releases to ensure stability and gather community feedback.
Release Phases
Early access
Cluster Mirroring is introduced as an early access feature, disabled by default to prevent accidental production usage. To enable it, all cluster nodes (controllers and brokers) must explicitly enable unstable API versions (unstable.api.versions.enable=true) and unstable feature versions (unstable.feature.versions.enable=true) in all configuration files. After starting the cluster with a minimum metadata version, operators can dynamically enable the mirror version feature to activate Cluster Mirroring (bin/kafka-features.sh --bootstrap-server :9092 upgrade --feature mirror.version=1). This stage is intended for testing and evaluation in non-production environments only, as the new APIs and metadata record formats may change in subsequent releases without backward compatibility guarantees.
Preview
In a future release, Cluster Mirroring will transition to preview status with frozen protocol and metadata schemas. The feature will still require explicit enablement via dynamic feature upgrades but will no longer require the unstable API and feature configuration. The feature remains disabled by default to ensure operators consciously opt-in, but the upgrade path from early access clusters will be officially supported with compatibility guarantees. This stage is suitable for pre-production testing and pilot deployments where API stability is required but production-grade maturity is not yet needed.
General availability
When Cluster Mirroring reaches general availability, the feature will be enabled by default when clusters reach the corresponding production metadata version. All new APIs will become stable production APIs with all unstable markers removed from their definition. No special configuration flags or explicit feature enablement will be required beyond setting an appropriate metadata version, and the feature will be fully supported for mission-critical production workloads under Kafka's standard compatibility guarantees. Clusters using Cluster Mirroring in preview can upgrade seamlessly to GA releases without migration steps. Downgrade is also supported, but it would require manual cleanup of the internal topic.
Migration From MirrorMaker 2
Cluster Mirror is not compatible with MirrorMaker 2. This is a critical consideration for users planning to migrate from MirrorMaker 2 to Cluster Mirroring.
MM2 and Cluster Mirroring use different internal topic structures and naming conventions for storing metadata and offsets. The two systems track and store consumer offsets differently, making it impossible to seamlessly transition between them.
Follow this process to switch from MirrorMaker 2 to Cluster Mirroring:
- Stop MM2 replication
- Delete mirror topics on destination cluster, including MM2 internal topics
- Start fresh with Cluster Mirroring
Compatibility Matrix
Note that some features require support from the source cluster.
Feature | Source Cluster Requirement | Destination Cluster Requirement | Notes |
|---|---|---|---|
Core mirroring and failover | 2.1 | 4.x | Kafka 4 is compatible with old clients versions up to 2.1 included. |
Failback (reverse mirroring) | 4.x | 4.x | Requires last mirrored offset tracking on both sides, otherwise it will fallback and truncate to zero, effectively mirroring from scratch. |
Share Groups | 4.x | 4.y | If the source doesn't support share groups, mirroring continues but share group offsets won't be synchronized. |
Performance Considerations
MirrorFetcherThread uses the same fetch protocol optimizations as ReplicaFetcherThread:
- Fetch Sessions: Incremental fetch sessions (KIP-227) reduce fetch request size by sending only changed partition metadata. This optimization is critical for cross-cluster replication where WAN latency is higher than LAN latency.
- Pipelining: Multiple fetch requests can be in-flight simultaneously, improving throughput over high-latency connections. The number of in-flight requests is controlled by standard replica fetcher settings.
- Compression: Record batches are transferred in their original compressed format, minimizing network bandwidth. The destination cluster decompresses and recompresses based on its own compression settings only if the compression codec differs.
- Zero-Copy Transfer: Within the destination cluster, replication from read-only leaders to followers uses zero-copy transfers where supported by the operating system.
Cluster Mirroring introduces additional replication threads and network I/O on brokers configured as read-only leaders for mirror partitions. The performance impact on existing intra-cluster replication is minimized through resource isolation:
- Separate Thread Pools: Cross-cluster fetcher threads run in a dedicated thread pool, which is independent from the intra-cluster fetcher thread pool. This separation ensures that cross-cluster replication latency does not impact local replica synchronization.
- Network I/O Overhead: Read-only leaders perform additional network I/O to fetch from source clusters. This overhead is proportional to the number of mirror partitions and the replication throughput. Brokers with many mirror partitions may experience increased CPU usage for network processing and data serialization.
- Memory Footprint: Each mirror fetcher thread maintains its own fetch session state, partition state map, and response buffers. With default configuration, memory overhead is comparable to standard replica fetchers. The metadata manager maintains connection pools and metadata caches, adding minimal memory overhead.
- Bandwidth Consumption: Cross-cluster traffic between source and destination clusters consumes WAN bandwidth. For large-scale deployments, operators should provision adequate inter-datacenter connectivity or configure throttling.
- State Management: Mirror partition state management is evenly distributed to available brokers to avoid any hot spot, especially during rolling update or restart events.
Test Plan
Unit Tests
Unit tests will cover individual component behavior:
- MirrorCoordinator: State loading, partition assignment, metadata persistence.
- MirrorMetadataManager: Topic creation, config sync, offset commit, ACL sync.
- MirrorFetcherThread: Epoch tracking, fetch processing, leader change handling.
- MirrorCommand: Command-line parsing, Admin API invocation, error handling.
- Protocol Serialization: Extended and new APIs serialization and deserialization.
Integration Tests
Integration tests will validate end-to-end functionality across multiple brokers:
- CLI Workflow: Create mirror with kafka-mirrors.sh, add topics, verify replication
- Basic Replication: Create mirror via API, replicate topic, verify data consistency
- Metadata Sync: Modify topic config in source, verify automatic sync to destination
- Partition Expansion: Add partitions to source topic, verify destination expands
- Consumer Groups: Commit offsets in source, verify replication to destination
- ACL Replication: Create ACL in source, verify creation in destination
- Leader Changes: Trigger leader election in source, verify fetcher reconnects
- Broker Failures: Stop destination broker, verify replication continues after recovery
System Tests
System tests will validate behavior under realistic production conditions:
- Performance Benchmark: Measure replication throughput and latency across WAN.
- Scalability Test: Replicate 1000 topics with 100,000 partitions across clusters.
- Failover Test: Simulate source cluster failure, measure consumer recovery time.
- Long-Running Stability: Run continuous replication for 7 days, verify no memory leaks or performance degradation.
- Security Validation: Test all authentication mechanisms (SASL PLAIN, SCRAM, Kerberos, mTLS) via kafka-mirrors.sh config files.
Rejected Alternatives
Keep Using MirrorMaker 2
This KIP introduces native cluster mirroring to address the limitations of MirrorMaker 2 described in the motivation section. The following tables provide a detailed comparison across deployment, features, and performance characteristics.
Use Case Guidance:
- MirrorMaker 2: Best for active-active topologies, multi-region writes, complex routing scenarios
- Cluster Mirroring: Best for disaster recovery, failover, migration, simplified operations with exact offset preservation
Deployment Comparison
| MirrorMaker 2 | Cluster Mirroring | |
|---|---|---|
| Architecture | External Connect workers (separate JVM) | Integrated into Kafka brokers using native replication protocol |
| Operational Complexity | Manage Connect cluster lifecycle independently | Unified with broker operations |
| Monitoring | Separate Connect metrics and dashboards | Standard Kafka JMX metrics |
Feature Support Comparison
| MirrorMaker 2 | Cluster Mirroring | |
|---|---|---|
Offset Translation | Lossy, requires remapping, causes reprocessing overhead | None needed, offsets preserved exactly |
Metadata Sync | Requires separate connector configuration (MirrorSourceConnector, MirrorCheckpointConnector) | Automatic (topics, configs, consumer groups, ACLs) |
Transactional Topics | Markers copied as regular records, incomplete transactions possible during replication | Markers mirrored, LSO truncation ensures consistency before failover |
Topic Write Protection | Not supported (mirror topics always writable) | Read-only enforcement during mirroring, writable only after explicit failover |
Tiered Storage | Fetches from broker (which reads from remote storage) | Not initially supported (future work) |
Active-Active | Supported via topic prefixing and cycle detection | Not supported (read-only enforcement prevents cycles) |
Share Groups | Not supported | Supported |
Failback | Full re-mirror from offset 0 | Delta sync using DescribeMirror |
Topic Name Preservation | No, destination topics prefixed with source cluster alias (e.g., source.topic-name) | Yes, same topic name as source |
Topic ID Preservation | No, destination gets new topic ID | Yes: same topic ID as source |
Performance & Isolation Comparison
| MirrorMaker 2 | Cluster Mirroring | |
|---|---|---|
Compression Overhead | Decompress + recompress records | Preserve source compression (zero overhead) |
Failover Time | Offset translation + consumer group sync + bootstrap reconfiguration | Consumer group sync + bootstrap reconfiguration only |
Bandwidth Control (Source) | None (unless client quotas manually configured) | Client quotas (current), dedicated mirror throttling (future) |
Bandwidth Control (Destination) | None (unless client quotas manually) | Replica-level throttling (mirror.replication.throttled.rate) |
Resource Isolation (Source) | Shares network bandwidth and disk I/O with all consumers | Shares network bandwidth and disk I/O with all consumers (same - current); isolated on replica-level (future) |
Resource Isolation (Destination | Separate JVM heap (memory isolated from brokers) but still consume resources from the broker as any client | Shares network, disk I/O; dedicated thread pool within broker JVM |
Malformed Batch Handling | Crashes Connect task, all partitions in task restart | Fails individual partition (FAILED state), others continue |
Blast Radius(Failure) | All partitions in task affected | Single partition affected |
Catch-Up Surge Protection | Connect worker heap may OOM, affects all tasks | Throttling on destination replica + source quota prevent memory spikes |
Unclean Leader Election | No protection against log divergence | Explicitly unsupported with warning logs |












1 Comment
ViquarKhan
KIP no was wrong so updated with correct one