You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

 

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when multiple producers can write to the same partition, the resulting state of the log is hard to predict or control: it's possible for a producer to end up writing duplicate messages to Kafka, or have multiple tasks write to the same commit log and create an inconsistent state.

This document proposes adding a simple concurrency primitive to the Kafka producer. The producer is allowed to attach an 'expected offset' to each published message — and if that offset does not match the upcoming offset at the time the message is appended to the log, the broker refuses the publish request. Like the common check-and-set operation, this 'conditional publish' lets the client confirm the current state before going ahead with the write. 

Again like check-and-set, this only makes good progress when contention for a single partition is low. Fortunately, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition — and in many of these cases, the 'conditional publish' turns out to be quite useful.

  • A producer can re-send a message indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail.
  • It's possible to 'bulk load' Kafka this way, by appending messages consecutively to a partition. Even if the list is much larger than the buffer size or the producer has to be restarted, each message will be appended to the log just once.
  • If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write.

In all of these cases, the 'conditional publish' operation can support much stronger guarantees than the existing publish semantics.

Public Interfaces

The proposal requires a few additions to the public interface.

  • Add a new offset field to the ProduceRecord class, with a default value of -1.
  • Add a new check.expected.offsets property to the log config.
  • Add a new error code and exception type for an offset mismatch.

Proposed Changes

The current produce request already includes an offset for every message, which the server currently ignores. This proposal repurposes that field as the expected offset of the message. This requires two main changes:

  • Clients need an API to specify the expected offset of a message. This requires adding a new offset field to the existing produce record, and using the value of that field as the offset for that message in the outgoing message set. (Or the sigil value -1, if unspecified.)
  • The server needs to be modified to compare the expected offset to the upcoming offset for the matching partition, and refuse the produce request if the offsets don't match. This involves a couple additional checks at offset-assignment time, and a new error code and exception type to signal the error to the client.

This feature also requires a new config parameter, which can be enabled globally or per-topic.

The JIRA ticket includes a rough-draft patch, which should give a sense of the scope of the change.

Compatibility, Deprecation, and Migration Plan

This proposal 'hides' the offset check behind a per-log configuration flag. Without any config changes, new clients will work with the existing version of the server, and existing clients will work with the new server.

If the new configuration parameter is enabled, a user will also need to use the updated client — existing clients put arbitrary values in the offset field, so those produce requests might fail. Since the only reason to enable the config is to manually specify the offsets, which requires an up-to-date client anyways, this does not seem to be a serious limitation.

In a future major version of Kafka, it may make sense to have all clients default the offset to -1 and leave the offset check on for all partitions. However, this would be a breaking change.

Rejected Alternatives

Idempotent Producer or Transactional Messaging

A couple of designs or proposals exist for adding transactional or idempotency features to Kafka: Idempotent Producer and Transactional Messaging in Kafka. These proposals, if implemented, would likely overlap with the use-cases for this feature — for example, one can imagine using transactional messaging to implement a more reliable mirror maker.

However, this proposal is a much more conservative extension to the existing Kafka API; this makes the implementation very simple, and it imposes little to no runtime cost in memory or time. Even if these more elaborate coordination proposals are implemented, this feature would still be useful in cases where simplicity and performance are important.

Comparing Offsets by Key

A similar feature was suggested on the mailing list — instead of checking that the given offset was equal to the upcoming offset for the partition, it would check that the given offset was greater than the last offset for the message's key. This feature would have similar advantages to the current proposal, and would reduce contention in some situations: many producers could write to the same partition concurrently, as long as none of them sent messages with the same key.

Compared to the current proposal, this has a couple of downsides:

  • It requires an additional datastructure, of size proportional to the number of unique keys present in the log, which would need to be maintained and checkpointed.
  • There are other use cases where checking offsets at the partition level is important: for example, a KV store in Samza might use a partition as a commit log for a KV store... and it would want to ensure that no other task is writing to that partition, not just some particular key. (Of course, per-key or per-partition checking could be selected by a config flag.)

While this may make a good future extension, it seems simpler to exclude it from the first iteration.

  • No labels