You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status 

Current stateUnder Discussion

Discussion thread: here 

JIRA: KAFKA-17116 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The core issue that this KIP addresses is the non-idempotency of the initial heartbeat in the new consumer group protocol.
When an AsyncKafkaConsumer sends a join group heartbeat but closes before receiving the response, it may attempt to send a leave group heartbeat without a member ID, leading to a failure with UNKNOWN_MEMBER_ID. This scenario can result in the broker registering a new member for which it will never receive a proper leave request.
Moreover, if the response to the initial heartbeat is lost, the client will retry the request, potentially creating a new member each time. This can lead to the creation of several "ghost" members that will persist until their session timeout elapses. During this period, these ghost members continue to hold onto partitions, causing delays in partition reassignment and inefficiencies in group management.

The main downsides of this current logic are as follows:

  • Stale Partition Assignments: If the member received partition assignments during the first Heartbeat, those partitions will not be re-assigned until the rebalance timeout expires. This occurs because the broker is waiting for the closed consumer to reconcile the partitions.
  • Ghost Members: Even if no partitions were assigned, the member will remain in the group from the broker's perspective but not from the client's point of view. The member will eventually be removed due to not sending Heartbeats, but this only happens when the session timeout expires.

These issues lead to inefficient partition management and potential delays in rebalancing, affecting the overall system performance and reliability. Therefore, it is crucial to address this behavior to ensure timely and accurate group membership management and partition rebalancing. By addressing the non-idempotency of the initial heartbeat, this KIP aims to resolve these issues, ensuring more accurate group membership management and timely partition rebalancing.

Last but not least, initially in KIP-848, we rejected the idea of allowing the client to generate its own Member ID due to concerns about additional dependencies and potential issues with correct ID generation. However, since then, improvements in libraries like librdkafka  have addressed these concerns. Given these changes, we now propose requiring client-side Member ID generation, as it simplifies the process and improves reliability.

Public Interfaces

To address the issue of non-idempotency in the initial heartbeat and improve reliability, we propose the following changes to the ConsumerGroupHeartbeat RPC:

In the new version of the ConsumerGroupHeartbeat RPC , the client must generate a UUID as the member ID during the initial heartbeat. This member ID must be included in every subsequent request to ensure consistency. The server will validate that a valid UUID is provided in the member ID field. If the member ID is missing or invalid, the server will reject the request with an InvalidRequestException .

Last, to accommodate these updates, we also propose bumping the ConsumerGroupHeartbeat RPC from version 0 to version 1. This version upgrade reflects the new requirement for client-generated member IDs. It's important to note that despite these behavioral changes, there are no modifications to the existing fields themselves.

Proposed Changes

To implement the changes proposed in the KIP, the client will need to generate UUIDs. The simple specifications are as follows:

UUID Generation Specifications

  1. UUID Version:

    • We recommend using UUID version 4, which generates UUIDs based on random values. This ensures a high degree of uniqueness and is consistent with Kafka’s Uuid class implementation.

  2. Uniqueness:

    • To ensure the uniqueness of the UUIDs, our implementation should avoid generating any reserved UUIDs, such as all-zero or specific predefined values. This is consistent with Kafka’s approach, where the Uuid class avoids returning certain reserved UUIDs like 00000000-0000-0000-0000-000000000000.

  3. Encoding:

    • We recommend using base64 encoding for representing UUIDs as strings. This encoding method is space-efficient and URL-safe, making it easier to store and transmit UUIDs in systems where compactness and readability are important, consistent with Kafka’s approach.

Member ID Lifecycle

The client must generate a UUID to be used as the member ID, and this ID should remain consistent for the duration of the consumer's session. Here, a "session" is defined as the period from the consumer's first heartbeat until it leaves the group, either through a graceful shutdown, a heartbeat timeout, or the process stopping or dying. The client should reuse the same UUID as the member ID for all heartbeats and rejoin attempts to maintain continuity within the group.

If a conflict arises where the member ID (UUID) generated by the client is detected to be a duplicate within the same group (for example, the same UUID is associated with another active member in the group), the server will handle this by comparing the memberEpoch values of the conflicting members. The member with the lower memberEpoch is considered outdated and will be fenced off by the server. When this occurs, the server responds with a FENCED_MEMBER_EPOCH error to the client, signaling it to rejoin the group with the same member ID while resetting the memberEpoch to zero. This ensures that the client properly resynchronizes and maintains the continuity and consistency of the group membership.

Compatibility, Deprecation, and Migration Plan

The change is backward compatible because version 0 of the ConsumerGroupHeartbeat RPC already supports a member ID provided by the client. If a client using an older version sends a heartbeat, it will still follow the rules outlined in KIP-848#Member ID. Newer clients should generate the member ID on the client side because this approach reduces dependency on the server for ID management, enhances scalability by distributing the responsibility of unique ID generation. 

Test Plan

The objective of the test is to ensure that the client using the newer protocol correctly and securely generates a UUID and provides this UUID as the member ID.
At the same time, ensure backward compatibility with older versions of the  ConsumerGroupHeartbeat RPC.
According the objective, we should have the test scenarios as the followings:

  • UUID Generation
    • Ensure that the client correctly generates a UUID as the member ID during the initial Heartbeat.
    • Verify that the UUID remains consistent across all subsequent heartbeats during the session.
  • Retry Mechanism
    • Ensure the client retries with the same UUID if a heartbeat request fails.
  • Consistency
    • Test that the client maintains the same UUID throughout the entire session and doesn't generate a new UUID midway.
  • Error Handling
    • No member ID is provided by the client: The server should reject the request with an InvalidRequestException and log an error indicating the missing member ID.
    • An invalid UUID is provided: The server should validate the UUID format and reject the request with an InvalidRequestException if the UUID does not conform to the expected format. The client should handle this by generating a correct UUID and retrying the request.
    • A member ID mismatch occurs within a session: If the server detects a mismatch between the provided member ID and the expected member ID for an ongoing session, it should return a UNKNOWN_MEMBER_ID  error.
  • Backward Compatibility
    • Ensure that clients using older versions of the RPC (where the server generates the member ID) still function correctly.
  • Mixed-Version
    • Test scenarios where multiple clients, using both the new and old versions RPC, are communicating with the server.

Rejected Alternatives

1.Temporary ID for Consumer Identification

Introduce a unique temporary ID generated by the consumer to be used for identification before member ID allocation. Add a new field in ConsumerHeartbeatRequestData to attach this ID. Upon receiving the initial join heartbeat request, the broker generates the member ID and maps the temporary ID to the member ID. This map helps identify leave heartbeat requests by the temporary ID in the described scenario. Once the consumer receives the allocated member ID, the temporary ID : member ID entry is removed from the map to prevent memory leaks.

Reason for Rejection

This solution requires adding a new field to the current protocol, introducing the complexity of backporting. Also ensuring the timely and accurate removal of temporary IDs to avoid memory leaks adds another layer of complexity to the implementation.

2.Server-side Generate the UUID at First Heartbeat but without adding the member to the group

In this approach, the server generates a UUID during the first Heartbeat sent by the client, but does not yet add the member to the group. The first Heartbeat would return the new member ID, a member epoch of zero, and a heartbeat interval of zero, prompting the client to immediately send another Heartbeat.

Reason for Rejection

While this approach was rejected because if a client leaves the group after receiving the first Heartbeat and the member ID, the server would respond with an "unknown member ID" error since the member is not yet officially part of the group. Overall, the client-side generated member ID is a simpler and more reliable solution.




  • No labels