Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA:
-
KAFKA-17116Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The core issue that this KIP addresses is the non-idempotency of the initial heartbeat in the new consumer group protocol.If the member ID is null or empty, the server will reject the request
with an InvalidRequestException.
When an AsyncKafkaConsumer
sends a join group heartbeat but closes before receiving the response, it may attempt to send a leave group heartbeat without a member ID, leading to a failure with UNKNOWN_MEMBER_ID
. This scenario can result in the broker registering a new member for which it will never receive a proper leave request.
Moreover, if the response to the initial heartbeat is lost, the client will retry the request, potentially creating a new member each time. This can lead to the creation of several "ghost" members that will persist until their session timeout elapses. During this period, these ghost members continue to hold onto partitions, causing delays in partition reassignment and inefficiencies in group management.
The main downsides of this current logic are as follows:
- Stale Partition Assignments: If the member received partition assignments during the first Heartbeat, those partitions will not be re-assigned until the rebalance timeout expires. This occurs because the broker is waiting for the closed consumer to reconcile the partitions.
- Ghost Members: Even if no partitions were assigned, the member will remain in the group from the broker's perspective but not from the client's point of view. The member will eventually be removed due to not sending Heartbeats, but this only happens when the session timeout expires.
These issues lead to inefficient partition management and potential delays in rebalancing, affecting the overall system performance and reliability. Therefore, it is crucial to address this behavior to ensure timely and accurate group membership management and partition rebalancing. By addressing the non-idempotency of the initial heartbeat, this KIP aims to resolve these issues, ensuring more accurate group membership management and timely partition rebalancing.
Last but not least, initially in KIP-848, we rejected the idea of allowing the client to generate its own Member ID due to concerns about additional dependencies and potential issues with correct ID generation. However, since then, improvements in libraries like librdkafka
have addressed these concerns. Given these changes, we now propose requiring client-side Member ID generation, as it simplifies the process and improves reliability.
Public Interfaces
To address the issue of non-idempotency in the initial heartbeat and improve reliability, we propose the following changes to the ConsumerGroupHeartbeat RPC
:
In the new version of the ConsumerGroupHeartbeat RPC
, the client must generate a UUID as the member ID during the initial heartbeat. This member ID must be included in every subsequent request to ensure consistency. We highly recommend that users utilize a UUID as the member ID, but ultimately, the choice is up to the user. The server will validate that a valid member ID is provided in the member ID field. If the member ID is null or empty and the request is made over RPC version >= 2, the server will reject the request with an InvalidRequestException
.
Lastly, to accommodate these updates, we also propose bumping the ConsumerGroupHeartbeat RPC
from version 1 to version 2. This version upgrade reflects the new requirement for client-generated member IDs. It's important to note that despite these behavioral changes, there are no modifications to the existing fields themselves.
Proposed Changes
To implement the changes proposed in the KIP, the client might need to generate UUIDs. The following specifications serve as guidelines for users who choose to use UUIDs as the member ID:
UUID Generation Specifications
UUID Version:
We recommend using UUID version 4, which generates UUIDs based on random values. This ensures a high degree of uniqueness and is consistent with Kafka’s Uuid class implementation.
Uniqueness:
To ensure the uniqueness of the UUIDs, our implementation should avoid generating any reserved UUIDs, such as all-zero or specific predefined values. This is consistent with Kafka’s approach, where the Uuid class avoids returning certain reserved UUIDs like 00000000-0000-0000-0000-000000000000.
Encoding:
We recommend using base64 encoding for representing UUIDs as strings. This encoding method is space-efficient and URL-safe, making it easier to store and transmit UUIDs in systems where compactness and readability are important, consistent with Kafka’s approach.
Member ID Lifecycle
The consumer instance must generate a member ID when it starts, and this ID should remain consistent for the entire lifetime of the process. The member ID acts as an incarnation ID of the process and should not be reset or changed, even if the consumer leaves and rejoins the group. It must remain the same until the process is completely stopped or terminated.
Duplicate Member ID
Since we allow the client to generate the member ID themselves, duplicate member IDs might be a concern. However, as mentioned in the Motivation section, with improvements in third-party libraries like librdkafka
, UUID generation has become easier today. We believe it is not difficult for the client to generate a unique ID, and moreover, the scope is limited to the consumer group. Therefore, we believe the risk of member ID collision within a group is negligible.
Compatibility, Deprecation, and Migration Plan
The change is backward compatible because version 0 of the ConsumerGroupHeartbeat RPC
already supports a member ID provided by the client. If a client using an older version sends a heartbeat, it will still follow the rules outlined in KIP-848#Member ID. Newer clients should generate the member ID on the client side because this approach reduces dependency on the server for ID management, enhances scalability by distributing the responsibility of unique ID generation.
Test Plan
The objective of the test is to ensure that the client using the newer protocol correctly and securely generates a member ID.
At the same time, ensure backward compatibility with older versions of the ConsumerGroupHeartbeat RPC
.
According the objective, we should have the test scenarios as the followings:
- Retry Mechanism
- Ensure the client retries with the same member ID if a heartbeat request fails.
- Consistency
- Test that the client maintains the same member ID throughout the entire session and doesn't generate a new member ID midway.
- Error Handling
- No member ID is provided by the client: The server should reject the request with an
InvalidRequestException
and log an error indicating the missing member ID.
- No member ID is provided by the client: The server should reject the request with an
- Backward Compatibility
- Ensure that clients using older versions of the RPC (where the server generates the member ID) still function correctly.
- Mixed-Version
- Test scenarios where multiple clients, using both the new and old versions RPC, are communicating with the server.
Rejected Alternatives
1.Temporary ID for Consumer Identification
Introduce a unique temporary ID generated by the consumer to be used for identification before member ID allocation. Add a new field in ConsumerHeartbeatRequestData
to attach this ID. Upon receiving the initial join heartbeat request, the broker generates the member ID and maps the temporary ID to the member ID. This map helps identify leave heartbeat requests by the temporary ID in the described scenario. Once the consumer receives the allocated member ID, the temporary ID : member ID entry is removed from the map to prevent memory leaks.
Reason for Rejection
This solution requires adding a new field to the current protocol, introducing the complexity of backporting. Also ensuring the timely and accurate removal of temporary IDs to avoid memory leaks adds another layer of complexity to the implementation.
2.Server-side Generate the UUID at First Heartbeat but without adding the member to the group
In this approach, the server generates a UUID during the first Heartbeat sent by the client, but does not yet add the member to the group. The first Heartbeat would return the new member ID, a member epoch of zero, and a heartbeat interval of zero, prompting the client to immediately send another Heartbeat.
Reason for Rejection
While this approach was rejected because if a client leaves the group after receiving the first Heartbeat and the member ID, the server would respond with an "unknown member ID" error since the member is not yet officially part of the group. Overall, the client-side generated member ID is a simpler and more reliable solution.