You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

JIRA

KAFKA-10199 - Getting issue details... STATUS KAFKA-10575 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

State restoration is a key procedure in Kafka Streams when processing tasks are migrated in a rebalance, as well for maintaining standby tasks for failure recoveries.

This proposal aims to expose more visibilities around this procedure to users, and is composed of two components: 1) augmenting the metrics related to restoration, 2) add new APIs for StateRestoreListener.

Public Interfaces

Below summarizes the public API changes in this KIP.

Restoration metrics

All the metrics below would be on the thread-level. Note that we will have separate thread handling restoration procedures, and hence their thread id would be different from stream threads.

Metric tags are:

  • type=stream-state-metrics
  • thread-id=[threadId]

Recording level is: INFO


Metric Name

Type

DescriptionNotes
restoring-active-tasks
countThe number of active tasks currently undergoing restoration
restoring-standby-tasks
countThe number of active tasks currently undergoing restoration
paused-active-tasks
countThe number of active tasks paused restoring
paused-standby-tasks
countThe number of standby tasks paused restoring
idle-ratio
gauge (percentage)The fraction of time the thread spent on being idleidle-ratio + restore-ratio + checkpoint-ratio should be 1
restore-ratio
gauge (percentage)The fraction of time the thread spent on restoring tasksidle-ratio + restore-ratio + checkpoint-ratio should be 1
checkpoint-ratio
gauge (percentage)The fraction of time the thread spent on checkpointing restored progressidle-ratio + restore-ratio + checkpoint-ratio should be 1
restore-records-total
countThe total number of records restored
restore-records-rate
rateThe average per-second number of records restored
restore-call-rate
rateThe average per-second number of restore calls triggered


Along with these new metrics, we would also deprecate the metrics below:

Metric Name

Type

DescriptionNotes
standby-process-ratio
gaugeTask-level; the fraction of time the processing thread spent on processing this standby taskRemoved since standby tasks are not processed by stream thread


New Method in StateRestoreListener

When an active task starts restoration, StateStoreListener#onRestoreStart would be triggered. The restoring task could end in two possible ways:

1) Restoration completes and the task could now be processed normally with incoming stream records. At this time StateStoreListener#onRestoreEnd would be triggered. 

2) Restoration was paused before completes, e.g. since another rebalance is triggered and this task is suspended and potentially migrated out of the current host later. At this time no callbacks would be triggered.

We propose to cover the second case above with a new API, so that each onRestoreStart function would be paired with either an onRestoreEnd function or an onRestorePaused function. Note that if the suspended task was re-assigned back to the current host, another onRestoreStart would be triggered again.

public interface StateRestoreListener {

    void onRestoreStart(final TopicPartition topicPartition,
                        final String storeName,
                        final long startingOffset,
                        final long endingOffset);

    void onRestoreEnd(final TopicPartition topicPartition,
                      final String storeName,
                      final long totalRestored);

    ...

    /**
     * NEW FUNC. Method called when restoring the {@link StateStore} is paused due to the task being suspended from the host.
     *           If the task was resumed after suspension and restoration continues, another {@link onRestoreStart} would be called. 
     */
    default void onRestorePaused(final TopicPartition topicPartition,
                                 final String storeName,
                                 final long totalRestored) {
        // do nothing
    } 
}


Compatibility, Deprecation, and Migration Plan

  • The default implementation of the new onRestorePaused function would be a no-op, to maintain backward compatibilities.
  • Deprecated metric would still be exposed, and only be removed in the next major release.

Rejected Alternatives

None.

  • No labels