You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status 

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-17116 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, if an AsyncKafkaConsumer closes after sending a join group heartbeat but before receiving the response, it may attempt to send a leave group heartbeat without a member ID. This results in a failure with UNKNOWN_MEMBER_ID. Consequently, the broker ends up with a registered new member for which it will never receive a proper leave request.

More importantly, this scenario highlights that the initial heartbeat is not idempotent. If a member is created on the first request but the response is lost for some reason, the client will retry the heartbeat request, leading to the creation of a new member. If this happens multiple times, several “ghost” members could be created, which will only expire when their session timeout elapses. During this period, these members will continue to hold partitions until they expire.

The main downsides of this current logic are as follows:

  • Stale Partition Assignments: If the member received partition assignments during the first Heartbeat, those partitions will not be re-assigned until the rebalance timeout expires. This occurs because the broker is waiting for the closed consumer to reconcile the partitions.
  • Ghost Members: Even if no partitions were assigned, the member will remain in the group from the broker's perspective but not from the client's point of view. The member will eventually be removed due to not sending Heartbeats, but this only happens when the session timeout expires.

These issues lead to inefficient partition management and potential delays in rebalancing, affecting the overall system performance and reliability. Therefore, it is crucial to address this behavior to ensure timely and accurate group membership management and partition rebalancing.

Public Interfaces

None.

Proposed Changes

To address the issue mentioned in the Motivation section, we propose bumping the version of the ConsumerGroupHeartbeat RPC.
In this newer version, the server will require the member ID in all requests. The client will generate a UUID as the member ID during the initial heartbeat and continue using it throughout the entire session. The server will verify that a valid UUID is attached in the member ID field.

Essentially, the server will no longer generate the member ID; instead, it must be provided by the client in every request.

Compatibility, Deprecation, and Migration Plan

If there is a client using older version of the ConsumerGroupHeartbeat RPC , the join flow will still follow the rules which are mentioned in KIP-848#Member ID

Test Plan

The objective of the test is to ensure that the client using the newer protocol correctly and securely generates a UUID and provides this UUID as the member ID.
At the same time, ensure backward compatibility with older versions of the  ConsumerGroupHeartbeat RPC.
According the objective, we should have the test scenarios as the followings:

  • UUID Generation
    • Ensure that the client correctly generates a UUID as the member ID during the initial Heartbeat.
    • Verify that the UUID remains consistent across all subsequent heartbeats during the session.
  • Retry Mechanism
    • Ensure the client retries with the same UUID if a heartbeat request fails.
  • Consistency
    • Test that the client maintains the same UUID throughout the entire session and doesn't generate a new UUID midway.
  • Error Handling
    • No member ID is provided by client.
    • An invalid UUID is provided.
    • A member ID mismatch occurs withs a session.
  • Backward Compatibility
    • Ensure that clients using older versions of the RPC (where the server generates the member ID) still function correctly.
  • Mixed-Version
    • Test scenarios where multiple clients, using both the new and old versions RPC, are communicating with the server.

Rejected Alternatives

1.Temporary ID for Consumer Identification

Introduce a unique temporary ID generated by the consumer to be used for identification before member ID allocation. Add a new field in ConsumerHeartbeatRequestData to attach this ID. Upon receiving the initial join heartbeat request, the broker generates the member ID and maps the temporary ID to the member ID. This map helps identify leave heartbeat requests by the temporary ID in the described scenario. Once the consumer receives the allocated member ID, the temporary ID : member ID entry is removed from the map to prevent memory leaks.

Reason for Rejection

This solution requires adding a new field to the current protocol, introducing the complexity of backporting. Also ensuring the timely and accurate removal of temporary IDs to avoid memory leaks adds another layer of complexity to the implementation.

2.Server-side Generate the UUID at First Heartbeat but without adding the member to the group

Using the first Heartbeat sent out by the client to generate the UUID without adding the member to the group yet. The first Heartbeat would send back the new member ID, zero as the member epoch, and zero as the heartbeat interval, prompting the client to immediately send another Heartbeat.

Reason for Rejection

This solution also introduces the complexity of backporting. When using the first Heartbeat to obtain a UUID from the server, compatibility issues must be addressed because the old server still adds the member to the group when handling the first Heartbeat. 
Making the first Heartbeat idempotent (i.e., generating the member ID only) represents a behavioral change. This means clients can no longer assume that the first Heartbeat will both "create a member ID" and "add the member to the group." Last and the least, this will need to update the content of KIP-848




  • No labels