Status

Current state: "Voting in progress"

Discussion thread: here

Vote thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect currently supports a single timestamp logical type (org.apache.kafka.connect.data.Timestamp) backed by java.util.Date, which is limited to millisecond precision. When source systems produce timestamps with microsecond or nanosecond precision — common in Avro, Parquet, databases like PostgreSQL, and event-driven systems — that precision is silently truncated.

For example, a source connector reading an Avro timestamp-micros field (e.g., 1617235200123456 microseconds) must currently either:

  • Map it to the Timestamp logical type, losing the sub-millisecond component (123456 µs123 ms)
  • Map it as a raw INT64, losing the semantic meaning of the field

KIP-808 added unix.precision configuration to TimestampConverter, but the SMT still documents: "This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components", because the intermediate representation is java.util.Date.

This KIP introduces two new logical types — TimestampMicros and TimestampNanos — that preserve full precision using java.time.Instant as the Java representation.

Why this matters

  • Precision Loss: Formats like Avro, Parquet, and Protocol Buffers support microsecond and nanosecond timestamps. Without matching logical types in Kafka Connect, sub-millisecond data is silently corrupted.
  • Increasing Adoption of High-Precision Time: Systems like event streams, financial applications, observability platforms, and databases (PostgreSQL, ClickHouse) increasingly rely on microsecond or nanosecond timestamps throughout the entire data pipeline.
  • Data Fidelity: Supporting timestamp-micros and timestamp-nanos in Kafka Connect ensures the full precision of time data is preserved across the entire source → Kafka → sink pipeline without any loss of information.

Proposed Changes

1. New Logical Type: TimestampMicros

A new class org.apache.kafka.connect.data.TimestampMicros in the connect-api module.

  • Logical name: org.apache.kafka.connect.data.TimestampMicros
  • Underlying schema type: INT64
  • Encoded representation: microseconds since Unix epoch
  • Java representation: java.time.Instant
MethodSignatureDescription
builder()static SchemaBuilder builder()Returns a SchemaBuilder with name org.apache.kafka.connect.data.TimestampMicros, type INT64, version 1
SCHEMAstatic final Schema SCHEMAThe default schema instance
fromLogical(Schema, Instant)static long fromLogical(Schema schema, Instant value)Converts Instant → microseconds since epoch
toLogical(Schema, long)static Instant toLogical(Schema schema, long value)Converts microseconds since epoch → Instant

2. New Logical Type: TimestampNanos

A new class org.apache.kafka.connect.data.TimestampNanos in the connect-api module.

  • Logical name: org.apache.kafka.connect.data.TimestampNanos
  • Underlying schema type: INT64
  • Encoded representation: nanoseconds since Unix epoch
  • Java representation: java.time.Instant
MethodSignatureDescription
builder()static SchemaBuilder builder()Returns a SchemaBuilder with name org.apache.kafka.connect.data.TimestampNanos, type INT64, version 1
SCHEMAstatic final Schema SCHEMAThe default schema instance
fromLogical(Schema, Instant)static long fromLogical(Schema schema, Instant value)Converts Instant → nanoseconds since epoch
toLogical(Schema, long)static Instant toLogical(Schema schema, long value)Converts nanoseconds since epoch → Instant

TimestampNanos uses INT64, which limits the representable range to approximately 1677-09-21 to 2262-04-11 (±2^63 nanoseconds). This matches the range supported by Avro's timestamp-nanos logical type.

3. Updated: ConnectSchema

The LOGICAL_TYPE_CLASSES map gains two new entries:

java TimestampMicros.LOGICAL_NAME → List.of(java.time.Instant.class) TimestampNanos.LOGICAL_NAME → List.of(java.time.Instant.class)

This enables runtime type validation for the new logical types. Existing entries for Timestamp, Date, and Time remain unchanged.

4. Updated: ConnectHeaders

Two new methods added to the Headers interface and ConnectHeaders implementation:

MethodSignatureDescription
addTimestampMicrosHeaders addTimestampMicros(String key, Instant value)Add a header with TimestampMicros schema
addTimestampNanosHeaders addTimestampNanos(String key, Instant value)Add a header with TimestampNanos schema

5. Updated SMT: TimestampConverter

New target.type values

The target.type configuration accepts two additional values: TimestampMicros and TimestampNanos.

target.typeSchemaJava typeEncoded as
Timestamp (existing)Timestamp.SCHEMAjava.util.Datemilliseconds since epoch
TimestampMicros (new)TimestampMicros.SCHEMAjava.time.Instantmicroseconds since epoch
TimestampNanos (new)TimestampNanos.SCHEMAjava.time.Instantnanoseconds since epoch

Precision-preserving unix conversion

When target.type=unix, the TimestampConverter currently loses sub-millisecond precision because it routes through java.util.Date. After this KIP:

  • Input with schema TimestampMicros → unix: full microsecond precision preserved
  • Input with schema TimestampNanos → unix: full nanosecond precision preserved
  • Input with schema Timestamp (existing) → unix: unchanged behavior

Updated timestampTypeFromSchema()

This method is extended to recognize TimestampMicros.LOGICAL_NAME and TimestampNanos.LOGICAL_NAME as valid input schema types, routing them to the appropriate translators.

Updated TimestampTranslator interface

The internal TimestampTranslator interface is updated to support java.time.Instant as the intermediate representation alongside java.util.Date, enabling lossless conversion for the new types. The specific implementation approach (e.g., overloaded methods, a common temporal interface, or separate translator maps) is left to the implementation PR.

6. Updated Converter: JsonConverter

Two new entries added to LOGICAL_CONVERTERS:

Logical NametoJsontoConnect
TimestampMicros.LOGICAL_NAMEInstant → JSON number (microseconds since epoch)JSON number → Instant
TimestampNanos.LOGICAL_NAMEInstant → JSON number (nanoseconds since epoch)JSON number → Instant

7. Updated: Values

New public conversion methods:

MethodSignatureDescription
convertToTimestampMicrosstatic Instant convertToTimestampMicros(Schema schema, Object value)Converts various types to Instant with microsecond semantics
convertToTimestampNanosstatic Instant convertToTimestampNanos(Schema schema, Object value)Converts various types to Instant with nanosecond semantics

These follow the same pattern as the existing convertToTimestamp(), convertToDate(), and convertToTime() methods.

8. Updated SMT: Cast

The encodeLogicalType() method is extended to handle the new types:

java case TimestampMicros.LOGICAL_NAME -> TimestampMicros.fromLogical(schema, (Instant) value); case TimestampNanos.LOGICAL_NAME -> TimestampNanos.fromLogical(schema, (Instant) value);

Converter Use Case (e.g., Avro Converter)

This KIP enables third-party converters (e.g., Confluent's Avro Converter) to properly map high-precision Avro timestamps. Before this KIP, a converter handling Avro's LONG type would fall through to raw INT64 for non-millis timestamps:

  • Before this KIP:
  • case LONG:
    	if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) { 
    		builder = Timestamp.builder(); 
    	} else { 
    		builder = SchemaBuilder.int64();
     	// semantic meaning lost 
    	} 
  • After this KIP:
  • case LONG: 
    	if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) { 
    		builder = Timestamp.builder(); 
    	} else if (AVRO_LOGICAL_TIMESTAMP_MICROS.equalsIgnoreCase(logicalType)) { 		
    		builder = TimestampMicros.builder(); 
    	} else if (AVRO_LOGICAL_TIMESTAMP_NANOS.equalsIgnoreCase(logicalType)) { 
    		builder = TimestampNanos.builder(); 
    	} else { 
    		builder = SchemaBuilder.int64(); 
    	}

Summary of All Changed Files

ModuleFileChange
connect-apiTimestampMicros.javaNew file — microsecond logical type
connect-apiTimestampNanos.javaNew file — nanosecond logical type
connect-apiConnectSchema.javaAdd entries to LOGICAL_TYPE_CLASSES
connect-apiValues.javaAdd convertToTimestampMicros() and convertToTimestampNanos()
connect-apiConnectHeaders.javaAdd addTimestampMicros() and addTimestampNanos()
connect-jsonJsonConverter.javaAdd LOGICAL_CONVERTERS entries
connect-transformsTimestampConverter.javaAdd new target types, fix precision loss
connect-transformsCast.javaHandle new logical types in encodeLogicalType()

Compatibility, Deprecation, and Migration Plan

Backward Compatibility

This change is fully backward-compatible for existing deployments:

  • Existing connectors producing Timestamp schemas continue to work identically.
  • Existing TimestampConverter configurations are unaffected — no existing target.type values change behavior.
  • No existing API signatures are modified.

Behavioral Changes

The following behavioral changes apply only when the new types are actively used:

  • Converters (JSON, Avro, Protobuf) that encounter fields with schema name org.apache.kafka.connect.data.TimestampMicros or org.apache.kafka.connect.data.TimestampNanos will serialize/deserialize them as long (micros/nanos since epoch) instead of treating them as raw INT64. This is opt-in: it only happens when a connector explicitly produces the new schema types.
  • The TimestampConverter SMT accepts two new target.type values. Existing configurations are unaffected.

Rolling Upgrades

During a rolling upgrade where some workers have the new types and others do not:

  • Connectors should not be configured to use the new logical types until all workers in the cluster are upgraded.
  • Workers running the old version will not recognize schemas with the new logical type names and will treat them as raw INT64, potentially causing data misinterpretation.
  • Once all workers are upgraded, connectors can begin producing TimestampMicros or TimestampNanos schemas.

Migration Path

  • Users who want microsecond/nanosecond precision: upgrade the Kafka Connect runtime and update their connector/converter configurations to use the new logical types.
  • Users who do not need sub-millisecond precision: no changes required.

Test Plan

Unit Tests: TimestampMicros

  • fromLogical() / toLogical() round-trip correctness
  • Schema name validation (throws DataException for wrong schema)
  • Edge cases: epoch zero, negative timestamps (before 1970), max long boundary

Unit Tests: TimestampNanos

  • fromLogical() / toLogical() round-trip correctness
  • Schema name validation (throws DataException for wrong schema)
  • Edge cases: epoch zero, negative timestamps, INT64 overflow boundary (~292 years from epoch)

TimestampConverter Tests

  • Conversion from TimestampMicrosunix with microsecond precision preserved
  • Conversion from TimestampNanosunix with nanosecond precision preserved
  • Conversion from unixTimestampMicros / TimestampNanos
  • Conversion from Timestamp (millis) → TimestampMicros / TimestampNanos (upscaling)
  • Conversion from TimestampMicros / TimestampNanosTimestamp (precision truncation is expected and documented)
  • Both schema-based and schemaless modes

JsonConverter Tests

  • Round-trip serialization/deserialization of TimestampMicros and TimestampNanos values
  • Correct JSON representation (JSON number, not string)

ConnectSchema Validation Tests

  • Instant values accepted for TimestampMicros / TimestampNanos schemas
  • java.util.Date values rejected for TimestampMicros / TimestampNanos schemas

Cast SMT Tests

  • Casting from TimestampMicros / TimestampNanos to primitive types (int64, string)

Rejected Alternatives

1. Modifying the existing Timestamp class to support multiple precisions

Changing Timestamp's Java type from java.util.Date to java.time.Instant would be a breaking change for all existing connectors. Existing code calls Timestamp.fromLogical(schema, (java.util.Date) value) — changing the parameter type would break compilation for every connector and converter in the ecosystem.

2. Adding precision variants within the existing Timestamp class

Introducing LOGICAL_NAME_MILLIS, LOGICAL_NAME_MICROS, and LOGICAL_NAME_NANOS as constants within the existing Timestamp class would require modifying its public API surface and could break schema name checks in user code and third-party converters that compare against Timestamp.LOGICAL_NAME.

3. Using java.util.Date for the new types

java.util.Date fundamentally cannot represent sub-millisecond precision, making it unsuitable for microsecond or nanosecond timestamps.

  • No labels