This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/h4x382z0089odj201pq2j086483q6sfr
JIRA:
-
KAFKA-18793Getting issue details...
STATUS
Authors: Daniel Urban, Gergely Harmadas, David Simon, Viktor Somogyi-Vass
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Kafka is widely adopted for building distributed event streaming platforms. As Kafka clusters grow in size and usage, a common challenge arises: different teams or workloads often share the same physical Kafka cluster, which can cause operational and organizational friction. To solve this problem, several solutions exist today but they have some drawabacks. Organizationally the first step is often when teams adopt Kafka and run their own installations. In this scenario they're completely in control of the resources (cpu, network, disk utilization), however from the organization's point of view this is the most expensive form of adoption as many capacity is left unused: often the smallest units of clusters are 3 node clusters to satisfy the most common replication factor requirements but for smaller use-cases a 3 broker environment is already overprovisioned. Of course with larger installations the overprovisioning may not be so apparent. A more economic solution is when the organization manages Kafka clusters together in a virtual environment. This is more efficient as the hardware is often utilized better, however the hardware is often managed by a different (often central) team while the teams are in control of the clusters themselves. Updating hardware can be problematic as maybe multiple teams have to be involved. It is also harder to grow the clusters as again, multiple parties need to be involved. The next organizational step is to create a central Kafka installation that is governed by a central team that provides Kafka as a service to other departments. This however takes control from individual teams and puts administrative burden on the central team. Often in this case with the correct setting of ACLs and CreateTopicPolicies a good enough solution can be provided where resource management becomes a bit easier. Virtual clusters aim to help by defining logical clusters where administrators can be promoted to make a Kafka installation self-service.
To summarize, Kafka lacks native support for logical separation of tenants (e.g., applications, teams) within a shared cluster. This leads to several challenges:
- Operational Complexity: Administrators manually implement workarounds like naming conventions for topics, quotas, and ACLs.
- Resource Waste: Dedicated brokers waste hardware resources and more costly.
- Management Overhead: Multi-tenant management often involves custom tools or external systems.
Goals
- Introduce Virtual Clusters as a logical abstraction within a physical Kafka cluster.
- Enable resource and metadata isolation between Virtual Clusters.
- Simplify multi-tenant management and improve operational efficiency.
- Allow administrators to define and manage VCs without significant overhead.
- Ensure backward compatibility and minimal disruption for existing deployments.
- Provide a transparent view of the cluster for the clients.
Non-Goals
- Introducing hard physical isolation (e.g., dedicated hardware).
- Replacing or deprecating existing multi-tenant mechanisms like quotas and ACLs.
High Level Design
After considering a few approaches, listed in the Rejected Alternatives section, below is what we think is the least obtrusive approach to support virtual clusters in Kafka.
At the very highest level, we would like to create a new type of resource called “virtual clusters” and connect other resources like topics and groups and users to them. This would allow the brokers to handle requests transparently for the clients. In addition to this, generalizing the Cluster ACL resource, we could properly form the authorization layer necessary in a backward compatible way. Virtual clusters are a logical abstraction that extend topics, quotas and ACLs and therefore won't be physical entities. Virtual clusters are defined by topics, users that are assigned to these clusters, quotas and ACLs. Any broker can host any topics that belong any virtual clusters. Therefore one can visualize them the following way:
Linking with Topics
We would form a directional one level tree-like structure where we’d link topics to virtual clusters which would be stored in the __cluster_metadata
topic. This solution is more advantageous over other approaches as links can be created and modified much easier than for instance a directory structure that has been proposed originally in KIP-37, allowing more elasticity to manage namespaces and there would be no changes required in the storage layer. Moreover this can align with the proposal for topic renames in KIP-516: Topic Identifiers.
Furthermore, links can be created with an alternative name to topics, allowing topic renaming in virtual clusters, but also linking topics between virtual clusters in order to achieve more complex use cases, like sharing topics between virtual clusters to provide readable views, or implement aggregation use-cases.
Creating Topics in Virtual Clusters
Currently topic names are heavily embedded into the protocol, therefore it’s challenging to rely only on topic IDs. We wanted to make our implementation as independent from physical topic names as possible while also respecting the current state of the protocol.
There are two major ways to create a topic in a virtual cluster compatible Kafka:
- Create a topic with a user that isn’t part of a virtual cluster: everything stays the same as it is currently.
- Create a topic with a user that is already assigned to a virtual cluster: topics that are created within a virtual cluster would have generated names, where one part continues to be the user-given name while another part is unique. This would be implemented by prefixing the UUID of the topic to the topic. This way we would form topic names in the UUID.topicname form, for instance e58ed763-928c-4155-bee9-fdbaaadc15f3.mytopic. While the UUID takes a few characters and leaves less for users to spend, this is needed to avoid conflicting names between virtual clusters. Links would also have names with the user-given topic name being the default, to be able to identify them in a user-friendly way. This somewhat compensates the loss on available characters for topic names but also links having names would accommodate use-cases when users share topics between virtual clusters as they can opt to use a different name if they conflict in different virtual clusters.
The UUID based mechanism would fulfill all requirements:
- Makes a valid topic name as the string representation of the UUID is a valid topic name. We considered base64 encoding as it's shorter, but it also may contain invalid characters for topic names.
- The string representation of the topic name is 36 characters, the overall topic name will still likely fit the 249 maximum limit.
- The generated topic name still has a human-readable part, so it's easier to identify its original purpose.
Deleting Topics
Deleting topics in virtual clusters would simply mean deleting the links. If all links are deleted, then the topic doesn’t belong to any virtual cluster, so administrators would simply need to delete the topic to get rid of it forever. Deleting the physical topic wouldn’t be allowed as long as there are links pointing to it. This is necessary to prevent dangling links.
Users and Clients
One of our assumptions is that clients and users usually belong to the same virtual cluster and would rarely use topics in others. But if they’d do, then virtual cluster admins could link topics across virtual clusters. Therefore it would make sense to bind users and client-ids to virtual clusters and create one-to-many connections, where one virtual cluster could have many users/client-ids associated with, but user/client-ids can only be associated with one virtual cluster. Storing this information in the brokers also opens up the possibility for users/client-ids to act transparently in the virtual cluster. For instance they wouldn’t need to prefix topics they address with the name of the VC, instead they would continue using the same topic name. This is advantageous for backward compatibility reasons as clients can simply be added to virtual clusters without them having to make any protocol or configuration changes.
Consumer Groups
In a similar fashion to users, consumer groups can be linked to virtual clusters as well, so groups in different virtual clusters may have the same local name. Storing the offset of virtual groups would be the same as in the classical case. They would continue using __consumer-offsets. Furthermore, __consumer-offsets should stay as a global internal topic to preserve backward compatibility. When consumers are moved into virtual clusters, their group ID will be prefixed with the name of the virtual cluster and that will be the reference for the __consumer-offsets topic. Since clients don’t use this topic directly but through various APIs (admin APIs, offset commit, list offsets, etc.), they aren’t exposed directly to them and therefore virtual clusters don’t need to expose it towards clients.
Transactions and Idempotency
Transactions should behave similarly to consumer groups. Since transactions are more cross-cutting since there are markers in user topics and in the __consumer-offsets topic as well. Therefore logically it makes sense to handle transactions similarly to consumer groups. This means that transactions will be linked to virtual clusters (to the same as the user creating them) and the transaction ID will be translated to a unique ID on the broker side.
ACLs
The Authorization Model
We would like to extend the current authorization model of Kafka to accommodate the needs of virtual clusters. Kafka already provides a CLUSTER ACL resource but currently it is hardcoded with the “kafka-cluster” value and it’s not possible to specify a different one. By generalizing and elevating it to a proper resource, we could actually be able to enable administrators to create clusters and properly manage ACLs within resources. Since many inter-broker protocols already depend on this resource, we would leave the pre-defined “kafka-cluster” as it is for internal protocols. Furthermore any user or client who has privileges on this resource, would have global access on the whole Kafka cluster for compatibility reasons, so on upgrades, everything stays the same.
By creating CRUD type permissions on cluster resources, we can form the basics of virtual cluster management. VC Admins can be created with ACLs and also a global admin by setting the CRUD ACLs on the “kafka-cluster” resource.
The figure above represents the ACL structure of the cluster (and not any physical one). The topmost ACL is the “kafka-cluster” which defines cluster-wide ACLs that apply everywhere. In this, there are all ACLs that come from pre-upgrade but in addition to that, there can be virtual cluster ACLs defined that apply only on resources, users and clients linked to that virtual cluster.
A cluster-wide admin is defined by being able to give permissions to other admins in the whole cluster. This can be achieved by granting CREATE_ACLS, DELETE_ACLS and DESCRIBE_ACLS on the “kafka-cluster” resource. Lower level administrators in virtual clusters wouldn’t need to be allowed to perform operations on the whole cluster, just in their assigned virtual clusters, therefore they would be granted access to CREATE_ACLS, DELETE_ACLS and DESCRIBE_ACLS on their respective virtual clusters.
Operations
We have to reinterpret some of the actions that can be performed on a CLUSTER resource since we would be able to specify the cluster as well. Generally, with protocols that are meant to be for inter-broker communication, we wouldn’t allow changing them from the “kafka-cluster” cluster, since that would compromise the behavior of the whole cluster.
Alter
On the cluster resource, these are the following protocols that can be used:
- Exposed protocols that are allowed for virtual clusters:
- WRITE_TXN_MARKERS (27)
- CREATE_ACLS (30)
- DELETE_ACLS (31)
- ALTER_REPLICA_LOG_DIRS (34)
- ALTER_USER_SCRAM_CREDENTIALS (51)
- Exposed new protocols that are allowed for virtual clusters:
- ALTER_VIRTUAL_CLUSTER: Users who are allowed for ALTER_VIRTUAL_CLUSTER on CLUSTER resources, will be able to modify resource connections in a virtual cluster. This means that they can add and remove topic links, associate users and clients with the CLUSTER resource and also disassociate them as described in the protocol section.
- Exposed protocols that are not allowed for virtual clusters:
- ALTER_PARTITION_REASSIGNMENTS (45): a user will be able to initiate partition reassignments on the physical cluster. We wouldn’t allow this to be applied on a virtual cluster level because this operation is tightly coupled with the physical layout of a cluster and it doesn’t make sense to be controlled from a virtual cluster level.
- Internal protocols that are not allowed for virtual clusters:
- UPDATE_FEATURES (57)
- UNREGISTER_BROKER (64)
AlterConfigs
- Exposed protocols that are allowed for virtual clusters:
- ALTER_CONFIGS (33)
- INCREMENTAL_ALTER_CONFIGS (44)
- ALTER_CLIENT_QUOTAS (49)
ClusterAction
- Exposed protocols that are allowed for virtual clusters:
- ALTER_CONFIGS (33)
- INCREMENTAL_ALTER_CONFIGS (44)
- ALTER_CLIENT_QUOTAS (49)
- ALTER_PARTITION (56)
- WRITE_TXN_MARKERS (27)
- READ_SHARE_GROUP_STATE (84)
- WRITE_SHARE_GROUP_STATE (85)
- DELETE_SHARE_GROUP_STATE (86)
- READ_SHARE_GROUP_STATE_SUMMARY (87)
- Internal protocols that are not allowed for virtual clusters:
- FETCH (1)
- LEADER_AND_ISR (4)
- STOP_REPLICA (5)
- UPDATE_METADATA (6)
- CONTROLLED_SHUTDOWN (7)
- OFFSET_FOR_LEADER_EPOCH (23)
- ELECT_PREFERRED_LEADERS (43)
- VOTE (52)
- BEGIN_QUORUM_EPOCH (53)
- END_QUORUM_EPOCH (54)
- ENVELOPE (58)
- FETCH_SNAPSHOT (59)
- BROKER_REGISTRATION (62)
- BROKER_HEARTBEAT (63)
- ALLOCATE_PRODUCER_IDS (67)
- CONTROLLER_REGISTRATION (70)
- INITIALIZE_SHARE_GROUP_STATE (83)
- Exposed protocols that aren't allowed for virtual clusters:
- ASSIGN_REPLICAS_TO_DIRS (73)
Create
- METADATA (3)
- CREATE_TOPICS (19)
New APIs:
- CREATE_VIRTUAL_CLUSTER: Users who are allowed for CREATE_VIRTUAL_CLUSTER on CLUSTER resources, will be able to create virtual clusters as described in the protocol section.
Describe
- LIST_GROUPS (16)
- DESCRIBE_ACLS (29)
- DESCRIBE_LOG_DIRS (35)
- LIST_PARTITION_REASSIGNMENTS (46)
- DESCRIBE_USER_SCRAM_CREDENTIALS (50)
- DESCRIBE_QUORUM (55)
- DESCRIBE_VIRTUAL_CLUSTER: Users who are allowed for DESCRIBE_VIRTUAL_CLUSTER on CLUSTER resources, will be able to describe a virtual cluster with its user, client and topic associations as described in the protocol section.
List
- Exposed new protocols that are allowed for virtual clusters:
- LIST_VIRTUAL_CLUSTERS: Users who are allowed for LIST_VIRTUAL_CLUSTERS on CLUSTER resources, will be able to retrieve a list of virtual clusters in the Kafka cluster.
Delete
- Exposed new protocols that are allowed for virtual clusters:
- DELETE_VIRTUAL_CLUSTER: Users who are allowed for DELETE_VIRTUAL_CLUSTER on CLUSTER resources, will be able to delete a virtual cluster. A virtual cluster can only be deleted when it’s empty and no users, clients and topics are associated with it.
Read
No protocols can be associated with the “Read” action on virtual clusters.
Write
No protocols can be associated with the “Write” action on virtual clusters.
Administrators
By properly setting the virtual cluster’s *_VIRTUAL_CLUSTER ACLs, one should be able to configure users to have an administrator role for a given virtual cluster. A super admin that is responsible for managing the physical cluster will be a user who has the appropriate *_CLUSTER ACLs on the “kafka-cluster” cluster resource. This admin will be able to grant further permissions to lower level administrators.
Quotas
We would like to introduce a new family of quotas, called “virtual-clusters”. These are similar in behavior to other types of quotas and applied from top to bottom in decreasing priority. Quotas outside “virtual-clusters” would be global and applied in every cluster. We wouldn’t allow the scenario for setting quotas for the “kafka-cluster” virtual cluster as that is a reserved name and would introduce ambiguity as it would have the same effect as using the non-virtual-clusters quota.
There are two exceptions to this above. When we define a virtual cluster, we would like to have an upper limit on virtual clusters, so not one virtual cluster and their resources could monopolize the whole cluster’s resources. The first one would put a limit on a specific virtual cluster and all quotas defined in that virtual cluster, while the second would give a default value on virtual clusters quota if the first one isn’t specified.
/config/virtual-clusters/<virtual-cluster>
/config/virtual-clusters/<default>
Besides the above, other configurations are decreasing in priority from top to bottom:
/config/virtual-clusters/<virtual-cluster>/users/<user>/clients/<client-id>
/config/virtual-clusters/<virtual-cluster>/users/<user>/clients/<default>
/config/virtual-clusters/<virtual-cluster>/users/<user>
/config/virtual-clusters/<virtual-cluster>/users/<default>/clients/<client-id>
/config/virtual-clusters/<virtual-cluster>/users/<default>/clients/<default>
/config/virtual-clusters/<virtual-cluster>/users/<default>
/config/virtual-clusters/<virtual-cluster>/clients/<client-id>
/config/virtual-clusters/<virtual-cluster>/clients/<default>
/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>
Kafka Connect
Kafka Connect is a fairly independent framework from core Kafka and lacks some capabilities. On an operational level from Kafka’s point of view, we have connect-offsets, connect-configs and connect-status internal topics. We see no reason against using them in virtual clusters. Running multiple Connect clusters would still be possible in virtual clusters. It may make configuration simpler if Connect clusters are put into virtual clusters as they hide internal topics from the rest of the Kafka cluster.
Mirror Maker
Mirror Maker is very similar to Connect as it builds on that. Similarly to connect, one should be able to configure their MM2 both for an existing Connect cluster using VCs, but also in dedicated mode, where MM2 runs its own Connect cluster. We wouldn’t like to add exceptions to MM2-internal topics, so users should be able to run multiple dedicated MM2 clusters on Kafka too.
Kafka Streams
Streams applications could be placed inside virtual clusters. This would mean that topics created by streams would be created in a virtual cluster already. From this perspective, they would fit virtual clusters well.
Migrating to new Virtual Clusters
When users upgrade to a virtual cluster compatible Kafka version, everything stays the same as it was before. There will be no virtual cluster resources created and the only cluster ACL is the “kafka-cluster” (and this is also true for a fresh installation).
Users should first create an administrator account that is a super admin and manage virtual clusters. Then with the appropriate command line tools it can create virtual clusters and set the ACLs as it fits to them. After this, they link their existing topics to these newly created virtual clusters and assign users to virtual clusters. Virtual clusters can co-exist with pre-upgrade topics and other resources, so the migration wouldn’t cause operational downtime.
Migration from Prefixes
One common use-case is when users use prefixes to simulate tenants. A typical example would be a topic called “emea.eu.sales.SalesStream” where “emea.eu.sales” prefix could be a user space for a company’s sales department. Then a prefix ACL could be enforced with a CreateTopicPolicy to control what kind of topics users of the sales department can create.
So in this setup clients will always address the prefixed name of the topic whereas in this proposal we suggest that clients should use just the topic’s name without any organizational prefixes, so it becomes transparent, but it also contradicts the previous usage pattern. This however can be mitigated easily by the following steps:
- The administrator creates a virtual cluster
- Links the topic to that with its prefixed name (emea.eu.sales.SalesStream) and with its relative name (SalesStream).
- At this point clients since they aren’t in the virtual cluster, would continue addressing the real topic.
- The administrator assigns them to the virtual cluster, they would continue using the same prefixed name but through the virtual cluster.
- When the clients are ready, they can be restarted with the relative topic name. From this point onwards, they would stop using the prefixed name.
- The link with the prefixed name can be dropped.
Technical Details
Protocol Changes
CREATE_VIRTUAL_CLUSTER
Users who are allowed for CREATE_VIRTUAL_CLUSTER on CLUSTER resources, will be able to create virtual clusters.
{ "apiKey": 91, "type": "request", "listeners": ["broker", "controller"], "name": "CreateVirtualClustersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "VirtualClusters", "type": "[]CreatableVirtualCluster", "versions": "0+", "about": "The virtual clusters to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." } ]}, { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "How long to wait in milliseconds before timing out the request." }, { "name": "validateOnly", "type": "bool", "versions": "1+", "default": "false", "ignorable": false, "about": "If true, check that the virtual clusters can be created as specified, but don't create anything." } ] }
{ "apiKey": 91, "type": "response", "name": "CreateVirtualClustersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "VirtualClusters", "type": "[]CreatableVirtualClustersResult", "versions": "0+", "about": "Results for each topic we tried to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, "about": "The error message, or null if there was no error." } ]} ] }
ALTER_VIRTUAL_CLUSTER
Users who are allowed for ALTER_VIRTUAL_CLUSTER on CLUSTER resources, will be able to modify resource connections in a virtual cluster. This means that they can add and remove topic links, associate users and clients with the CLUSTER resource and also disassociate them.
{ "apiKey": 92, "type": "request", "listeners": ["broker", "controller"], "name": "AlterVirtualClustersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "VirtualCluster", "type": "[]VirtualClusterData", "versions": "0+", "about": "The virtual clusters to Alter.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." }, { "name": "Resources", "type": "[]VirtualClusterResource", "versions": "0+", "about": "The updates for each resource.", "fields": [ { "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true, "about": "The resource type." }, { "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true, "about": "The resource name." }, { "name": "ResourceOperation", "type": "int8", "versions": "0+", "mapKey": true, "about": "The operation on the resource." } ]} ]}, { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "How long to wait in milliseconds before timing out the request." }, { "name": "validateOnly", "type": "bool", "versions": "1+", "default": "false", "ignorable": false, "about": "If true, check that the virtual clusters can be created as specified, but don't create anything." } ] }
{ "apiKey": 92, "type": "response", "name": "AlterVirtualClustersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "VirtualCluster", "type": "[]VirtualClusterData", "versions": "0+", "about": "The virtual clusters to Alter.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." }, { "name": "Resources", "type": "[]VirtualClusterResource", "versions": "0+", "about": "The updates for each resource.", "fields": [ { "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true, "about": "The resource type." }, { "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true, "about": "The resource name." }, { "name": "ResourceOperation", "type": "int8", "versions": "0+", "mapKey": true, "about": "The operation on the resource." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, "about": "The error message, or null if there was no error." } ]} ]} ] }
DELETE_VIRTUAL_CLUSTER
Users who are allowed for DELETE_VIRTUAL_CLUSTER on CLUSTER resources, will be able to delete a virtual cluster. A virtual cluster can only be deleted when it’s empty and no users, clients and topics are associated with it.
{ "apiKey": 93, "type": "request", "listeners": ["broker", "controller"], "name": "DeleteVirtualClustersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "VirtualClusters", "type": "[]DeletableVirtualCluster", "versions": "0+", "about": "The virtual clusters to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." } ]}, { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "How long to wait in milliseconds before timing out the request." }, { "name": "validateOnly", "type": "bool", "versions": "1+", "default": "false", "ignorable": false, "about": "If true, check that the virtual clusters can be deleted as specified, but don't delete anything." } ] }
{ "apiKey": 93, "type": "response", "name": "DeleteVirtualClustersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DeletableVirtualClusterResult", "versions": "0+", "about": "The results for each virtual cluster we tried to delete.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The deletion error, or 0 if the deletion succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ] }
LIST_VIRTUAL_CLUSTERS
Users who are allowed for LIST_VIRTUAL_CLUSTERS on CLUSTER resources, will be able to retrieve a list of virtual clusters in the Kafka cluster.
{ "apiKey": 94, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListVirtualClustersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ ] }
{ "apiKey": 94, "type": "response", "name": "ListVirtualClustersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "VirtualClusters", "type": "[]ListedVirtualCluster", "versions": "0+", "about": "Each virtual cluster in the response.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." } ]} ] }
DESCRIBE_VIRTUAL_CLUSTER
Users who are allowed for DESCRIBE_VIRTUAL_CLUSTER on CLUSTER resources, will be able to describe a virtual cluster. The listing would include the topics, users, clients, groups and transactions assigned to that virtual cluster.
{ "apiKey": 95, "type": "request", "listeners": ["broker", "controller"], "name": "DescribeVirtualClustersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "VirtualClusters", "type": "[]VirtualClusterData", "versions": "0+", "about": "The virtual clusters to describe.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." } ]} ] }
{ "apiKey": 95, "type": "response", "name": "DescribeVirtualClustersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "VirtualClusters", "type": "[]VirtualClusterResults", "versions": "0+", "about": "The virtual clusters to describe.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the virtual cluster." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The topic error, or 0 if there was no error." }, { "name": "TopicLinks", "type": "[]TopicLink", "versions": "0+", "about": "The topic links for the virtual cluster.", "fields": [ { "name": "LinkName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic link." }, { "name": "PhysicalName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name on the physical cluster." } ]}, { "name": "Users", "type": "[]string", "versions": "0+", "about": "The users of the virtual cluster." }, { "name": "ClientIds", "type": "[]string", "versions": "0+", "about": "The client ids of the virtual cluster." }, { "name": "TransactionalIds", "type": "[]string", "versions": "0+", "entityType": "transactionalId", "about": "The transactional ids associated with the virtual cluster." }, { "name": "ConsumerGroups", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The consumer group ids associated with the virtual cluster." } ]} ] }
Interface Changes
We’ll list the anticipated interface changes below.
Admin APIs
Create Virtual Clusters
@InterfaceStability.Evolving public class CreateVirtualClustersResult { private final Map<String, KafkaFuture<CreateVirtualClustersResponseData.CreatableVirtualClustersResult>> futures; public CreateVirtualClustersResult(Map<String, KafkaFuture<CreateVirtualClustersResponseData.CreatableVirtualClustersResult>> futures) { this.futures = futures; } public Map<String, KafkaFuture<Void>> values() { return futures.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> null))); } public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } }
@InterfaceStability.Evolving public class NewVirtualCluster { private final String name; public NewVirtualCluster(String name) { this.name = name; } public String name() { return name; } }
@InterfaceStability.Evolving public class CreateVirtualClustersOptions extends AbstractOptions<CreateVirtualClustersOptions> { private boolean validateOnly = false; private boolean retryOnQuotaViolation = true; public CreateVirtualClustersOptions timeoutMs(Integer timeoutMs) { this.timeoutMs = timeoutMs; return this; } public CreateVirtualClustersOptions validateOnly(boolean validateOnly) { this.validateOnly = validateOnly; return this; } public boolean shouldValidateOnly() { return validateOnly; } public CreateVirtualClustersOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) { this.retryOnQuotaViolation = retryOnQuotaViolation; return this; } public boolean shouldRetryOnQuotaViolation() { return retryOnQuotaViolation; } }
CreateVirtualClustersResult createVirtualClusters(Collection<NewVirtualCluster> virtualClusters, CreateVirtualClustersOptions options);
Alter Virtual Clusters
@InterfaceStability.Evolving public class AlterVirtualClustersResult { private final Map<String, KafkaFuture<CreateVirtualClustersResponseData.CreatableVirtualClustersResult>> futures; public AlterVirtualClustersResult(Map<String, KafkaFuture<CreateVirtualClustersResponseData.CreatableVirtualClustersResult>> futures) { this.futures = futures; } public Map<String, KafkaFuture<Void>> values() { return futures.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> null))); } public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } }
@InterfaceStability.Evolving public class AlterVirtualClustersOptions extends AbstractOptions<AlterVirtualClustersOptions> { private boolean validateOnly = false; private boolean retryOnQuotaViolation = true; public AlterVirtualClustersOptions timeoutMs(Integer timeoutMs) { this.timeoutMs = timeoutMs; return this; } public AlterVirtualClustersOptions validateOnly(boolean validateOnly) { this.validateOnly = validateOnly; return this; } public boolean shouldValidateOnly() { return validateOnly; } public AlterVirtualClustersOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) { this.retryOnQuotaViolation = retryOnQuotaViolation; return this; } public boolean shouldRetryOnQuotaViolation() { return retryOnQuotaViolation; } }
@InterfaceStability.Evolving public class VirtualClusterAlteration { public enum ResourceChangeType { ADD, REMOVE } public enum ResourceType { USER, TOPIC, GROUP, TRANSACTIONAL_ID } private String virtualClusterName; private List<VirtualClusterEntityChange> changes; public VirtualClusterAlteration(String virtualClusterName, List<VirtualClusterEntityChange> changes) { this.virtualClusterName = virtualClusterName; this.changes = changes; } public String virtualClusterName() { return virtualClusterName; } public List<VirtualClusterEntityChange> changes() { return changes; } public static class VirtualClusterEntityChange { private ResourceType resourceType; private String entityName; private ResourceChangeType changeType; public VirtualClusterEntityChange(ResourceType resourceType, String entityName, ResourceChangeType changeType) { this.resourceType = resourceType; this.entityName = entityName; this.changeType = changeType; } public ResourceType entityType() { return resourceType; } public String entityName() { return entityName; } public ResourceChangeType changeType() { return changeType; } } }
AlterVirtualClustersResult alterVirtualClusters(Collection<VirtualClusterAlteration> alterations, AlterVirtualClustersOptions options);
Delete Virtual Clusters
@InterfaceStability.Evolving public class DeleteVirtualClustersOptions extends AbstractOptions<DeleteVirtualClustersOptions> { private boolean validateOnly = false; private boolean retryOnQuotaViolation = true; public DeleteVirtualClustersOptions timeoutMs(Integer timeoutMs) { this.timeoutMs = timeoutMs; return this; } public DeleteVirtualClustersOptions validateOnly(boolean validateOnly) { this.validateOnly = validateOnly; return this; } public boolean shouldValidateOnly() { return validateOnly; } public DeleteVirtualClustersOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) { this.retryOnQuotaViolation = retryOnQuotaViolation; return this; } public boolean shouldRetryOnQuotaViolation() { return retryOnQuotaViolation; } }
@InterfaceStability.Evolving public class DeleteVirtualClustersResult { private final Map<String, KafkaFuture<DeleteVirtualClustersResponseData.DeletableVirtualClusterResult>> futures; public DeleteVirtualClustersResult(Map<String, KafkaFuture<DeleteVirtualClustersResponseData.DeletableVirtualClusterResult>> futures) { this.futures = futures; } public Map<String, KafkaFuture<Void>> values() { return futures.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> null))); } public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } }
DeleteVirtualClustersResult deleteVirtualClusters(Collection<String> virtualClusterNames, DeleteVirtualClustersOptions options);
List Virtual Clusters
@InterfaceStability.Evolving public class ListVirtualClustersResult { private final KafkaFuture<List<String>> futures; public ListVirtualClustersResult(KafkaFuture<List<String>> futures) { this.futures = futures; } public KafkaFuture<List<String>> listing() { return futures; } }
ListVirtualClustersResult listVirtualClusters();
Describe Virtual Clusters
@InterfaceStability.Evolving public class DescribeVirtualClustersOptions extends AbstractOptions<DescribeVirtualClustersOptions> { }
@InterfaceStability.Evolving public class DescribeVirtualClustersResult { private Map<String, KafkaFuture<VirtualClusterDescription>> futures; public DescribeVirtualClustersResult(Map<String, KafkaFuture<VirtualClusterDescription>> futures) { this.futures = futures; } public Map<String, KafkaFuture<VirtualClusterDescription>> values() { return futures; } public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } }
@InterfaceStability.Evolving public class VirtualClusterDescription { public static class TopicLink { private String linkName; private String topicName; public TopicLink(String linkName, String topicName) { this.linkName = linkName; this.topicName = topicName; } public String linkName() { return linkName; } public String topicName() { return topicName; } } private String name; private List<TopicLink> topicLinks; private List<String> groupLinks; private List<String> userLinks; private List<String> clientLinks; private List<String> transactionalIdLinks; public VirtualClusterDescription(String name, List<TopicLink> topicLinks, List<String> groupLinks, List<String> userLinks, List<String> clientLinks, List<String> transactionalIdLinks) { this.name = name; this.topicLinks = topicLinks; this.groupLinks = groupLinks; this.userLinks = userLinks; this.clientLinks = clientLinks; this.transactionalIdLinks = transactionalIdLinks; } public String name() { return name; } public List<TopicLink> topicLinks() { return topicLinks; } public List<String> groupLinks() { return groupLinks; } public List<String> userLinks() { return userLinks; } public List<String> clientLinks() { return clientLinks; } public List<String> transactionalIdLinks() { return transactionalIdLinks; } }
DescribeVirtualClustersResult describeVirtualClusters(Collection<String> virtualClusterIds, DescribeVirtualClustersOptions options);
Config Changes
At this point there may be no need for new configs. It could be possible to add configuration for setting up virtual clusters through static configuration, however due to the possible number of topics and users it can get quite extensive. Therefore users should rely on command line tools or the Admin API when migrating to virtual clusters.
Tooling Changes
Overall we would like to introduce a single new command line tool to handle all these interface changes. This would be the virtual-clusters.sh tool.
The following subcommands would be available for the command: create, alter, list, describe, delete. Each of these would implement the previously described functionality. Each command is designed to carry out a single action for simplicity. For instance deleting multiple virtual clusters with a single execution of the command wouldn’t be available as the Admin API could provide better access for more complicated actions.
Subcommands
create
With the below command, an administrator would create a virtual cluster.
virtual-clusters.sh create --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster
alter
With the alter command below, one could create a resource link to the virtual cluster or can remove one. The command would be able to execute only one addition or removal of a selected resource.
virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster [--add | --remove] [--user PRINCIPAL | --client CLIENT_ID | [--topic TOPIC --link LINK] | --group GROUP | --transactional-id TRANSACTIONAL_ID]
delete
virtual-clusters.sh delete --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster
list
virtual-clusters.sh list --bootstrap-server localhost:9092
describe
virtual-clusters.sh describe --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster
Examples
# create a new virtual cluster virtual-clusters.sh create --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster # create a topic link in it virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster --add --topic my-topic --link test-topic # assign a user to the virtual cluster virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster --add --user jane-doe # list the virtual clusters virtual-clusters.sh list --bootstrap-server localhost:9092 # describe the virtual cluster virtual-clusters.sh describe --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster # delete the virtual-cluster virtual-clusters.sh delete --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster
ACL Migration
Users that aren’t created freshly before adding them to a virtual cluster, probably have some ACLs applied on them. With the current proposal we can add and remove ACLs but the community would benefit from a more user-friendly tool that migrates the user’s ACLs into the virtual cluster.
A full migration could look like this:
- Create a VC
- Create topic links inside a VC
- Move a user and its resources to a VC:
- Assign its resources (groups, delegation tokens, transactional IDs) to the VC
- Create ACLs in the VC for these resources
- Assign the user to the VC
- Remove the old ACLs
This is a complex migration process and also while it can be complex to do in one command, we can help cluster operators to automate b., c. and d. the following way:
- When a user is assigned to a virtual cluster, the administrator would have to specify the
--copy-acls-from
<source> option if ACLs needed to be migrated. This would copy all ACLs from the source virtual-cluster that targets the user in the command. If thekafka-cluster
source is specified, then the migration utility will look for ACLs not belonging to any virtual clusters. - In the same command or in a separate one by specifying the
--remove-old-acls-from <source>
option, the administrator from source can remove any ACLs that are already migrated. This would copy all ACLs from the source virtual-cluster that targets the user in the command. If thekafka-cluster
source is specified, then the migration utility will look for ACLs not belonging to any virtual clusters.
During migration, topic references will be translated to the appropriate link references by querying the links on the broker side.
# assign a user to the virtual cluster and also copy its ACLs that exist in the global space (kafka-cluster) virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster --add --user jane-doe --migrate-acls-from kafka-cluster # assign a user to the virtual cluster and also copy its ACLs that exist in another VC (my-old-virtual-cluster) virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster --add --user jane-doe --copy-acls-from my-old-virtual-cluster # remove any ACLs for the user that are in the specified VC (if the user is already added, it won't readd it, but if it isn't added yet, the command fails) virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster --add --user jane-doe --remove-old-acls-from kafka-cluster # assign a user to the virtual cluster and also copy its ACLs that exist in another VC (my-old-virtual-cluster), then remove the old ones in the same command - this should be used only if downtime isn't an issue virtual-clusters.sh alter --bootstrap-server localhost:9092 --virtual-cluster my-virtual-cluster --add --user jane-doe --migrate-acls-from my-old-virtual-cluster --remove-old-acls-from my-old-virtual-cluster
Extending kafka-acls.sh
Administrators will need to set virtual cluster specific ACLs for existing users as well, besides managing virtual clusters themselves. We plan to add the --virtual-cluster
option to the kafka-acls.sh command to enhance it with VC capabilities. The existing options would work well with this extra option. We would like to introduce another new parameter to control the pattern of the virtual cluster specified. This new --virtual-cluster-resource-pattern-type
could apply to VCs only to provide more robust, generic specification of ACLs. It would default to literal
if not specified explicitly.
Examples:
# Adding virtual-cluster specific ACLs bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic --virtual-cluster com.mycompany.eu --virtual-cluster-pattern-type literal # Adding VC specific ACLs with resource types bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed --virtual-cluser com.mycompany.eu. --virtual-cluster-pattern-type prefixed # Removing ACLs that are specific to virtual clusters bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic --virtual-cluster com.mycompany.eu # List ACLs of a virtual cluster $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --virtual-cluster com.mycompany.eu
Metadata Layer
Although it isn’t an API towards users, we feel important to discuss this part as it affects the core of Kafka. We plan to store the data of virtual clusters in the metadata layer similarly to topics and configurations for instance.
Therefore the first step in this, is that there will be a VirtualClusterImage that holds all properties of the VirtualCluster itself along with all links/associations. With this we can define the delta classes that will symbolize the changes in virtual clusters.
Finally VirtualClustersImage will be put into the MetadataImage to make it part of the metadata.
/** * Represents the image of all virtual clusters. */ public class VirtualClustersImage { private Map<String, VirtualClusterImage> virtualClusterImages; } /** * Represents the image of a single virtual cluster. */ public class VirtualClusterImage { public static class TopicLink { private String topicName; private String linkName; } private String name; private List<TopicLink> topics; private List<String> users; private List<String> clients; private List<String> groups; private List<String> transactionalIds; } /** * Represents the delta of virtual clusters on a change in virtual clusters. */ public class VirtualClustersDelta { private final VirtualClustersImage image; private final List<VirtualClusterDelta> clusterChanges; } /** * Represents the changes of a virtual cluster. */ public class VirtualClusterDelta { private final VirtualClusterImage image; private final List<VirtualClusterImage.TopicLink> topicLinkChanges; private final List<String> userChanges; private final List<String> clientChanges; private final List<String> groupChanges; private final List<String> transactionalIdChanges; }
These images will be used by the VirtualClustersMetadataPublisher to update the metadata similarly to other existing features when required.
Deprecation, Backward Compatibility
Configuration
There are no new configurations and no planned deprecations.
Behavior
A Kafka cluster without any virtual clusters should behave the same as it did before the upgrade to a virtual cluster compatible version. After the upgrade, users will have the chance to set up their ACLs properly and add virtual clusters. This however won’t alter the behavior of the cluster as it will continue to operate as it did before. Users can migrate their clients to virtual clusters as described in a previous section. No behavior will be deprecated.
Interfaces
We add new admin methods without changing the behavior of existing ones. No existing interfaces will be deprecated.
Protocols
We add new protocols without changing the behavior of old ones. No existing protocols will be deprecated.
Testing
We mainly plan to test this in the conventional ways: unit tests for asserting the correctness in a more primitive level, integration tests to assert the correctness of the Admin API, the protocols and the rest of the server side request handling and ductape tests to assert the correctness of the feature as a whole, starting from the command line tools which would spin the Admin APIs, then the protocols and the server side request handling as well. In the ductape tests a whole life cycle would be played through:
- Create a virtual cluster through command line
- Create links in the virtual cluster
- Assign users, clients and other resources to the cluster
- Run a consumer and producer to assert the happy path
- Remove links and resources from the virtual cluster
- Delete the virtual cluster
Documentation Plan
The Kafka multi-tenancy section will be amended with the contents of this KIP upon completion.
Rejected Alternatives
Hierarchical Clusters
While it would allow more complex use cases and we could implement a cluster of clusters, the operational complexity would be much higher. With hierarchical clusters we would need to introduce a separator to address resources in the non-leaf nodes of the cluster tree. There are multiple reasons against this:
- The introduction of this notation itself could break backward compatibility as users would need to change their clients to use the fully qualified resource names when addressing resources in virtual clusters.
- It would also complicate the ACL structure as we would need to introduce this hierarchy in ACLs and would need to create a much more complicated model to handle the relationships in the hierarchy which in our opinion isn’t worth the price as we may risk keeping backward compatibility or having weird edge cases.
- Besides this, in an average organization there aren’t hundreds of teams who use Kafka, so a flat structure would likely satisfy most needs. Users can continue using “.” as a separator, so they could have “us.amer.ecommerce” as their virtual cluster.
KIP-37 Style Namespaces
KIP-37 proposed a similar idea, however since it never got through the discussion phase, it never expanded more on the details. We find that this solution has the drawbacks of the hierarchical clusters and additionally it represents namespaces hardcoded in the log directory structure. This isn’t beneficial as moving or renaming topics would become hard as one would need to replicate the data, which is obviously more costly, the more data there is.
Avoid Binding Users to Virtual Clusters
If we don’t bind users to virtual clusters, then the clients used by those users can address multiple VCs and thus resources like topics and groups could become ambiguous if the same resource is present in multiple places. This could be resolved by introducing a cluster separator and clients would need to reference topics as “cluster:topicname” for instance. This would most likely result in backward incompatibility as one would need to change clients in order to use virtual clusters. We could say that those resources which are unique in a VC could be referenced with relative addresses (where we omit the cluster part of the name), but it would break when a resource with the same name is created elsewhere. Besides this, with the current proposal if a client wants to consume or produce messages to topics in other virtual clusters, an administrator can link those topics to the client’s VC and restrict the access with ACLs only to that client. This provides safe access for the client and a well-defined way to share topics across virtual clusters.
Namespaces instead of Virtual Clusters
Apache Pulsar has a 3 level structure to address topics. First, the topmost layer is called “tenant” which defines configurations for a set of Pulsar clusters as it can span across multiple clusters. It is a higher level concept than the virtual clusters presented here. Then in every tenant there are namespaces which are the administrative units for quotas and configuration. This is mostly equivalent to our virtual clusters. We could have used the “namespace” name here but virtual clusters is a more descriptive name in our opinion.
Second, while we currently don’t want to add the “tenant” level unit of organization, it may be a future development possibility.