DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Discarded – covered by KIP-1146
Discussion thread: (to be added after posting to dev@kafka.apache.org)
JIRA: KAFKA-13678 - Getting issue details... STATUS
Motivation
Kafka Streams provides periodic callbacks via:
ProcessorContext#schedule(Duration interval,
PunctuationType type,
Punctuator callback)
Punctuations are designed to trigger periodically based on:
PunctuationType.STREAM_TIMEPunctuationType.WALL_CLOCK_TIME
Currently, the first punctuation is aligned relative to:
The timestamp of the first processed record (
STREAM_TIME)The wall-clock time at scheduling (
WALL_CLOCK_TIME)
As a result, punctuation timestamps depend on when the application starts and on the first observed record.
Example:
Assume:
interval = 10 seconds
If the first processed record has timestamp 12:
12, 22, 32, 42...
If, after restart (due to topic retention), the first processed record has timestamp 26:
26, 36, 46...
This means punctuation alignment is non-deterministic across restarts.
Some use cases require periodic callbacks aligned to fixed time boundaries independent of the first record, for example:
0, 10, 20, 30...
or with a shift:
5, 15, 25, 35...
As discussed in KAFKA-13678, this KIP proposes extending the existing schedule() API by adding a time-shift parameter to allow explicit control over punctuation alignment.Public Interfaces
This KIP introduces a new overloaded method in:
org.apache.kafka.streams.processor.ProcessorContext
New API:
Cancellable schedule(Duration interval,
PunctuationType type,
Duration timeShift,
Punctuator callback);
Parameter Semantics
interval— existing interval durationtype— existing punctuation typetimeShift— alignment shift relative to epoch timecallback— existing punctuator
timeShift Behavior
timeShift == null
→ preserves existing behavior (relative to first event or scheduling time)timeShift == Duration.ZERO
→ align punctuations to epoch-aligned boundariestimeShift == X
→ align to epoch boundaries shifted by X
Example:
If:
interval = 10 seconds timeShift = Duration.ofSeconds(5)
Punctuations occur at:
5, 15, 25, 35...
No existing APIs are modified or removed.
No behavior changes occur unless the new overload is used.
No protocol, configuration, or binary format changes are introduced.
Proposed Changes
1. Alignment Calculation
When timeShift is provided, the next punctuation timestamp is computed as:
next = ceil((currentTime - shift) / interval) * interval + shift
Where:
currentTimeis:stream time for
STREAM_TIMEsystem time for
WALL_CLOCK_TIME
shift = timeShift.toMillis()
If timeShift is null, existing scheduling logic remains unchanged.
2. STREAM_TIME Semantics
If:
interval = 10 timeShift = 0
And stream time reaches 26:
next = 30
Punctuations occur at:
0, 10, 20, 30, 40...
Independent of first processed record.
3. WALL_CLOCK_TIME Semantics
If:
interval = 10 timeShift = 5
Punctuations occur at:
5, 15, 25, 35...
Relative to epoch time.
Streams Internal Implementation
This section describes how the change integrates with the Kafka Streams internals.
Current Scheduling Flow
Relevant Classes
Located under:
streams/src/main/java/org/apache/kafka/streams/processor/internals/
Primary classes involved:
ProcessorContextImplStreamTaskPunctuationQueuePunctuationSchedule
ProcessorContextImpl
Add overloaded schedule() method accepting timeShift.
Delegate to StreamTask.schedule() including shift value.
The existing schedule(Duration, PunctuationType, Punctuator) remains unchanged.
StreamTask
Extend internal scheduling logic to accept optional timeShift.
Add helper method:
private long computeNextTimestamp(long currentTime, long interval, Long shift)
Pseudo-implementation:
if (shift == null) {
return currentTime + interval; // existing behavior
}
long adjusted = currentTime - shift;
long next = ((adjusted + interval - 1) / interval) * interval;
return next + shift;
This calculation is used:
During initial scheduling
During rescheduling after punctuation firing
PunctuationSchedule
Add new field:
private final Long timeShiftMillis;
Used when computing subsequent timestamps.
No changes to queue ordering logic are required.
PunctuationQueue
No structural modifications.
Queue continues to order schedules by nextTimestamp.
Architecture Diagram
Current
With timeShift
Compatibility, Deprecation, and Migration Plan
Existing
schedule(Duration, PunctuationType, Punctuator)remains unchanged.Default behavior is preserved.
New behavior is opt-in.
No migration required.
No deprecation of existing APIs.
Test Plan
Unit Tests
Add tests covering:
STREAM_TIME with
nullshift (existing behavior)STREAM_TIME with zero shift
STREAM_TIME with custom shift
WALL_CLOCK_TIME with zero shift
Multiple punctuations with different shifts in same task
Restart simulation ensuring deterministic alignment
Integration Tests
Add Streams integration test:
Start topology.
Produce records with varying first timestamps.
Restart application.
Verify punctuation alignment remains consistent when shift is specified.
Regression Validation
All existing punctuation tests must pass without modification.
Rejected Alternatives
1. Alignment Strategy Enum
Rejected to keep the change minimal and close to the original discussion in KAFKA-13678.
A numeric shift parameter provides greater flexibility with less API surface.
2. Separate Timer API
Rejected because periodic punctuations already provide the required abstraction.
3. Changing Default Behavior
Rejected due to backward compatibility concerns.
Conclusion
This KIP extends Kafka Streams punctuation scheduling with an optional time-shift parameter.
It enables deterministic and configurable alignment of periodic callbacks while preserving existing semantics and maintaining full backward compatibility.


