Motivation:

  • Current Gobblin Kafka High Level Consumer uses Kafka Consumer (0.8) APIs and Gobblin support for them will be deprecated.  The Re-design's primary goal is to replace old Kafka consumer APIs like ConsumerConnector and MessageAndMetadata with a Gobblin Kafka consumer abstraction GobblinKafkaConsumerClient.
  • The old design uses Kafka auto commit feature which can cause potential loss of messages when offsets are committed and the system fails before messages are processed. To address this, the API has been enhanced to support committing offsets manually.
  • Consumer throughput can be increased by separating consumption and processing so they can be scaled individually.

Full Implementation of High level consumer can be found here

Proposed Change

GobblinKafkaConsumerClient is a simplified, generic wrapper client to communicate with Kafka. This class does not depend on classes defined in kafka-clients library. This allows Gobblin job constructs to work with different versions of Kafka. Concrete classes implementing this interface use a specific version of kafka-client library.  High level consumer stores topic partition and offset info AFTER the messages are processed and commits them periodically to Kafka. This ensures at-least once delivery in case of a failure. Additionally, APIs are provided to subscribe to a topic along with a GobblinKafkaRebalanceListener that provides hooks to when a consumer joins/leaves a consumer group. In this case, we commit remaining offsets and clear offset caches. (See interface definition in next section)

The new design consists of 3 parts

  • Consumption

Separate thread that consumes from Kafka by calling consumerClient.poll() and adds messages to queues. Each message partition is hashed such that messages from a particular partition land on the same queue to save ordering from the same partition.

  • Processing 

A pool of threads (usually number of partitions), each of which process a queue indefinitely. When a message is polled from the queue, it is sent for processing. After  the message is processed, the partition and latest offsets are recorded. This guarantees that only offsets for messages that have been  processed will be committed.

  • Manual offset commit

A separate thread that periodically(configurable) commits the latest offsets to Kafka. A separate thread ensures that we are not committing too frequently. 

New or Changed Public Interfaces:

GobblinKafkaConsumerClient

The API has been enhanced to allow manual committing of offsets (commitOffsetsAsync & commitOffsetsSync) and to subscribe to a topic with a GobblinConsumerRebalanceListener

Full interface can be found here and an implementation with Kafka (version 09) can be found here

public interface GobblinKafkaConsumerClient extends Closeable {
 
  /**
   * API to consume records from kafka
   * @return
   */
  default Iterator<KafkaConsumerRecord> consume() {
    return Collections.emptyIterator();
  }

  /**
   * Subscribe to a topic
   * @param topic
   */
  default void subscribe(String topic) {
    return;
  }

  /**
   * Subscribe to a topic along with a GobblinKafkaRebalanceListener
   * @param topic
   */
  default void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
    return;
  }

  /**
   * Commit offsets manually to Kafka asynchronously
   */
  default void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
    return;
  }

  /**
   * Commit offsets manually to Kafka synchronously
   */
  default void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
    return;
  }

  /**
   * returns the last committed offset for a KafkaPartition
   * @param partition
   * @return last committed offset or -1 for invalid KafkaPartition
   */
  default long committed(KafkaPartition partition) {
    return -1L;
  }

}

KafkaConsumerRecord

Added getTopic(), getOffset(), getPartition() to that the HighLevelConsumer can have access to these to allow recording of partiton with latest offset.

Full interface can be found here and and an implementation with Kafka (version 09) can be found here

/**
 * A kafka message/record consumed from {@link GobblinKafkaConsumerClient}. This interface provides APIs to read message
 * metadata. Extension interfaces like {@link DecodeableKafkaRecord} or {@link ByteArrayBasedKafkaRecord} provide APIs
 * to read the actual message/record.
 */
public interface KafkaConsumerRecord {

  /**
   * Offset of this record
   */
  public long getOffset();

  /**
   * Next offset after this record
   */
  public long getNextOffset();

  /**
   * @return Partition id for this record
   */
  int getPartition();

  /**
   * @return topic for this record
   */
  String getTopic();

}

GobblinConsumerRebalanceListener

Listener that provides hooks to when a consumer joins/leaves a consumer group. In this case, we commit remaining offsets and clear offset caches

Full class can be found here

/**
 * A listener that is called when kafka partitions are re-assigned when a consumer leaves/joins a group
 *
 * For more details, See https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
 */

public interface GobblinConsumerRebalanceListener {

  void onPartitionsRevoked(Collection<KafkaPartition> partitions);

  void onPartitionsAssigned(Collection<KafkaPartition> partitions);

}

Rejected Alternatives:

  • An alternative considered was to let each queue thread commit offsets separately but this could lead to committing offsets too frequently and bog down the system. 
  • No labels