Note that this proposal is incomplete, and tries to explore the UX of the feature before establishing the technical requirements and limitations. As we discover what the technical limitations are, some of the UX may need to change, and some semantics of the feature may need to be softened.

To Co-Authors: None of the below contents is final or necessarily correct. Please feel free to edit this document directly and summarize your changes on the mailing list afterwards. 

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Since before the implementation of MirrorMaker 1, there has been a desire to replicate data between Kafka clusters. This can be done for a many different reasons including but not limited to: disaster recovery, latency optimization, load shaping, security, and data protection. Currently the open source tooling for Kafka replication consists of MirrorMaker 1 and MirrorMaker 2, which both fall short in many modern use-cases.

  1. They run in external processes which may experience outages when both Kafka clusters are otherwise healthy and accepting clients
  2. They represent an additional operational burden beyond just running Kafka
  3. Replication does not preserve offsets of individual records
  4. Replication does not preserve exactly-once-semantics for records & consumer offsets

Goals

  • Replicate topics and other associated resources between intentionally separate Kafka clusters
  • Offsets for replicated records should be the same as in the origin Kafka
  • Preserve exactly-once-semantics for replicated records

Non-Goals

  • Implement multi-leader partitions that accept writes on multiple brokers.
  • Perform distributed consensus to elect partition leaders across multiple clusters
  • Provide transparent failover of single clients between Kafka clusters

Public Interfaces

Operators will use the AdminClient (directly or via a command-line utility) to create and manage Replication Links between pairs of clusters. These clusters will then open connections between individual brokers to replicate data from the Source Topic Leader to the Follower Topic Leader.

Operators will configure Replication Links to provide the availability and consistency guarantees required for their usage. Application Developers can use Replication Links to implement multi-cluster applications, or migrate applications between clusters while maintaining data semantics.

User Interface

Admin Client - Namespace

A Namespace is an abstract group of a single type of string-identified resource in a single cluster, one of:

  • Topic
  • Consumer Group
  • ACL Principal

The AdminClient has CRUD operations for Namespaces. Namespaces have the following properties:

  • A string prefix that all resources within the namespace share, fixed during creation
  • The resource type of those resources, fixed during creation
  • Reserved: boolean flag default false, dynamically reconfigurable

Using prefixes to identify namespaces has the following properties:

  • It is possible to map resources between namespaces by replacing one prefix with another. (e.g. (A*, B*) namespace mapping will pair (A.topic, B.topic) topics)
  • It is possible to test whether two namespaces have overlap (e.g. A* and B* share no overlap. AB* and A* have an overlap)
  • It is possible to find all resources within a namespace, and test if that set of resources is empty.

One cluster can have many namespaces, some overlapping. For an analogy, a Namespace is like a directory in a filesystem. Multiple filesystems (clusters) can have the same directory (namespace) "/dir/", and some filesystems can rename "/dir/" to "/otherDir/". This can be extended to support first-class directories in Kafka at a later date.

Topic namespaces additionally have some state which is read-only by users:

  • A provenance history for each of the topic-partitions, indicating the source cluster used for ranges of offsets

Namespaces with reservations are used when fulfilling write-requests to resources within that namespace, to prevent conflicting writes ahead-of-time. The provenance history is used for managing unclean link recovery.

The top-level namespace with the prefix "" does not include resources starting with underscore, to exclude all internal topics.

A Replication Link is a pair of clusters that were/are/will be involved in cross-cluster replication. The AdminClient has CRUD operations for Namespaces.

A single cluster can participate in multiple replication links simultaneously. A pair of clusters is permitted to have any number of links between them.

The replication link has a configuration, which includes:

  • The local cluster ID, fixed during creation
  • The remote cluster ID, fixed during creation
  • The mode of the link (synchronous or asynchronous), dynamically reconfigurable
  • The direction of the link/role of this cluster (leader, follower)
  • The bootstrap.servers for the opposite cluster, dynamically reconfigurable
  • Credentials which include permission to manage replication links on the opposite cluster, dynamically reconfigurable

A replication link also includes a set of attached namespace mappings. These are each configured with:

  • The local cluster namespace, fixed during creation
  • The remote cluster namespace, fixed during creation

Creating a replication link causes the cluster to contact the other cluster via the bootstrap servers and replicate the link configuration. The configuration on the other cluster is mirrored with local IDs swapped with remote IDs.

Namespace configurations are stored and configurable on both clusters, with clusters performing lightweight consensus or CRDTs for some operations and reconciliation for others. This is to allow operators to intentionally diverge their configurations on each cluster during a network partition.

Namespaces can be attached or detached from a replication link dynamically. A namespace can only be the follower for one replication link at a time, but can be the leader for multiple replication links simultaneously. Marking a namespace as a follower for any link sets the reserved flag if it is not already set.

Deleting a replication link detaches all namespaces, and reserved namespaces stay reserved. These namespaces can later be added to another link, marked non-reserved, or deleted. Namespaces cannot be deleted while attached to a replication link.

Attaching or detaching a replication link updates the topic provenance history for those topics, recording the offset at which a topic changed state.

Consumers and Producers

Existing Consumers & Producers can access replicated topics without an upgrade or reconfiguration.

Metrics

Each partition being replicated will expose replication lag on the follower side of the replication link.

Data Semantics

Relation to Intra-Cluster replication

Cross-Cluster Replication is similar to Intra-Cluster replication, as both cross-cluster topics and intra-cluster replicas:

  • Have the same configuration as their source
  • Have the same offsets for records
  • Have the same number of partitions
  • Are included in the ISR (synchronous mode)

They are different in the following ways, as cross-cluster replicas:

  • Are subject to the target cluster's ACL environment
  • Are not eligible for fetches from source cluster consumers
  • Have a separate topic-id
  • Are not included in the ISR (asynchronous mode)

Replication Mode

  • Asynchronous mode allows replication of a record to proceed after the source cluster has ack'd the record to the producer.
    • Maintains existing latency of source producers, but provides only single-partition consistency guarantees
  • Synchronous mode requires the source cluster to delay acks for a producer until after the record has been replicated to all ISRs for all attached synchronous replication links.
    • Also applies to consumer group offsets submitted by the consumer or transactional producer 
    • Increases latency of source producers, in exchange for multi-topic & consumer-offset consistency guarantees

New replication links are always in asynchronous mode. Once the replication link has caught up completely, it can be changed to synchronous mode. Once the synchronous mode metadata has converged across both clusters, all new transactions started on the source cluster will be exactly-once delivered to the target cluster. Links in synchronous mode can be manually downgraded to asynchronous, immediately losing exactly-once guarantees.

In general, automatic processes happening in the background will only increase consistency and reduce availability. Steps which decrease consistency are always initiated via AdminClient, and are at the discretion of the operator.

Network Partition Behavior

We must support network partition tolerance, which requires choosing between consistency and availability. Availability in the table below means that the specified client can operate, at the expense of consistency with another client. Consistency means that the specified client will be unavailable, in order to be consistent with another client.

Mode
Asynchronous ModeSynchronous Mode

Link State

DisconnectedCatching UpFollowingSyncingIn-Sync/Out-Of-Sync

Source Consumers

Available1,2Available1Available1Available1Available1

Source Non-Transactional Producers

Available1,2Available1Available1Available1Available1
Source Transactional ProducersAvailable2Available3Available4Consistent5Consistent5
Target Consumers in Non-Replicated GroupAvailable2Available6Available6Consistent7Consistent8
Target Consumers in Replicated GroupAvailable2Consistent9Consistent9Consistent7,9Consistent9
Target ProducersAvailable2Consistent10Consistent10Consistent10Consistent10
  1. Source clients not involved in transactions will always prioritize availability over consistency
  2. When the link is permanently disconnected, clients on each cluster are not required to be consistent and can show whatever the state was when the link was disconnected.
  3. Transactional clients are available during link catch-up, to allow creating links on transactional topics without downtime.
  4. Transactional clients are available when asynchronous links are catching up, because the source cluster is allowed to ack transactional produces while async replication is offline or behind.
  5. Transactional clients are consistent when synchronous links are out-of-sync, because the destination topic is readable by target consumers. Transactional produces to a topic with an out-of-sync synchronous replication link should timeout or fail.
  6. Consumers of an asynchronous replicated topic will see partial states while the link is catching up
  7. While syncing a synchronous replicated topic, consumers will be unable to read the partial topic. This allows source transactional produces to proceed while the link is starting (3)
  8. Consumers of a synchronous replicated topic should always see the same contents as the source topic
  9. Consumers within a replicated consumer group will be unable to commit offsets, as the offsets are read-only while the link is active.
  10. Producers targeting the replicated topic will fail because the topic is not writable until it is disconnected. Allowing a parallel-write to the replicated topic would be inconsistent.

Unclean Leader Election

If an unclean leader election takes place on the source cluster, the follower topic will be truncated to correspond with the upstream topic.

Unclean Link Recovery

After a link is disconnected, the history of the two topics is allowed to diverge arbitrarily, as each leader will accept writes which are uncorrelated. After this has happened, it would be desirable to re-establish the link, truncating whichever side is now the follower. This rewrites history on one cluster to coincide with the data on the other, and is only performed manually.

The cluster will save a topic provenance history whenever a namespace is removed from a link, to record the last offset received from that link before disconnection. This will be used to determine what offset to truncate to when reattaching to the network link.

Consumer Offsets & Metadata Replication

Consumer offsets are in a per-cluster __consumer_offsets topic. This topic participates in transactions via sendOffsetsToTransaction, so must be replicated synchronously in order to preserve transactional semantics.

Because the __consumer_offsets topic must remain writable in both clusters concurrently, it is not viable to directly replicate from one cluster's topic to the other. On the destination cluster, a separate topic must be used, such as __consumer_offsets_<source_cluster_id>.

Because the __consumer_offsets topic is a privileged internal topic containing many different consumer groups, and clusters participating in a cross-cluster replication need to choose how to manage replicating consumer offsets.

  1. Trivially replicate the whole __consumer_offsets topic to the target cluster (completely trust target cluster with enforcement)
  2. Redact the __consumer_offsets topic during replication to just the consumer groups as-configured (only replicates new offset commits, some cpu overhead to do redaction)
  3. Per-key encrypt the __consumer_offsets topic during replication, and selectively pass encryption keys when consumer groups added (cpu and/or storage overhead, requires cryptographic investigation)

It would also be advantageous to have access to the metadata, but with similar security measures in place. This would allow each cluster to keep a "last known state" of the other cluster during network partitions.

Remote and Local Logs

  • Remote log replication should be controlled by the second tier's provider.
  • Remote logs can be referenced if not replicated by the second tier's provider so that replicated Kafka topics reference the source remote log storage.

Networking

  • The network path between Kafka clusters is assumed to have less uptime, lower bandwidth,  and higher latency than the intra-cluster network, and have more strict routing constraints.
  • Network connections for replication links can proceed either source→target or target→source to allow one cluster to be behind a NAT (can we support NAT punching as a first-class feature, to allow both clusters to be behind NAT?)
  • Allow for simple traffic separation of cross-cluster connections and client/intra-cluster connections (do we need to connect to something other than the other cluster's advertised listeners?)

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

User Stories

Developers

Single-Cluster Applications

Applications with all consumers and all producers on one cluster will retain existing delivery guarantees for non-transactional and transactional producers.

Applications will not be affected by asynchronous replication links, but will experience higher latency with synchronous replication. (Create an informational Namespace which is not allowed to be part of a synchronous link?)

Multi-Cluster Applications

Applications may have producers (and consumers) in one cluster, and a replication link to a different cluster containing only consumers.

Creating a new application with clients in multiple clusters will involve having the operator of the cluster provision replicated namespaces, inside which replicated resources may be created.

This application depends on the presence of the replication link, and would be disrupted if the provisioned namespace were to be unattached or the link deleted.

If an application needs producers in multiple clusters, they can do so with per-cluster leader topics. This is similar to the traditional Active-Active architecture in MirrorMaker 2.

Multi-Cluster Non-Transactional Applications

For consumers on asynchronous follower topics, and for synchronous links that are "out-of-sync", the end of the follower partition may be far behind the source partition. Different topics/partitions may be behind by significantly different amounts.

For consumers on synchronous follower topics that are "in-sync", the end of the follower partition should be close behind the source partition, and may be slightly ahead or behind of other follower partitions.

Multi-Cluster Transactional Applications

An application may span multiple clusters, and use transactional producers and read_committed producers.

For producers on asynchronous topics or synchronous topics in "syncing" state, they can commit without the transaction being started on the remote cluster.

For producers on synchronous topics in the "in-sync" state, they cannot commit until the data has been prepared on the remote cluster. After a commit, the data may be visible first on the source cluster.

For producers on synchronous topics in the "out-of-sync" state, they will be unable to receive acks from the source brokers, and will be unable to commit transactions.

Operators

  1. Upgrade both clusters to a version supporting Cross Cluster Replication
  2. Obtain credentials to manage namespaces and replication links on both clusters
  3. Pick a cluster not behind a NAT to create a replication link with the bootstrap servers & credentials for the other cluster
  4. Confirm clusters are able to reach one another and perform initial handshake.

Adding an empty namespace "local-" to a replication link, and creating a topic "local-clicks" within it afterwards

  1. Choose the destination prefix in the follower cluster, e.g. "remote"
  2. The link can be synchronous or asynchronous without interrupting availability
  3. On the leader cluster, create a namespace "local-"
  4. On the follower cluster, create a namespace "remote-"
  5. On either cluster, attach these two namespaces, configured with "local-" as the leader or "remote-" as the follower
  6. Observe that the link remains in the "Following" or "In-Sync" states
  7. Create the topic "local-clicks" on the leader cluster
  8. Observe that the link remains in the "Following" or "In-Sync" states

Adding a single existing topic (local-clicks) to a replication link (only recommended for asynchronous links)

  1. Choose the name of the topic on the destination cluster, e.g. "remote-clicks"
  2. On the leader cluster, create a namespace "local-clicks"
  3. On the follower cluster, create a namespace "remote-clicks"
  4. On either cluster, attach these two namespaces, configured with "local-clicks" as the leader or "remote-clicks" as the follower
  5. Observe that the follower namespace "remote-clicks" is marked reserved
  6. Observe that the link goes to the "Catching Up" or "Out-Of-Sync" state
  7. Wait for the link to return to "Following" or "In-Sync"

Remove a follower topic from a replication link and make it writable

  1. Detach the namespace from the link.
  2. Observe that the link remains in the "Following" or "In-Sync" states
  3. Observe that the follower topics are still not writable, because the namespace is marked "reserved"
  4. Reconfigure the namespace to clear the "reserved" flag
  5. Observe that the topic becomes writable

Reverse the replication direction of a namespace with full-consistency

  1. Create a replication link in the opposite direction, if one does not already exist.
  2. Verify that link is in "Following" or "In-Sync" state
  3. Mark the leader namespace as "reserved" (and wait for metadata to propagate, should namespaces have states too?)
  4. Observe that the leader namespace resources stop being writable.
  5. Detach the namespaces from the original replication link
  6. Attach the namespaces to the opposite replication link
  7. Clear the reserved flag for the new leader namespace
  8. Observe that the leader namespace is now writable. (All open transactions should be aborted when the reservation is removed, how?)

Disaster Recovery (multi-zone Asynchronous)

  1. I administrate multiple Kafka clusters in different availability zones
  2. I have a performance-sensitive application that reads in all zones but writes to only one zone at a time. For example, an application that runs consumers in zones A and B to keep caches warm but disables producers in zone B while zone A is running.
  3. I set up an asynchronous Cross-Cluster replication link for my topics and consumer groups from cluster A to cluster B. While the link is being created, applications in zone A are performant, and zone B can warm it's caches with historical data as it is replicated.
  4. I do the same with cluster A and cluster C (and others that may exist)
  5. When zone A goes offline, I want the application in zone B to start writing. I manually disconnect the A→B cross-cluster link, and trigger the application in zone B to begin writing.
  6. What happens to cluster C? Can we connect B→C quickly? What happens if C is ahead of B and truncating C breaks the cluster C consumers?
  7. When zone A recovers, I see that the history has diverged between zone A and B. I manually delete the topics in zone A and re-create the replication link in the opposite direction.

Unclean Link Recovery

  1. Examine the last shared offset, and the current end offset for the partitions in both namespaces
  2. Determine which namespace should be the leader after link recovery, and which should be the follower
  3. Move all clients to the Leader cluster
  4. Mark the follower namespace as reserved
  5. Set the replication link in the desired direction to Asynchronous mode if not already.
  6. Add the namespace pair to the link
  7. Wait for the link to truncate, and catch up

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • Both the source and target clusters should have a version which includes the Cross-Cluster Replication feature
  • Clusters which support the Cross-Cluster Replication feature should negotiate on the mutually-supported replication semantics
  • If one of the clusters is downgraded to a version which does not support Cross-Cluster Replication, the partner cluster's link should fall out-of-sync.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

Propose improvements to MirrorMaker 2 or a new MirrorMaker 3

Mirror Maker's approach to using public clients to perform replication limits the guarantees that replication provides. In order to strengthen these guarantees, we would need to add capabilities to the public clients, or rely on internal interfaces, neither of which is desirable.

Establish mechanisms for improving "stretched clusters" that have a heterogeneous network between nodes, aka "rack awareness"

The use-case for a stretched cluster is different than cross-cluster replication, in that a stretched cluster shares ACLs, topic-ids, principals, secrets, etc. Cross-Cluster Replication is intended to be used across data protection domains, which currently require the use of distinct clusters.

Propose a layer above Kafka to provide virtual/transparent replication

This is currently possible to implement with the Kafka public APIs, but doesn't actually replicate the data. This makes it unsuitable for disaster recovery, latency optimization, and load-shaping use-cases where connectivity to the source topic/replicas may be lost.


  • No labels