DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Since Kafka transactions protocol has been improved throught KIP-447: Producer scalability for exactly once semantics and Kafka Streams has already adopted eos-v2 by KIP-732: Deprecate eos-alpha and replace eos-beta with eos-v2 we should now drop support for the legacy transactions protocol (which doesn't use Consumer Group generation id and member id) in Kafka Producer.
Present API informs developers that they should avoid legacy protocol by marking void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, String consumerGroupId) deprecated, but still developers can easily use a non depreacated method void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) and pass there ConsumerGroupMetadata instance created by hand using non deprecated constructor.
This situation creates confustion when migrating the legacy applications, as developers might simply wrap a string consumer group id in a ConsumerGroupMetadata object and pass it to the void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) to avoid using deprecated APIs - but this will still lacks the necessary generation id and member id.
Users should not create instances of ConsumerGroupMetadata, and this class should not have any public constructor.
Public Interfaces
- org/apache/kafka/clients/consumer/ConsumerGroupMetadata
Proposed Changes
ConsumerGroupMetadata class should be an sealed interface, we should provide a new permitted class for interface implementation and use java default scope for class visibility.
public interface ConsumerGroupMetadata {
String groupId();
int memberEpoch();
String memberId();
Optional<String> groupInstanceId();
}
class DefaultConsumerGroupMetadata implements ConsumerGroupMetadata {
private final String groupId;
private final int memberEpoch;
private final String memberId;
private final Optional<String> groupInstanceId;
DefaultConsumerGroupMetadata(String groupId,
int memberEpoch,
String memberId,
Optional<String> groupInstanceId) {
this.groupId = Objects.requireNonNull(groupId, "group.id can't be null");
this.memberEpoch= memberEpoch;
this.memberId = Objects.requireNonNull(memberId, "member.id can't be null");
this.groupInstanceId = Objects.requireNonNull(groupInstanceId, "group.instance.id can't be null");
}
public String groupId() {
return groupId;
}
public int memberEpoch() {
return memberEpoch;
}
public String memberId() {
return memberId;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
@Override
public String toString() {
return String.format("GroupMetadata(groupId = %s, memberEpoch= %d, memberId = %s, groupInstanceId = %s)",
groupId,
memberEpoch,
memberId,
groupInstanceId.orElse(""));
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultConsumerGroupMetadata that = (DefaultConsumerGroupMetadata) o;
return memberEpoch== that.memberEpoch &&
Objects.equals(groupId, that.groupId) &&
Objects.equals(memberId, that.memberId) &&
Objects.equals(groupInstanceId, that.groupInstanceId);
}
@Override
public int hashCode() {
return Objects.hash(groupId, memberEpoch, memberId, groupInstanceId);
}
}
Compatibility, Deprecation, and Migration Plan
- Change is impacting all users using any available constructor of ConsumerGroupMetadata class.
- For Kafka 4.2.0, we should keep the ConsumerGroupMetadata class as-is for backward compatibility, but deprecate all constructors and update the JavaDocs to indicate this class will become an interface in the future.
- For Kafka 5.0, we do the actual code change from class to interface.
- Once Kafka clients stop supporting Java 11 we can mark the ConsumerGroupMetadata interface as sealed with only default implementation permitted.
Test Plan
Automated tests which are part of CI will cover necessary tests.
Rejected Alternatives
We can make all constructors in ConsumerGroupMetadata private and provide new static factory method which will be getting instance of group metadata from Kafka Consumer.
This approach is rejected because internal Kafka Consumer components (ConsumerCoordinat and AsyncKafkaConsumer) are coupled with ConsumerGroupMetadata class and decoupling them requires big code refactoring.
We can highlight two major hotspots:
1) ConsumerCoordinator is evaluating the onAssignment method from ConsumerPartitionAssignor which is a public interface - it means that even when we try to use some alternative internal implementation of ConsumerGroupMetadata in ConsumerCoordinator, it will fail at that point.
2) AsyncKafkaConsumer implements the public org.apache.kafka.clients.consumer.Consumer interface, so for implementing the ConsumerGroupMetadata groupMetadata() method, it has to be able to instantiate a ConsumerGroupMetadata object. Unfortunately, AsyncKafkaConsumer is in org.apache.kafka.clients.consumer.internals, so as you mentioned, package-private scope will not fly.
class DefaultConsumerGroupMetadata implements ConsumerGroupMetadata {
private final String groupId;
private final int memberEpoch;
private final String memberId;
private final Optional<String> groupInstanceId;
DefaultConsumerGroupMetadata(String groupId,
int memberEpoch,
String memberId,
Optional<String> groupInstanceId) {
this.groupId = Objects.requireNonNull(groupId, "group.id can't be null");
this.memberEpoch= memberEpoch;
this.memberId = Objects.requireNonNull(memberId, "member.id can't be null");
this.groupInstanceId = Objects.requireNonNull(groupInstanceId, "group.instance.id can't be null");
}
public String groupId() {
return groupId;
}
public int memberEpoch() {
return memberEpoch;
}
public String memberId() {
return memberId;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
@Override
public String toString() {
return String.format("GroupMetadata(groupId = %s, memberEpoch= %d, memberId = %s, groupInstanceId = %s)",
groupId,
memberEpoch,
memberId,
groupInstanceId.orElse(""));
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultConsumerGroupMetadata that = (DefaultConsumerGroupMetadata) o;
return memberEpoch== that.memberEpoch &&
Objects.equals(groupId, that.groupId) &&
Objects.equals(memberId, that.memberId) &&
Objects.equals(groupInstanceId, that.groupInstanceId);
}
@Override
public int hashCode() {
return Objects.hash(groupId, memberEpoch, memberId, groupInstanceId);
}
}