Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: User Interface & User Stories

...

  • Implement multi-leader partitions that accept writes on multiple brokers.
  • Perform distributed consensus to elect partition leaders across multiple clusters
  • Provide transparent failover of single clients between Kafka clusters

Public Interfaces

User Interface

Admin Client - Namespace

A Namespace is an abstract group of a single type of string-identified resource in a single cluster, one of:

  • Topic
  • Consumer Group
  • ACL Principal

The AdminClient has CRUD operations for Namespaces. Namespaces have the following properties:

Operators will use the AdminClient (directly or via a command-line utility) to create and manage Replication Links between pairs of clusters. These clusters will then open connections between individual brokers to replicate data from the Source Topic Leader to the Follower Topic Leader.

Operators will configure Replication Links to provide the availability and consistency guarantees required for their usage. Application Developers can use Replication Links to implement multi-cluster applications, or migrate applications between clusters while maintaining data semantics.

User Interface

Admin Client - Namespace

A Namespace is an abstract group of a single type of string-identified resource in a single cluster, one of:

  • Topic
  • Consumer Group
  • ACL Principal

The AdminClient has CRUD operations for Namespaces. Namespaces have the following properties:

  • A string prefix that all resources within the namespace share, fixed during creation
  • The resource type of those resources, fixed during creation
  • Reserved:
  • A string prefix that all resources within the namespace share, fixed during creation
  • The resource type of those resources, fixed during creation
  • Reserved: boolean flag default false, dynamically reconfigurable

...

One cluster can have many namespaces. These namespaces can correspond to namespaces in other clusters either trivially (with the same prefix) or via a mapping (to a different prefix).For an analogy, a , some overlapping. For an analogy, a Namespace is like a directory in a filesystem. Multiple filesystems (clusters) can have the same directory (namespace) "/dir/", and some filesystems can rename "/dir/" to "/otherDir/". This can be extended to support first-class directories in Kafka at a later date.

Topic namespaces additionally have some state which is read-only by users:

  • A provenance history for each of the topic-partitions, indicating the source cluster used for ranges of offsets

Namespaces with reservations are used when fulfilling write-requests to resources within that namespace, to Namespaces with reservations are used when fulfilling write-requests to resources within that namespace, to prevent conflicting writes ahead-of-time. They are also used to track the provenance of data: i.e. where did the data originate for some segment of the log?

Namespaces cannot be deleted while attached to a replication link. They are intended to outlive a replication link, and preserve some history about the topics between replication link lifetimes. They can be deleted after being detached from all links, but the corresponding link provenance will be lost.

The provenance history is used for managing unclean link recovery.

The top-level namespace with the prefix "" does not include resources starting with underscore, to exclude all internal topics.

A Replication Link is a pair of clusters A Replication Link is a pair of clusters that were/are/will be involved in cross-cluster replication. The AdminClient has CRUD operations for Namespaces. Namespace configurations are stored and configurable on both clusters, with clusters performing lightweight consensus for some operations and reconciliation for others.

A single cluster can participate in multiple A single cluster can participate in multiple replication links simultaneously. Multiple links are allowed between a A pair of clusters , in both directionsis permitted to have any number of links between them.

The replication link has a configuration, which includes:

  • The local cluster ID, fixed during creation
  • The and the remote cluster ID, fixed during creation
  • The mode of the link (synchronous or asynchronous), dynamically reconfigurable
  • The bootstrap.servers for the opposite cluster, dynamically reconfigurabledirection of the link/role of this cluster (leader, follower)
  • The bootstrap.servers for the opposite cluster, dynamically reconfigurable
  • Credentials which include permission to manage replication links on the opposite cluster, dynamically reconfigurable

A replication link also includes a set of attached namespace pairs that are mapped to one another, and the direction in which replication will occur for that namespace.

Namespaces can be attached or detached from a replication link dynamically. A namespace can only be the follower for one replication link at a time, but can be the leader for multiple replication links simultaneously. A namespace must be reserved before it can become a follower.

Deleting/disconnecting a replication link detaches all namespaces, and un-reserves them if they were following. These namespaces can later be added to another link.

Consumers and Producers

Existing Consumers & Producers can access replicated topics without an upgrade or reconfiguration.

Metrics

Each partition being replicated will expose replication lag on the follower side of the replication link.

Data Semantics

Relation to Intra-Cluster replication

Cross-Cluster Replication is similar to Intra-Cluster replication, as both cross-cluster topics and intra-cluster replicas:

  • Have the same configuration as their source
  • Have the same offsets for records
  • Have the same number of partitions
  • Are included in the ISR (synchronous mode)

They are different in the following ways, as cross-cluster replicas:

mappings. These are each configured with:

  • The local cluster namespace, fixed during creation
  • The remote cluster namespace, fixed during creation

Creating a replication link causes the cluster to contact the other cluster via the bootstrap servers and replicate the link configuration. The configuration on the other cluster is mirrored with local IDs swapped with remote IDs.

Namespace configurations are stored and configurable on both clusters, with clusters performing lightweight consensus or CRDTs for some operations and reconciliation for others. This is to allow operators to intentionally diverge their configurations on each cluster during a network partition.

Namespaces can be attached or detached from a replication link dynamically. A namespace can only be the follower for one replication link at a time, but can be the leader for multiple replication links simultaneously. Marking a namespace as a follower for any link sets the reserved flag if it is not already set.

Deleting a replication link detaches all namespaces, and reserved namespaces stay reserved. These namespaces can later be added to another link, marked non-reserved, or deleted. Namespaces cannot be deleted while attached to a replication link.

Attaching or detaching a replication link updates the topic provenance history for those topics, recording the offset at which a topic changed state.

Consumers and Producers

Existing Consumers & Producers can access replicated topics without an upgrade or reconfiguration.

Metrics

Each partition being replicated will expose replication lag on the follower side of the replication link.

Data Semantics

Relation to Intra-Cluster replication

Cross-Cluster Replication is similar to Intra-Cluster replication, as both cross-cluster topics and intra-cluster replicas:

  • Have the same configuration as their source
  • Have the same offsets for records
  • Have the same number of partitions
  • Are included in the ISR (synchronous mode)

They are different in the following ways, as cross-cluster replicas:

  • Are subject to the Are subject to the target cluster's ACL environment
  • Are not eligible for fetches from source cluster consumers
  • Have a separate topic-id
  • Are not included in the ISR (asynchronous mode)

...

  1. Source clients not involved in transactions will always prioritize availability over consistency
  2. When the link is permanently disconnected, clients on each cluster are not required to be consistent and can show whatever the state was when the link was disconnected.
  3. Transactional clients are available during link catch-up, to allow creating links on transactional topics without downtime.
  4. Transactional clients are available when asynchronous links are catching up, because the source cluster is allowed to ack transactional produces while async replication is offline or behind.
  5. Transactional clients are consistent when synchronous links are out-of-sync, because the destination topic is readable by target consumers. Transactional produces to a topic with an out-of-sync synchronous replication link should timeout or fail.
  6. Consumers of an asynchronous replicated topic will see partial states while the link is catching up
  7. While syncing a synchronous replicated topic, consumers will be unable to read the partial topic. This allows source transactional produces to proceed while the link is starting (3)
  8. Consumers of a synchronous replicated topic should always see the same contents as the source topic
  9. Consumers within a replicated consumer group will be unable to commit offsets, as the offsets are read-only while the link is active.
  10. Producers targeting the replicated topic will fail because the topic is not writable until it is disconnected. Allowing a parallel-write to the replicated topic would be inconsistent.

Unclean

...

Leader Election

If an unclean leader election takes place on the source cluster, the follower topic will be truncated to correspond with the upstream topic.

Unclean Link Recovery

After a link is After a link is disconnected, the history of the two topics is allowed to diverge arbitrarily, as each leader will accept writes which are uncorrelated. After this has happened, it would be desirable to re-establish the link, truncating whichever side is now the follower. This rewrites history on one cluster to coincide with the data on the other, and is only performed manually.

The cluster will save a topic provenance history whenever a namespace is removed from a link, to record the last offset received from that link before disconnection. This will be used to determine what offset to truncate to when reattaching to the network link.

Remote and Local Logs

  • Remote log replication should be controlled by the second tier's provider.
  • Remote logs can be referenced if not replicated by the second tier's provider so that replicated Kafka topics reference the source remote log storage.

Networking

  • The network path between Kafka clusters is assumed to have less uptime, lower bandwidth,  and higher latency than the intra-cluster network, and have more strict routing constraints.
  • Network connections for replication links can proceed either source→target or target→source to allow one cluster to be behind a NAT (can we support NAT punching as a first-class feature, to allow both clusters to be behind NAT?)
  • Allow for simple traffic separation of cross-cluster connections and client/intra-cluster connections (do we need to connect to something other than the other cluster's advertised listeners?)

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

User Stories

Developers

Single-Region Non-Transactional Applications

Single-Region Transactional Applications

Multi-Region Non-Transactional Applications

Multi-Region Transactional Applications

Consumer Offsets & Metadata Replication

Consumer offsets are in a per-cluster __consumer_offsets topic. This topic participates in transactions via sendOffsetsToTransaction, so must be replicated synchronously in order to preserve transactional semantics.

Because the __consumer_offsets topic must remain writable in both clusters concurrently, it is not viable to directly replicate from one cluster's topic to the other. On the destination cluster, a separate topic must be used, such as __consumer_offsets_<source_cluster_id>.

Because the __consumer_offsets topic is a privileged internal topic containing many different consumer groups, and clusters participating in a cross-cluster replication need to choose how to manage replicating consumer offsets.

  1. Trivially replicate the whole __consumer_offsets topic to the target cluster (completely trust target cluster with enforcement)
  2. Redact the __consumer_offsets topic during replication to just the consumer groups as-configured (only replicates new offset commits, some cpu overhead to do redaction)
  3. Per-key encrypt the __consumer_offsets topic during replication, and selectively pass encryption keys when consumer groups added (cpu and/or storage overhead, requires cryptographic investigation)

It would also be advantageous to have access to the metadata, but with similar security measures in place. This would allow each cluster to keep a "last known state" of the other cluster during network partitions.

Remote and Local Logs

  • Remote log replication should be controlled by the second tier's provider.
  • Remote logs can be referenced if not replicated by the second tier's provider so that replicated Kafka topics reference the source remote log storage.

Networking

  • The network path between Kafka clusters is assumed to have less uptime, lower bandwidth,  and higher latency than the intra-cluster network, and have more strict routing constraints.
  • Network connections for replication links can proceed either source→target or target→source to allow one cluster to be behind a NAT (can we support NAT punching as a first-class feature, to allow both clusters to be behind NAT?)
  • Allow for simple traffic separation of cross-cluster connections and client/intra-cluster connections (do we need to connect to something other than the other cluster's advertised listeners?)

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

User Stories

Developers

Single-Cluster Applications

Applications with all consumers and all producers on one cluster will retain existing delivery guarantees for non-transactional and transactional producers.

Applications will not be affected by asynchronous replication links, but will experience higher latency with synchronous replication. (Create an informational Namespace which is not allowed to be part of a synchronous link?)

Multi-Cluster Applications

Applications may have producers (and consumers) in one cluster, and a replication link to a different cluster containing only consumers.

Creating a new application with clients in multiple clusters will involve having the operator of the cluster provision replicated namespaces, inside which replicated resources may be created.

This application depends on the presence of the replication link, and would be disrupted if the provisioned namespace were to be unattached or the link deleted.

If an application needs producers in multiple clusters, they can do so with per-cluster leader topics. This is similar to the traditional Active-Active architecture in MirrorMaker 2.

Multi-Cluster Non-Transactional Applications

For consumers on asynchronous follower topics, and for synchronous links that are "out-of-sync", the end of the follower partition may be far behind the source partition. Different topics/partitions may be behind by significantly different amounts.

For consumers on synchronous follower topics that are "in-sync", the end of the follower partition should be close behind the source partition, and may be slightly ahead or behind of other follower partitions.

Multi-Cluster Transactional Applications

An application may span multiple clusters, and use transactional producers and read_committed producers.

For producers on asynchronous topics or synchronous topics in "syncing" state, they can commit without the transaction being started on the remote cluster.

For producers on synchronous topics in the "in-sync" state, they cannot commit until the data has been prepared on the remote cluster. After a commit, the data may be visible first on the source cluster.

For producers on synchronous topics in the "out-of-sync" state, they will be unable to receive acks from the source brokers, and will be unable to commit transactions.

Operators

  1. Upgrade both clusters to a version supporting Cross Cluster Replication
  2. Obtain credentials to manage namespaces and replication links on both clusters
  3. Pick a cluster not behind a NAT to create a replication link with the bootstrap servers & credentials for the other cluster
  4. Confirm clusters are able to reach one another and perform initial handshake.

Adding an empty namespace "local-" to a replication link, and creating a topic "local-clicks" within it afterwards

  1. Choose the destination prefix in the follower cluster, e.g. "remote"
  2. The link can be synchronous or asynchronous without interrupting availability
  3. On the leader cluster, create a namespace "local-"
  4. On the follower cluster, create a namespace "remote-"
  5. On either cluster, attach these two namespaces, configured with "local-" as the leader or "remote-" as the follower
  6. Observe that the link remains in the "Following" or "In-Sync" states
  7. Create the topic "local-clicks" on the leader cluster
  8. Observe that the link remains in the "Following" or "In-Sync" states

Adding a single existing topic (local-clicks) to a replication link (only recommended for asynchronous links)

  1. Choose the name of the topic on the destination cluster, e.g. "remote-clicks"
  2. On the leader cluster, create a namespace "local-clicks"
  3. On the follower cluster, create a namespace "remote-clicks"
  4. On either cluster, attach these two namespaces, configured with "local-clicks" as the leader or "remote-clicks" as the follower
  5. Observe that the follower namespace "remote-clicks" is marked reserved
  6. Observe that the link goes to the "Catching Up" or "Out-Of-Sync" state
  7. Wait for the link to return to "Following" or "In-Sync"

Remove a follower topic from a replication link and make it writable

  1. Detach the namespace from the link.
  2. Observe that the link remains in the "Following" or "In-Sync" states
  3. Observe that the follower topics are still not writable, because the namespace is marked "reserved"
  4. Reconfigure the namespace to clear the "reserved" flag
  5. Observe that the topic becomes writable

Reverse the replication direction of a namespace with full-consistency

  1. Create a replication link in the opposite direction, if one does not already exist.
  2. Verify that link is in "Following" or "In-Sync" state
  3. Mark the leader namespace as "reserved" (and wait for metadata to propagate, should namespaces have states too?)
  4. Observe that the leader namespace resources stop being writable.
  5. Detach the namespaces from the original replication link
  6. Attach the namespaces to the opposite replication link
  7. Clear the reserved flag for the new leader namespace
  8. Observe that the leader namespace is now writable. (All open transactions should be aborted when the reservation is removed, how?)

...

Disaster Recovery (multi-zone Asynchronous)

...