Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor changes for review comments

...

The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker configuration property group.share.recordpartition.lockmax.partitionrecord.limitlocks . By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

...

When the group coordinator fails over, the newly elected coordinator loads replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members. However, the assignments are not persisted and will be recalculated by the new group coordinator. It will bump the group epoch as a result.

The SimpleAssignor

This KIP introduces org.apache.kafka.coordinator.group.share.SimpleAssignor . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records.

...

The SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SPSO and the SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SPSO and the SPEO. The upper bound is controlled by the broker configuration group.share.recordpartition.lockmax.partitionrecord.limitlocks. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. This is an internal configuration.
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)"share"  will be added to the default value of this configuration once this feature is complete.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 30000 (1 second30 seconds), maximum 3600000 (1 hour)
group.share.recordpartition.lockmax.partitionrecord.limitlocksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
group.share.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
group.share.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
group.share.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
group.share.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

...

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

If not specified, uses the broker configuration group.share.record.lock.duration.ms.

If specified, minimum 1000limited by the broker configuration group.share.min.record.lock.duration.ms , maximum limited by the broker configuration group.share.max.record.lock.duration.ms

group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

If not specified, uses the broker configuration group.share.heartbeat.interval.ms .

If specified, minimum limited by the broker configuration group.share.min.heartbeat.interval.ms , maximum limited by the broker configuration group.share.max.heartbeat.interval.ms

group.share.session.timeout.ms 

The timeout to detect client failures when using the share group protocol.

If not specified, uses the broker configuration group.share.session.timeout.ms .

If specified, minimum limited by the broker configuration group.share.min.session.timeout.ms , maximum limited by the broker configuration group.share.max.session.timeout.ms .

...

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions loaded in the last 30 seconds.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-total)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

...