This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-447: Producer scalability for exactly once semantics
Skip to end of metadata
Go to start of metadata

Status

Current stateDraft in progress

Discussion thread

JIRA

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Exactly one semantics (EOS) provides transactional message processing guarantees. Producers can write to multiple partitions atomically so that either all writes succeed or all writes fail. This can be used in the context of stream processing frameworks, such as Kafka Streams, to ensure exactly once processing between topics.

In Kafka EOS, we use the concept of a "transactional Id" in order to preserve exactly once processing guarantees across process failures and restarts. Essentially this allows us to guarantee that for a given transactional Id, there can only be one producer instance that is active and permitted to make progress at any time. Zombie producers are fenced by an epoch which is associated with each transactional Id. We can also guarantee that upon initialization, any transactions which were still in progress are completed before we begin processing. This is the point of the initTransactions() API.

The problem we are trying to solve in this proposal is a semantic mismatch between consumers in a group and transactional producers. In a consumer group, ownership of partitions can transfer between group members through the rebalance protocol. For transactional producers, assignments are assumed to be static. Every transactional id must map to a consistent set of input partitions. To preserve the static partition mapping in a consumer group where assignments are frequently changing, the simplest solution is to create a separate producer for every input partition. This is what Kafka Streams does today.

This architecture does not scale well as the number of input partitions increases. Every producer come with separate memory buffers, a separate thread, separate network connections. This limits the performance of the producer since we cannot effectively use the output of multiple tasks to improve batching. It also causes unneeded load on brokers since there are more concurrent transactions and more redundant metadata management.

Proposed Changes

We argue that the root of the problem is twofold:

  1. Unlike consumer group state which is centralized on a single group coordinator, transaction state is distributed across multiple coordinators. There is no central point for transaction state coordination.
  2. Transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes.

Our proposal is to make transaction coordinators aware of consumer group assignments. Rather than distributing the transactional state by routing every transactional Id to a separate coordinator, we will use the consumer group id to identify a single transaction coordinator which is responsible for managing the state. This gives the coordinator the ability to understand which partitions are being consumed by each member of the group. This can then be used to safely coordinate assignment changes.

We use the initTransactions API currently in order to fence producers using the same transactional Id and to ensure that previous transactions have been completed. We will add a new initTransactions API which accepts the set of assigned partitions and the associated consumer group Id. This will be passed to the transaction coordinator in the InitProducerId call, and will be stored with the other transaction state.

Essentially the problem we are trying to solve is making the coordinator aware of the dependencies between processes that come as a result of partition reassignment. When handling the InitProducerId request, the coordinator will use the previous partition assignment of the consumer group to check which transactions need to be completed before it is safe to begin processing. The coordinator will then ensure that only one producer for each assigned partition is allowed to make progress at any time.

Public Interfaces

The main addition of this KIP is a new variant of the current initTransactions API which provides the set of partitions that were assigned in the latest rebalance.

interface Producer {
  /**
   * Initialize transactional state for the producer with the partitions assigned
   * in the consumer group rebalance. This call ensures that any transactions
   * involving committed offsets from the set of input partitions must be completed
   * before this call returns. 
   *
   * Unlike the no-arg initTransactions() API, this can be called multiple times
   * on the same instance. Typically it should be called immediately after receiving
   * a new partition assignment from the group coordinator.
   */
  void initTransactions(Set<TopicPartition> inputPartitions, String consumerGroupId, int generationId);
}

There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

  • The first is that it is safe to call this API multiple times. In fact, it is required to be invoked after every consumer group rebalance. 
  • The second is that it is safe to call after receiving a `ProducerFencedException`. If a producer is fenced, all that is needed is to rejoin the associated consumer group and call this new `initTransactions` API.

The key to this proposal is allowing a single transaction coordinator to see the assignments of all members in the group. It can then maintain the invariant that only one producer is allowed to make progress at any time for a particular input partition. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.

The schema for FindCoordinator does not need to change, but we need to add a new coordinator type

FindCoordinatorRequest => CoordinatorKey CoordinatorType
  CoordinatorKey => STRING
  CoordinatorType => INT8 // 0 -> Consumer group coordinator, 1 -> Transaction coordinator, 2 -> Transaction "group" coordinator

Below we provide the new InitProducerId schema:

InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  ConsumerGroupId => NullableString         // NEW
  ConsumerGroupGeneration => Int32          // NEW
  AssignedPartitions => [Topic [Partition]] // NEW
    Topic => String
    Partition => Int32

InitProducerIdResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

The new InitProducerId API accepts either a user-configured transactional Id or a consumer group Id and the set of assigned partitions. When a consumer group is provided, the transaction coordinator will check whether there are any ongoing transactions that include the assigned partitions. If there are, these transactions will be aborted and the corresponding producers will be fenced by bumping their epochs. Once transactions are complete, the call will return.

Fencing

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we include the consumer group generation. Once the coordinator observes a generation bump for a group, it will refuse to handle requests from the previous generation. The only thing other group members can do is call InitProducerId themselves. This in fact would be the common case since transactions will usually be completed before a consumer joins a rebalance.

In order to pass the group generationId to `initTransaction`, we need to expose it from the consumer. We propose to add an overload to onPartitionsAssigned in the consumer's rebalance listener interface:

public interface ConsumerRebalanceListener {
  default void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
    onPartitionsAssigned(partitions);
  }
}

With this proposal, the transactional id is no longer needed for proper fencing, but the coordinator still needs a way to identify producers individually as they are executing new transactions. There are two options: continue using the transactional id or use the producer id which is generated by the coordinator in InitProducerId. Either way, the main challenge is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we will keep this model unchanged. The producer must still provide a transactional Id, but now the only requirement is that it is defined uniquely for each producer in the application. It is no longer tied to exactly once guarantees.

We also need to change the on-disk format for transaction state in order to persist both the consumer group id and the assigned partitions. We propose to use a separate record type in order to store the group assignment. Transaction state records will not change.

Key => GroupId TransactionalId
  GroupId => String
  TransactionalId => String

Value => GenerationId AssignedPartitions
  GenerationId => Int32
  AssignedPartitions => [Topic [Partition]]
    Topic => String
    Partition => Int32

To be able to upgrade Kafka Streams application to leverage this new feature, a new config shall be introduced to control the producer upgrade decision:

StreamsConfig.java
public static boolean CONSUMER_GROUP_AWARE_TRANSACTION = "consumer.group.aware.transaction"; // default to false

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread.

Compatibility fencing

To fence an old producer accessing the same topic partition, we will introduce a new exception type:

Errors.java
CONCURRENT_PRODUCER_COMMIT(85, "This producer attempted to commit offset to a topic partition which is owned by another producer in this generation.", ConcurrentProducerCommitException::new),

And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:

Errors.java
PENDING_TRANSACTION(85, "Could not consume from this topic partition due to pending transactions going on.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.


  String groupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order to ensure any
        // transactions involving committed offsets from the assigned partitions
        // have been completed
        producer.initTransactions(partitions, consumerGroupId, generationId);
      }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);
    producer.commitTransaction();
  }

The main points are the following:

  1. The new initTransactions API is used in the ConsumerRebalanceListener passed to subscribe.
  2. We no longer need to close the producer after a rebalance. We can call initTransactions multiple times.

Compatibility, Deprecation, and Migration Plan

This is a server-client integrated change, and it's required to upgrade the broker first with `inter.broker.protocol.version` to the latest. Any produce request with higher version will automatically get fenced because of no support. If this is the case on a Kafka Streams application, you will be recommended to unset `CONSUMER_GROUP_AWARE_TRANSACTION` config as necessary to just upgrade the client without using new thread producer.

To make the upgrade completely compatible with current EOS transaction semantics, we need to be able to distinguish clients who are making progress on the same input source but using different transactional id. It is possible to have two different types of clients within the same consumer group. Imagine a case as a Kafka Streams applications, where half of the instances are using old task producer API, while the other half of them use new consumer group API. This fencing could be done by leveraging  `AddOffsetsToTxnRequest` which contains a consumer group id and topic partitions. Group coordinator could build a reverse mapping from topic partition to producer.id, and remember which producer has been contributing offset commits to each specific topic partition. In this way, when we upgrade to the new API, group coordinator will be actively checking this map upon receiving `AddOffsetsToTxnRequest`. If the stored `producer.id` doesn't match the one defined in request, coordinator would send out a `ConcurrentProducerCommitException` to stored producer by aborting any ongoing transaction associated with it. This ensures us a smooth upgrade without worrying about old pending transactions.

Besides an active fencing mechanism, we also need to ensure 100% correctness during upgrade. This means no input data should be processed twice, even though we couldn't distinguish the client by transactional id anymore. The solution is to reject consume offset request by sending out PendingTransactionException to new client when there is pending transactional offset commits, so that new client shall start from a clean state instead of relying on transactional id fencing. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the worst case for availability is just hitting the transaction timeout which should be trivial. Compared with other fancy fencing logic based on topic partition ownership, we believe this trade-off is a good deal.

Rejected Alternatives

  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization:


  • No labels

3 Comments

  1. QQ: does the upgrade of the FindCoordinator API means we would not support producers migrating from the old API to the new APIs, since their coordinator will be different then?

  2. Guozhang Wang By migrating, do you mean consolidating the old transaction log into new transaction log? Since we don't have the topic partition ownership info in old transaction log, this seems not easy to achieve without manual process.

  3. Also we will be handling transaction requests with one single coordinator, shall we pick the same broker for group coordinator or a different one? Will this new transaction coordinator become a hotspot if we redirect all the transaction requests/log pending to it?