Status

Current state: Accepted

Discussion thread: here

JIRAKAFKA-3846

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Timestamps were added to Kafka record types in the 0.10 release (KIP-32), however this does not get propagated automatically to Connect because it uses custom wrappers to add fields and rename some for clarity.

The addition of timestamps is trivial, but can be really useful:

  • in source connectors for topics where CreateTime timestamps are configured
  • in sink connectors for topics where the timestamp info is only present in the record metadata

Public Interfaces

  • SourceRecord, which is initialized by source connectors as record data from external sources → Kafka
  • SinkRecord, which is initialized by the Connect runtime as record data from Kafka → external sinks

Proposed change

  • Add timestamp field to SourceRecord
  • Add timestamp and TimestampType (i.e. NoTimestampType / CreateTime / LogAppendTime) fields to SinkRecord

New or Changed Public Interfaces

New constructor variants and getters for the field on SourceRecord and SinkRecord.

SourceRecord.java
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                    String topic, Integer partition,
                    Schema keySchema, Object key,
                    Schema valueSchema, Object value,
                    Long timestamp) { .. }
  
public Long timestamp() { .. }
SinkRecord.java
public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Long timestamp, TimestampType timestampType) { .. }

public TimestampType timestampType() { .. }
  
public Long timestamp() { .. }

Migration Plan and Compatibility

Existing constructors for SourceRecord and SinkRecord will continue to exist so there are no compatibility concerns.

Test Plan

Unit tests that validate

  • SourceRecord timestamps are propagated correctly to Kafka ProducerRecord
  • Kafka ConsumerRecord timestamp and timestamp type are propagated to SinkRecord

Rejected Alternatives

In addition to timestamps, we could have also exposed record metadata that was added to Kafka's ConsumerRecord in KIP-42 (Interceptors), i.e. checksum and serialized size. There doesn't seem to be a use-case for exposing these fields to sink connectors.

  • No labels