DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
- Status
- Motivation
- Why Share Kafka Connect Internal Topics Across Clusters?
- Why this will not effect Kafka Connect Startup
- BenchMarking with topic shared across 20 clusters each having 5 Sink jobs
- Public Interfaces
- Proposed Changes
- Compatibility, Deprecation, and Migration Plan
- Test Plan
- Rejected Alternatives
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Setting up a Kafka Connect cluster requires provisioning three internal topics:
- Offset Storage Topic – Tracks offsets of source connectors.
- Status Storage Topic – Maintains the state of connectors and tasks.
- Config Storage Topic – Stores connector configurations.
This design choice simplifies migration—enabling seamless replication of management topics across regions. However, it introduces operational overhead:
- Every new cluster requires three new topics, leading to an exponential increase in topic creation.
- Cross-team dependencies slow down provisioning, delaying deployments.
- Compacted topics trigger frequent disk cleanup operations, adding maintenance complexity.
While each cluster only requires three topics, their cumulative impact grows significantly as more connect clusters are deployed.
But as these topics have very light traffic and are compacted, instead of provisioning dedicated topics for every cluster, Kafka Connect clusters can share internal topics across multiple deployments. This brings immediate benefits:
- Drastically Reduces Topic Proliferation – Eliminates unnecessary topic creation.
- Faster Kafka Connect Cluster Deployment – No waiting for new topic provisioning.
- Large Enterprises with Multiple Teams Using Kafka Connect
- Scenario: In large organisations, multiple teams manage different Kafka Connect clusters for various data pipelines.
- Benefit: Instead of waiting for new internal topics to be provisioned each time a new cluster is deployed, teams can immediately start using pre-existing shared topics, reducing lead time and improving efficiency.
- Cloud-Native & Kubernetes-Based Deployments
- Scenario: Many organisations deploy Kafka Connect in containerised environments (e.g., Kubernetes), where clusters are frequently scaled up/down or recreated dynamically.
- Benefit: Since internal topics are already available, new clusters can spin up instantly, without waiting for topic provisioning or Kafka ACL approvals.
- How this will help different organisations:
- Large Enterprises with Multiple Teams Using Kafka Connect
- Lower Operational Load – Reduces disk-intensive cleanup operations.
- Broker resource utilization is expected to decrease by approximately 20%, primarily due to reduced partition count and metadata overhead. This optimization can enable further cluster downscaling, contributing directly to lower infrastructure costs (e.g., fewer brokers, reduced EBS storage footprint, and lower I/O throughput).
- Administrative overhead and monitoring complexity are projected to reduce by 30%, due to:
- Fewer topics to configure, monitor, and apply retention/compaction policies to.
- Reduced rebalancing operations during cluster scale-in or scale-out events.
- Minimized data movement and replication workload in failure scenarios (e.g., EBS volume replacement), as fewer partitions are involved.
- Simplified Management – Less overhead in monitoring and maintaining internal topics.
Why Share Kafka Connect Internal Topics Across Clusters?
1. Operational Efficiency & Reduced Overhead
- Each Kafka Connect cluster currently requires three internal topics (status, config, offset).
- Creating and managing these topics for every cluster increases operational burden, especially when managed by external teams.
- Sharing topics reduces manual setup, cleanup, and maintenance efforts.
- Streamlining topic creation accelerates cluster provisioning and scaling.
2. Simplified Cluster Management & Deployment
- In environments with frequent scaling (e.g., Kubernetes, cloud auto-scaling), requiring new topics for each cluster adds friction.
- Shared topics enable dynamic scaling without waiting for other teams to provision topics.
- Connect clusters can be provisioned with minimal configuration changes.
3. Resource Optimization & Cost Reduction
- Kafka topics consume storage, partitions, and replication resources.
- For every new cluster, dedicated topics require:
- Additional storage allocation
- Partitioning and replication overhead
- Compaction and retention tuning
- Sharing topics reduces Kafka’s resource utilization, leading to cost savings.
4. Improved Topic Governance & Consistency
- Managing individual topics per Connect cluster leads to fragmented policies for:
- Retention settings
- Compaction configurations
- Replication factors
- Access controls (ACLs)
- Centralized topic management ensures uniform policies and a single source of truth.
- Reduces the risk of misconfigurations.
5. Reduced Metadata Load on Kafka Brokers
- Kafka brokers maintain metadata for every topic, impacting controller performance.
- Reducing the number of small, frequently accessed topics leads to:
- Lower metadata overhead
- Faster topic discovery
- Reduced workload on Kafka controllers
6. Streamlined Monitoring & Security Controls
- Instead of monitoring and securing numerous individual topics, SRE teams can:
- Implement unified monitoring for fewer topics.
- Apply standardized ACLs & RBAC policies.
- Simplify compliance auditing (fewer entities to track).
Why this will not effect Kafka Connect Startup
- The growth of the shared topic will be limited by O(n), where “n” is the number of clusters.
- Kafka already shares the “__Consumer_Offset” topic across all the consumer groups on a single kafka cluster and very efficiently manages even if the number of consumer groups keeps on scaling linearly and this motivates to share the connect internal topics without adding any appreciable time delay in starting connect at least to an allowed limit.
BenchMarking with topic shared across 20 clusters each having 5 Sink jobs
- Each of the Config Topic and Status Topic has roughly 7-8 k records.
- Config Topic → 32MB (segment size)
- Status Topic → 3MB (segment size)
- In NON-TOPIC-SHARING mode size remains in KBs. Still:
- Connect framework roughly took only:
- 400-500 ms for reading the entire Status Topic.
- around 500 ms for reading entire Config topic (Slightly more time than status as config has high payload size)
- Also one key thing to consider is this reading only happens while starting and once that is done there is NOeffect on connect resource api’s.
Public Interfaces
New properties
- A single worker-level configuration property will be added for distributed mode:
Name
Type
Default
ImportanceDescription
connect.management.topic.sharing.supportStringdisabledHIGHWhether to enable sharing of Kafka Connect management topic sharing across clusters by using cluster group ID to filter the different states along the connect journey.
To enable the connect management topic sharing on a new cluster, set this property to "enabled"
- Another constructor parameter “clusterGroupId” is added to the following classes:
- KafkaConfigBackingStore
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase, String clusterGroupId) {
this(converter, config, configTransformer, adminSupplier, clientIdBase, clusterGroupId, Time.SYSTEM);
}
KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase, String clusterGroupId, Time time) {
...
this.clusterGroupId = Optional.ofNullable(clusterGroupId);
}
- KafkaStatusBackingStore.java
public KafkaStatusBackingStore(Time time, Converter converter, String clusterGroupId) {
this(time, converter, null, "connect-distributed-", clusterGroupId);
}
public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier, String clientIdBase, String clusterGroupId) {
this.time = time;
this.converter = converter;
this.tasks = new Table<>();
this.connectors = new HashMap<>();
this.topics = new ConcurrentHashMap<>();
this.topicAdminSupplier = topicAdminSupplier;
this.clientId = Objects.requireNonNull(clientIdBase) + "statuses";
this.clusterGroupId = Optional.ofNullable(clusterGroupId);
}
- OffsetStorageReaderImpl.java
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter keyConverter, Converter valueConverter, String clusterGroupId) {
this.backingStore = backingStore;
this.namespace = namespace;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.closed = new AtomicBoolean(false);
this.offsetReadFutures = new HashSet<>();
this.clusterGroupId = Optional.ofNullable(clusterGroupId);
}
- OffsetStorageWriter.java
public OffsetStorageWriter(OffsetBackingStore backingStore, String namespace, String clusterGroupId, Converter keyConverter, Converter valueConverter) {
this.backingStore = backingStore;
this.namespace = namespace;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.clusterGroupId = Optional.ofNullable(clusterGroupId);
}
- Two new methods have been added to the “KafkaTopicBasedBackingStore.java”
private Optional<List<String>> parseKey(ConsumerRecord<String, byte[]> record) {
String key = record.key();
if (key == null || key.isEmpty()) {
log.error("Empty or null key provided in record: {}", record);
return Optional.empty();
}
List<String> parts = Arrays.asList(key.split("\\" + CLUSTER_GROUP_SEPARATOR));
if (parts.size() != 2) {
log.error("Invalid key format: '{}'. Key should be in the format 'clusterGroupId.actualKey'", key);
return Optional.empty();
}
return Optional.of(parts);
}
Optional<String> clusterGroupIdFromRecord(ConsumerRecord<String, byte[]> record) {
return parseKey(record).map(parts -> parts.get(0));
}
Optional<String> keyFromRecord(ConsumerRecord<String, byte[]> record) {
return parseKey(record).map(parts -> parts.get(1));
}
KafkaConnectTopicMigrator
We have created a new Transformer to handle all type of the migrations and the consumer need not worry. The migration will be similar to how we migrate in the current world with only the addition of a very light weight transformer handling the cluster transformation logic. The configs are as below:
| Property Name | Purpose | Default |
| source.cluster.names | list of cluster names which are to be migrated, empty in case migrating old client or migrating a particular job. | Empty List |
| destination.cluster.name | destination cluster name to which the migration is to be performed. | No default value as this will be required in all the cases. |
| topic.type | type of the topic which is being migrated | No default value as this is always needed |
| migrate.job | config to control whether only a particluar job is to be migrated | FALSE |
| job.name | name of the job in case a particular job is being migrated | empty string |
| old.client | config to control the migration of an old client to a new client | True, as we assume that migration will be primarily needed for an old client to the new client |
Proposed Changes
As part of this KIP, we are proposing the add the “ClusterGroupId” as prefix to the existing keys. So for ex:
If the current key for “connector status” for connector “A” for Cluster "B” is :
status-connector-A
then with this change it will be modified to the following:
B.status-connector-A
While processing the “parseKey” will split this on the “.” which will result in the following:
- Cluster Group Id from the record → B
- Original Key → status-connector-A
Cluster Group Id derived from the record ("B") will be validated against the actual ClusterGroupId of the cluster and if matched the original logic will continue else the record will be skipped for this cluster.
Why “.” is chosen:
The intention was just not to choose “-” as it is already being used so the we can preserve all the logic with minimal changes.
Compatibility, Deprecation, and Migration Plan
Compatibility
- The changes are backward compatible, meaning anyone can freely and smoothly upgrade to the new client to which these changes will be added, without any issues. As the changes are dependent on whether it can extract the “CLUSTER_ID” from the key, in case if it doesn’t it goes into the normal flow.
- A cluster using the older client can still share the management topics with the cluster upgraded to the newer client.
| Kafka Client A | Kafka Client B | Can Share Topic? |
| Old | Old | No |
| Old | New | Yes |
| New | Old | Yes |
| New | New | Yes |
Deprication
- No interfaces or api’s are being deprecated as part of this change.
Migration
- Old clients migrating to cluster operating with new client:
- Can choose to just mirror all the connect topics to the topics of the cluster where migration is intended if that cluster is not sharing the topic with any older clients.
- If the cluster where migration is desired is already hosting other cluster with older client then “
KafkaConnectTopicMigrator” can be used to do the migration. - New cluster migrating to another new cluster:
- “KafkaConnectTopicMigrator” can be used along as transformer along with kafka to kafka mirror.
Test Plan
- Unit and integration tests will be added for both the forward and backward compatibility
Rejected Alternatives
Another way to pass cluster information was to put the cluster information in the header. But there were two challenges with this approach:
- This breaks the backward incompatibility as the older client has no way to automatically work with the newer clients with topic sharing feature turned on.
- This required a lot more code changes and hence a larger testing and review cycle.