- Current Gobblin Kafka High Level Consumer uses Kafka Consumer (0.8) APIs and Gobblin support for them will be deprecated. The Re-design's primary goal is to replace old Kafka consumer APIs like ConsumerConnector and MessageAndMetadata with a Gobblin Kafka consumer abstraction GobblinKafkaConsumerClient.
- The old design uses Kafka auto commit feature which can cause potential loss of messages when offsets are committed and the system fails before messages are processed. To address this, the API has been enhanced to support committing offsets manually.
- Consumer throughput can be increased by separating consumption and processing so they can be scaled individually.
Full Implementation of High level consumer can be found here
GobblinKafkaConsumerClient is a simplified, generic wrapper client to communicate with Kafka. This class does not depend on classes defined in kafka-clients library. This allows Gobblin job constructs to work with different versions of Kafka. Concrete classes implementing this interface use a specific version of kafka-client library. High level consumer stores topic partition and offset info AFTER the messages are processed and commits them periodically to Kafka. This ensures at-least once delivery in case of a failure. Additionally, APIs are provided to subscribe to a topic along with a GobblinKafkaRebalanceListener that provides hooks to when a consumer joins/leaves a consumer group. In this case, we commit remaining offsets and clear offset caches. (See interface definition in next section)
The new design consists of 3 parts
Separate thread that consumes from Kafka by calling consumerClient.poll() and adds messages to queues. Each message partition is hashed such that messages from a particular partition land on the same queue to save ordering from the same partition.
A pool of threads (usually number of partitions), each of which process a queue indefinitely. When a message is polled from the queue, it is sent for processing. After the message is processed, the partition and latest offsets are recorded. This guarantees that only offsets for messages that have been processed will be committed.
- Manual offset commit
A separate thread that periodically(configurable) commits the latest offsets to Kafka. A separate thread ensures that we are not committing too frequently.
New or Changed Public Interfaces:
The API has been enhanced to allow manual committing of offsets (commitOffsetsAsync & commitOffsetsSync) and to subscribe to a topic with a GobblinConsumerRebalanceListener
Added getTopic(), getOffset(), getPartition() to that the HighLevelConsumer can have access to these to allow recording of partiton with latest offset.
Listener that provides hooks to when a consumer joins/leaves a consumer group. In this case, we commit remaining offsets and clear offset caches
Full class can be found here
- An alternative considered was to let each queue thread commit offsets separately but this could lead to committing offsets too frequently and bog down the system.