Status

Current state:  Under discussion

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The `KafkaStatusBackingStore` in Kafka Connect is responsible for persisting connector and task status updates to a compacted Kafka topic. Currently, when status update writes fail, the behavior is inconsistent:


  1. Blocking Behavior: Some code paths block indefinitely waiting for Kafka to become available, which can cause worker threads to hang during broker outages.
  2. Silent Failures: Other code paths may silently drop status updates without proper logging or metrics, making it difficult to diagnose issues in production.
  3. No Operator Control: Operators have no way to tune retry behavior based on their specific reliability requirements and SLAs.
  4. Lack of Observability: There are no metrics to track status update failures, retries, or success rates, making troubleshooting difficult.


This KIP proposes a comprehensive, configurable retry mechanism with exponential backoff, jitter, and metrics to address these issues.

Public Interfaces

New Configuration Properties

The following new configuration properties will be added to `DistributedConfig`:

PropertyTypeDefaultValid ValueDescription

status.storage.retry.max.retries

int

5

[0, MAX_INT]

Maximum number of retry attempts before giving up

status.storage.retry.initial.backoff.ms

long

300

[0, MAX_LONG]

Initial backoff delay in milliseconds

status.storage.retry.max.backoff.ms

long

10000

[0, MAX_LONG]

Maximum backoff delay cap in milliseconds

status.storage.retry.backoff.multiplier

double

2.0

[1.0, ∞[ 

Multiplier applied to backoff after each attempt

status.storage.retry.jitter.factor

double

0.25

[0.0, 1.0]

Jitter factor applied to backoff to add randomness


New JMX Metrics

New metrics will be exposed under the `status-backing-store-metrics` group:

Metric NameType Description

status-update-retry-total

Counter

Total number of retry attempts for status updates

status-update-failure-total

Counter

Total number of status updates that failed after all retries

status-update-success-total

Counter

Total number of successful status updates

Proposed Changes

1. Retry Configuration in DistributedConfig

Add new configuration constants and their definitions to `DistributedConfig.java`:


public static final String STATUS_STORAGE_RETRY_MAX_RETRIES_CONFIG = "status.storage.retry.max.retries";

public static final String STATUS_STORAGE_RETRY_INITIAL_BACKOFF_MS_CONFIG = "status.storage.retry.initial.backoff.ms";

public static final String STATUS_STORAGE_RETRY_MAX_BACKOFF_MS_CONFIG = "status.storage.retry.max.backoff.ms";

public static final String STATUS_STORAGE_RETRY_BACKOFF_MULTIPLIER_CONFIG = "status.storage.retry.backoff.multiplier";

public static final String STATUS_STORAGE_RETRY_JITTER_FACTOR_CONFIG = "status.storage.retry.jitter.factor";



Validation : The configuration will validate that `initial.backoff.ms <= max.backoff.ms` at startup and throw a `ConfigException` if violated.


2. Exponential Backoff with Jitter


The backoff calculation follows the formula:

baseBackoff = initialBackoffMs × (multiplier ^ retryCount)

cappedBackoff = min(baseBackoff, maxBackoffMs)

jitter = cappedBackoff × jitterFactor × random(0, 1)

finalBackoff = cappedBackoff + jitter



Jitter Purpose : The jitter prevents the "thundering herd" problem where multiple workers retry simultaneously after a transient failure, potentially overwhelming the recovering broker.


3. Asynchronous Retry Mechanism


Retries will be scheduled asynchronously using a `ScheduledExecutorService`:


private ScheduledExecutorService sendRetryExecutor = Executors.newSingleThreadScheduledExecutor(

    ThreadUtils.createThreadFactory("status-store-retry-" + statusTopic, true));



This ensures that:

  • Main worker threads are not blocked during retries
  • Retries are processed in order for consistency
  • The executor uses daemon threads to not prevent JVM shutdown


4. Metrics Implementation


A new `StatusBackingStoreMetrics` class will be created following the pattern of `ErrorHandlingMetrics`:


public class StatusBackingStoreMetrics implements AutoCloseable {

    private final ConnectMetrics.MetricGroup metricGroup;

    private final Sensor retries;

    private final Sensor failures;

    private final Sensor successes;

    

    public void recordRetry() { retries.record(); }

    public void recordFailure() { failures.record(); }

    public void recordSuccess() { successes.record(); }

}




5. Integration Points


The `KafkaStatusBackingStore` will be modified to:


1. Read retry configuration from `DistributedConfig` in the `configure()` method

2. Use `sendWithRetry()` for connector and task status updates

3. Use `sendTopicStatusWithRetry()` for topic status updates

4. Accept a `setMetrics(ConnectMetrics)` method for optional metrics integration

Compatibility, Deprecation, and Migration Plan


Operators should review their monitoring and alerting to incorporate the new metrics. For environments with frequent transient failures:

# Increase retry tolerance for unstable networks
status.storage.retry.max.retries=10
status.storage.retry.max.backoff.ms=30000


To restore pre-KIP infinite-retry behavior

 status.storage.retry.max.retries=2147483647  


Test Plan

This KIP will be tested through a combination of unit tests, integration tests, and system tests. The primary focus is on system-level validation to ensure the feature works correctly in real-world scenarios.


Unit Tests


Unit tests will validate component-level correctness:

- Configuration validation (jitter range 0.0-1.0, initial ≤ max backoff constraint)

- Backoff calculation formula with various retry counts

- Metrics sensor registration and recording


Integration Tests


Integration tests using `EmbeddedConnectCluster` will verify:

- Retry configuration is correctly read from worker properties

- Status updates succeed under normal conditions with metrics recorded

- The retry mechanism is triggered when `RetriableException` occurs


System Tests


System tests are the primary validation mechanism for this KIP. They will run against a real Kafka cluster (either Docker-based or Ducktape) and verify end-to-end behavior:


  • Broker Failure Scenario
    1. Start a Connect cluster with default retry configuration
    2. Start a connector and verify status is written successfully
    3. Simulate broker unavailability (stop Kafka broker or network partition)
    4. Verify that retries occur with exponential backoff via log inspection
    5. Verify JMX metrics increment: `status-update-retry-total`
    6. After `max.retries` is exhausted, verify `status-update-failure-total` increments
    7. Restore broker and verify next status update succeeds


  • Transient Failure Recovery
    1. Configure a short broker unavailability window (2-3 seconds)
    2. Verify that retries succeed before exhaustion
    3. Verify `status-update-success-total` increments after retry
    4. Verify status is eventually consistent in the status topic


  • Configuration Validation
    1. Attempt to start a worker with `initial.backoff.ms > max.backoff.ms`
    2. Verify worker fails to start with `ConfigException`
    3. Attempt to configure `jitter.factor` outside [0.0, 1.0] range
    4. Verify configuration is rejected at startup


  • Backward Compatibility 
    1. Run existing Connect system tests without any retry configuration changes
    2. Verify all tests pass, demonstrating the defaults maintain existing behavior
    3. Upgrade a running Connect cluster to the new version
    4. Verify connectors continue operating correctly


  • Metrics Validation
    1. Query JMX endpoint during normal operation
    2. Verify `status-update-success-total` increments with each status change
    3. Induce failures and verify `status-update-retry-total` and `status-update-failure-total` are accurate


How We Know It Works


Success criteria : All status updates eventually succeed when brokers are healthy, with metrics accurately reflecting the number of successes.

Failure criteria : When brokers are unavailable beyond the retry window, failures are logged, metrics increment, and the system does not hang.

Regression check : Existing Connect system tests in the Kafka test suite must pass unchanged.


Rejected Alternatives


1. Circuit Breaker Pattern


A circuit breaker could completely stop status updates after repeated failures. This was rejected because:

- Status updates are critical for operator visibility

- Circuit breakers require additional state management

- The exponential backoff already provides pressure relief


2. Per-Connector Configuration


Allowing retry configuration per-connector was considered but rejected because:

- Status updates share a single backing topic

- Significantly increases configuration complexity

- Unlikely to provide meaningful benefit in practice


3. Synchronous Retries with Timeout


Blocking the calling thread for retries with a total timeout was rejected because:

- Would block worker threads during broker issues

- Could cause cascade failures in the worker

Future work 

  1. Dead Letter Queue: Failed status updates could optionally be written to a DLQ for later analysis.
  2. Callback Mechanism: Allow custom handlers to be invoked when status updates fail permanently.
  3. Per-Status-Type Configuration : Different retry settings for connector vs. task vs. topic status updates.



  • No labels