This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion - Duplicate"

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP introduces support for the timestamp-micros logical type within Kafka Connect.
While formats like Avro, Parquet, and others support higher precision timestamps (including microseconds :: https://avro.apache.org/docs/1.11.0/spec.html), Kafka Connect has been limited to handling only millisecond precision (timestamp).
As a result, any timestamp data expressed in microseconds is truncated when communicated between the Kafka Connect source and sink, leading to potential loss of precision and data fidelity.
This change aims to extend the logical timestamp support in Kafka Connect to include timestamp-micros. By doing so, it ensures that the full precision of time data can be communicated accurately across the entire pipeline without any loss of information.
It handles:
Precision Loss: Many data storage formats, such as Avro, have long supported timestamp precision at the microsecond level. However, due to Kafka Connect's inability to handle timestamp-micros, this precision is lost when converting or sending data between Kafka and sink systems.
Increasing Adoption of High Precision Time: As data systems increasingly rely on high-precision timestamps (e.g., in systems like logs, event streams, or financial applications), it's crucial to maintain this precision throughout the entire data flow, especially when integrating with systems like Kafka Connect.
Supporting timestamp-micros in Kafka Connect will enhance compatibility with existing data storage formats, provide the ability to retain high-precision timestamps, and improve data quality.

Although KIP-808 introduced precision support for microseconds (micros) and nanoseconds (nanos), the Kafka Connect framework currently supports only a single logical type: Timestamp. Internally, this Timestamp type relies on java.util.Date, which is limited to millisecond precision. As a result, when Kafka Connect processes timestamps with microsecond or nanosecond precision, it inadvertently truncates the additional precision, treating the input as milliseconds. This leads to data corruption, as the original microsecond-level details are lost.

Proposed Changes:

This KIP proposes to add new logical-type “TimestampMicros” to handle micros level precision supported by Avro and other formats. It uses internally “java.time.Instant” to handle micros epochs.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.data;
 
import org.apache.kafka.connect.errors.DataException;
 
import java.time.Instant;
 
public class TimestampMicros {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.TimestampMicros";
 
    /**
     * Returns a SchemaBuilder for a TimestampMicros. By returning a SchemaBuilder you can override additional schema settings such
     * as required/optional, default value, and documentation.
     * @return a SchemaBuilder
     */
    public static SchemaBuilder builder() {
        return SchemaBuilder.int64()
                .name(LOGICAL_NAME)
                .version(1);
    }
 
    public static final Schema SCHEMA = builder().schema();
 
    /**
     * Convert a value from its logical format ({@link java.time.Instant}) to its encoded format (long).
     * @param value the logical value
     * @return the encoded value
     */
    public static long fromLogical(Schema schema, Instant value) {
        if (!(LOGICAL_NAME.equals(schema.name())))
            throw new DataException("Requested conversion of Timestamp object but the schema does not match.");
        return value.getEpochSecond() * 1_000_000 + value.getNano() / 1_000;
    }
 
    /**
     * Convert a value from its encoded format (long) to its logical format ({@link java.time.Instant}).
     * @param value the encoded value
     * @return the logical value
     */
    public static java.time.Instant toLogical(Schema schema, long value) {
        if (!(LOGICAL_NAME.equals(schema.name())))
            throw new DataException("Requested conversion of Timestamp object but the schema does not match.");
        return Instant.ofEpochSecond(0, value * 1000L);
    }
}

This only adds support for another logical type without touching any other existing data-types or logical types.

How this will be used by converters:

Currently the converters let’s say Avro converter does the following:

case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
    builder = Timestamp.builder();
} else {
    builder = SchemaBuilder.int64();
}
break;


Here since we do not have the class to handle Micros logical type if logicalType does not matches “AVRO_LOGICAL_TIMESTAMP_MILLIS” or if it will be “timestamp-mills” we will simply build “int64” builder but after this change we can handle that in the following manner:

case LONG:
if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
    builder = Timestamp.builder();
} else if (AVRO_LOGICAL_TIMESTAMP_MICROS.equalsIgnoreCase(logicalType)){
    builder = TimestampMicros.builder()
}
break;


Once this will be approved and merge, a fix will be added to KIP-808 in the following way:

TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Long unixTime))
throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
switch (config.unixPrecision) {
case UNIX_PRECISION_MICROS:
return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
}
}


Compatibility, Deprecation, and Migration Plan

  • No breaking changes.

  • Users upgrading an existing MirrorMaker 2 (MM2) cluster do not need to change any configurations.

  • Backward-compatible, meaning existing data will continue to function as expected.

  • Users who want to use TimestampMicros simply need to upgrade the Kafka Connect client.

Test Plan

  1. Unit Tests: Ensure correctness in TimestampMicros conversions.

Rejected Alternatives

  1. Reusing the existing Timestamp logical type:

    • This would lead to data corruption and precision loss when handling microsecond timestamps.

    • Since java.util.Date only supports milliseconds, the microsecond data would be lost.

  • No labels