Status

Current state: Approved

Discussion thread: https://lists.apache.org/thread/t7fbxcr41lxojydrsvy569bgfm91o3j6

Voting thread: https://lists.apache.org/thread/d0oqdx6gs4g17zmy0zb8lyxbc4d80no2

JIRA: KAFKA-7699 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams do not provide a way to easily trigger periodic callbacks at specific times. Wall-clock time punctuation allows to schedule periodic callbacks based on wall-clock time progress, but the punctuation time starts when the punctuation is scheduled. As a result, the callback is triggered at a non-deterministic time. It would be nice to allow a punctuation to be triggered at a fixed/anchored time, independent of when the punctuation was registered. For instance, this will allow for triggering a punctuation at the start of every hour, i.e. HH:00:00.

Real life use cases can be taken from the energy sector. The automatic closed loop balancing of the power grid requires calculations to be published at the start of every 10th second. Today, this is solved by a work-around utilising a "has published this interval"-flag, and a state store to ensure a punctuation is only triggered once per interval: 

Anchored punctuation - work around
class PunctuateProcessor(
    private val hasForwardedStoreName: String,
    private val forwardTime: Duration,
) : ContextualProcessor<String, avro_value, String, avro_value>(),
    ILogging by Logging<PunctuateProcessor>() {
    private lateinit var hasForwardStore: WindowStore<String, Boolean>
    private lateinit var forwardSchedule: Cancellable

    private val hasForwardStoreKey = "hasPublished"

    override fun init(context: ProcessorContext<String, avro_value>) {
        super.init(context)
        this.hasForwardStore = context.getStateStore(hasForwardedStoreName)

        forwardSchedule =
            context().schedule(Duration.ofMillis(500), PunctuationType.WALL_CLOCK_TIME) { forwardRecordsIfTime() }
    }

    override fun process(record: Record<String, avro_value>) {
        // Store incoming records
    }

    private fun forwardRecordsIfTime() {
        val currentTime = Duration.ofMillis(context().currentSystemTimeMs())
        val flooredTime = Duration.ofMillis(floorTo10Second(currentTime.toMillis()))

        if (isTimeToForward(currentTime) && !hasForwardedThisInterval(flooredTime)) {
            forwardRecords()
            hasForwardStore.put(hasForwardStoreKey, true, flooredTime.toMillis())
        }
    }

    private fun isTimeToForward(currentTime: Duration): Boolean = (currentTime.toSecondsPart() % 10) >= (forwardTime.toSecondsPart() % 10)

    private fun hasForwardedThisInterval(intervalStart: Duration): Boolean =
        hasForwardStore.fetch(hasForwardStoreKey, intervalStart.toMillis()) ?: false

Generally, the work-around can be characterised as flag-polling, where the poll frequency is determined by a wall-clock punctuation. Hence, the punctuation trigger time is only as precise as the wall-clock punctuation trigger interval. A more frequent wall-clock punctuation will give a more precise trigger time, at the cost of firing an increased number of flag checks (i.e. punctuations). This is a trade-off that can be removed with anchored punctuations. 

Public API Interfaces

The goal is to introduce an anchored wall-clock punctuation that will have functionally similarities with that of running a cron job. In short, the user should be allowed to specify the start time for the schedule, instead of relying on the non-deterministic time for when the schedule was registered.

The proposed API change is to extend the `schedule()` method in the `ProcessingContext` interface with a parameter to represent the start time in epoch milliseconds for the schedule's anchored time, without enforcing any changes upon the existing users:

Proposed API change
package org.apache.kafka.streams.processor.api;

public interface ProcessingContext {

	// New method allowing for anchored punctuation
	Cancellable schedule(final Duration interval, final Instant startTime, final PunctuationType type, final Punctuator callback);

	// Existing method
 	Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
		schedule(interval, null, type, callback);
	}
}

Proposed changes

PunctuationType support

It is planned to support both system time and stream time in this first iteration, as it is deemed more of a hassle to gracefully handle non-supported punctuation types rather than implementing support for both punctuation types.

Punctuation semantics

The startTime, together with the interval and the current time, will determine the next trigger time for the callback. The method for calculating the next trigger time could look something like:

Calculation of next trigger time
long currentTime = currentSystemTimeMillis();	// or currentStreamTime();
long startTimeMillis = startTime.toEpochMillis();

if (currentTime < startTimeMillis) {
	return startTimeMillis - currentTime;
}

long elapsedTime = currentTime - startTimeMillis;
long intervalsPassed = elapsedTime / interval;
long nextTriggerTime = startTimeMillis + (intervalsPassed + 1) * interval;

return nextTriggerTime - currentTime;

Edge cases, and possibly unintuitive cases, will be described in the following paragraphs. 

Start time is defined in the past

If the startTime is defined as a point in time that is before the current time, the implementation will just skip forward and calculate the next trigger time.

If we have startTime at t=90, the current time is t=101, and the interval is 10s, the next trigger time will be t=110. 

As a result, there will exist several schedule configurations yielding the same trigger times.

Starts and restarts

If the running Streams app start, or restarts, close to the scheduled trigger time, there is a possibility that callbacks will be missed. 

Take for example a schedule configured with a start time of t=100 and an interval of 10 seconds. When the Streams app start up at t=101, it would wait until t=110 to trigger the first punctuation, and then proceed to punctuate as usual in 10s increments.

However, if the app happened to shut down at t=109, and did not come back up again before t=111, the app will miss the punctuation at t=110 and will have to wait until t=120 for the next punctuation.

Compatibility, Deprecation, and Migration Plan

The goal is to not enforce any changes upon the existing users of the schedule method, but instead expand the current schedule options with a new anchored schedule option. This can be achieved by using method overloading.

Test Plan

The plan is to test the new schedule option in the same way that the current schedule options are tested. The anchored wall-clock punctuation is a new feature, and the feature should therefore not affect any current features or users. 

Rejected Alternatives

Cron job

Creating a new schedule method that takes in a cron job expression as a parameter:

Cron job punctuation
package org.apache.kafka.streams.processor.api;

public interface ProcessingContext {

	// New method allowing for anchored punctuation using cron expressions
	Cancellable schedule(final String cronExpression, final Punctuator callback);

	// Existing method
 	Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback);
}

The usage of cron expressions would require the inclusion of a new dependency, such as Quartz. Generally, we wish to avoid bringing in new dependencies if it is avoidable. Also, it is wise to start simple when implementing a new future, and then build up the feature incrementally. The value-add of supporting cron expressions is also not clear, as schedules can be precisely configured by "only" using a start time and an interval.

  • No labels