Status

Current state: "Under Discussion"

Discussion thread: -

JIRA:

Released: -

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With the introduction of the metric system it is now time to make it easily accessible to users. As the WebInterface is the first stop for users for any details about Flink, it seems appropriate to expose the gathered metrics there as well.

Public Interfaces

There will be no changes to public interfaces.

Proposed Changes

The changes can be roughly broken down into 4 steps:

  1.     Create a data-structure on the Job-/TaskManager containing a metrics snapshot
  2.     Transfer this snapshot to the WebInterface backend
  3.     Store the snapshot in the WebRuntimeMonitor in an easily accessible way
  4.     Expose the stored metrics to the WebInterface via REST API

Note that this FLIP does not include the display of metrics.

Creating the snapshot on the Job-/TaskManager

The MetricRegistry will contain a MetricDumper, which act similarly as an unscheduled reporter.
The Dumper creates and returns a Key-Value representation of the entire metric space when queried by the Manager.

The keys represent the name of the metric; formatted according to the following scope format strings:

metrics.scope.jm0.<name>
metrics.scope.tm1.<tm_id>.<name>
metrics.scope.jm.job2.<job_id>.<name>
metrics.scope.tm.job2.<job_id>.<name>
metrics.scope.tm.task3.<job_id>.<task_id>.<subtask_index>.<name>
metrics.scope.tm.operator4.<job_id>.<task_id>.<subtask_index>.<operator_name>.<name>

The initial number serves as a category for the WebInterface, and allows for faster handling as we don't have to parse the entire string before deciding what category it belongs to.
  0 = JobManager
  1 = TaskManager
  2 = Job
  3 = Task
  4 = Operator

For this to work we need to be able to use a different format than the one configured in the configuration, and also cache the resulting strings.
For now we can hard-code a separate scopeString field in the AbstractMetricGroup; a more general solution would be to allow separate ScopeFormat configurations for each reporter, which is a natural follow-up to .

The Value is the value returned by the metric, or a method of the given metric (as Histograms expose multiple methods).

Whether the value is stringified is TBD. Using strings would solve the serialization problem for Gauge metrics, but will require the generation of many short-lived objects on the JM/TM and additional parsing if we want to aggregate metrics in the WebInterface.

The Key-Value pairs can be stored in simple list-like data structure.

Transfer to the WebInterface

With the "upcoming" separation of JobManager and WebInterface the transfer of TaskManager metrics should not be done completely through the JobManager, thus the TaskManagers should communicate directly with the WebInterface.

As there aren't any details as to how the separation will work, specifically whether a TaskManager -> WebInterface heartbeat will exist, i will assume that there is no message that we can piggyback on.

As such the WebInterface will regularly query the JobManager for all available TaskManagers, and then query each of them for a metric dump.

This will be done in a separate Thread inside the WebRuntimeMonitor, which also has the responsiblity to merge the returned dumps.

The merged dump is kept in a central location inside the WebRuntimeMonitor, available to different handlers.

Storage in the WebRuntimeMonitor

My (rough) proposal for a datastructure is the following:

MetricStore {
	void addMetric(String name, Object value);

	JobManager jobManager;

	class JobManager {
		Map<String, Object> metrics;
	}

	Map<String, TaskManager> taskmanagers;

	class TaskManager {
		Map<String, Object> metrics;
	}

	Map<String, Job> jobs;

	class Job {
		Map<String, Object> metrics;
		Map<String, Task> tasks;
	}

	class Task {
		Map<String, Object> metrics;
		Map<String, Subtask>;
	}

	class Subtask {
		Map<String, Object> metrics;
	}
}

 

Note that at any given time only one of these objects will exist.

 The WebInterface will execute a variety of queries regarding metrics of

The proposed data structure supports the these queries without requiring any joins of sub structures. For example, in order to access a task metric we do not have to iterate through structures representing the different task managers, which would be the case if we were to mirror the MetricGroup structure.

Access to the structure should be guarded by a lock, so that not every map must a concurrent one.

There aren't any plans for data retirement right now.

Access from the WebInterface

Several new Handlers will be added (one for each category) that will access the central MetricStore.

The REST call for a list of all available metrics of a task could look like this: /jobs/JOB_ID/vertices/TASK_ID/metrics

The REST call for specific metrics of a task could look like this: /jobs/JOB_ID/vertices/TASK_ID/metrics?get=elements-in,elements-out,bytes-per-second-out

Prototype

A working prototype that follows this FLIP can be found here.
    
It allows the WebInterface to display task/operator metrics. Credits to Piotr Godek who provided the code to display of task metrics in the WebInterface.

Compatibility, Deprecation, and Migration Plan

-

Test Plan

Everything can be tested with unit tests.

Rejected Alternatives

-