Status

Current stateAccepted

Discussion thread: here

Vote thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-932: Queues for Kafka introduced share groups to Apache Kafka. This KIP introduces additional group configurations to give users more control over share groups. It also formalizes the manner in which group configurations with limits specified by broker configurations are validated and enforced.

Proposed Changes

Some of the configuration for share groups in KIP-932 is specified at the broker level without an equivalent group configuration.

First, it enables group configuration for the maximum number of delivery attempts, so error tolerance can be configured for each group. For records which are currently in-flight when the limit is reduced, a final delivery attempt which exceeds the reduced limit may be made. The delivery count limit is not bounds checked at the start of the delivery attempt because fetching is not concerned with archiving records. It's a boundary case that only occurs when the limit decreases and messages are already in-flight. When a delivery attempt completes, we check whether the number of deliveries has reached or exceeded the limit, and archive the records accordingly. If the limit was decreased, it's possible that the limit has been exceeded in which case a final delivery attempt is permitted. If the limit was increased, it's possible that a record was nearing its final attempt but now it has more delivery attempts available. The throttling that occurs as the final attempts are made might have been applied previously, but can be relaxed to match the new limit.

Second, it enables group configuration for the record lock limit per share-partition, so the balance between resource consumption and concurrency can be configured for each group. The defaults for the equivalent broker configurations were chosen to be suitable for most situations, but group overrides give administrators more control if they desire. In a similar way, if the limit is decreased, the number of record locks will gradually decrease to fit within the new limit as record delivery completes.

Finally, KIP-1222: Acquisition lock timeout renewal in share consumer explicit mode introduced renewal of acquisition locks. This enables processing of records to be extended indefinitely by repeatedly renewing the acquisition locks. A new group configuration is added to control whether this behavior is permitted for the group. If the value of the configuration is set to false, a renew acknowledgement will be rejected with error code INVALID_RECORD_STATE  and an error message which explains that renewing acquisition locks is not enabled for the group.

Validation and enforcement of group configurations

Group configurations have been introduced in several KIPs (848, 932, 1071) in which broker configurations specify the permitted limits of the group configurations. For example, the group configuration consumer.session.timeout.ms  is not allowed to be less than the broker configuration group.consumer.min.session.timeout.ms  or greater than group.consumer.max.session.timeout.ms . When the group configuration is updated, it is simple to validate it against the broker configuration limits. However, if a broker is restarted and the broker configuration has been changed, the group configurations might now fall outside the permitted limits. Prior to this KIP, in some cases, the broker will not start because the group configuration validations fail. In others, the group configurations are only checked when they are being updated. This KIP also resolves these inconsistencies.

  • When a group configuration is updated, the value is validated against the broker configuration limits. If the validation fails, the request is rejected. (No change)
  • When a broker configuration is updated, the existing group configurations are not validated. This ensures that the administrator is able to tighten or relax limits easily.
  • When a broker starts, if a group configuration lies outside the broker configuration limits, the broker is permitted to start.
  • When a group configuration is evaluated, the effective value of a group configuration is capped by the broker limits.

Public Interfaces

Configuration

Broker configuration

New configurations are added to specify the upper and lower bounds of the associated group configurations.

ConfigurationDescriptionValues

group.share.delivery.count.limit 

(Existing configuration)Default 5, minimum 2, maximum 10 (no change)

group.share.max.delivery.count.limit 

The maximum value of a group configuration for the maximum number of delivery attempts for a record delivered to a share group.

Default 10, minimum 5, maximum 25

group.share.min.delivery.count.limit 

The minimum value of a group configuration for the maximum number of delivery attempts for a record delivered to a share group.Default 2, minimum 2, maximum 5

group.share.partition.max.record.locks 

(Existing configuration)Default 2000, minimum 100, maximum 10000 (no change)

group.share.max.partition.max.record.locks 

The maximum value of a group configuration for the share-group record lock limit per share-partition.Default 4000, minimum 2000, maximum 10000
group.share.min.partition.max.record.locks The minimum value of a group configuration for the share-group record lock limit per share-partition.Default 100, minimum 100, maximum 2000

Group configuration

ConfigurationDescriptionValues
share.delivery.count.limit The maximum number of delivery attempts for a record delivered to the share group.

If not specified, uses the broker configuration group.share.delivery.count.limit .

If specified, minimum limited by the broker configuration group.share.min.delivery.count.limit , maximum limited by the broker configuration group.share.max.delivery.count.limit .

share.partition.max.record.locks Record lock limit per share-partition.

If not specified, uses the broker configuration group.share.partition.max.record.locks .

If specified, minimum limited by the broker configuration group.share.min.partitions.max.record.locks , maximum limited by the broker configuration group.share.max.partitions.max.record.locks .

share.renew.acknowledge.enable Whether the renew acknowledge type is enabled for the share group.

Valid values "true" (default) and "false"

Compatibility, Deprecation, and Migration Plan

The changes in this KIP are limited to adding a few new configurations to control existing behavior. There should be no compatibility concerns.

Test Plan

The feature will be thoroughly tested with unit and integration tests.

Rejected Alternatives

None so far.

  • No labels