DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The interface provides commit() hook, and its documentation :
/**
* This method is invoked periodically when offsets are committed for this source task. Note that the offsets
* being committed won't necessarily correspond to the latest offsets returned by this source task via
* {@link #poll()}. Also see {@link #commitRecord(SourceRecord, RecordMetadata)} which allows for a more
* fine-grained tracking of records that have been successfully delivered.
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
*/
public void commit() throws InterruptedException {
// This space intentionally left blank.
}
explicitly mentions that this hook :
will be run periodically when
offsetsare committed.does not guarantee that offsets being committed will necessarily be the latest offsets delivered by this source task’s
poll().
ie, with commit(), periodically, Task may get to know some offsets were committed to Offsets-topic. But it does not describe which offsets. Which makes it futile. Surprisingly.
What's missing ?
SourceTask interface provides another hook - commitRecord(*). This hook is invoked by Kafka Producer thread for each record, exactly once iff corresponding SourceRecord was successfully published to KafkaTopic. For most Source Connector designs, this hook can be used to cache progress for every record and signal progress to external system (when deemed appropriate).
For Advanced Source Connector designs, this hook alone does not provide the following information :
No ordering guarantee in commitRecord() invocations
KafkaProducer thread will invoke commitRecord(*) after successful delivery of each record. Consider the following example when a SourceTask fetches following records from one JMS Queue :
(format - Record:Offset , where Record = Data, Position = Message position in JMS Queue)
A1:0, B1:1, A2:2, B2:3, A3:4, B3:5 ….
User then configures an SMT that routes A* records to TopicA and B* records to TopicB .
KafkaProducer (with this SourceTask) will deliver records to TopicA and TopicB respectively - potentially to different Kafka Brokers. This may result in delivery of A1 (position:0) after B3 (position:5).
ie, KafkaProducer Thread might invoke commitRecord(*) for B3 before invoking it for A1.
SourceTask must explicitly check and wait for commitRecord invocation for every record before B3, and only then must it signal position:5 (with B3).
This explicit tracking can be done via SourceTasks. This behaviour is non-obvious for end-users and easy to miss, and explicit tracking in async designs can make them more complicated than necessary.
No information on commit of corresponding offset
This is not a short-coming of commitRecord(*) hook, and is not generally required for simpler SourceConnectors. Currently, either by commitRecord(*) or commit(), Task does not get to know exactly when can offset of a specific SourceRecord can be considered committed.
This is much needed for SourceConnector that talk to external-system that maintain some watermark (for cleanup). These connector maintain offsets in OffsetTopic + signal low-watermark to external systems. Generally, low-watermark cannot be sent out for every record, and publishing offsets at record level is more suitable. When task is restarted, it must start from last committed offset. So, watermark must always be lower than last-committed-offset. Thus watermark must only be updated based on latest committed offset (and not based on latest SourceRecords published to KafkaTopic).
Example: Debezium PostgresSourceConnector
During commit(), PostgresConnectorTask explicitly :
reads the offsets from offset-topic (codeRef) [which were just committed before
commit()was called]figures out LSN to be flushed back to Postgres from offsets read (codeRef).
Ticket → https://issues.redhat.com/browse/DBZ-7816
Public Interfaces
Changes in SourceTask interface
/**
* This method is invoked periodically when offsets are committed for this source task. Note that the offsets
* being committed won't necessarily correspond to the latest offsets returned by this source task via
* {@link #poll()}. Also see {@link #commitRecord(SourceRecord, RecordMetadata)} which allows for a more
* fine-grained tracking of records that have been successfully delivered.
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* <p>
* Note: Exceptions thrown by this method will be logged and ignored, and will not result in task failure.
*
* @param latestCommittedOffsets Latest committed source-offsets produced by this task. For each key ({SourceRecord.sourcePartition}),
* the associated value ({SourceRecord.sourceOffset}) represents the most recent committed source-offset.
* This ensures that all preceding sourceOffsets, as delivered by task.poll() for the same sourcePartition,
* have also been committed.
* @throws InterruptedException
*/
public void commit(Map<Map<String, Object>, Map<String, Object>> latestCommittedOffsets) throws InterruptedException {
this.commit();
}
commit() will not be deprecated.Proposed Changes
Current behaviour :
Record Tracking
Before submitting records to producer, WorkerSourceTask will buffer sourceRecords in SubmittedRecords class. SubmittedRecords class stores offsets data in a Map (records), where :
key: sourcePartitionvalue:DequeofSubmittedRecordobjects.SubmittedRecordstoressourcePartitionandsourceOffsetof record.
ie, in records for every sourcePartition, SubmittedRecord objects are stored in order of their submission. (code-ref).
In producer.send(*) callback, corresponding SubmittedRecord is ack’d by producer-thread. (code-ref). As we’ve seen before, this ack per message could be out-of-order.SubmittedRecord is then used to fetch ordered list of offsets (ordered per sourcePartition) using committableOffsets() method. (ref).
committableOffsets() will always return an ordered list. Even if 100th element is ack’d, it will not be returned until all elements before it are also ack’d.Offset commit
Offset commits happen periodically in WorkerSourceTask. An independent offset-committer-thread will get latest ack’d SubmittedRecords (committableOffsets) and write these offsets to offsets-topic.
If offsets are written to offsets-topic successfully, this thread will invoke task.commit() (code-ref).
Proposed behaviour :
This wiki proposes that committableOffsets can be passed as a parameter to task.commit() , which can then be used by Task to understand set of offsets published until this point, such that these offsets are :
Most recently committed offsets (by this task).
For every sourceOffsets (per sourcePartition), all sourceOffsets before it (in the order they were delivered by
task.poll()) were also committed.
How is order maintained ?
As we’ve seen before, task.commit() simply uses committableOffsets , which will always provide offsets in the order they were delivered by task.
Notes
While the proposed commit() hook will be useful for source connectors, users should be aware that it must not be used when :
SourceTasks need to read global offsets. They must read
offset-topic.commit()hook proposed can only be used by SourceTasks to detect latest committed offsets written by itself.SourceTasks needs to read offset of latest record that was committed to Kafka. SourceTasks must instead use
commitRecord().
Compatibility, Deprecation, and Migration Plan
No breaking changes.
Changes are backward compatible. Existing SourceTask implementations will not break. .