DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Voting in progress"
Discussion thread: here
Vote thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Connect currently supports a single timestamp logical type (org.apache.kafka.connect.data.Timestamp) backed by java.util.Date, which is limited to millisecond precision. When source systems produce timestamps with microsecond or nanosecond precision — common in Avro, Parquet, databases like PostgreSQL, and event-driven systems — that precision is silently truncated.
For example, a source connector reading an Avro timestamp-micros field (e.g., 1617235200123456 microseconds) must currently either:
- Map it to the
Timestamplogical type, losing the sub-millisecond component (123456 µs→123 ms) - Map it as a raw
INT64, losing the semantic meaning of the field
KIP-808 added unix.precision configuration to TimestampConverter, but the SMT still documents: "This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components", because the intermediate representation is java.util.Date.
This KIP introduces two new logical types — TimestampMicros and TimestampNanos — that preserve full precision using java.time.Instant as the Java representation.
Why this matters
- Precision Loss: Formats like Avro, Parquet, and Protocol Buffers support microsecond and nanosecond timestamps. Without matching logical types in Kafka Connect, sub-millisecond data is silently corrupted.
- Increasing Adoption of High-Precision Time: Systems like event streams, financial applications, observability platforms, and databases (PostgreSQL, ClickHouse) increasingly rely on microsecond or nanosecond timestamps throughout the entire data pipeline.
- Data Fidelity: Supporting
timestamp-microsandtimestamp-nanosin Kafka Connect ensures the full precision of time data is preserved across the entire source → Kafka → sink pipeline without any loss of information.
Proposed Changes
1. New Logical Type: TimestampMicros
A new class org.apache.kafka.connect.data.TimestampMicros in the connect-api module.
- Logical name:
org.apache.kafka.connect.data.TimestampMicros - Underlying schema type:
INT64 - Encoded representation: microseconds since Unix epoch
- Java representation:
java.time.Instant
| Method | Signature | Description |
|---|---|---|
builder() | static SchemaBuilder builder() | Returns a SchemaBuilder with name org.apache.kafka.connect.data.TimestampMicros, type INT64, version 1 |
SCHEMA | static final Schema SCHEMA | The default schema instance |
fromLogical(Schema, Instant) | static long fromLogical(Schema schema, Instant value) | Converts Instant → microseconds since epoch |
toLogical(Schema, long) | static Instant toLogical(Schema schema, long value) | Converts microseconds since epoch → Instant |
2. New Logical Type: TimestampNanos
A new class org.apache.kafka.connect.data.TimestampNanos in the connect-api module.
- Logical name:
org.apache.kafka.connect.data.TimestampNanos - Underlying schema type:
INT64 - Encoded representation: nanoseconds since Unix epoch
- Java representation:
java.time.Instant
| Method | Signature | Description |
|---|---|---|
builder() | static SchemaBuilder builder() | Returns a SchemaBuilder with name org.apache.kafka.connect.data.TimestampNanos, type INT64, version 1 |
SCHEMA | static final Schema SCHEMA | The default schema instance |
fromLogical(Schema, Instant) | static long fromLogical(Schema schema, Instant value) | Converts Instant → nanoseconds since epoch |
toLogical(Schema, long) | static Instant toLogical(Schema schema, long value) | Converts nanoseconds since epoch → Instant |
TimestampNanos uses INT64, which limits the representable range to approximately 1677-09-21 to 2262-04-11 (±2^63 nanoseconds). This matches the range supported by Avro's timestamp-nanos logical type.
3. Updated: ConnectSchema
The LOGICAL_TYPE_CLASSES map gains two new entries:
java TimestampMicros.LOGICAL_NAME → List.of(java.time.Instant.class) TimestampNanos.LOGICAL_NAME → List.of(java.time.Instant.class)
This enables runtime type validation for the new logical types. Existing entries for Timestamp, Date, and Time remain unchanged.
4. Updated: ConnectHeaders
Two new methods added to the Headers interface and ConnectHeaders implementation:
| Method | Signature | Description |
|---|---|---|
addTimestampMicros | Headers addTimestampMicros(String key, Instant value) | Add a header with TimestampMicros schema |
addTimestampNanos | Headers addTimestampNanos(String key, Instant value) | Add a header with TimestampNanos schema |
5. Updated SMT: TimestampConverter
New target.type values
The target.type configuration accepts two additional values: TimestampMicros and TimestampNanos.
| target.type | Schema | Java type | Encoded as |
|---|---|---|---|
Timestamp (existing) | Timestamp.SCHEMA | java.util.Date | milliseconds since epoch |
TimestampMicros (new) | TimestampMicros.SCHEMA | java.time.Instant | microseconds since epoch |
TimestampNanos (new) | TimestampNanos.SCHEMA | java.time.Instant | nanoseconds since epoch |
Precision-preserving unix conversion
When target.type=unix, the TimestampConverter currently loses sub-millisecond precision because it routes through java.util.Date. After this KIP:
- Input with schema
TimestampMicros→ unix: full microsecond precision preserved - Input with schema
TimestampNanos→ unix: full nanosecond precision preserved - Input with schema
Timestamp(existing) → unix: unchanged behavior
Updated timestampTypeFromSchema()
This method is extended to recognize TimestampMicros.LOGICAL_NAME and TimestampNanos.LOGICAL_NAME as valid input schema types, routing them to the appropriate translators.
Updated TimestampTranslator interface
The internal TimestampTranslator interface is updated to support java.time.Instant as the intermediate representation alongside java.util.Date, enabling lossless conversion for the new types. The specific implementation approach (e.g., overloaded methods, a common temporal interface, or separate translator maps) is left to the implementation PR.
6. Updated Converter: JsonConverter
Two new entries added to LOGICAL_CONVERTERS:
| Logical Name | toJson | toConnect |
|---|---|---|
TimestampMicros.LOGICAL_NAME | Instant → JSON number (microseconds since epoch) | JSON number → Instant |
TimestampNanos.LOGICAL_NAME | Instant → JSON number (nanoseconds since epoch) | JSON number → Instant |
7. Updated: Values
New public conversion methods:
| Method | Signature | Description |
|---|---|---|
convertToTimestampMicros | static Instant convertToTimestampMicros(Schema schema, Object value) | Converts various types to Instant with microsecond semantics |
convertToTimestampNanos | static Instant convertToTimestampNanos(Schema schema, Object value) | Converts various types to Instant with nanosecond semantics |
These follow the same pattern as the existing convertToTimestamp(), convertToDate(), and convertToTime() methods.
8. Updated SMT: Cast
The encodeLogicalType() method is extended to handle the new types:
java case TimestampMicros.LOGICAL_NAME -> TimestampMicros.fromLogical(schema, (Instant) value); case TimestampNanos.LOGICAL_NAME -> TimestampNanos.fromLogical(schema, (Instant) value);
Converter Use Case (e.g., Avro Converter)
This KIP enables third-party converters (e.g., Confluent's Avro Converter) to properly map high-precision Avro timestamps. Before this KIP, a converter handling Avro's LONG type would fall through to raw INT64 for non-millis timestamps:
- Before this KIP:
case LONG: if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) { builder = Timestamp.builder(); } else { builder = SchemaBuilder.int64(); // semantic meaning lost }- After this KIP:
case LONG: if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) { builder = Timestamp.builder(); } else if (AVRO_LOGICAL_TIMESTAMP_MICROS.equalsIgnoreCase(logicalType)) { builder = TimestampMicros.builder(); } else if (AVRO_LOGICAL_TIMESTAMP_NANOS.equalsIgnoreCase(logicalType)) { builder = TimestampNanos.builder(); } else { builder = SchemaBuilder.int64(); }
Summary of All Changed Files
| Module | File | Change |
|---|---|---|
connect-api | TimestampMicros.java | New file — microsecond logical type |
connect-api | TimestampNanos.java | New file — nanosecond logical type |
connect-api | ConnectSchema.java | Add entries to LOGICAL_TYPE_CLASSES |
connect-api | Values.java | Add convertToTimestampMicros() and convertToTimestampNanos() |
connect-api | ConnectHeaders.java | Add addTimestampMicros() and addTimestampNanos() |
connect-json | JsonConverter.java | Add LOGICAL_CONVERTERS entries |
connect-transforms | TimestampConverter.java | Add new target types, fix precision loss |
connect-transforms | Cast.java | Handle new logical types in encodeLogicalType() |
Compatibility, Deprecation, and Migration Plan
Backward Compatibility
This change is fully backward-compatible for existing deployments:
- Existing connectors producing
Timestampschemas continue to work identically. - Existing
TimestampConverterconfigurations are unaffected — no existingtarget.typevalues change behavior. - No existing API signatures are modified.
Behavioral Changes
The following behavioral changes apply only when the new types are actively used:
- Converters (JSON, Avro, Protobuf) that encounter fields with schema name
org.apache.kafka.connect.data.TimestampMicrosororg.apache.kafka.connect.data.TimestampNanoswill serialize/deserialize them aslong(micros/nanos since epoch) instead of treating them as rawINT64. This is opt-in: it only happens when a connector explicitly produces the new schema types. - The
TimestampConverterSMT accepts two newtarget.typevalues. Existing configurations are unaffected.
Rolling Upgrades
During a rolling upgrade where some workers have the new types and others do not:
- Connectors should not be configured to use the new logical types until all workers in the cluster are upgraded.
- Workers running the old version will not recognize schemas with the new logical type names and will treat them as raw
INT64, potentially causing data misinterpretation. - Once all workers are upgraded, connectors can begin producing
TimestampMicrosorTimestampNanosschemas.
Migration Path
- Users who want microsecond/nanosecond precision: upgrade the Kafka Connect runtime and update their connector/converter configurations to use the new logical types.
- Users who do not need sub-millisecond precision: no changes required.
Test Plan
Unit Tests: TimestampMicros
fromLogical()/toLogical()round-trip correctness- Schema name validation (throws
DataExceptionfor wrong schema) - Edge cases: epoch zero, negative timestamps (before 1970), max
longboundary
Unit Tests: TimestampNanos
fromLogical()/toLogical()round-trip correctness- Schema name validation (throws
DataExceptionfor wrong schema) - Edge cases: epoch zero, negative timestamps,
INT64overflow boundary (~292 years from epoch)
TimestampConverter Tests
- Conversion from
TimestampMicros→unixwith microsecond precision preserved - Conversion from
TimestampNanos→unixwith nanosecond precision preserved - Conversion from
unix→TimestampMicros/TimestampNanos - Conversion from
Timestamp(millis) →TimestampMicros/TimestampNanos(upscaling) - Conversion from
TimestampMicros/TimestampNanos→Timestamp(precision truncation is expected and documented) - Both schema-based and schemaless modes
JsonConverter Tests
- Round-trip serialization/deserialization of
TimestampMicrosandTimestampNanosvalues - Correct JSON representation (JSON number, not string)
ConnectSchema Validation Tests
Instantvalues accepted forTimestampMicros/TimestampNanosschemasjava.util.Datevalues rejected forTimestampMicros/TimestampNanosschemas
Cast SMT Tests
- Casting from
TimestampMicros/TimestampNanosto primitive types (int64, string)
Rejected Alternatives
1. Modifying the existing Timestamp class to support multiple precisions
Changing Timestamp's Java type from java.util.Date to java.time.Instant would be a breaking change for all existing connectors. Existing code calls Timestamp.fromLogical(schema, (java.util.Date) value) — changing the parameter type would break compilation for every connector and converter in the ecosystem.
2. Adding precision variants within the existing Timestamp class
Introducing LOGICAL_NAME_MILLIS, LOGICAL_NAME_MICROS, and LOGICAL_NAME_NANOS as constants within the existing Timestamp class would require modifying its public API surface and could break schema name checks in user code and third-party converters that compare against Timestamp.LOGICAL_NAME.
3. Using java.util.Date for the new types
java.util.Date fundamentally cannot represent sub-millisecond precision, making it unsuitable for microsecond or nanosecond timestamps.