Status

Current stateDiscarded – covered by KIP-1146

Discussion thread: (to be added after posting to dev@kafka.apache.org)

JIRA: KAFKA-13678 - Getting issue details... STATUS

Motivation

Kafka Streams provides periodic callbacks via:

ProcessorContext#schedule(Duration interval,
                           PunctuationType type,
                           Punctuator callback)

Punctuations are designed to trigger periodically based on:

  • PunctuationType.STREAM_TIME

  • PunctuationType.WALL_CLOCK_TIME

Currently, the first punctuation is aligned relative to:

  • The timestamp of the first processed record (STREAM_TIME)

  • The wall-clock time at scheduling (WALL_CLOCK_TIME)

As a result, punctuation timestamps depend on when the application starts and on the first observed record.


Example:

Assume:

interval = 10 seconds
If the first processed record has timestamp 12:
12, 22, 32, 42...
If, after restart (due to topic retention), the first processed record has timestamp 26:
26, 36, 46...

This means punctuation alignment is non-deterministic across restarts.

Some use cases require periodic callbacks aligned to fixed time boundaries independent of the first record, for example:

0, 10, 20, 30...
or with a shift:
5, 15, 25, 35...
As discussed in KAFKA-13678, this KIP proposes extending the existing schedule() API by adding a time-shift parameter to allow explicit control over punctuation alignment.

Public Interfaces

This KIP introduces a new overloaded method in:

org.apache.kafka.streams.processor.ProcessorContext

New API:

Cancellable schedule(Duration interval,
                     PunctuationType type,
                     Duration timeShift,
                     Punctuator callback);

Parameter Semantics

  • interval — existing interval duration

  • type — existing punctuation type

  • timeShift — alignment shift relative to epoch time

  • callback — existing punctuator

timeShift Behavior

  • timeShift == null
    → preserves existing behavior (relative to first event or scheduling time)

  • timeShift == Duration.ZERO
    → align punctuations to epoch-aligned boundaries

  • timeShift == X
    → align to epoch boundaries shifted by X


Example:

If:

interval = 10 seconds
timeShift = Duration.ofSeconds(5)


Punctuations occur at:

5, 15, 25, 35...


No existing APIs are modified or removed.

No behavior changes occur unless the new overload is used.

No protocol, configuration, or binary format changes are introduced.


Proposed Changes

1. Alignment Calculation

When timeShift is provided, the next punctuation timestamp is computed as:

next = ceil((currentTime - shift) / interval) * interval + shift


Where:

  • currentTime is:

    • stream time for STREAM_TIME

    • system time for WALL_CLOCK_TIME

  • shift = timeShift.toMillis()

If timeShift is null, existing scheduling logic remains unchanged.

2. STREAM_TIME Semantics

If:

interval = 10
timeShift = 0


And stream time reaches 26:

next = 30


Punctuations occur at:

0, 10, 20, 30, 40...

Independent of first processed record.

3. WALL_CLOCK_TIME Semantics

If:

interval = 10
timeShift = 5


Punctuations occur at:

5, 15, 25, 35...

Relative to epoch time.

Streams Internal Implementation

This section describes how the change integrates with the Kafka Streams internals.

Current Scheduling Flow

Relevant Classes

Located under:

streams/src/main/java/org/apache/kafka/streams/processor/internals/


Primary classes involved:

  • ProcessorContextImpl

  • StreamTask

  • PunctuationQueue

  • PunctuationSchedule

ProcessorContextImpl

Add overloaded schedule() method accepting timeShift.

Delegate to StreamTask.schedule() including shift value.

The existing schedule(Duration, PunctuationType, Punctuator) remains unchanged.

StreamTask

Extend internal scheduling logic to accept optional timeShift.

Add helper method:

private long computeNextTimestamp(long currentTime, long interval, Long shift)


Pseudo-implementation:

if (shift == null) {
return currentTime + interval; // existing behavior
}

long adjusted = currentTime - shift;
long next = ((adjusted + interval - 1) / interval) * interval;
return next + shift;


This calculation is used:

  • During initial scheduling

  • During rescheduling after punctuation firing

PunctuationSchedule

Add new field:

private final Long timeShiftMillis;

Used when computing subsequent timestamps.

No changes to queue ordering logic are required.

PunctuationQueue

No structural modifications.

Queue continues to order schedules by nextTimestamp.


Architecture Diagram

Current


With timeShift


Compatibility, Deprecation, and Migration Plan

  • Existing schedule(Duration, PunctuationType, Punctuator) remains unchanged.

  • Default behavior is preserved.

  • New behavior is opt-in.

  • No migration required.

  • No deprecation of existing APIs.

Test Plan

Unit Tests

Add tests covering:

  • STREAM_TIME with null shift (existing behavior)

  • STREAM_TIME with zero shift

  • STREAM_TIME with custom shift

  • WALL_CLOCK_TIME with zero shift

  • Multiple punctuations with different shifts in same task

  • Restart simulation ensuring deterministic alignment

Integration Tests

Add Streams integration test:

  1. Start topology.

  2. Produce records with varying first timestamps.

  3. Restart application.

  4. Verify punctuation alignment remains consistent when shift is specified.

Regression Validation

All existing punctuation tests must pass without modification.

Rejected Alternatives

1. Alignment Strategy Enum

Rejected to keep the change minimal and close to the original discussion in KAFKA-13678.

A numeric shift parameter provides greater flexibility with less API surface.

2. Separate Timer API

Rejected because periodic punctuations already provide the required abstraction.

3. Changing Default Behavior

Rejected due to backward compatibility concerns.

Conclusion

This KIP extends Kafka Streams punctuation scheduling with an optional time-shift parameter.

It enables deterministic and configurable alignment of periodic callbacks while preserving existing semantics and maintaining full backward compatibility.


  • No labels