Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Typo in inactive session duration

...

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over. However, the members of the groups and their assignments are not persisted. This means that existing members will have to rejoin the share group following a group coordinator failover.

The SimpleShareAssignor

This KIP introduces org.apache.kafka.coordinator.group.assignor.SimpleShareAssignor . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records.

  • The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  • If any partitions which were not assigned any members, members are assigned members round-robin until each partition has at least one member assigned to it.

When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.

Here an example of how the assignments might change as members join the share group and the number of partitions changes.

State

Members subscribed to T1

Topic:partitions

Assignment change

Assignments

M1 subscribed to topic T1 with 1 partition

M1 (hash 1)

T1:0

Assign T1:0 to M1

M1 → T1:0

M2 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2)

T1:0

Assign T1:0 to M2

M1 → T1:0

M2 → T1:0

Add 3 partitions to T1

M1 (hash 1), M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:1 and T1:3 to M2

Revoke T1:0 from M2

Assign T1:2 to M1

M1 → T1:0, T1:2

M2 → T1:1, T1:3

M3 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3)

T1:0, T1:1, T1:2, T1:3

Assign T1:2 to M3

Revoke T1:2 from M1

M1 → T1:0

M2 → T1:1, T1:3

M3 → T1:2

M4 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4)

T1:0, T1:1, T1:2, T1:3

Assign T1:3 to M4

Revoke T1:3 from M2

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5, M6, M7, M8 join the group and subscribe to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

Assign T1:0 to M5

Assign T1:1 to M6

Assign T1:2 to M7

Assign T1:3 to M8

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

M1 subscribes to T2 with 2 partitions

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

T2:0, T2:1

Assign T2:0 and T2:1 to M1

M1 → T1:0, T2:0, T2:1

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

All members leave except M2

M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:0, T1:2, T1:3 to M2

M2 → T1:0, T1:1, T1:2, T1:3

SimpleShareAssignor does not make assignments based on rack IDs. SimpleShareAssignor does not make assignments based on lag or consumer throughput.

Record delivery and acknowledgement

...

Share sessions use memory on the share-partition leader. Each broker that is a share-partition leader has a cache of share sessions. Because a share session is an integral part of how share groups work, as opposed to a performance optimisation in the manner of fetch sessions, the limit is calculated based on share group configurations group.share.max.groups  and group.share.max.size . Sessions are evicted if they have been inactive for more than 120,000 milliseconds (2 minutes). The key of the cache is the pair (GroupId, MemberId).

...