Status

Current state: Accepted

Discussion thread: here

JIRA: KAFKA-19764

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-848 introduced a batching mechanism in the group coordinator, effective in Apache Kafka 4.0. When requests are handled, the group coordinator may want to write records to a __consumer_offsets partition. These records are accumulated in a batch and the batch is flushed in three cases:

  • When the batch is about to exceed max.message.bytes  for __consumer_offsets 
  • When the batch is older than the duration given by the new group.coordinator.append.linger.ms config option, which defaults to 5 milliseconds
  • Before and after transactional writes, since a batch can only be part of a single transaction

Once the batch is flushed and replicated successfully, the request is considered complete and the response is sent to the client.

Under high volume non-transactional workloads, batching is good at reducing request latencies. But when request rates are low, the default linger config sets a minimum latency on requests of 5 milliseconds. For workloads with low latency requirements, this is a regression compared to the pre-KIP-848 group coordinator and users are impacted once they upgrade to Apache Kafka 4.0.

The same latency floor also applies to the share coordinator, since it shares the coordinator runtime with the group coordinator.

Proposed Changes

A new adaptive group.coordinator.append.linger.ms mode is proposed. When in adaptive mode,

  • batch linger times will tend to 0 ms to reduce request latencies under low workloads.
  • batch linger times will rise to keep request latencies under control under high workloads.

This saves users from having to tune group.coordinator.append.linger.ms manually.

The new adaptive linger time can be specified by setting the existing group.coordinator.append.linger.ms config option to -1. The default group.coordinator.append.linger.ms is also changed from 5 ms to -1, so that users upgrading from Apache Kafka <4.0 do not see a regression in latency. 

The same change is applied to the share coordinator, under share.coordinator.append.linger.ms.

The implementation of adaptive linger time is subject to change but will always meet the two criteria above. An initial implementation is described below.

Initial Implementation

The coordinator runtime, which is used by the group coordinator, maintains a queue of operations. Requests that come in are inserted at the back of the queue. When a new batch is created, a timer is started for append.linger.ms. Once the timer has elapsed, a flush operation is inserted at the front of the queue after append.linger.ms has elapsed.

To implement an adaptive linger time, the flush operation can be inserted at the back of the queue instead of starting a timer when a batch is started. Under low workloads, the queue will be empty and the flush operation will be immediate. Under high workloads, the queue will contain other requests before the flush operation. The records from these requests will be naturally collected up into the batch before flushing.

When batches are flushed due to exceeding max.message.bytes or transactional writes, extra flush operations will be left in the queue. To avoid triggering extra flushes from these orphaned operations, batches can be numbered and flush operations will only apply if the current batch's number is unchanged. When starting a transactional batch, no flush operation will be enqueued, since the batch will be flushed at immediately at the end of request processing.

In testing, this implementation was found to reduce offset commit latencies to <1 ms under low workloads and perform no worse than a 5 ms linger time under high workloads.

Public Interfaces

Broker Configurations

The group.coordinator.append.linger.ms and share.coordinator.append.linger.ms config options are updated to allow -1 as a value. -1 indicates that the linger time should be adaptive and close to 0 ms when under low workloads. The defaults are also changed from 5 ms to -1.

NameTypePrevious DefaultDefaultDoc
group.coordinator.append.linger.msint5 ms-1The duration in milliseconds that the group coordinator will wait for writes to accumulate before flushing them to disk. -1 indicates an adaptive linger time, based on the workload.
share.coordinator.append.linger.msint5 ms-1The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk. -1 indicates an adaptive linger time, based on the workload.

Broker Metrics

Two new metrics are proposed.

Batch linger time sensor

Measures the linger time of batches now that the linger time may be dynamic. Even when append.linger.ms is set to a fixed value, the linger time can vary due to queuing delays in the coordinator runtime.

  • kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-max
  • kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-p50
  • kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-p99
  • kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-p999
  • kafka.server:type=share-coordinator-metrics,name=batch-linger-time-ms-max
  • kafka.server:type=share-coordinator-metrics,name=batch-linger-time-ms-p50
  • kafka.server:type=share-coordinator-metrics,name=batch-linger-time-ms-p99
  • kafka.server:type=share-coordinator-metrics,name=batch-linger-time-ms-p999

Batch flush rate sensor

Measures batch flushes per second.

  • kafka.server:type=group-coordinator-metrics,name=batch-flush-rate
  • kafka.server:type=share-coordinator-metrics,name=batch-flush-rate

Compatibility, Deprecation, and Migration Plan

Users who have configured group.coordinator.append.linger.ms and share.coordinator.append.linger.ms will be unaffected.

Users upgrading from Apache Kafka <4.0 with low latency requirements will no longer see a regression in request latency.

Test Plan

Unit tests will be added for the adaptive linger setting.

Since the default append.linger.ms is changing, the existing system tests should be sufficient to check for regressions.

Rejected Alternatives

Separate config option for adaptive linger time

This would have to default to off to avoid breaking users who have configured an explicit append.linger.ms. Users would then not benefit from the adaptive linger config by default.

  • No labels