Status

Current stateAdopted

Discussion thread: link

Vote thread: link

JIRA: KAFKA-9931 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The configuration for Kafka Connect distributed worker currently requires several configuration properties that define the names of the three internal Kafka topics that the workers will use for storing connector configs, offsets, and statuses:

  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

KIP-154 added the ability for the worker to create these topics if they do not exist when the worker starts up, and to use the following properties to define the replication factor and number of partitions for these new topics:

  • config.storage.replication.factor – defaults to 3, value must be >= 1
  • offset.storage.replication.factor – defaults to 3, value must be >= 1
  • status.storage.replication.factor – defaults to 3, value must be >= 1
  • offset.storage.partitions – defaults to 25, value must be >= 1
  • status.storage.partitions – defaults to 5, value must be >= 1

The topic used for storage of connector configurations must always have exactly one partition, which is why there is no configuration property to set the partition for this topic.

At the time of KIP-154, the AdminClient API required the replication factor and partitions be greater than or equal to one. But more recently, KIP-464 modified the AdminClient APIs to support creating topics by using the broker's default.replication.factor value and/or the broker's num.partitions value. The Kafka Connect distributed worker configuration currently does not support using the broker's default replication factor or number of partitions.

The Kafka Connect distributed worker configuration also does not support defining or passing other topic settings when the Connect worker creates the configuration, offset, and status topics. 

Both of these changes will allow Kafka administrators to set up sensible defaults for all topics (including the desired replication factor), and allow a Connect distributed worker configuration to use those defaults where appropriate and override the topic-specific settings for the internal topics as necessary.

As an example, consider a user that wants to use the default replication factor of the Kafka cluster but wants to override the minimum ISR for Connect's three internal topics, which could be done with:

# Use the broker's default replication factor for these topics
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
# Override any broker default min ISR to always use 3
config.storage.min.insync.replicas=3
offset.storage.min.insync.replicas=3
status.storage.min.insync.replicas=3


Public Interfaces

The allowed values for the following five existing Connect distributed worker properties will be changed to also allow -1 to signal that the broker's default replication factor should be used for the newly created topics:

  • config.storage.replication.factor
  • offset.storage.replication.factor
  • status.storage.replication.factor
  • offset.storage.partitions
  • status.storage.partitions


Also, the distributed worker configuration will be changed to recognize additional optional properties that match the following patterns, and to pass them (without the prefix) to the new topic requests created by the Connect distributed worker when it attempts to create the internal topics:

Property pattern

Type

Default

Description

Excluded properties
config.storage.<topic-specific-setting>
severalbroker value

Additional topic-specific settings used when creating the internal Kafka topic where Connect stores connector configurations. Here "<topic-specific-setting>" must be any valid Kafka topic-level configurations for the version of the Kafka broker where the topic should be created; the Connect worker will fail upon startup if the "<topic-specific-setting>" is not known to the broker.

  • partitions (always set to 1)
  • cleanup.policy (always set to `compact`)
offset.storage.<topic-specific-setting>
severalbroker valueAdditional topic-specific settings used when creating the internal Kafka topic where Connect stores source offsets for source connectors. Here "<topic-specific-setting>" must be any valid Kafka topic-level configurations for the version of the Kafka broker where the topic should be created; the Connect worker will fail upon startup if the "<topic-specific-setting>" is not known to the broker.
  • cleanup.policy (always set to `compact`)
status.storage.<topic-specific-setting>
severalbroker valueAdditional topic-specific settings used when creating the internal Kafka topic where Connect stores connector and task statuses. Here "<topic-specific-setting>" must be any valid Kafka topic-level configurations for the version of the Kafka broker where the topic should be created; the Connect worker will fail upon startup if the "<topic-specific-setting>" is not known to the broker.
  • cleanup.policy (always set to `compact`)


Note that some topic-specific properties are excluded because the distributed worker always sets specific values. Therefore, if a distributed worker configuration does set any of these excluded properties, the distributed worker will issue a warning that such properties should not be set and will be ignored.

The Connect worker will fail upon startup if any of the topic-settings specified in the above configurations are not known to the Kafka broker.

This proposal does not otherwise change any other behavior of how or when the Connect worker creates internal topics. This proposal also does not affect Connect standalone behavior.

Proposed Changes

When the Connect distributed worker starts up, it currently attempts to create the configuration, offsets, and/or status topics used by the worker if any of those topics do not exist. With this proposal, the additional properties will be passed to the broker during these new topic requests. Note that like with KIP-154, these properties will not be used if the corresponding topic already exists.

Compatibility, Deprecation, and Migration Plan

These changes are backward compatible, and existing Connect distributed worker configurations will continue to work with no change in behavior, unless those configurations define unknown topic settings using any of the patterns defined above, in which case the Connect worker or MirrorMaker2 process will fail upon startup. All new properties are prefixed with one of three prefixes (e.g., `config.storage.`, `offset.storage.`, and `status.storage.`) already used for other worker-level properties, and thus are not expected to clash with any properties used for REST Extensions or config providers.

Rejected Alternatives

  1. When one of the excluded topic-specific settings are used, cause the worker to fail. This would be equivalent to adding a default value and validator that allows only the one allowed value. This was rejected (in favor of a warning) to follow existing conventions that extra / unknown properties do not cause a failure.
  • No labels