Status

Current stateVoting

Discussion threadhere

JIRAKAFKA-12793

Motivation

When Kafka is used to build data pipeline in mission critical business scenarios, availability and throughput are the most important operational goals that need to be maintained in presence of transient or permanent local failure. One typical situation that requires Ops intervention is disk failure, some partitions have long write latency caused by extremely high disk utilization; since all partitions share the same buffer under the current producer thread model, the buffer will be filled up quickly and eventually the good partitions are impacted as well. The cluster level success rate and timeout ratio will degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit problematic partitions during transient failure. Similar approach is applied in other distributed systems and RPC frameworks.

Public Interfaces

New producer config option is added:


Add a ProducerMuteManager class that manages the partition metadata related to this mechanism

/**
 * A class which maintains mute of TopicPartitions. Also keeps the number of TopicPartitions accumulated-batches and in-flight requests.
 */
public class ProducerMuteManager implements Closeable {

    /**
     * Add mute of TopicPartition
     *
     * @param topicPartition
     */
    public void mute(TopicPartition topicPartition);

    /**
     * Remove muted of TopicPartition
     *
     * @param topicPartition
     */
    public void unmute(TopicPartition topicPartition);


    public boolean isMute(TopicPartition topicPartition);

    /**
     *  Return muted of TopicPartitions
     *
     * @return
     */
    public Set<TopicPartition> getMutedPartitions();

    public void close();

    /**
     *  Return the number of TopicPartition accumulated-batches requests
     *
     * @return
     */
    public Map<TopicPartition, Integer> getAccumulatedBatches();

    void setInFlightBatches(Map<TopicPartition, List<ProducerBatch>> inFlightBatches);

    /**
     * Return the number of TopicPartition in-flight requests
     *
     * @return The request count.
     */
    public Map<TopicPartition, Integer> getInFlightRequestCount();


Add a 'initialize' method in Partitioner class

public interface Partitioner extends Configurable, Closeable {

    /**
     * This method is called when the Producer is built to initialize the partition mute manager
     *
     * @param producerMuteManager
     */
    default void initialize(ProducerMuteManager producerMuteManager) {
    }

    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
}


Add a 'initialize' method in ProducerInterceptor class

public interface ProducerInterceptor extends Configurable, Closeable {

    /**
     * This method is called when the Producer is built to initialize the partition mute manager
     *
     * @param producerMuteManager
     */
    default void initialize(ProducerMuteManager producerMuteManager) {
    }

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
}

Proposed Changes

We propose to add a configuration driven circuit breaking mechanism that allows Kafka client to mute’ partitions when certain condition is met. The mechanism adds callbacks in Sender class workflow that allows to filtering partitions based on certain policy.

The client can choose proper implementation that fits a special failure scenario, Client-side custom implementation of Partitioner and ProducerInterceptor

Muting partitions have impact when the topic contains keyed message as messages will be written to more than one partitions during period of recovery. We believe this can be an explicit trade-off the application makes between availability and message ordering.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

The proposed solution is only beneficial to applications with Kafka clients upgraded to the new version. Large organizations almost surely have mixed clients which will not all be protected. Similar mechanism can also be implemented on the server side and benefit all clients regardless of their version. We argue that client-side circuit breaking and server side broker high availability are complementary instead of conflicting. On one hand it is not likely (or extremely expensive) to implement broker HA in the control plane; on the other hand we have also often seen client side mechanism used to mitigate network problem between client and broker.