Status

Current state: replaced by KIP-382: MirrorMaker 2.0

Discussion thread: here and  and here

JIRA:

Pull Request: https://github.com/apache/kafka/pull/6171  replaced with: KIP-382: MirrorMaker 2.0 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

MirrorMaker 2.0 (KIP-382) needs to know the downstream offsets of replicated records in order to provide cross-cluster offset translation. Currently, WorkerSourceTask receives this information from KafkaProducer but throws it away. It's possible that other Connectors may benefit from this change, e.g. see KIP-381, which also proposes to notify SourceTasks of ACK'd records. In particular, this proposal makes it possible to distinguish when records have been durably stored vs when they have been skipped altogether by a SourceConnector.

Public Interfaces

The callback commitRecord() will be overloaded with an extra parameter:


public abstract class SourceTask implements Task {
---%<---


// existing method
public void commitRecord(SourceRecord sourceRecord) {
  // nop
}


// new method calls old one by default
public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
  commitRecord(sourceRecord);
}
---%<---


Proposed Changes

Currently, SourceTask includes a commitRecord() callback, which is invoked under these conditions:

The new overloaded version will be invoked instead, with recordMetadata null when there is no ACK. To preserve backwards compatibility, the default implementation will call the old method.

Compatibility, Deprecation, and Migration Plan

This is a new callback and won't affect existing code beyond an additional function call. 

Rejected Alternatives