Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Anchor
Status
Status
Status

Current state: "Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Anchor
Motivation
Motivation
Motivation

This KIP introduces support for the timestamp-micros and timestamp-nanos logical type within Kafka Connect.
While formats like Avro, Parquet, and others support higher precision timestamps (including microseconds/ nanoseconds :: https://avrogithub.apache.orgcom/apache/avro/blob/7a155a51e4f222bf024e32b3afb336640cc3e32d/doc/content/en/docs/1.1112.0/Specification/spec.html_index.md?plain=1#L879), Kafka Connect has been limited to handling only millisecond precision (timestamp).
As a result, any timestamp data expressed in microseconds/nanosecond is truncated when communicated between the Kafka Connect source and sink, leading to potential loss of precision and data fidelity.
This change aims to extend the logical timestamp support in Kafka Connect to include timestamp-micros and timestamp-nanos. By doing so, it ensures that the full precision of time data can be communicated accurately across the entire pipeline without any loss of information.
It handles:
Precision Loss: Many data storage formats, such as Avro, have long supported timestamp precision at the microsecond and nanosecond level. However, due to Kafka Connect's inability to handle timestamp-micros, this precision is lost when converting or sending data between Kafka and sink systems.
Increasing Adoption of High Precision Time: As data systems increasingly rely on high-precision timestamps (e.g., in systems like logs, event streams, or financial applications), it's crucial to maintain this precision throughout the entire data flow, especially when integrating with systems like Kafka Connect.
Supporting timestamp-micros in Kafka Connect will enhance compatibility with existing data storage formats, provide the ability to retain high-precision timestamps, and improve data quality.

Although KIP-808 introduced precision support for microseconds (micros) and nanoseconds (nanos), the Kafka Connect framework currently supports only a single logical type: Timestamp. Internally, this Timestamp type relies on java.util.Date, which is limited to millisecond precision. As a result, when Kafka Connect processes timestamps with microsecond or nanosecond precision, it inadvertently truncates the additional precision, treating the input as milliseconds. This leads to data corruption, as the original microsecond-level details are lost.

Anchor
Proposed Changes
Proposed Changes
Proposed Changes:

This KIP proposes to add new logical-type “TimestampMicros” to handle micros level precision supported by Avro and other formats. It uses internally “java.time.Instant” to handle micros epochs. and "TimestampNanos"

Code Block
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.data;

import org.apache.kafka.connect.errors.DataException;

import java.time.Instant;

public class TimestampMicros {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.TimestampMicros";

    /**
     * Returns a SchemaBuilder for a TimestampMicros. By returning a SchemaBuilder you can override additional schema settings such
     * as required/optional, default value, and documentation.
     * @return a SchemaBuilder
     */
    public static SchemaBuilder builder() {
        return SchemaBuilder.int64()
                .name(LOGICAL_NAME)
                .version(1);
    }

    public static final Schema SCHEMA = builder().schema();

    /**
     * Convert a value from its logical format ({@link java.time.Instant}) to its encoded format (long).
     * @param value the logical value
     * @return the encoded value
     */
    public static long fromLogical(Schema schema, Instant value) {
        if (!(LOGICAL_NAME.equals(schema.name())))
            throw new DataException("Requested conversion of TimestampTimestampMicros object but the schema does not match.");
        return value.getEpochSecond() * 1_000_000 + value.getNano() / 1_000;
    }

    /**
     * Convert a value from its encoded format (long) to its logical format ({@link java.time.Instant}).
     * @param value the encoded value
     * @return the logical value
     */
    public static java.time.Instant toLogical(Schema schema, long value) {
        if (!(LOGICAL_NAME.equals(schema.name())))
            throw new DataException("Requested conversion of TimestampTimestampMicros object but the schema does not match.");
        return Instant.ofEpochSecond(0, value * 1000L);
    }
}

This only adds support for another logical type without touching any other existing data-types or logical types.

...


Code Block
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.data;

import org.apache.kafka.connect.errors.DataException;

import java.time.Instant;

public class TimestampNanos {

    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.TimestampNanos";

    /**
     * Returns a SchemaBuilder for a TimestampNanos. By returning a SchemaBuilder you can override
     * additional schema settings such as required/optional, default value, and documentation.
     * 
     * @return a SchemaBuilder for a 64-bit nanosecond-precision timestamp
     */
    public static SchemaBuilder builder() {
        return SchemaBuilder.int64()
                .name(LOGICAL_NAME)
                .version(1);
    }

    public static final Schema SCHEMA = builder().schema();

    /**
     * Convert a value from its logical format ({@link java.time.Instant}) to its encoded format (long).
     * The conversion preserves full nanosecond precision from the Instant.
     * 
     * @param schema the schema associated with the value (must be of type TimestampNanos)
     * @param value the logical value as an Instant
     * @return the encoded value as a long representing nanoseconds since epoch
     * @throws DataException if the schema is not a TimestampNanos schema
     */
    public static long fromLogical(Schema schema, Instant value) {
        if (!(LOGICAL_NAME.equals(schema.name())))
            throw new DataException("Requested conversion of TimestampNanos object but the schema does not match.");
        return value.getEpochSecond() * 1_000_000_000L + value.getNano();
    }

    /**
     * Convert a value from its encoded format (long representing nanoseconds since epoch) to its
     * logical format ({@link java.time.Instant}). The conversion preserves full nanosecond precision.
     * 
     * @param schema the schema associated with the value (must be of type TimestampNanos)
     * @param value the encoded value as a long representing nanoseconds since epoch
     * @return the logical value as an Instant
     * @throws DataException if the schema is not a TimestampNanos schema
     */
    public static java.time.Instant toLogical(Schema schema, long value) {
        if (!(LOGICAL_NAME.equals(schema.name())))
            throw new DataException("Requested conversion of TimestampNanos object but the schema does not match.");
        return Instant.ofEpochSecond(value / 1_000_000_000L, value % 1_000_000_000L);
    }
}


This only adds support for another logical type without touching any other existing data-types or logical types.

Anchor
How this will be used by converters
How this will be used by converters
How this will be used by converters:

Currently the converters let’s say Avro converter does the following:

Code Block
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
	builder = Timestamp.builder();
} else {
	SchemaBuilder.int64();
}
break;


Here since we do not have the class to handle Micros and Nanos logical type if logicalType does not matches “AVRO_LOGICAL_TIMESTAMP_MILLIS” or if it will be “timestamp-micros/timestamp-nanos” we will simply build “int64” builder but after this change we can handle that in the following manner:

Code Block
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
	builder = Timestamp.builder();
} else if (AVRO_LOGICAL_TIMESTAMP_MICROS.equalsIgnoreCase(logicalType)) {
	builder = TimestampMicros.builder();
} else if (AVRO_LOGICAL_TIMESTAMP_NANOS.equalsIgnoreCase(logicalType)){
	builder = TimestampNanos.builder();
} else {
	SchemaBuilder.int64();
}
break;


After introduction of these logical types, a fix will be added to KIP-808 in the following way:

Code Block
        TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() {
            @Override
            public Date toRaw(Config config, Object orig) {
                if (!(orig instanceof Long))
                    throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
                long unixTime = (Long) orig;
                return switch (config.unixPrecision) {
                    case UNIX_PRECISION_SECONDS ->
                            Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime)).toInstant();
                    case UNIX_PRECISION_MICROS ->
                            TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMicros(unixTime));
                    case UNIX_PRECISION_NANOS ->
                            TimestampNanos.toLogical(TimestampNanos.SCHEMA, TimeUnit.NANOSECONDS.toNanos(unixTime));
                    default -> Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
                };
            }

            @Override
            public Schema typeSchema(boolean isOptional) {
                return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA;
            }

            @Override
            public Long toType(Config config, Instant orig, String logicalType) {
                long unixTimeMillis;
                switch (logicalType) {
                    case Timestamp.LOGICAL_NAME -> unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, new Date(orig.toEpochMilli()));
                    case TimestampMicros.LOGICAL_NAME -> unixTimeMillis = TimestampMicros.fromLogical(TimestampMicros.SCHEMA, orig);
                    case TimestampNanos.LOGICAL_NAME -> unixTimeMillis = TimestampNanos.fromLogical(TimestampNanos.SCHEMA, orig);
                    default -> unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, new Date(orig.toEpochMilli()));
                }
                return switch (config.unixPrecision) {
                    case UNIX_PRECISION_SECONDS -> TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
                    case UNIX_PRECISION_MICROS -> TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
                    case UNIX_PRECISION_NANOS -> TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
                    default -> unixTimeMillis;
                };
            }
        });


Anchor
Compatibility, Deprecation, and Migration Plan
Compatibility, Deprecation, and Migration Plan
Compatibility, Deprecation, and Migration Plan

  • No breaking changes.

  • Users upgrading an existing MirrorMaker 2 (MM2) cluster do not need to change any configurations.

  • Backward-compatible, meaning existing data will continue to function as expected.

  • Users who want to use TimestampMicros simply need to upgrade the Kafka Connect client.

Anchor
Test Plan
Test Plan
Test Plan

  1. Unit Tests: Ensure correctness in TimestampMicros conversions.

Anchor
Rejected Alternatives
Rejected Alternatives
Rejected Alternatives

  1. Reusing the existing Timestamp logical type with introduction of a new logical type "org.apache.kafka.connect.data.TimestampMicros" as below:

    1. Code Block
      public static final String LOGICAL_NAME_MILLIS = "org.apache.kafka.connect.data.TimestampMillis";
      public static final String LOGICAL_NAME_MICROS = "org.apache.kafka.connect.data.TimestampMicros";
      public static final String LOGICAL_NAME_NANOS = "org.apache.kafka.connect.data.TimestampNanos";
       public static final Schema SCHEMA_MILLIS = builder(LogicalType.MILLIS).schema();
      public static final Schema SCHEMA_MICROS = builder(LogicalType.MICROS).schema();
      public static final Schema SCHEMA_NANOS = builder(LogicalType.NANOS).schema();

      As this would have introduced a breaking and incompatible change with respect to the old clients and can be considered as 

Currently the converters let’s say Avro converter does the following:

Code Block
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
	builder = Timestamp.builder();
} else {
	builder = SchemaBuilder.int64();
}
break;

Here since we do not have the class to handle Micros logical type if logicalType does not matches “AVRO_LOGICAL_TIMESTAMP_MILLIS” or if it will be “timestamp-mills” we will simply build “int64” builder but after this change we can handle that in the following manner:

Code Block
case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
	builder = Timestamp.builder();
} else if (AVRO_LOGICAL_TIMESTAMP_MICROS.equalsIgnoreCase(logicalType)){
	builder = TimestampMicros.builder()
}
break;

Once this will be approved and merge, a fix will be added to KIP-808 in the following way:

Code Block
TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Long unixTime))
throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
switch (config.unixPrecision) {
case UNIX_PRECISION_MICROS:
return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
}
}

...

  • No breaking changes.

  • Users upgrading an existing MirrorMaker 2 (MM2) cluster do not need to change any configurations.

  • Backward-compatible, meaning existing data will continue to function as expected.

  • Users who want to use TimestampMicros simply need to upgrade the Kafka Connect client.

...

  1. Unit Tests: Ensure correctness in TimestampMicros conversions.

...

  1. Reusing the existing Timestamp logical type:

    • This would lead to data corruption and precision loss when handling microsecond timestamps.

    • Since java.util.Date only supports milliseconds, the microsecond data would be lost.