DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
- Status
- Motivation
- Proposed Changes
- Converters use case
- Compatibility, Deprecation, and Migration Plan
- Test Plan
- Rejected Alternatives
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP introduces support for the timestamp-micros and timestamp-nanos logical type within Kafka Connect.
While formats like Avro, Parquet, and others support higher precision timestamps (including microseconds/ nanoseconds :: https://github.com/apache/avro/blob/7a155a51e4f222bf024e32b3afb336640cc3e32d/doc/content/en/docs/1.12.0/Specification/_index.md?plain=1#L879), Kafka Connect has been limited to handling only millisecond precision (timestamp).
As a result, any timestamp data expressed in microseconds/nanosecond is truncated when communicated between the Kafka Connect source and sink, leading to potential loss of precision and data fidelity.
This change aims to extend the logical timestamp support in Kafka Connect to include timestamp-micros and timestamp-nanos. By doing so, it ensures that the full precision of time data can be communicated accurately across the entire pipeline without any loss of information.
It handles:
Precision Loss: Many data storage formats, such as Avro, have long supported timestamp precision at the microsecond and nanosecond level. However, due to Kafka Connect's inability to handle timestamp-micros, this precision is lost when converting or sending data between Kafka and sink systems.
Increasing Adoption of High Precision Time: As data systems increasingly rely on high-precision timestamps (e.g., in systems like logs, event streams, or financial applications), it's crucial to maintain this precision throughout the entire data flow, especially when integrating with systems like Kafka Connect.
Supporting timestamp-micros in Kafka Connect will enhance compatibility with existing data storage formats, provide the ability to retain high-precision timestamps, and improve data quality.
Although KIP-808 introduced precision support for microseconds (micros) and nanoseconds (nanos), the Kafka Connect framework currently supports only a single logical type: Timestamp. Internally, this Timestamp type relies on java.util.Date, which is limited to millisecond precision. As a result, when Kafka Connect processes timestamps with microsecond or nanosecond precision, it inadvertently truncates the additional precision, treating the input as milliseconds. This leads to data corruption, as the original microsecond-level details are lost.
Proposed Changes:
logical-type “TimestampMicros” and "TimestampNanos"
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.data;
import org.apache.kafka.connect.errors.DataException;
import java.time.Instant;
public class TimestampMicros {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.TimestampMicros";
/**
* Returns a SchemaBuilder for a TimestampMicros. By returning a SchemaBuilder you can override additional schema settings such
* as required/optional, default value, and documentation.
* @return a SchemaBuilder
*/
public static SchemaBuilder builder() {
return SchemaBuilder.int64()
.name(LOGICAL_NAME)
.version(1);
}
public static final Schema SCHEMA = builder().schema();
/**
* Convert a value from its logical format ({@link java.time.Instant}) to its encoded format (long).
* @param value the logical value
* @return the encoded value
*/
public static long fromLogical(Schema schema, Instant value) {
if (!(LOGICAL_NAME.equals(schema.name())))
throw new DataException("Requested conversion of TimestampMicros object but the schema does not match.");
return value.getEpochSecond() * 1_000_000 + value.getNano() / 1_000;
}
/**
* Convert a value from its encoded format (long) to its logical format ({@link java.time.Instant}).
* @param value the encoded value
* @return the logical value
*/
public static java.time.Instant toLogical(Schema schema, long value) {
if (!(LOGICAL_NAME.equals(schema.name())))
throw new DataException("Requested conversion of TimestampMicros object but the schema does not match.");
return Instant.ofEpochSecond(0, value * 1000L);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.data;
import org.apache.kafka.connect.errors.DataException;
import java.time.Instant;
public class TimestampNanos {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.TimestampNanos";
/**
* Returns a SchemaBuilder for a TimestampNanos. By returning a SchemaBuilder you can override
* additional schema settings such as required/optional, default value, and documentation.
*
* @return a SchemaBuilder for a 64-bit nanosecond-precision timestamp
*/
public static SchemaBuilder builder() {
return SchemaBuilder.int64()
.name(LOGICAL_NAME)
.version(1);
}
public static final Schema SCHEMA = builder().schema();
/**
* Convert a value from its logical format ({@link java.time.Instant}) to its encoded format (long).
* The conversion preserves full nanosecond precision from the Instant.
*
* @param schema the schema associated with the value (must be of type TimestampNanos)
* @param value the logical value as an Instant
* @return the encoded value as a long representing nanoseconds since epoch
* @throws DataException if the schema is not a TimestampNanos schema
*/
public static long fromLogical(Schema schema, Instant value) {
if (!(LOGICAL_NAME.equals(schema.name())))
throw new DataException("Requested conversion of TimestampNanos object but the schema does not match.");
return value.getEpochSecond() * 1_000_000_000L + value.getNano();
}
/**
* Convert a value from its encoded format (long representing nanoseconds since epoch) to its
* logical format ({@link java.time.Instant}). The conversion preserves full nanosecond precision.
*
* @param schema the schema associated with the value (must be of type TimestampNanos)
* @param value the encoded value as a long representing nanoseconds since epoch
* @return the logical value as an Instant
* @throws DataException if the schema is not a TimestampNanos schema
*/
public static java.time.Instant toLogical(Schema schema, long value) {
if (!(LOGICAL_NAME.equals(schema.name())))
throw new DataException("Requested conversion of TimestampNanos object but the schema does not match.");
return Instant.ofEpochSecond(value / 1_000_000_000L, value % 1_000_000_000L);
}
}
This only adds support for another logical type without touching any other existing data-types or logical types.
How this will be used by converters:
Currently the converters let’s say Avro converter does the following:
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
builder = Timestamp.builder();
} else {
SchemaBuilder.int64();
}
break;
Here since we do not have the class to handle Micros and Nanos logical type if logicalType does not matches “AVRO_LOGICAL_TIMESTAMP_MILLIS” or if it will be “timestamp-micros/timestamp-nanos” we will simply build “int64” builder but after this change we can handle that in the following manner:
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
builder = Timestamp.builder();
} else if (AVRO_LOGICAL_TIMESTAMP_MICROS.equalsIgnoreCase(logicalType)) {
builder = TimestampMicros.builder();
} else if (AVRO_LOGICAL_TIMESTAMP_NANOS.equalsIgnoreCase(logicalType)){
builder = TimestampNanos.builder();
} else {
SchemaBuilder.int64();
}
break;
After introduction of these logical types, a fix will be added to KIP-808 in the following way:
TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Long))
throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
long unixTime = (Long) orig;
return switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS ->
Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime)).toInstant();
case UNIX_PRECISION_MICROS ->
TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMicros(unixTime));
case UNIX_PRECISION_NANOS ->
TimestampNanos.toLogical(TimestampNanos.SCHEMA, TimeUnit.NANOSECONDS.toNanos(unixTime));
default -> Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
};
}
@Override
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA;
}
@Override
public Long toType(Config config, Instant orig, String logicalType) {
long unixTimeMillis;
switch (logicalType) {
case Timestamp.LOGICAL_NAME -> unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, new Date(orig.toEpochMilli()));
case TimestampMicros.LOGICAL_NAME -> unixTimeMillis = TimestampMicros.fromLogical(TimestampMicros.SCHEMA, orig);
case TimestampNanos.LOGICAL_NAME -> unixTimeMillis = TimestampNanos.fromLogical(TimestampNanos.SCHEMA, orig);
default -> unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, new Date(orig.toEpochMilli()));
}
return switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS -> TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
case UNIX_PRECISION_MICROS -> TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
case UNIX_PRECISION_NANOS -> TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
default -> unixTimeMillis;
};
}
});
Compatibility, Deprecation, and Migration Plan
No breaking changes.
Users upgrading an existing MirrorMaker 2 (MM2) cluster do not need to change any configurations.
Backward-compatible, meaning existing data will continue to function as expected.
Users who want to use
TimestampMicrossimply need to upgrade the Kafka Connect client.
Test Plan
Unit Tests: Ensure correctness in
TimestampMicrosconversions.
Rejected Alternatives
- Reusing the existing Timestamp logical type with introduction of a new logical type "org.apache.kafka.connect.data.TimestampMicros" as below:
public static final String LOGICAL_NAME_MILLIS = "org.apache.kafka.connect.data.TimestampMillis"; public static final String LOGICAL_NAME_MICROS = "org.apache.kafka.connect.data.TimestampMicros"; public static final String LOGICAL_NAME_NANOS = "org.apache.kafka.connect.data.TimestampNanos"; public static final Schema SCHEMA_MILLIS = builder(LogicalType.MILLIS).schema(); public static final Schema SCHEMA_MICROS = builder(LogicalType.MICROS).schema(); public static final Schema SCHEMA_NANOS = builder(LogicalType.NANOS).schema();
As this would have introduced a breaking and incompatible change with respect to the old clients and can be considered as
Reusing the existing
Timestamplogical type:This would lead to data corruption and precision loss when handling microsecond timestamps.
Since
java.util.Dateonly supports milliseconds, the microsecond data would be lost.