DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Kafka Connect, the KafkaConfigBackingStore class is responsible for persistent storage of connector and task configurations in a Kafka topic. Currently, the timeout for synchronous write operations and then read back from the configuration topic is hardcoded via the READ_WRITE_TOTAL_TIMEOUT_MS constant, set to 30,000 milliseconds (30 seconds).
While 30 seconds is sufficient for many environments, this hardcoded limit can be problematic in specific scenarios:
- High-load or unstable-networking clusters: Where metadata topic writing or consumer offset reads might experience transient delays exceeding 30 seconds.
- Large cluster with many connector tasks: For a large Kafka Connect cluster with more than thousands connector tasks, it will take some time to write each task’s configuration and then read back . The configuration topic is designed to be a single partition topic which further reduces the parallelism. When a worker starts up and needs to read a compacted configuration topic with a significant history, the
readToEndoperation might time out.
We have expressed the need to increase this timeout to avoid ConnectException during these spikes (see discussion thread here). This KIP proposes making this timeout a configurable parameter to allow operators to tune it according to their environment's performance characteristics.
Public Interfaces
Connector Configuration Options
A new connector configuration will be added to the Kafka Connect `DistributedConfig` class:
config.storage.kafka.store.read.write.total.timeout.ms
Property | Value |
Type | Long |
Default | 30000 |
Importance | Low |
Description | The timeout in milliseconds for synchronous read and write operations to the Kafka Connect configuration storage topic. This determines how long the worker will wait for operations like publishing connector configurations and reading the current state of the cluster before throwing a timeout error. The value of this parameter should not be higher than max.poll.interval.ms to interfere with the unhealthy worker detection. |
Proposed Changes
The KafkaConfigBackingStore class will be modified to accept the timeout value from the configuration. Currently, the code uses a static final long:
static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
The proposed implementation will:
- Add
CONFIG_STORAGE_KAFKA_STORE_READ_WRITE_TIMEOUT_MS_CONFIGtoDistributedConfigclass with the default value of 30000 - Update the
KafkaConfigBackingStoreuse the parameter defined in ConnectorConfig
Compatibility, Deprecation, and Migration Plan
This change is fully backward compatible. The default value of 30,000ms matches the current hardcoded value, so existing clusters will see no change in behavior unless they explicitly override the new configuration.
Test Plan
- Unit tests for `ConnectorConfig` to verify the new property is correctly defined and parsed.
- Unit tests for `KafkaConfigBackingStore` to ensure it correctly initializes with the configured timeout value.
- Integration tests to verify that operations fail appropriately when the timeout is set to a very low value (e.g., 1ms) and succeed when set to higher values during simulated network latency.
Rejected Alternatives
- Globally increase the hardcoded value: Rejected because 30 seconds is a reasonable default, and increasing it globally for all users might hide underlying infrastructure issues that operators should be aware of. Configuration allows for per-cluster tuning. There is also a danger of setting very high READ_WRITE_TOTAL_TIMEOUT_MS value which might affect the detection of unhealthy connector worker.
- Use existing client timeouts: Rejected because `READ_WRITE_TOTAL_TIMEOUT_MS` wraps multiple client calls (admin, producer, consumer) into a single logical "read-to-end" or "write-and-read-back" cycle, which requires its own high-level timeout management.