Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-386 is building on top of FLIP-384. The intention here is to add a capability for state backends to attach custom attributes during recovery to recovery spans. For example RocksDBIncrementalRestoreOperation could report both remote download time and time to actually clip/ingest the RocksDB instances after rescaling.
Proposed Changes
Recovery metrics are first collected for each subtask and then sent back to the JobManager for the aggregation to a single number per Job (from all of the subtasks), as described in FLIP-384. From all of those aggregated metrics, a single recovery span trace is created on the JobManager. Custom metrics will be aggregated the same way how the built-in metrics are aggregated in the FLIP-384 (using max and sum).
Public Interfaces
It will be possible to set the custom attributes via the following interfaces:
@Experimental public interface CustomInitializationMetrics { void addMetric(String name, long value); void addMetric(String name, double value); } // intendend usage: customInitializationMetrics.addMetric("DownloadTime", value)
The remaining issue is to pass CustomInitializationMetrics instance to for example RocksDBIncrementalRestoreOperation . In this FLIP I’m proposing to do it via adding the following interfaces to the org.apache.flink.runtime.state.StateBackend interface:
@PublicEvolving public interface StateBackend { (...) @PublicEvolving pubic interface KeyedStateBackendParameters<K> { // already existing parameters of createKeyedStateBackend() method Environment getEnv(); JobID getJobID(); String getOperatorIdentifier(); TypeSerializer<K> getKeySerializer(); int getNumberOfKeyGroups(); KeyGroupRange getKeyGroupRange(); TaskKvStateRegistry getKvStateRegistry(); TtlTimeProvider getTtlTimeProvider(); @Nonnull Collection<KeyedStateHandle> getStateHandles(); CloseableRegistry getCancelStreamRegistry(); double getManagedMemoryFraction(); // newly added parameter CustomInitializationMetrics getCustomInitializationMetrics(); } @PublicEvolving public interface OperatorStateBackendParameters { // already existing parameters of createOperatorStateBackend() method Environment getEnv(); String getOperatorIdentifier(); @Nonnull Collection<OperatorStateHandle> getStateHandles(); CloseableRegistry getCancelStreamRegistry(); // newly added parameter CustomInitializationMetrics getCustomInitializationMetrics(); } }
With the following new methods:
@PublicEvolving public interface StateBackend { (...) <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( KeyedStateBackendParameters<K> parameters); OperatorStateBackend createOperatorStateBackend( OperatorStateBackendParameters parameters); }
With all of those changes state backends, like incremental RocksDB, will be able to measure custom metrics during the restore operation and report them to be included in recovery span.
Compatibility, Deprecation, and Migration Plan
This FLIP is proposing to clean up the StateBackend by deprecating all of the other previously used versions of the StateBackend#createKeyedStateBackend and StateBackend#createOperatorStateBackend methods. The newly proposed versions from this FLIP will be more future proof by making adding new parameters much easier while at the same time not braking compatibility for the users.
Test Plan
Change will be covered by automatic tests and tested manually inside the Confluent.
Rejected Alternatives
None