Status

Current state: Accepted

Discussion thread: here

Voting thread: here

Vote result: Accepted with 3 binding +1 votes (Chris Egerton, Yash Mayya, Andrew Schofield), 4 non-binding +1 votes (Hector Geraldino, Andrei D, Andrei Rudkouski, Mario Fiore Vitale)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect requires several internal topics (config, offsets, and status) for storing connector metadata and state. By default, these topics are automatically created by the worker if they are missing.

However, in some scenarios this behavior can be problematic:

  • internal topics may already contain information about a large number of connectors.

  • if one or more of the original topics are accidentally removed or renamed, Kafka Connect will create them silently.

  • this can lead to unexpected states — e.g., if the config topic exists but the offsets topic was removed, all connectors may start from scratch.

We are going to introduce safer mode where Kafka Connect fails fast instead of silently creating new topics, allowing to troubleshoot misconfigurations and prevent accidental data loss or resets.

Proposed changes

A new worker configuration will be introduced:

internal.topics.automatic.creation.enable = [true | false]
  • true (default): (current behavior) — Kafka Connect automatically creates internal topics if missing.
  • false: Kafka Connect does not create internal topics. If one or more required topics are missing, the worker startup fails with an explicit error message.

The changes:

  1. Add a new worker-level configuration `internal.topics.automatic.creation.enable`.
  2. Modify internal topic management logic in `KafkaBasedConfigBackingStore`, `KafkaOffsetBackingStore`, and `KafkaStatusBackingStore` to respect the configuration.

  3. When the config is set to false and a required topic is missing:

    • The worker should log an error message specifying the missing topic(s).

    • The worker should fail to start, instead of creating new topics.


Example exception message:

Topic 'non-existent-offset' specified via the 'offset.storage.topic' property is missing. 
The config 'internal.topics.automatic.creation.enable' is set to 'false', so automatic creation of internal topics is disabled. 
Either enable automatic creation or create the topics manually before starting the worker.


Implementation: https://github.com/apache/kafka/pull/20384

Internal Topics Management Script

A new script, bin/connect-internal-topics.sh, will be developed to manage the creation of internal topics for Kafka Connect. This script can be used before bringing up any workers in the cluster. After that, all workers in the cluster can be configured with automatic internal topic creation disabled.

Arguments

  • Positional argument sub-command must take exactly one of the following values:

    • create - creates the internal topics for the Kafka Connect cluster. If the topics already exist, a log message will indicate that they are already present.
  • --worker-config <worker-properties-file>

    • The script will read this worker properties file and use the configs related to internal topics:

      • offset.storage.topic, offset.storage.partitions, offset.storage.replication.factor

      • config.storage.topic, config.storage.replication.factor

      • status.storage.topic, status.storage.partitions, status.storage.replication.factor

      • additionally, overrides for internal topic properties will be read (e.g., config.storage.<topic-specific-property>, offset.storage.<topic-specific-property>, status.storage.<topic-specific-property>)

      • top-level properties from the config file will be used for the admin client when connecting to the broker (such as SSL/SASL settings, retries, bootstrap servers, etc.)

    • Default values for internal topic settings will match those used when running Kafka Connect in distributed mode.

    • Topics will be created regardless of the value of internal.topics.automatic.creation.enable.

Compatibility, Deprecation, and Migration Plan

There are no backward compatibility concerns.

Rejected Alternatives

  1. Always fail if topics are missing — breaks existing deployments and backward compatibility.

  2. Make this configurable per-topic (config/offsets/status) — adds unnecessary complexity.

  • No labels