Status

Current state: DRAFT [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect workers are long-running JVM processes that production teams operate much like Kafka brokers: they run in Kubernetes, sit behind load-balancers and must meet strict SLOs. Today every change to worker.properties still requires a full process restart, even for purely operational tweaks such as:

  • Rotating REST-server TLS certificates — changing keystore/truststore paths or passwords to meet certificate expiry or compliance rotation schedules.

  • Updating SSL/TLS configuration — switching cipher suites, enabling/disabling protocols, or changing client auth requirements.

Why restarts hurt

Restarts cause:

  1. Connector task pauses and rebalances — every worker departure triggers a group rebalance, redistributing all tasks across remaining workers.

  2. Snapshot restart penalty — Debezium’s snapshotting is an atomic, non-checkpointed process; a worker restart forces the connector to discard all progress and re-scan the entire dataset to ensure data consistency.

  3. Extra producer fencing for EOS connectors — exactly-once source connectors must re-fence on restart, adding latency.

  4. Rolling restart amplification — in a cluster of N workers, rotating a single certificate requires N sequential restarts. Each restart pauses tasks on that worker; if any restart exceeds scheduled.rebalance.delay.ms, a full rebalance is triggered, redistributing tasks across the remaining workers.

  5. SLO impact — even with graceful shutdown, the aggregate downtime across a rolling restart can breach availability targets.

Precedent: KIP-226

The broker project solved the equivalent pain in, where KRaft-backed overrides plus an in-process Reconfigurable SPI allow brokers to apply a safe subset of configs on the fly.

This KIP brings the same zero-downtime workflow to Connect, but deliberately limits the first phase to SSL/TLS material properties that are proven safe to reload.

Benefits beyond SSL hot-reload

While the initial scope targets SSL material, the infrastructure introduced by this KIP enables future dynamic reconfiguration of any safe worker property. The plumbing — config topic record type, REST API surface, Reconfigurable registration, and the validate → apply pipeline — is intentionally generic. Future KIPs can expand the whitelist to include:

  • REST API thread pool settings (min/max threads, idle timeout)

  • Offset commit intervals

  • Producer/consumer retry and timeout parameters

  • Connector-level overrides (task.max, etc.)

  • Custom metrics reporter configuration

By establishing the pattern now with a low-risk surface (SSL material), we validate the architecture before expanding to higher-risk properties.

Public Interfaces

REST API

All endpoints are exposed on the admin listener (admin.listeners) if configured, or on the regular listener otherwise (ref). They follow the same forwarding pattern as existing Connect admin endpoints: non-leader workers forward write requests (PATCH, DELETE) to the current leader via the existing DistributedHerder forwarding mechanism.

Authorization: These endpoints require the same authorization as other admin-level Connect operations. If rest.extension.classes configures an authorization extension, it applies to these endpoints. Implementations SHOULD restrict write access (PATCH, DELETE) to admin-level principals.

Availability: These endpoints are only registered in distributed mode. In standalone mode, the WorkerConfigResource is not registered and the endpoints are not available. Standalone worker support is deferred to future work.

Leader-only writes: PATCH and DELETE operations are enforced to run on the leader worker only. If a non-leader worker receives a write request, it transparently forwards the request to the current leader using the same forwarding mechanism used by connector create/delete operations. This ensures only the leader writes to the config topic, preventing interleaved read-modify-write races on the single worker-config record.

Verb

Path

Semantics

Success

Error

GET

/admin/dynamic-worker-config

Returns the current dynamic overrides as a flat JSON map of config key to config value. Only overrides that have been set via PATCH are returned (static-file values are not included). Consistent with GET /connectors/{name}/config, values are returned unredacted.

200 OK

POST

/admin/dynamic-worker-config/validate

Body = JSON map of proposed overrides. Performs full validation via Reconfigurable.validateReconfiguration() on the receiving worker and returns validation errors or a preview of the effective config. No state is modified.

200 OK with validation result

400 if non-whitelisted keys

PATCH

/admin/dynamic-worker-config

Body = JSON map of overrides to merge. The leader worker validates, then appends worker-config-* records to the Connect config topic; all workers consume, validate locally, and apply.

200 OK with applied config

400 for validation failure, 500 for topic write failure

DELETE

/admin/dynamic-worker-config/{name}

Removes a single override (tombstone) so the static-file value takes effect again.

204 No Content

404 if no override exists for that key

Note on PATCH vs PUT: PATCH is used because the operation merges overrides with the existing set. PUT would imply full replacement of all overrides, which would require the caller to always send the complete set. PATCH semantics match the incremental nature of the operation (analogous to the broker's IncrementalAlterConfigs vs legacy AlterConfigs).

Request/Response examples

GET /admin/dynamic-worker-config

{
  "listeners.https.ssl.keystore.location": "/opt/certs/new-keystore.jks",
  "listeners.https.ssl.keystore.password": "keystore-secret"
}

Note: Only dynamic overrides are returned. Static-file values are not included. Consistent with GET /connectors/{name}/config, values (including passwords) are returned unredacted. The config topic and admin endpoint should be secured via ACLs and authorization extensions respectively.

PATCH /admin/worker-config

{
  "listeners.https.ssl.keystore.location": "/opt/certs/rotated-keystore.jks",
  "listeners.https.ssl.keystore.password": "new-password"
}

POST /admin/worker-config/validate

{
  "listeners.https.ssl.keystore.location": "/opt/certs/rotated-keystore.jks",
  "listeners.https.ssl.keystore.password": "new-password"
}

Response:

{
  "valid": true,
  "configs": [
    "listeners.https.ssl.keystore.location",
    "listeners.https.ssl.keystore.password"
  ],
  "errors": {}
}

SPI reuse

Connect will reuse the existing org.apache.kafka.common.config.Reconfigurable interface shipped in kafka-clients. No new SPI is added.

public interface Reconfigurable extends Configurable {
    Set<String> reconfigurableConfigs();
    void validateReconfiguration(Map<String, ?> configs) throws ConfigException;
    void reconfigure(Map<String, ?> configs);
}

The following Connect components will implement Reconfigurable in Phase 1:

  • RestServer — handles SSL context reload via SslContextFactory.reload().

The Reconfigurable lifecycle in Connect:

  1. configure() — called during RestServer construction (existing behavior, unchanged).

  2. reconfigurableConfigs() — called at registration time to build the whitelist. Returns the Phase 1 config set.

  3. validateReconfiguration(configs) — called by the Herder when a worker-config-* record is consumed from the config topic. The full merged config map (static + dynamic overrides) is passed. Throws ConfigException on failure.

  4. reconfigure(configs) — called only if validateReconfiguration() succeeded on all registered Reconfigurable instances. Applies the new configuration.

Apply on registration: When a component registers as Reconfigurable via DistributedHerder.registerReconfigurable(), any existing worker config overrides from the config topic are immediately applied (validate + reconfigure). This handles the case where the config topic is read during herder.start() before the REST server registers, ensuring pre-existing overrides take effect without waiting for a future update.

Additional components can register as Reconfigurable in future KIPs.

Proposed Changes

Storage format in the Config Topic

Worker configuration overrides are stored in the existing Connect config topic alongside connector configs, task configs, target states, etc.

Record format

All worker config overrides are stored in a single record, consistent with how connector configs and task configs are stored (one record per entity containing the full config map):

Field

Value

Key

worker-config (fixed key for all worker config overrides)

Value

JSON-encoded SchemaAndValue using WORKER_CONFIGURATION_V0 schema. Contains a map of all current override key-value pairs.

Tombstone

Key with null value — removes all overrides so static-file values take effect

Schema definition

The version field (always 0 initially) enables future schema evolution. The value field contains the config value as a string, consistent with all other config topic record schemas.

WORKER_CONFIGURATION_V0 = {
    "schema": {
        "type": "struct",
        "fields": [
            {"field": "version", "type": "int32"},
            {"field": "properties", "type": "map", "keys": "string", "values": "string"}
        ]
    }
}

Design rationale: single record vs per-key records

A single aggregate record was chosen over per-key records because:

  1. Atomic writes — a single record write is atomic at the topic level. No commit marker or buffering logic is needed.

  2. Simple compaction — one key (worker-config) means the latest value always wins. No edge cases with compaction removing records referenced by older commit markers.

  3. Consistent with existing patterns — the config topic already uses distinct record keys for different entities — connectors (connector-{name}), tasks (task-{connector}-{task}), and logger levels (logger-cluster-{namespace}). Those existing records store the full config map per entity in a single record value, and this KIP follows the same pattern.

  4. Simpler code — no deferred buffering, no commit marker processing, no snapshot-semantics bookkeeping. The consumer applies the config map directly when the record is consumed.

Caveat — DELETE is read-modify-write: DELETE /admin/dynamic-worker-config/{name} cannot simply produce a tombstone for one key. Instead, the leader reads the current override map, removes the specified key, and writes the updated map back as a new worker-config record. If the last override is removed, a tombstone (null value) is written, allowing log compaction to clean up the record. This is acceptable because DELETE operations are leader-only and the override set is small (Phase 1: 6 SSL keys).

Sensitive values and config providers

Config provider support: Dynamic worker config overrides support the same ConfigProvider placeholder syntax used by connector configs (e.g., ${file:/opt/secrets:keystore-password}, ${vault:secret/ssl:password}). Placeholders are stored as-is in the config topic — raw, unresolved strings. Each worker resolves placeholders independently using its locally configured config.providers via the existing WorkerConfigTransformer before calling validateReconfiguration() and reconfigure(). This means:

  • Sensitive values such as passwords never need to appear in plaintext in the PATCH request body or the config topic.

  • The GET /admin/dynamic-worker-config endpoint returns the raw (unresolved) placeholder strings, consistent with how GET /connectors/{name}/config returns raw connector configs.

  • Each worker resolves the same placeholder against its own config provider, which may return different values per node if the provider is node-local (e.g., FileConfigProvider reading different local files).

Update flow

Write path (leader worker)

Client                     Leader Worker                   Config Topic
  |                            |                               |
  |-- PATCH /admin/dynamic-worker-config -->                   |
  |                            |                               |
  |                    1. Validate whitelist                    |
  |                    2. Resolve config provider placeholders  |
  |                    3. Merge with current overrides map      |
  |                    4. Call validateReconfiguration()        |
  |                       on all local Reconfigurable instances |
  |                            |                               |
  |                    [If validation fails: return 400]        |
  |                            |                               |
  |                    5. Write single worker-config record --->|
  |                       (full map, raw placeholders)          |
  |                    6. readToEnd() (wait for own write)      |
  |                            |                               |
  |                    7. Apply locally                         |
  |                            |                               |
  |<-- 200 OK with applied config                              |


Consistency model: The REST call is synchronous from the caller’s perspective — it blocks until the config records and commit marker are written to the topic and the leader’s own consumer has caught up via readToEnd(). On the leader, the config is applied (validated and reconfigured) as part of this readToEnd() processing. However, follower workers apply the config asynchronously when their consumer catches up to the commit marker. This is the same eventual consistency model used by all Connect config operations — for example, when a connector config is PUT, the REST call returns after the write is confirmed, but tasks restart on followers asynchronously as they consume the new records.

Read path (all workers including leader)

Config Topic              Worker (consumer loop)
    |                            |
    |-- worker-config record --->|
    |                            |  1. Extract override map from record
    |                            |  2. Resolve config provider placeholders
    |                            |     via WorkerConfigTransformer
    |                            |  3. Call validateReconfiguration() on all
    |                            |     registered Reconfigurable instances
    |                            |  4. If valid: call reconfigure() on each
    |                            |  5. If invalid: log error, keep old config
    |                            |  6. Update in-memory effective config

Failure semantics

Failure

Behavior

Validation failure on leader (pre-write)

400 returned to client. Nothing written to topic.

Config topic write failure

500 returned to client. The single-record write either succeeds or fails atomically.

Validation failure on follower (post-consume)

Follower logs an error and ignores the update. Follower's old config remains active. The cluster may be temporarily inconsistent — the leader's config was applied but the follower's was not. This is acceptable because validation should produce the same result on all workers for the whitelisted SSL configs (paths and passwords). A validation failure on a follower but not the leader indicates a genuine difference in the follower's environment (e.g., file does not exist at that path on the follower).

Leader change between validate and PATCH

The validate response is informational only. PATCH performs its own validation before writing. No TOCTOU risk.

Jetty SSL handling details

SSL material reload

When reconfigure() is called on RestServer with updated SSL properties:

  1. Build a fresh SslContextFactory.Server from the updated properties using SSLUtils.createServerSideSslContextFactory().

  2. For each HTTPS ServerConnector on the Jetty server, call SslContextFactory.reload(newFactory).

  3. New TLS handshakes use the new certificates immediately.

  4. Existing connections continue with the old SSL context until they are closed naturally. This is standard Jetty behaviour and is safe.

What is NOT changed dynamically (Phase 1)

  • Listeners — no listener add/remove. The set of listening ports/addresses remains fixed.

  • HTTP connectors — only HTTPS connectors with SSL context are affected.

  • REST extensions — no re-initialization of ConnectRestExtension instances.

Compatibility, Deprecation, and Migration Plan

Forward compatibility (new records, old workers)

New records in the config topic use the key prefix worker-config- and commit-worker-config. Older workers that do not recognize these prefixes will log:

ERROR Discarding config update record with invalid key: worker-config-listeners.https.ssl.keystore.location

at KafkaConfigBackingStore.java:959. This is noisy but functionally safe — the older worker ignores the record and continues operating with its static configuration.

Backward compatibility (removing overrides)

Removing all dynamic overrides (via DELETE for each key) restores pure static-file behaviour. There is no lock-in.

Downgrade path

If a cluster is downgraded after dynamic overrides have been set:

  1. The config topic still contains worker-config-* records.

  2. Older workers log errors for each record (see above) but are otherwise unaffected.

  3. Static-file values take effect on all workers.

  4. The dynamic overrides remain in the topic until manually removed or compacted away via tombstones.

Recommendation: Before downgrading, use DELETE /admin/dynamic-worker-config/{name} for each override to write tombstones, ensuring a clean config topic after compaction.

Config topic schema versioning

The WORKER_CONFIGURATION_V0 schema includes a version field. Future schema changes follow the same pattern as the existing TARGET_STATE_V0 → V1 migration in KafkaConfigBackingStore: the consumer code reads the version field and dispatches to the appropriate parser with fallback to the latest known version.

Test Plan

Unit tests

  • RestServerReconfigurableTest

    • Create a Jetty instance with HTTPS listener.

    • Call validateReconfiguration() with valid new keystore → assert no exception.

    • Call validateReconfiguration() with non-existent keystore path → assert ConfigException.

    • Call reconfigure() with valid new keystore → assert new TLS handshakes succeed with the new certificate.

    • Assert existing connections continue working after reload.

  • KafkaConfigBackingStoreWorkerConfigTest

    • Write worker-config-* records + commit marker to config topic.

    • Assert UpdateListener.onWorkerConfigUpdate() is called with the correct merged config.

    • Write a tombstone for one key + commit marker → assert the static-file value is restored.

    • Write worker-config-* records without a commit marker → assert no update callback is fired (records are buffered).

  • WorkerConfigWhitelistTest

    • Attempt to set a non-whitelisted config via PATCH → assert 400 response.

    • Attempt to set a whitelisted config → assert 200 response.

Integration tests

  • DynamicWorkerConfigIntegrationTest

    • Stand up a 3-worker distributed Connect cluster with HTTPS.

    • Rotate the keystore via PATCH on one worker (forwarded to leader).

    • Assert all 3 workers reload SSL context.

    • Make HTTPS requests to each worker → assert the new certificate is served.

    • No rebalance should occur during the reload.

  • RollingUpgradeCompatibilityTest

    • Stand up a mixed cluster (old workers + new workers).

    • Set a dynamic override via the new worker's REST API.

    • Assert old workers log a warning but continue operating normally.

    • Assert new workers apply the override.

Rejected Alternatives

Signal-based reload (kill -HUP)

Not portable in containers. Node-local only — no cluster-wide coordination. No validation before apply; a bad keystore is discovered only at reload failure. No audit trail.

File-watcher on worker.properties

Node-local only — requires external tooling (ConfigMaps, Ansible) to propagate file changes to every worker. No validation or dry-run. Race conditions on non-atomic file writes. Cannot extend to non-file-based configs in future phases.

External sidecar / infrastructure-based rotation (cert-manager)

Automates rolling restarts but does not eliminate them — each restart still causes a group rebalance and task redistribution. No public API exists for an external process to trigger in-process SSL context refresh. Ties Connect to Kubernetes; does not work on bare metal, VMs, or Docker Compose.

Extend the Kafka Admin API (IncrementalAlterConfigs)

Wrong coordination layer — Connect workers coordinate via their own config topic, not the controller's metadata log. Would couple Connect and broker version compatibility for config management. Connect worker configs require worker-side validation (e.g., checking local file existence), which the controller cannot perform.

Future Work

  • Expand whitelist — add thread pool settings (rest.server.max.threads), offset commit intervals, producer/consumer timeouts, and other operational configs that are safe to change dynamically.

  • Standalone worker support — implement a file-watcher variant for standalone mode, where there is no config topic. The file-watcher triggers the same Reconfigurable.validateReconfiguration() → reconfigure() pipeline.

  • Two-phase cluster commit — for potentially disruptive config changes (e.g., listener changes), implement a protocol where all workers validate before any worker applies, ensuring cluster-wide atomicity.

  • No labels