This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-345: Introduce static membership protocol to reduce consumer rebalances

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  GroupInstanceId     => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members
  ThrottleTime 		  => int16
  ErrorCode			  => int16
  GenerationId		  => int32
  ProtocolName		  => String
  LeaderId			  => String
  MemberId			  => String
  Members	     	  => []JoinGroupResponseMember
						   MemberId 		=> String
						   GroupInstanceId  => String // new	
						   Metadata			=> bytes

SyncGroupRequest => GroupId GenerationId MemberId GroupInstanceId Assignments
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new
  Assignments         => []SyncGroupRequestAssignment
				           MemberId 		=> String
 						   Assignment	    => bytes

SyncGroupResponse => ThrottleTime ErrorCode Assignment
  ThrottleTime 		  => int16
  ErrorCode			  => int16
  Assignment		  => bytes

HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new

HeartbeatResponse => ThrottleTime ErrorCode Assignment
  ThrottleTime 		  => int16
  ErrorCode			  => int16

OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new
  Topics	          => []OffsetCommitRequestTopic
						   Name         => String
						   Partitions   => []OffsetCommitRequestPartition
											 PartitionIndex        => int32	
											 CommittedOffset       => int64	
											 CommittedLeaderEpoch  => int32	
											 CommitTimestamp       => int64
											 CommittedMetadata	   => String

OffsetCommitResponse => ThrottleTime Topics
  ThrottleTime 		  => int16
  Topics	          => []OffsetCommitResponseTopic
						   Name         => String
						   Partitions   => []OffsetCommitResponsePartition
											 PartitionIndex        => int32	
											 ErrorCode             => int16

LeaveGroupRequest => GroupId MemberIdentityList
  GroupId             => String
  MemberId            => String // removed
  MemberIdentityList  => []MemberIdentity // new
						   MemberId         => String
						   GroupInstanceId  => String		

In the meantime, for better visibility for static members, we are also going to bump DescribeGroup request/response protocol to include `group.instance.id`:

Code Block
DescribeGroupRequest => ThrottleTime Groups
  ThrottleTime           => int16
  Groups                 => []DescribeGroups
							  ErrorCode        => int16
							  GroupId          => String
							  GroupState       => String
							  ProtocolType     => String
							  ProtocolData     => int16
							  Members          => []DescribedGroupMember
									    			MemberId   => String
										    		GroupInstanceId  => String // new
											    	ClientId         => String							
										    		ClientHost       => String
											    	MemberMetadata   => bytes
    												MemberAssignment => bytes

Of course, we would bump the Join/Sync/Heartbeat/OffsetCommit/Leave/Describe group request/response versions by 1.

...

Code Block
languagejava
titleConsumerCoordinator.java
Map<String, ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions;

We shall also add a new public function to `Subscription` class in `PartitionAssignor` to get `group.instance.id`:

Code Block
languagejava
titlePartitionAssignor.java
class Subscription {
	...
	public Optional<String> groupInstanceId();
}

We are also introducing a new error type. Will explain the handling in the following section.

...