DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The `KafkaStatusBackingStore` in Kafka Connect is responsible for persisting connector and task status updates to a compacted Kafka topic. Currently, when status update writes fail, the behavior is inconsistent:
- Blocking Behavior: Some code paths block indefinitely waiting for Kafka to become available, which can cause worker threads to hang during broker outages.
- Silent Failures: Other code paths may silently drop status updates without proper logging or metrics, making it difficult to diagnose issues in production.
- No Operator Control: Operators have no way to tune retry behavior based on their specific reliability requirements and SLAs.
- Lack of Observability: There are no metrics to track status update failures, retries, or success rates, making troubleshooting difficult.
This KIP proposes a comprehensive, configurable retry mechanism with exponential backoff, jitter, and metrics to address these issues.
Public Interfaces
New Configuration Properties
The following new configuration properties will be added to `DistributedConfig`:
| Property | Type | Default | Valid Value | Description |
|---|---|---|---|---|
status.storage.retry.max.retries | int | 5 | [0, MAX_INT] | Maximum number of retry attempts before giving up |
status.storage.retry.initial.backoff.ms | long | 300 | [0, MAX_LONG] | Initial backoff delay in milliseconds |
status.storage.retry.max.backoff.ms | long | 10000 | [0, MAX_LONG] | Maximum backoff delay cap in milliseconds |
status.storage.retry.backoff.multiplier | double | 2.0 | [1.0, ∞[ | Multiplier applied to backoff after each attempt |
status.storage.retry.jitter.factor | double | 0.25 | [0.0, 1.0] | Jitter factor applied to backoff to add randomness |
New JMX Metrics
New metrics will be exposed under the `status-backing-store-metrics` group:
| Metric Name | Type | Description |
|---|---|---|
status-update-retry-total | Counter | Total number of retry attempts for status updates |
status-update-failure-total | Counter | Total number of status updates that failed after all retries |
status-update-success-total | Counter | Total number of successful status updates |
Proposed Changes
1. Retry Configuration in DistributedConfig
Add new configuration constants and their definitions to `DistributedConfig.java`:
public static final String STATUS_STORAGE_RETRY_MAX_RETRIES_CONFIG = "status.storage.retry.max.retries"; public static final String STATUS_STORAGE_RETRY_INITIAL_BACKOFF_MS_CONFIG = "status.storage.retry.initial.backoff.ms"; public static final String STATUS_STORAGE_RETRY_MAX_BACKOFF_MS_CONFIG = "status.storage.retry.max.backoff.ms"; public static final String STATUS_STORAGE_RETRY_BACKOFF_MULTIPLIER_CONFIG = "status.storage.retry.backoff.multiplier"; public static final String STATUS_STORAGE_RETRY_JITTER_FACTOR_CONFIG = "status.storage.retry.jitter.factor";
Validation : The configuration will validate that `initial.backoff.ms <= max.backoff.ms` at startup and throw a `ConfigException` if violated.
2. Exponential Backoff with Jitter
The backoff calculation follows the formula:
baseBackoff = initialBackoffMs × (multiplier ^ retryCount) cappedBackoff = min(baseBackoff, maxBackoffMs) jitter = cappedBackoff × jitterFactor × random(0, 1) finalBackoff = cappedBackoff + jitter
Jitter Purpose : The jitter prevents the "thundering herd" problem where multiple workers retry simultaneously after a transient failure, potentially overwhelming the recovering broker.
3. Asynchronous Retry Mechanism
Retries will be scheduled asynchronously using a `ScheduledExecutorService`:
private ScheduledExecutorService sendRetryExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("status-store-retry-" + statusTopic, true));
This ensures that:
- Main worker threads are not blocked during retries
- Retries are processed in order for consistency
- The executor uses daemon threads to not prevent JVM shutdown
4. Metrics Implementation
A new `StatusBackingStoreMetrics` class will be created following the pattern of `ErrorHandlingMetrics`:
public class StatusBackingStoreMetrics implements AutoCloseable {
private final ConnectMetrics.MetricGroup metricGroup;
private final Sensor retries;
private final Sensor failures;
private final Sensor successes;
public void recordRetry() { retries.record(); }
public void recordFailure() { failures.record(); }
public void recordSuccess() { successes.record(); }
}
5. Integration Points
The `KafkaStatusBackingStore` will be modified to:
1. Read retry configuration from `DistributedConfig` in the `configure()` method
2. Use `sendWithRetry()` for connector and task status updates
3. Use `sendTopicStatusWithRetry()` for topic status updates
4. Accept a `setMetrics(ConnectMetrics)` method for optional metrics integration
Compatibility, Deprecation, and Migration Plan
Operators should review their monitoring and alerting to incorporate the new metrics. For environments with frequent transient failures:
# Increase retry tolerance for unstable networks status.storage.retry.max.retries=10 status.storage.retry.max.backoff.ms=30000
To restore pre-KIP infinite-retry behavior
status.storage.retry.max.retries=2147483647
Test Plan
This KIP will be tested through a combination of unit tests, integration tests, and system tests. The primary focus is on system-level validation to ensure the feature works correctly in real-world scenarios.
Unit Tests
Unit tests will validate component-level correctness:
- Configuration validation (jitter range 0.0-1.0, initial ≤ max backoff constraint)
- Backoff calculation formula with various retry counts
- Metrics sensor registration and recording
Integration Tests
Integration tests using `EmbeddedConnectCluster` will verify:
- Retry configuration is correctly read from worker properties
- Status updates succeed under normal conditions with metrics recorded
- The retry mechanism is triggered when `RetriableException` occurs
System Tests
System tests are the primary validation mechanism for this KIP. They will run against a real Kafka cluster (either Docker-based or Ducktape) and verify end-to-end behavior:
- Broker Failure Scenario
- Start a Connect cluster with default retry configuration
- Start a connector and verify status is written successfully
- Simulate broker unavailability (stop Kafka broker or network partition)
- Verify that retries occur with exponential backoff via log inspection
- Verify JMX metrics increment: `status-update-retry-total`
- After `max.retries` is exhausted, verify `status-update-failure-total` increments
- Restore broker and verify next status update succeeds
- Transient Failure Recovery
- Configure a short broker unavailability window (2-3 seconds)
- Verify that retries succeed before exhaustion
- Verify `status-update-success-total` increments after retry
- Verify status is eventually consistent in the status topic
- Configuration Validation
- Attempt to start a worker with `initial.backoff.ms > max.backoff.ms`
- Verify worker fails to start with `ConfigException`
- Attempt to configure `jitter.factor` outside [0.0, 1.0] range
- Verify configuration is rejected at startup
- Backward Compatibility
- Run existing Connect system tests without any retry configuration changes
- Verify all tests pass, demonstrating the defaults maintain existing behavior
- Upgrade a running Connect cluster to the new version
- Verify connectors continue operating correctly
- Metrics Validation
- Query JMX endpoint during normal operation
- Verify `status-update-success-total` increments with each status change
- Induce failures and verify `status-update-retry-total` and `status-update-failure-total` are accurate
How We Know It Works
- Success criteria : All status updates eventually succeed when brokers are healthy, with metrics accurately reflecting the number of successes.
- Failure criteria : When brokers are unavailable beyond the retry window, failures are logged, metrics increment, and the system does not hang.
- Regression check : Existing Connect system tests in the Kafka test suite must pass unchanged.
Rejected Alternatives
1. Circuit Breaker Pattern
A circuit breaker could completely stop status updates after repeated failures. This was rejected because:
- Status updates are critical for operator visibility
- Circuit breakers require additional state management
- The exponential backoff already provides pressure relief
2. Per-Connector Configuration
Allowing retry configuration per-connector was considered but rejected because:
- Status updates share a single backing topic
- Significantly increases configuration complexity
- Unlikely to provide meaningful benefit in practice
3. Synchronous Retries with Timeout
Blocking the calling thread for retries with a total timeout was rejected because:
- Would block worker threads during broker issues
- Could cause cascade failures in the worker
Future work
- Dead Letter Queue: Failed status updates could optionally be written to a DLQ for later analysis.
- Callback Mechanism: Allow custom handlers to be invoked when status updates fail permanently.
- Per-Status-Type Configuration : Different retry settings for connector vs. task vs. topic status updates.