Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink state is getting more and more valuable and there is a natural need from the user side to execute ad-hoc queries to see what’s inside.

There is a State Processor API which provides state access at a certain level. The overall intention in the near future is to provide a more user friendly experience when state content is queried.


When somebody knows the UID of an operator then it's possible to query the values inside but this is not always the case.

When the Flink application is complex enough, maintained by many developers or the on-call team is different then it’s not always clear which one needs to look for.


Normally one can define an operator in a Flink application like this where UID and name can be assigned to it:


stream.map(new ValueProcessUDF())
       .uid(“value-process-uid”)
       .name(“value-process-name”)


In case of checkpoint the state metadata file now contains only the hashed value of an operator UID, for example:

  • Operator UID: value-process-uid
  • Operator Hash: ca4f5fe9a637b656f09ea78b3e7a15b9

Important to understand that from the hash it’s not possible to get back the original UID.

Let’s say somebody has no idea what kind of operators are inside the state and the number of operators is relatively big.

With an internal class (SavepointLoader) such list of operator hashes can be reached but since only the hashed UID is available, one must manually collect the UIDs from the original application (this step requires already quite some time),

then convert all to hashes and try to match it with the metadata file. This process is quite inconvenient especially when the code is scattered in multiple repositories.


The solution can be to store operator UID and name inside the state metadata file. The gain is obvious, one can see immediately the human readable intention of that operator which can spare quite some time.

Later on the metadata information where the list of operators can be checked is intended to be exposed but it's not a target here.

Public Interfaces

There are no new or changing public interfaces.

Proposed Changes

Add operator UID and name to the state metadata file next to the hashed UID.

The mentioned 2 fields are intended to be either empty or filled with the user provided values.

Important to say that there is no intention to touch any existing fields inside the metadata file, just add 2 new.

Compatibility, Deprecation, and Migration Plan

Migration will be automatic once the code is added. There are the following cases which is intended to be covered:

  • Old state files are read: The old deserializer going to read the data and leave the mentioned 2 new fields as empty string
  • New checkpoint going to be written out: The new serializer going to write out the mentioned 2 new string fields, which may come from user code
  • New state files are read: The new deserializer going to read the data and going to read the mentioned 2 new string fields

The metadata file contains a version field and based on this field the proper deserializer can be chosen.

Test Plan

It’s planned to implement automated tests.

Rejected Alternatives

None.