Status

Current state: Accepted

Discussion threadhere

Voting threadhere

JIRA: KAFKA-10000 - Getting issue details... STATUS KAFKA-6080 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Background and References

KIP-98

KIP-98 added support for exactly-once delivery guarantees with Kafka and its Java clients.

The idempotent producer

One common source of duplicate records from applications that write to Kafka was automatic producer retries, where a producer would re-send a batch to Kafka under certain circumstances even if that batch had already been committed by the broker. KIP-98 allowed users to configure their producer to perform these retries idempotently, such that downstream applications would only see one instance of any message in an automatically-resent producer batch, even if the batch were sent to Kafka several times by the producer.

The transactional producer

Another major component of the user-facing changes from that KIP was the introduction of the transactional producer, which can create transactions that span multiple topic partitions and perform atomic writes to them. Furthermore, the transactional producer is capable of "fencing" out other producers, which involves claiming ownership over a transactional ID and barring any prior producer instances with the same transactional ID from being able to make any subsequent writes to Kafka. This is especially useful in preventing zombie instances of a client application from sending duplicate messages.

Terminology

"Fencing" is used here as a general term to describe disabling an older instance or group of instances of some type of application.

  • "Fencing" out a transactional producer means disabling that producer from being able to perform any further writes to Kafka (one way to do this, for example, is to instantiate a new transactional producer with the same transactional ID)
  • "Fencing" out a generation of source tasks means disabling every source task in that generation from being able to produce any more source records to Kafka, or commit any more source offsets

"Source partition" has two meanings here. It can refer to:

  • A part of the external system that a source connector reads from and is able to assign to a single task. For example, a table in a database, or a topic partition in a Kafka source connector such as MirrorMaker 2
  • The key in a key/value pair that a source task provides to the Connect framework along with a source record (see the SourceRecord constructor, for example) to use for tracking progress in consuming from a source partition

"Source offset" can be used to refer to either the value in the above-mentioned key/value pair for which a "source partition" is the key, or it may refer to the entire key/value pair (in the same way that "offset" can be used to refer to either the combination of a topic partition and some position in it, or just the last coordinate in a topic/partition/offset trio)

Motivation

It's been possible for sink connectors to implement exactly-once delivery for a while now, and in systems with features like unique key constraints, even relatively easy. However, given Kafka's lack of such features that allow for easy remediation and/or prevention of duplicates, the same cannot be said for source connectors, and without framework-level adaptations, exactly-once delivery for source connectors remains impossible.

There are two key reasons for this:

Source offset accuracy: The Connect framework periodically writes source task offsets to an internal Kafka topic at a configurable interval, once the source records that they correspond to have been successfully sent to Kafka. This provides at-least-once delivery guarantees for most source connectors, but allows for the possibility of duplicate writes in cases where source records are written to Kafka but the worker is interrupted before it can write the offsets for those records to Kafka.

Zombies: Some scenarios can cause multiple tasks to run and produce data for the same source partition (such as a database table or a Kafka topic partition) at the same time. This can also cause the framework to produce duplicate records.

Another common source of duplicate records from source connectors is automatic producer retries. However, users can already address this by configuring source connectors to use an idempotent producer by the worker-level producer.enable.idempotence  or connector-level producer.override.enable.idempotence  properties.

In order to support exactly-once delivery guarantees for source connectors, the framework should be expanded to atomically write source records and their source offsets to Kafka, and to prevent zombie tasks from producing data to Kafka.

With these changes, any source connector for which each source partition is assigned to at most one task at a time, and which is capable of resuming consumption from the upstream source on startup based on the source offsets that prior tasks have provided to the framework, should be capable of exactly-once delivery.

Goals

  • Require as few per-connector changes as possible. The Connect ecosystem is fairly mature at this point and even something as lightweight as a new SourceTask  method would have to be applied to potentially dozens if not hundreds of connectors. The current proposal requires no such changes for existing connectors, as long as they use they use the source offsets API correctly.
  • Minimize configuration changes. The implementation of this feature may grow fairly complex, but it should be easy for users to enable and understand. KIP-98, for example, added exactly-once support for Kafka and its Java clients, and only exposed three producer properties and one consumer property to users. The current proposal only adds five user-facing configuration properties.
  • Minimize operational burden on users. It's likely that this feature will be used heavily and may even be enabled by default with a later release of Connect. A wide range of Connect deployment styles and environments should be supported with as little pain as possible, including but not limited to security-conscious and resource-conscious environments. The current proposal's requirements in security-conscious environments are well-defined and easy-to-anticipate, and it does not significantly alter the resource allocation story of Connect.
  • Minimize gotchas and potential footguns. Nobody likes fine print. If we can address a common use case for Connect users or Connect developers, we should; leaving things out for the sake of a simpler design or smaller changes should only be done as a last resort. Even if well-documented, known gaps in use cases (especially those that vary from connector to connector) may not be propagated to or understood by end users, and may be seen as a fault in the quality of this feature.
  • Overall, make this a feature that gives people joy to use, not pain. Jury's out on this one until the vote thread passes!

Public Interfaces

New properties

A single worker-level configuration property will be added for distributed mode:

Name

Type

Default

Importance

Doc

exactly.once.source.support 

STRING 

disabled 

HIGH 

Whether to enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled.

Permitted values are "disabled", "preparing", and "enabled". In order to safely enable exactly-once support for source connectors, all workers in the cluster must first be updated to use the "preparing" value for this property. Once this has been done, a second update of all of the workers in the cluster should be performed to change the value of this property to "enabled".

And four new per-connector configuration properties will be added:

Name

Type

Default

Importance

Doc

exactly.once.support STRING"requested" MEDIUM Permitted values are "requested" and "required". If set to "required", forces a preflight check for the connector to ensure that it can provide exactly-once delivery with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to Connect that they support this; in that case, documentation for the connector should be consulted carefully before creating it, and the value for this property should be set to "requested". Additionally, if the value is set to "required" but the worker that performs preflight validation does not have exactly-once support enabled for source connectors, requests to create or validate the connector will fail.
transaction.boundary STRING "poll" MEDIUM Permitted values are "poll", "connector", and "interval". If set to "poll", a new producer transaction will be started and committed for every batch of records that each task from this connector provides to Connect. If set to "connector", relies on connector-defined transaction boundaries; note that not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to create them with this property set to "connector" will fail. Finally, if set to "interval", commits transactions only after a user-defined time interval has passed.

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

transaction.boundary.interval.ms LONG null LOW If "transaction.boundary" is set to "interval", determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level "offset.flush.interval.ms" property.

Connector API expansions

Any newly-introduced interfaces, classes, etc. here will be added to the connect-api  artifact, since they will be part of the public API for Connect.

A new ExactlyOnceSupport  enum is introduced:

ExactlyOnce
package org.apache.kafka.connect.source;

/**
 * An enum to represent the level of support for exactly-once delivery from a source connector.
 */
public enum ExactlyOnceSupport {
    /**
     * Signals that a connector supports exactly-once delivery.
     */
	SUPPORTED,
    /**
     * Signals that a connector does not support exactly-once delivery.
     */
    UNSUPPORTED
}

The SourceConnector  API is expanded to allow developers to specify whether their connector supports exactly-once delivery:

SourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
     * The default implementation will return {@code null}.
     * @param connectorConfigs the configuration that will be used for the connector.
     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support,
     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the
     * connector cannot.
     */
    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
		return null;
    }
}

A new TransactionContext  interface is introduced:

TransactionContext
package org.apache.kafka.connect.source;

/**
 * Provided to source tasks to allow them to define their own producer transaction boundaries when
 * exactly-once support is enabled.
 */
public interface TransactionContext {
    /**
     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
     * is processed.
     */
    void commitTransaction();

    /**
     * Request a transaction commit after a source record is processed. The source record will be the
     * last record in the committed transaction.
     * @param record the record to commit the transaction after.
     */
    void commitTransaction(SourceRecord record);

    /**
     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
     * the records in that transaction will be discarded and will not appear in a committed transaction..
     */
    void abortTransaction();

    /**
     * Requests a transaction abort after a source record is processed. The source record will be the
     * last record in the aborted transaction. All of the records in that transaction will be discarded
     * and will not appear in a committed transaction.
     * @param record the record to abort the transaction after.
     */
    void abortTransaction(SourceRecord record);
}

The SourceTaskContext  interface is expanded to provide developers access to a TransactionContext  instance (Javadocs largely copied from existing docs on the SinkTaskContext::errantRecordReporter ):

SourceTaskContext
package org.apache.kafka.connect.source;

public interface SourceTaskContext {
	// Existing fields and methods omitted

    /**
     * Get a {@link TransactionContext} that can be used to define producer transaction boundaries
     * when exactly-once support is enabled for the connector.
     *
     * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to
     * maintain backward compatibility so they can also be deployed to older Connect runtimes
     * should guard the call to this method with a try-catch block, since calling this method will result in a
     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to
     * Connect runtimes older than Kafka 3.0. For example:
     * <pre>
     *     TransactionContext transactionContext;
     *     try {
     *         transactionContext = context.transactionContext();
     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
     *         transactionContext = null;
     *     }
     * </pre>
     *
     * @return the transaction context, or null if the user does not want the connector to define
     * its own transaction boundaries
     * @since 3.0
     */
	default TransactionContext transactionContext() {
		return null
 	}
}

A new ConnectorTransactionBoundaries  enum is introduced:

SourceConnector
package org.apache.kafka.connect.source;

/**
 * An enum to represent the level of support for connector-defined transaction boundaries.
 */
public enum ConnectorTransactionBoundaries {
    /**
     * Signals that a connector can define its own transaction boundaries.
     */
	SUPPORTED,
    /**
     * Signals that a connector cannot define its own transaction boundaries.
     */
    UNSUPPORTED
}

And the SourceConnector  API is expanded with a second new method to allow developers to specify whether their connector can define its own transaction boundaries:

SourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector can define its own transaction boundaries with the proposed
     * configuration. Developers must override this method if they wish to add connector-defined
     * transaction boundary support; if they do not, users will be unable to create instances of
     * this connector that use connector-defined transaction boundaries. The default implementation
     * will return {@code UNSUPPORTED}.
     * @param connectorConfigs the configuration that will be used for the connector
     * @return whether the connector can define its own transaction boundaries  with the given
     * config.
     */
    public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
		return ConnectorTransactionBoundaries.UNSUPPORTED;
    }
}

REST API pre-flight validation

When a user submits a new connector configuration today, Connect takes steps to ensure that the connector configuration is valid before writing it to the config topic. These steps are performed synchronously and, if any errors are detected or unexpected failures occur, reported to the user in the body and status of the HTTP response for their request.

This validation takes place on the following endpoints:

  • POST /connectors/
  • PUT /connectors/{connector}/config
  • PUT /connector-plugins/{connectorType}/config/validate

Some new pre-flight validation logic will be added for exactly-once logic that also applies to these endpoints (and any other endpoints in the future that perform similar pre-flight connector config validation).

When exactly-once support is required

If the exactly.once.support connector configuration is set to required , the connector's SourceConnector::exactlyOnceSupport  method will be queried.

If the result is UNSUPPORTED , then an error will be reported with the exactly.once.support  property stating that the connector does not provide exactly-once support with the given configuration.

If the result is null , then an error will be reported with the exactly.once.support  property stating that Connect cannot determine whether the connector provides exactly-once guarantees, and that the user should carefully consult documentation for the connector before proceeding by possibly setting the property to "requested".

If the result is SUPPORTED , no error will be reported.

When connector-defined transaction boundaries are required

If the transaction.boundary  connector configuration is set to connector , the connector's SourceConnector::canDefineTransactionBoundaries  method will be queried.

If the result is ConnectorTransactionBoundaries.UNSUPPORTED or null , then an error will be reported with the transaction.boundary  property stating that the connector does not support defining its own transaction boundaries, and the user will be encouraged to configure the connector with a different transaction boundary type.

If the result is ConnectorTransactionBoundaries.SUPPORTED , no error will be reported.

New metrics

Three new task-level JMX properties will be added:

MBean namekafka.connect:type=source-task-metrics,connector=([-.\w]+),task=([\d]+)

Metric nameDescription
transaction-size-min The number of records in the smallest transaction the task has committed so far.
transaction-size-max The number of records in the largest transaction the task has committed so far.
transaction-size-avg The average number of records in the transactions the task has committed so far.

Proposed Changes

Atomic offset writes

Offset reads

When exactly.once.source.support is set to enabled for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

Users will be not able to explicitly override this in the worker configuration with the property consumer.isolation.level  or in the connector configuration with the property consumer.override.isolation.level. If they attempt to do so, the user-provided value will be ignored and a warning message will be logged notifying them of this fact.

Offset (and record) writes

When exactly.once.source.support is set to enabled for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for that transaction to be committed, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

Users will be able to configure the transaction boundaries for connectors with the newly-introduced transaction.boundary connector configuration property (described in greater detail in the "Public Interfaces" section).

Once an offset commit is complete, if the connector is (implicitly or explicitly) configured with a separate offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

Task producers will be given a transactional ID of ${groupId}-${connector}-${taskId}, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). Users will not be able to override this with the worker-level producer.transactional.id  or connector-level producer.override.transactional.id  property. If they attempt to do so, the user-provided value will be ignored and a warning message will be logged notifying them of this fact.

The worker-level offset.flush.timeout.ms property will be ignored for exactly-once source tasks. They will be allowed to take as long as necessary to complete an offset commit, since the cost of failure at that point is to fail the source task. Currently, all source task offset commits take place on a single shared worker-global thread. In order to support source task commits without a timeout, but also prevent  laggy tasks from disrupting the availability of other tasks on the cluster, the worker will be modified to permit simultaneous source task offset commits.

It may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are some remedial actions that users can take to nurse their connector back to health: tune their producer configuration for higher throughput, increase the transaction timeout for the producers used by the connector, decrease the offset commit interval (if using interval-based transaction boundaries), or switch to the poll  value for the transaction.boundary  property. We will include include these steps in the error message for a task that fails due to producer transaction timeout.

SourceTask record commit API

The SourceTask::commit and SourceTask::commitRecord methods will be invoked immediately after each successful offset commit, including the end-of-life offset commit that takes place during task shutdown. First, commitRecord will be invoked for every record in the committed batch, followed by a call to commit. It is possible that the worker or task may suddenly die in between a successful offset commit and the completion of these calls.

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information provided to the Connect framework with each record it has produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Regardless of the value for the exactly.once.source.support property on a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets. For users and developers familiar with Kafka Streams, this is the major differentiating factor that makes a global offsets topic impossible.

Second, it closes a potential security loophole where malicious connectors can corrupt the offsets information available to other connectors on the cluster. This is preventable at the moment by configuring connectors with separate principals from that of the worker, and only granting write access to the offsets topic to the worker principal. However, since the same producer will now be used to both write offset data and write source task records, that approach will no longer work. Instead, allowing connectors to use their own offsets topics should allow administrators to maintain the security of their cluster, especially in multitenant environments.

Finally, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Configuration

Only the name of the per-connector offsets topic will be configurable by users. Other properties, such as the number of partitions and the replication factor, will be derived from the worker config using the behavior described in KIP-605.

Hosting Kafka cluster

Regardless of the value for the exactly.once.source.support property on a worker, if a connector configuration contains a value for the offsets.storage.topic  property, it will use an offsets topic with that name on the Kafka cluster that it produces data to (which may be different from the one that hosts the worker's global offsets topic).

Implicit usage

A per-connector offsets topic might be implicitly configured under certain circumstances. Specifically, this will occur (when exactly-once source support is enabled) for connectors whose configurations do not contain the offsets.storage.topic  property, but do contain an overridden bootstrap.servers  value that causes the connector to target a different Kafka cluster than the one that hosts the worker's global offsets topic.

Connectors like this will be said to be "implicitly configured" to use a separate offsets topic.

The name of this topic will be the same as the name of the global offsets topic, which is controlled by the offsets.storage.topic  property in the worker config.

Creation

If a connector is explicitly or implicitly configured to use a separate offsets topic but that topic does not exist yet, the worker will automatically try to create the topic before startup for any task or Connector  instances belonging to that connector. This will be done using an admin client constructed from the connector's configuration (using the various admin.override.*  connector properties, admin.* worker properties, and top-level worker properties in descending order of precedence), using the same logic that the worker already uses for its internal topics.

Smooth migration

When a separate offsets topic is created for the first time, it will naturally be empty. In order to avoid losing offset information that may be stored in the global offsets topic, when a connector or task instance requests offsets from the worker, it will be given a combined view of the offset information present in both its separate offsets topic and the worker's global offsets topic. Precedence will be given to offset information present in the separate offsets topic.

For example, if the offsets stored in the global offsets topic for a connector are:

Offsets present in worker's global offsets topic
{
  "partition": {
    "subreddit": "apachekafka"
  },
  "offset": {
    "timestamp": "4761"
  }
}
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2112"
  }
}

And the offsets in the connector's separate offsets topic are:

Offsets present in separate offsets topic
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2169"
  }
}
{
  "partition": {
    "subreddit": "grilledcheese"
  },
  "offset": {
    "timestamp": "489"
  }
}

The offsets passed to the connector by the worker will be:

Offsets presented to the task
{
  "partition": {
    "subreddit": "apachekafka"
  },
  "offset": {
    "timestamp": "4761" // No offset for this partition was present in the separate offsets topic, so the one in the global offsets topic is used instead
  }
}
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2169" // Preference is given to the offset for this partition that comes from the separate offsets topic
  }
}
{
  "partition": {
    "subreddit": "grilledcheese"
  },
  "offset": {
    "timestamp": "489"
  }
}

Zombie Fencing

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

An example of this record for a connector named “reddit-source” with 11 tasks would look like:

    key: “tasks-count-reddit-source”
value: {“tasks”: 11}

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, it will be safe for workers to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to safely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not be able to safely bring up tasks for the connector and will require a round of zombie fencing first (described below).

Zombie fencing by the leader

A new internal endpoint will be added to the REST API:

PUT /connectors/{connector}/fence

This endpoint will be secured by the session key mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints and will only be used for inter-worker communication; users should not query it directly. It will be available regardless of the value of the exactly.once.source.support  value for the worker.

When a worker receives a request to this endpoint, it will:

  1. Check to see if a rebalance is pending and, if it is, serve an HTTP 409 CONFLICT response. (This step is actually unnecessary and has been removed from the implementation; see KAFKA-15059 - Getting issue details... STATUS )
  2. Check to see if it is the leader. If it is not the leader, either forward the request to the leader or fail the request, using at most two hops. This two-hop strategy is already implemented in Connect for other REST endpoints that require forwarding of requests, so it is not described in great detail here.
  3. Verify that the connector in the request URL both exists and is a source connector; if not, fail the request.
  4. Read to the end of the config topic.
  5. If there is a task count record for the connector in the config topic after the latest set of task configs for the connector, serve an empty-bodied 200 response.
  6. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
    1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector. This will be done in parallel.
    2. If the leader receives a request to write new task configs for the connector during this period, the round of fencing will be immediately cancelled and an HTTP 409 CONFLICT response will be served.
  7. Write the new task count record for the fenced-out connector to the config topic
  8. Read to the end of the config topic to verify that the task count record has been written successfully.
  9. Serve an empty-bodied 200 response.

Step 6 will take place on a separate thread in order to avoid blocking the worker's herder tick thread (which is essential for detecting and handling rebalances, and performing the work for some REST requests). Once it is complete, steps 7 through 9 will be handled on the worker's herder tick thread, in order to ensure that the worker has an accurate view of whether new task configs for the connector have been submitted during step 6.

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. This same logic does not apply for multi-task connectors, even when the number of tasks is unchanged after a reconfiguration; for details on why, see the rejected alternative Non-generational task fencing.

Preparation for rebalance

When exactly.once.source.support  is set to enabled  and a new set of task configurations for a connector is detected, then workers will preemptively stop source tasks for that connector. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

This section was the result of a misunderstanding of the herder's rebalance mechanics, and is actually unnecessary. The worker already performs this preemptive stop for all reconfigured tasks (if using incremental rebalancing) or all tasks (if using eager rebalancing) before (re)joining the group during a rebalance.

Source task startup

When exactly.once.source.support  is set to enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

Before a worker starts a source task, it will first send a request to the leader's internal zombie fencing endpoint for the task's connector. If that request fails for any reason, the task will be marked as FAILED  and startup will be aborted.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker begins to start a source task (due to manual restart by the user, rebalance, reconfiguration, etc.)
  2. Worker issues a fencing request to the leader; if this request fails for any reason, the task is marked as FAILED  and startup is aborted
  3. Worker reads to the end of the config topic and verifies that a task count record is now present for the connector after the latest set of task configurations for the connector; if this is not the case, task startup is abandoned *
  4. If the connector is implicitly or explicitly configured to use a separate offsets topic:
    1. Worker attempts to create the topic (silently swallowing the error if the topic already exists); if this fails for a non-acceptable reason (using the same logic as the worker already does when creating its internal topics to differentiate between acceptable and non-acceptable errors), the task is marked as FAILED and startup is aborted
  5. Worker instantiates a transactional producer for the task
  6. Worker reads to the end of the config topic
  7. If a new set of task configurations has since been generated for the connector, task startup is abandoned *
    1. Otherwise, begin polling the task for data

* - If this happens, a new task will be automatically brought up in place of this task, in response to the new set of task configurations in the config topic. No action (such as restarting the task) will be necessary on the part of the user.

Leader access to config topic

When exactly.once.source.support  is set to preparing  or enabled  for a worker, if the worker is the leader of the cluster, it will now use a transactional producer to guarantee that at most one worker is capable of writing to the config topic at any time. In greater detail:

After a rebalance, if a worker discovers that it has become the leader of the cluster, it will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id property in their worker config, the user-provided value will be ignored and a warning message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a connector, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then. We also address existing edge cases in Connect where a zombie leader may write incorrect information to the config topic.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID. This same functionality is already possible by employing one or more transactional producers and invoking their initTransactions  methods, but is replicated on the admin client to make it easier to use for other cases (for more details, see the Fencing-only producers rejected alternative):

Admin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(transactionalIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

FenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

FenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link Admin#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

ACLs

The ACLs required for this new API will be the same as the ones required to use a transactional producer for each of the specified transactional IDs. Specifically, this amounts to grants for the Write  and Describe  operations on the TransactionalId  resource, and a grant for the IdempotentWrite operation on the Cluster  resource.

Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Limitations

  • Distributed mode is required; standalone mode will not be supported (yet) for exactly-once source connectors
  • Exactly-once source support can only be enabled for all source connectors or none; it cannot be toggled on a per-connector basis
  • In order to be viable for exactly-once delivery, connectors must assign source partitions to at most one task at a time (otherwise duplicate writes may occur)
  • In order to be viable for exactly-once delivery, connectors must use the Connect source offset API for tracking progress (otherwise duplicate writes or dropped records may occur)

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during zombie fencing

The leader will try indefinitely during a round of zombie fencing to write the task count to the config topic and verify that it has been written by reading it back from the topic after writing.

If this takes too long, the worker may fall out of the group and a new leader may be elected. If the (now-dethroned) leader becomes unblocked during this process and attempts to write to the config topic, it will have been fenced out by its replacement. If it has been blocked on reading from the config topic, it will wake up and finish serving a response. In the next iteration of the herder tick thread, it will discover that it has fallen out of the group and attempt to rejoin.

If the attempt to read from/write to the config topic does not block but instead fails for some reason, the leader will immediately leave the group and then attempt to rejoin, serving an HTTP 500 ERROR response containing a stack trace for the failure to interact with the config topic.

Cannot fence out producers during zombie fencing

The leader may be unable to fence out producers during a round of zombie fencing if, for example, it is unable to reach or locate a transaction coordinator, or the connector's admin principal lacks the ACLs necessary to fence out the producers for its tasks.

If this happens, the leader will serve an HTTP 500 ERROR response containing a stack trace for the failure to fence out the producer(s).

For some errors (such as those caused by insufficient ACLs), the stack trace will include a helpful error message containing instructions on how to remediate the failure (such as granting the connector's admin principal permission to fence out producers with a list or description of applicable transactional IDs).

The worker that receives this 500 response will mark the Task object FAILED in the status topic, and use the stack trace contained in the response to populate the trace  field for the status message.

Manual restart of fenced-out tasks

If a task fails because its producer has been fenced out, the user may try to manually restart that task through the REST API. If the worker’s view of the config topic is up-to-date, this will simply result in the restarted task replacing any existing instances of the same task, and exactly-once delivery guarantees will be preserved. However, if the worker has an outdated view of the config topic, restarting that task may compromise exactly-once guarantees if that task ends up producing data from a source partition that has already been assigned to a different task in the latest set of task configurations for the connector.

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with an HTTP 500 TIMEOUT response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it serve the usual HTTP 404 NOT FOUND response. If a rebalance becomes expected because of new information read from the config topic, an HTTP 409 CONFLICT response will be served.

Worker lag after being assigned source task but before instantiating producer

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topic
  2. A rebalance is triggered and worker L assigns task T to worker F
  3. Worker F receives its assignment and, after requesting a successful round of fencing from worker L, is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L assigns task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and, after requesting a successful round of fencing from worker L, is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

In this scenario, if a source partition initially assigned to task T is reassigned to a different task during the reconfiguration in step 5, the task instance created by worker F in step 8 will begin producing duplicate data and exactly-once delivery guarantees will be compromised.

This is possible because it is not guaranteed at the time of the fencing performed by the leader that all producers that could be active for a connector have been created yet. If a producer has not yet been created, it cannot be fenced out; if it is created after the fencing occurs, a different approach must be taken to ensure that no duplicate data is produced.

This case should be covered by the additional read to the end of the config topic by the worker after it has brought up a transactional producer for a source task. If the connector is reconfigured after the worker became blocked and a round of producer fencing has occurred, then the worker will bring up a transactional producer for its source task, but then discover the connector reconfiguration and abort startup. If the worker has brought up a transactional producer and then become blocked before completing its read to the end of the config topic, the round of producer fencing by the leader should fence out that producer and any subsequent restart attempts will block until/unless the worker is able to complete a read to the end of the config topic (and handle any rebalances necessitated by new information read from the topic in the process).

Accidental task commit of dropped messages

If a transactional producer is successfully instantiated, it will not attempt to contact the broker again until an attempt is made to actually send a record. This can lead to interesting behavior: the producer is brought up and able to make initial contact with the transaction coordinator, then the entire Kafka cluster dies, then the producer begins and commits a transaction without attempting to send any records, and no exception or other indication of failure is generated by the producer at any point in this sequence of events.

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery (either because exactly.once.source.support  is not set to enabled , or the worker is running an older version of Connect for which the feature is not available), the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker, non-leader producer to config topic

If anything besides a valid leader worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Before setting exactly.once.source.support  to preparing  or enabled , the producer principal for the worker must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

Describe

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite *

Cluster

Kafka cluster targeted by the Connect cluster

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Connector principal permissions

Producer

With exactly-once source support enabled, each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Describe

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

IdempotentWrite *

Cluster

Kafka cluster targeted by the connector.

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

Read

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Note that there is currently no use case for Connect that requires a consumer principal to be configured for source connectors. As a result, this proposed change technically introduces "new" configuration properties for source connectors: consumer-level overrides prefixed with consumer.override. , as described in KIP-458: Connector Client Config Override Policy.

Admin

With exactly-once source support enabled, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.
DescribeTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

If the (implicitly- or explicitly-configured) offsets topic for a connector does not exist yet, its admin principal must be given the following permissions on the Kafka cluster that will host the offsets topic:

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided and the connector targets the same Kafka cluster that the worker uses for its internal topics, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided and the connector targets the same Kafka cluster that the worker uses for its internal topics, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade(s)

At most two rolling upgrades will be required to enable exactly-once source support on a Connect cluster.

The first rolling upgrade will be to upgrade every worker to a version of Connect that can provide exactly-once source support, and to set exactly.once.source.support  to preparing .

The second rolling upgrade will be to actually enable exactly-once source support on each worker (by setting exactly.once.source.support  to enabled).

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them, and that no non-leader workers are able to write to the config topic during the upgrade.

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.support  to either disabled  or preparing  for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

Soft downgrade

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, all connectors and their tasks should continue to run and process data, thanks to the same logic that makes a rolling upgrade possible.

Hard downgrade

Offsets accuracy

Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it can still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or older than the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled. Without any writes to the global offsets topic, all records processed by a connector since a switch to a dedicated offsets topic would be re-processed after the downgrade and would likely result in a flood of duplicates. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be quite bad, so a little extra effort on the part of Connect to significantly reduce the fallout of downgrades in this case is warranted.

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, the worker will give precedence to the existing separate offsets topic, even if the data in that topic is stale and there is actually newer information present in the global offsets topic.

Two-step downgrade

The safest way to perform a hard downgrade is to follow the same steps for a rolling upgrade, but in reverse. Specifically:

Perform an initial rolling downgrade wherein exactly.once.source.support  is set to false  for every worker.

Perform a second rolling downgrade where each worker is modified to use an earlier version of Connect.

All connectors and tasks should continue to run and process data, thanks to the same logic that makes a rolling upgrade possible.

Single-step downgrade

If a hard downgrade is performed in a single-step rolling downgrade (i.e., after each worker is shut down, it is immediately downgraded to a lower version of Connect), some tasks may begin to fail as their workers will be unable to reach the internal fencing endpoint on workers that have already been downgraded. Once the downgrade is complete, it should be sufficient to restart these tasks in order to get them running again.

Additional configuration properties

The worker-level exactly.once.source.support  and connector-level offsets.storage.topic , transaction.boundary , exactly.once.support , and transaction.boundary.interval.ms  configuration properties introduced here should not violate backwards compatibility, as they all come with default values that preserve existing behavior. Technically, an existing connector may expose a offsets.storage.topic  configuration property that will now conflict with this newly-introduced framework property, but the risk is low enough to be acceptable.

The newly-introduced source connector consumer.override. -prefixed properties will also technically be backwards incompatible, but for the same reason that these properties were introduced in the first place for sink connectors, and the similar producer.override. - and admin.override. -prefixed properties were introduced for source connectors, this should be acceptable. The risk is low enough.

New Admin API

The new Admin  methods for fencing producers will be implemented using existing protocols (specifically, FindCoordinator and InitProducerId), so no changes to the Kafka binary protocol will be required. As a result, these new methods should work on any broker that supports those protocols.

Rejected Alternatives

Non-generational task fencing

Summary: instead of proactively fencing out task producers after Connector startup, allow each task to bring up its own producer and let it fence out older instances of itself in the process.

Rejected because:

  • This doesn't work for cases where a source partition is reassigned across tasks. For example, if task T1 of a connector is responsible for reading table A, and then the connector reassigns A to task T2, task T1 needs to be shut down completely before task T2 starts up. This can be referred informally to as the "source partition reshuffling problem".
  • This doesn't work for cases where the number of tasks in a connector is reduced. For example, if the number of tasks is brought down from 8 to 5, then tasks 6, 7, and 8 will never be fenced out by successors since none will be brought up.

Fencing during rebalance

Summary: instead of exposing an internal REST endpoint, perform zombie fencing automatically during rebalances.

Rejected because: tightly coupling zombie fencing and rebalancing makes fencing fairly heavyweight and complicates the rebalancing process. On top of that, if fencing fails and the user wants to retry the operation, requiring a rebalance for that retry attempt is unnecessarily high-cost.

Connector owners performing fencing and writing directly to config topic

Summary: instead of only allowing the leader to perform zombie fencing and write task count records and task configs to the config topic, let individual workers handle this responsibility. The owner of the Connector object for a connector would be given all three of these responsibilities.

Rejected because: in order to maintain the integrity of the config topic, it's imperative that only a single worker be able to access it at a time for a given connector. This could be accomplished by allowing each worker to write to the config topic with a transactional producer whose transactional ID is mapped in a 1:1 fashion from the name of the connector. However, if a rebalancing bug occurs and two non-zombie workers believe they both own the same Connector object, it's unclear how the cluster could gracefully recover from this, and it's likely that manual intervention by the user would be required.

Per-connector exactly-once property

Summary: either instead of or in addition to allowing exactly-once support to be configured on a worker-wide basis, allow it to be enabled on a per-connector basis.

Rejected because: even if well-behaved workers close the producers for tasks that lag on shutdown, zombie workers may still run tasks that the cluster has since reassigned to other workers. In this case, while the worker remains a zombie, manual intervention is required from the cluster administrator to shut down the tasks hosted on the zombie worker, either by shutting down the worker (possible today) or by explicitly revoking tasks from the worker (not possible today, but another option that would work). Requiring timely manual intervention on the part of the user in order to preserve delivery guarantees should be avoided if possible.

If a non-exactly-once source task were to become a zombie because it, its converter, one of its transforms, etc. became blocked, or because its worker became a zombie, then its connector were reconfigured to enable exactly-once support, the zombie task might eventually cause duplicate writes to occur. To make this scenario less likely, exactly-once support is provided in an all-or-nothing fashion that makes it impossible for non-transactional producers to stay lying around in a cluster after all of the workers have been upgraded and configured to provide exactly-once source support.

Deduplication of connector-provided records

Summary: track messages produced by source connectors and perform deduplication automatically.

Rejected because: this requires every message to be uniquely and deterministically identifiable, which is not possible in all cases (although this is likely possible in all cases where connectors properly utilize the offsets API provided by the Connect framework). When it is possible, deduplication can already be implemented by a downstream application such as Kafka Streams. Finally, it’s unclear how all prior IDs for records produced by a connector might be tracked and used efficiently by a Connect cluster without incurring a noticeable performance penalty.

Per-source partition producers

Summary: instead of using one producer for each task, use a producer for each source partition of the task.

Rejected because:

  • Although this would provide exactly-once support for most of the same connectors that the proposed design would (specifically, connectors that allocate each source partition to at most one task at a time), there are some cases that this would miss. This is because the assumption that the connector-specified source partitions that it gives to the Connect framework correspond to the actual source partitions that it consumes from (such as database tables) doesn't always hold. If, for example, a connector stores a version field in its source partitions, a change in that version field (either across connector versions or even during runtime for the connector) would allow zombie task instances for that connector to continue running and processing data, compromising exactly-once delivery.
  • Another issue with this approach is that the framework would only be able to instantiate a transactional producer for a source partition after a task communicates its intent to use that partition. There is no explicit mechanism for this communication right now. One implicit mechanism might be to create a transactional producer for a source partition the first time that a task produces data for that partition, but that wouldn't fence out zombie task instances early enough as they might be able to produce data in between when newer task instances read offset data (and choose where to begin consuming from the upstream system) and when they produce the first record for that data, which would then lead to duplicate record delivery. Another implicit mechanism might be to create a transactional producer for a source partition the first time that a task tries to read the offset for that source partition, but this would only provide exactly-once delivery guarantees for tasks that query the framework for source offsets of every source partition they write to before they perform that write. One common use case where this may not happen is if a task discovers a new object in the upstream system that it knows (by creation timestamp, for example) was created after the task was started; in this case, it's possible and even likely that a connector developer wouldn't see a need to check for source offsets for that object since it was just created.
  • The number of producers created for a connector could grow quite large, at which point, allocating enough buffer memory, broker connections, and other resources for each of these may have adverse impacts on the performance of the worker. This would also go against the pattern established by the Connect framework that, to scale a connector, its task count should be increased, as it would make the number of producers set up for a connector the same no matter if it’s running a single task or hundreds. This would be especially problematic if, for example, MirrorMaker 2 were set up to consume from 1000 slowly-moving topics with a single of task; if the average partition count per topic were 5, that'd lead to 5000 producers being created. This approach was initially used by Kafka Streams, but due to the scalability issues it presented, the follow-up KIP-447 has since been introduced to eliminate excessive construction of producers.
  • It's unclear exactly how a transactional ID could be derived from a source partition, since source partitions consist of structured data (specifically, a Java Map<String, ?> ). We could try to serialize these as JSON and then use the resulting string as the transactional ID (likely in conjunction with the name of the connector in order to avoid collisions with other connectors), but that would lead to some extremely ugly transactional IDs. A hash of the JSON-serialized source partition could be taken, but that would make it even harder to predict, and might even lead to collisions which, though unlikely, would be preferable to avoid.
  • The operational burden on users in security-conscious environments would be significantly heavier, as the transactional IDs required to run a connector would be difficult to predict and could dynamically expand at runtime depending on the implementation of the connectors being run. At best, per-connector documentation would be required to expose a mostly-internal detail so that users could understand exactly which transactional IDs their connector's producer would need.
  • Because no guarantee is made that prior generations of tasks are fenced out, it becomes possible for a task running on a worker with an outdated configuration for the connector to fence out a task running with a correct configuration. This would not compromise delivery guarantees, but may lead to some serious user ire if, for example, the connector were reconfigured with a different transformation chain, converter, or another property that altered the format of data produced by it.

Throwaway fencing-only producers

Summary: instead of expanding the Admin  API to add producer-fencing functionality, just instantiate a throwaway producer for the sole purpose of calling Producer::initTransactions .

Rejected because: this is a bit of an abuse of the producer API and using a dedicated admin API is cleaner. It requires no broker-side changes and will be fairly lightweight to implement. There also may be other legitimate use cases for fencing out producers without wanting to instantiate one in other client applications.

Cleanup of task count records on connector deletion

Summary: if a connector is deleted, perform a final round of producer fencing for its tasks before writing a tombstone task count record for the connector to the config topic.

Rejected because: in the worst case, the connector is never recreated and a worker (presumably the leader) will have expended unnecessary effort on fencing out producers that aren’t going to compromise delivery guarantees and won’t have any bigger consequences than such tasks would already have on a Connect cluster today. In the best case, the connector is recreated immediately, in which case, a fencing would be performed anyways if no task count tombstone record were written to the config topic. Ultimately, no work is saved by doing this fencing on connector deletion, and some work may be unnecessary.

Full migration of offsets topics

Summary: instead of falling back on the global offsets topic, when creating a per-connector offsets topic for the first time (or if there are no offsets for a connector present in a pre-existing per-connector offsets topic), copy all relevant offset information over from the global offsets topic into the new per-connector offsets topic, and rely solely on the content of the per-connector offsets topic.

Rejected because: this approach was significantly more complex, involved duplicating existing data, and provided little if any advantages over the current proposal.

Non-configurable transaction boundaries

Summary: instead of allowing users to configure the transaction boundaries for their connector, define all transaction boundaries the same way (by interval, poll batch, connector definition, connector with a fallback of poll, etc.).

Rejected because: there is no one-size-fits-all strategy for transaction boundaries that can be expected to accommodate every reasonable combination of connector and use case. Defining transactions on the batches returned from SourceTask::poll  would heavily limit throughput for connectors that frequently produce small record batches. Defining transactions on an interval would add a latency penalty to the records at the beginning of these transactions and, in the case of very large transactions, would inflate the memory requirements of downstream consumers (which would have to buffer the entire transaction locally within each topic-partition before beginning to process any of the records in it (this is actually incorrect; consumers do not have to buffer transactions locally)). And some connectors may have no reasonable way to define their own transaction boundaries at all.

Future Work

Per-connector granularity

We may want to enable exactly-once to be enabled on a per-connector basis. This could be implemented by instantiating each source task with a "fencible producer" that uses a transactional ID to write to Kafka, but does not actually produce records inside transactions. This way, if exactly-once becomes enabled for that connector, the first round of zombie fencing for it will be able to fence out all prior producer instances, even if they aren't using the traditional transactional producer.

Standalone mode support

Since the design for atomic writes of source records and their offsets relies on source offsets being stored in a Kafka topic, standalone mode is not eligible. If there is sufficient demand, we may add this capability to standalone mode in the future.

Enable by default

We may want to enable this feature by default after it has been available for several releases and it appears to be sufficiently stable.

More conservative task fencing

It may not be necessary to fence out all tasks from a previous generation if the division of source partitions among those tasks doesn't change. However, it's impossible to know the actual assignment of source partitions to tasks since there's no API for connectors to be able to provide this information to the Connect framework. An extension to the Connect framework to add this API could be made, but is left for future work. One important question to consider with such an API would be whether it would take into account configuration changes that are invisible to the connector itself, such as converter or transformation chain changes.

  • No labels