DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: DRAFT [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Connect workers are long-running JVM processes that production teams operate much like Kafka brokers: they run in Kubernetes, sit behind load-balancers and must meet strict SLOs. Today every change to worker.properties still requires a full process restart, even for purely operational tweaks such as:
Rotating REST-server TLS certificates — changing keystore/truststore paths or passwords to meet certificate expiry or compliance rotation schedules.
Updating SSL/TLS configuration — switching cipher suites, enabling/disabling protocols, or changing client auth requirements.
Why restarts hurt
Restarts cause:
Connector task pauses and rebalances — every worker departure triggers a group rebalance, redistributing all tasks across remaining workers.
Snapshot restart penalty — Debezium’s snapshotting is an atomic, non-checkpointed process; a worker restart forces the connector to discard all progress and re-scan the entire dataset to ensure data consistency.
Extra producer fencing for EOS connectors — exactly-once source connectors must re-fence on restart, adding latency.
Rolling restart amplification — in a cluster of N workers, rotating a single certificate requires N sequential restarts. Each restart pauses tasks on that worker; if any restart exceeds
scheduled.rebalance.delay.ms, a full rebalance is triggered, redistributing tasks across the remaining workers.SLO impact — even with graceful shutdown, the aggregate downtime across a rolling restart can breach availability targets.
Precedent: KIP-226
The broker project solved the equivalent pain in, where KRaft-backed overrides plus an in-process Reconfigurable SPI allow brokers to apply a safe subset of configs on the fly.
This KIP brings the same zero-downtime workflow to Connect, but deliberately limits the first phase to SSL/TLS material properties that are proven safe to reload.
Benefits beyond SSL hot-reload
While the initial scope targets SSL material, the infrastructure introduced by this KIP enables future dynamic reconfiguration of any safe worker property. The plumbing — config topic record type, REST API surface, Reconfigurable registration, and the validate → apply pipeline — is intentionally generic. Future KIPs can expand the whitelist to include:
REST API thread pool settings (min/max threads, idle timeout)
Offset commit intervals
Producer/consumer retry and timeout parameters
Connector-level overrides (task.max, etc.)
Custom metrics reporter configuration
By establishing the pattern now with a low-risk surface (SSL material), we validate the architecture before expanding to higher-risk properties.
Public Interfaces
REST API
All endpoints are exposed on the admin listener (admin.listeners) if configured, or on the regular listener otherwise (ref). They follow the same forwarding pattern as existing Connect admin endpoints: non-leader workers forward write requests (PATCH, DELETE) to the current leader via the existing DistributedHerder forwarding mechanism.
Authorization: These endpoints require the same authorization as other admin-level Connect operations. If rest.extension.classes configures an authorization extension, it applies to these endpoints. Implementations SHOULD restrict write access (PATCH, DELETE) to admin-level principals.
Availability: These endpoints are only registered in distributed mode. In standalone mode, the WorkerConfigResource is not registered and the endpoints are not available. Standalone worker support is deferred to future work.
Leader-only writes: PATCH and DELETE operations are enforced to run on the leader worker only. If a non-leader worker receives a write request, it transparently forwards the request to the current leader using the same forwarding mechanism used by connector create/delete operations. This ensures only the leader writes to the config topic, preventing interleaved read-modify-write races on the single worker-config record.
Verb | Path | Semantics | Success | Error |
|---|---|---|---|---|
| /admin/dynamic-worker-config | Returns the current dynamic overrides as a flat JSON map of config key to config value. Only overrides that have been set via PATCH are returned (static-file values are not included). Consistent with | 200 OK | — |
| /admin/dynamic-worker-config/validate | Body = JSON map of proposed overrides. Performs full validation via | 200 OK with validation result | 400 if non-whitelisted keys |
| /admin/dynamic-worker-config | Body = JSON map of overrides to merge. The leader worker validates, then appends | 200 OK with applied config | 400 for validation failure, 500 for topic write failure |
| /admin/dynamic-worker-config/{name} | Removes a single override (tombstone) so the static-file value takes effect again. | 204 No Content | 404 if no override exists for that key |
Note on PATCH vs PUT: PATCH is used because the operation merges overrides with the existing set. PUT would imply full replacement of all overrides, which would require the caller to always send the complete set. PATCH semantics match the incremental nature of the operation (analogous to the broker's IncrementalAlterConfigs vs legacy AlterConfigs).
Request/Response examples
GET /admin/dynamic-worker-config
{
"listeners.https.ssl.keystore.location": "/opt/certs/new-keystore.jks",
"listeners.https.ssl.keystore.password": "keystore-secret"
}
Note: Only dynamic overrides are returned. Static-file values are not included. Consistent with GET /connectors/{name}/config, values (including passwords) are returned unredacted. The config topic and admin endpoint should be secured via ACLs and authorization extensions respectively.
PATCH /admin/worker-config
{
"listeners.https.ssl.keystore.location": "/opt/certs/rotated-keystore.jks",
"listeners.https.ssl.keystore.password": "new-password"
}
POST /admin/worker-config/validate
{
"listeners.https.ssl.keystore.location": "/opt/certs/rotated-keystore.jks",
"listeners.https.ssl.keystore.password": "new-password"
}
Response:
{
"valid": true,
"configs": [
"listeners.https.ssl.keystore.location",
"listeners.https.ssl.keystore.password"
],
"errors": {}
}
SPI reuse
Connect will reuse the existing org.apache.kafka.common.config.Reconfigurable interface shipped in kafka-clients. No new SPI is added.
public interface Reconfigurable extends Configurable {
Set<String> reconfigurableConfigs();
void validateReconfiguration(Map<String, ?> configs) throws ConfigException;
void reconfigure(Map<String, ?> configs);
}
The following Connect components will implement Reconfigurable in Phase 1:
RestServer — handles SSL context reload via
SslContextFactory.reload().
The Reconfigurable lifecycle in Connect:
configure()— called duringRestServerconstruction (existing behavior, unchanged).reconfigurableConfigs()— called at registration time to build the whitelist. Returns the Phase 1 config set.validateReconfiguration(configs)— called by the Herder when aworker-config-*record is consumed from the config topic. The full merged config map (static + dynamic overrides) is passed. ThrowsConfigExceptionon failure.reconfigure(configs)— called only ifvalidateReconfiguration()succeeded on all registeredReconfigurableinstances. Applies the new configuration.
Apply on registration: When a component registers as Reconfigurable via DistributedHerder.registerReconfigurable(), any existing worker config overrides from the config topic are immediately applied (validate + reconfigure). This handles the case where the config topic is read during herder.start() before the REST server registers, ensuring pre-existing overrides take effect without waiting for a future update.
Additional components can register as Reconfigurable in future KIPs.
Proposed Changes
Storage format in the Config Topic
Worker configuration overrides are stored in the existing Connect config topic alongside connector configs, task configs, target states, etc.
Record format
All worker config overrides are stored in a single record, consistent with how connector configs and task configs are stored (one record per entity containing the full config map):
Field | Value |
|---|---|
Key |
|
Value | JSON-encoded |
Tombstone | Key with null value — removes all overrides so static-file values take effect |
Schema definition
The version field (always 0 initially) enables future schema evolution. The value field contains the config value as a string, consistent with all other config topic record schemas.
WORKER_CONFIGURATION_V0 = {
"schema": {
"type": "struct",
"fields": [
{"field": "version", "type": "int32"},
{"field": "properties", "type": "map", "keys": "string", "values": "string"}
]
}
}
Design rationale: single record vs per-key records
A single aggregate record was chosen over per-key records because:
Atomic writes — a single record write is atomic at the topic level. No commit marker or buffering logic is needed.
Simple compaction — one key (
worker-config) means the latest value always wins. No edge cases with compaction removing records referenced by older commit markers.Consistent with existing patterns — the config topic already uses distinct record keys for different entities — connectors (
connector-{name}), tasks (task-{connector}-{task}), and logger levels (logger-cluster-{namespace}). Those existing records store the full config map per entity in a single record value, and this KIP follows the same pattern.Simpler code — no deferred buffering, no commit marker processing, no snapshot-semantics bookkeeping. The consumer applies the config map directly when the record is consumed.
Caveat — DELETE is read-modify-write: DELETE /admin/dynamic-worker-config/{name} cannot simply produce a tombstone for one key. Instead, the leader reads the current override map, removes the specified key, and writes the updated map back as a new worker-config record. If the last override is removed, a tombstone (null value) is written, allowing log compaction to clean up the record. This is acceptable because DELETE operations are leader-only and the override set is small (Phase 1: 6 SSL keys).
Sensitive values and config providers
Config provider support: Dynamic worker config overrides support the same ConfigProvider placeholder syntax used by connector configs (e.g., ${file:/opt/secrets:keystore-password}, ${vault:secret/ssl:password}). Placeholders are stored as-is in the config topic — raw, unresolved strings. Each worker resolves placeholders independently using its locally configured config.providers via the existing WorkerConfigTransformer before calling validateReconfiguration() and reconfigure(). This means:
Sensitive values such as passwords never need to appear in plaintext in the PATCH request body or the config topic.
The
GET /admin/dynamic-worker-configendpoint returns the raw (unresolved) placeholder strings, consistent with howGET /connectors/{name}/configreturns raw connector configs.Each worker resolves the same placeholder against its own config provider, which may return different values per node if the provider is node-local (e.g.,
FileConfigProviderreading different local files).
Update flow
Write path (leader worker)
Client Leader Worker Config Topic | | | |-- PATCH /admin/dynamic-worker-config --> | | | | | 1. Validate whitelist | | 2. Resolve config provider placeholders | | 3. Merge with current overrides map | | 4. Call validateReconfiguration() | | on all local Reconfigurable instances | | | | | [If validation fails: return 400] | | | | | 5. Write single worker-config record --->| | (full map, raw placeholders) | | 6. readToEnd() (wait for own write) | | | | | 7. Apply locally | | | | |<-- 200 OK with applied config |
Consistency model: The REST call is synchronous from the caller’s perspective — it blocks until the config records and commit marker are written to the topic and the leader’s own consumer has caught up via readToEnd(). On the leader, the config is applied (validated and reconfigured) as part of this readToEnd() processing. However, follower workers apply the config asynchronously when their consumer catches up to the commit marker. This is the same eventual consistency model used by all Connect config operations — for example, when a connector config is PUT, the REST call returns after the write is confirmed, but tasks restart on followers asynchronously as they consume the new records.
Read path (all workers including leader)
Config Topic Worker (consumer loop)
| |
|-- worker-config record --->|
| | 1. Extract override map from record
| | 2. Resolve config provider placeholders
| | via WorkerConfigTransformer
| | 3. Call validateReconfiguration() on all
| | registered Reconfigurable instances
| | 4. If valid: call reconfigure() on each
| | 5. If invalid: log error, keep old config
| | 6. Update in-memory effective config
Failure semantics
Failure | Behavior |
|---|---|
Validation failure on leader (pre-write) | 400 returned to client. Nothing written to topic. |
Config topic write failure | 500 returned to client. The single-record write either succeeds or fails atomically. |
Validation failure on follower (post-consume) | Follower logs an error and ignores the update. Follower's old config remains active. The cluster may be temporarily inconsistent — the leader's config was applied but the follower's was not. This is acceptable because validation should produce the same result on all workers for the whitelisted SSL configs (paths and passwords). A validation failure on a follower but not the leader indicates a genuine difference in the follower's environment (e.g., file does not exist at that path on the follower). |
Leader change between validate and PATCH | The validate response is informational only. PATCH performs its own validation before writing. No TOCTOU risk. |
Jetty SSL handling details
SSL material reload
When reconfigure() is called on RestServer with updated SSL properties:
Build a fresh
SslContextFactory.Serverfrom the updated properties usingSSLUtils.createServerSideSslContextFactory().For each HTTPS
ServerConnectoron the Jetty server, callSslContextFactory.reload(newFactory).New TLS handshakes use the new certificates immediately.
Existing connections continue with the old SSL context until they are closed naturally. This is standard Jetty behaviour and is safe.
What is NOT changed dynamically (Phase 1)
Listeners — no listener add/remove. The set of listening ports/addresses remains fixed.
HTTP connectors — only HTTPS connectors with SSL context are affected.
REST extensions — no re-initialization of
ConnectRestExtensioninstances.
Compatibility, Deprecation, and Migration Plan
Forward compatibility (new records, old workers)
New records in the config topic use the key prefix worker-config- and commit-worker-config. Older workers that do not recognize these prefixes will log:
ERROR Discarding config update record with invalid key: worker-config-listeners.https.ssl.keystore.location
at KafkaConfigBackingStore.java:959. This is noisy but functionally safe — the older worker ignores the record and continues operating with its static configuration.
Backward compatibility (removing overrides)
Removing all dynamic overrides (via DELETE for each key) restores pure static-file behaviour. There is no lock-in.
Downgrade path
If a cluster is downgraded after dynamic overrides have been set:
The config topic still contains
worker-config-*records.Older workers log errors for each record (see above) but are otherwise unaffected.
Static-file values take effect on all workers.
The dynamic overrides remain in the topic until manually removed or compacted away via tombstones.
Recommendation: Before downgrading, use DELETE /admin/dynamic-worker-config/{name} for each override to write tombstones, ensuring a clean config topic after compaction.
Config topic schema versioning
The WORKER_CONFIGURATION_V0 schema includes a version field. Future schema changes follow the same pattern as the existing TARGET_STATE_V0 → V1 migration in KafkaConfigBackingStore: the consumer code reads the version field and dispatches to the appropriate parser with fallback to the latest known version.
Test Plan
Unit tests
RestServerReconfigurableTestCreate a Jetty instance with HTTPS listener.
Call
validateReconfiguration()with valid new keystore → assert no exception.Call
validateReconfiguration()with non-existent keystore path → assertConfigException.Call
reconfigure()with valid new keystore → assert new TLS handshakes succeed with the new certificate.Assert existing connections continue working after reload.
KafkaConfigBackingStoreWorkerConfigTestWrite
worker-config-*records + commit marker to config topic.Assert
UpdateListener.onWorkerConfigUpdate()is called with the correct merged config.Write a tombstone for one key + commit marker → assert the static-file value is restored.
Write
worker-config-*records without a commit marker → assert no update callback is fired (records are buffered).
WorkerConfigWhitelistTestAttempt to set a non-whitelisted config via PATCH → assert 400 response.
Attempt to set a whitelisted config → assert 200 response.
Integration tests
DynamicWorkerConfigIntegrationTestStand up a 3-worker distributed Connect cluster with HTTPS.
Rotate the keystore via PATCH on one worker (forwarded to leader).
Assert all 3 workers reload SSL context.
Make HTTPS requests to each worker → assert the new certificate is served.
No rebalance should occur during the reload.
RollingUpgradeCompatibilityTestStand up a mixed cluster (old workers + new workers).
Set a dynamic override via the new worker's REST API.
Assert old workers log a warning but continue operating normally.
Assert new workers apply the override.
Rejected Alternatives
Signal-based reload (kill -HUP) | Not portable in containers. Node-local only — no cluster-wide coordination. No validation before apply; a bad keystore is discovered only at reload failure. No audit trail. |
File-watcher on worker.properties | Node-local only — requires external tooling (ConfigMaps, Ansible) to propagate file changes to every worker. No validation or dry-run. Race conditions on non-atomic file writes. Cannot extend to non-file-based configs in future phases. |
External sidecar / infrastructure-based rotation (cert-manager) | Automates rolling restarts but does not eliminate them — each restart still causes a group rebalance and task redistribution. No public API exists for an external process to trigger in-process SSL context refresh. Ties Connect to Kubernetes; does not work on bare metal, VMs, or Docker Compose. |
Extend the Kafka Admin API (IncrementalAlterConfigs) | Wrong coordination layer — Connect workers coordinate via their own config topic, not the controller's metadata log. Would couple Connect and broker version compatibility for config management. Connect worker configs require worker-side validation (e.g., checking local file existence), which the controller cannot perform. |
Future Work
Expand whitelist — add thread pool settings (
rest.server.max.threads), offset commit intervals, producer/consumer timeouts, and other operational configs that are safe to change dynamically.Standalone worker support — implement a file-watcher variant for standalone mode, where there is no config topic. The file-watcher triggers the same
Reconfigurable.validateReconfiguration() → reconfigure()pipeline.Two-phase cluster commit — for potentially disruptive config changes (e.g., listener changes), implement a protocol where all workers validate before any worker applies, ensuring cluster-wide atomicity.