This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree

Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Current state: "Under Discussion" Released

Discussion thread: -


serverASF JIRA

Released: -Flink 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The MetricRegistry will contain a MetricDumperMetricQueryService, which acts like an unscheduled reporter.
The Dumper can be used by the Job-/TaskManager to create a service is a separate actor that creates and returns a Key-Value representation of the entire metric space when queried.

The keys represent the name of the metric; formatted according to the following scope format strings:


As there aren't any details as to how the separation will work, specifically whether a TaskManager -> WebInterface heartbeat will exist, i will assume that there is no message that we can piggyback on.

As such the WebInterface will regularly query The WebRuntimeMonitor will contain a MetricFetcher which queries the JobManager for all available TaskManagers, and then query each of them for a metric dump.

This will be done in a separate Thread inside the WebRuntimeMonitor, which also has the responsibility to merge the returned dumps.

The merged dump is Metrics are only fetched if they actually accessed via REST calls, with a minimum time period (10 seconds) between updates.

The fetched metrics are merged and kept in a central location inside the WebRuntimeMonitorMetricFetcher, available to different handlers.


MetricStore {
	void addMetric(String name, Object value);

	JobManagerMetricStore jobManager; jobMan

	class JobManagerMetricStore {
		Map<String, Object> metrics;

	Map<String, TaskManager>TaskManagerMetricStore> taskmanagers;

	class TaskManagerMetricStore {
		Map<String, Object> metrics;

	Map<String, Job>JobMetricStore> jobs;

	class JobMetricStore {
		Map<String, Object> metrics;
		Map<String, Task>TaskMetricStore > tasks;

	class TaskMetricStore {
		Map<String, Object> metrics;
		Map<String, Subtask>SubtaskMetricStore>;

	class SubtaskMetricStore  {
		Map<String, Object> metrics;


Access to the structure should be guarded by a lock, so that not every map must a concurrent one.There aren't any plans for data retirement right now.

Data Retirement

  • JobManager metrics are kept indefinitely, as they have a limited size
  • TaskManager metrics are kept as long as the given TaskManager is registered on the JobManager
  • Job and Task metrics are kept as long as they are running or archived. In other words, if the job is listed in the WebInterface as either running or completed, the metrics are still available.

Access from the WebInterface

Several new Handlers will be added (one for each category) that will access the central MetricStore.

The REST call calls for a the list of all available metrics of a task could look like this:

  • JobManager: "/


The REST call for specific metrics of a task could look like this: /jobs/JOB_ID/vertices/TASK_ID/metrics?get=elements-in,elements-out,bytes-per-second-out

  • jobmanager/metrics"
  • TaskManager: "/taskmanagers/:taskmanagerid/metrics"
  • Job: "/jobs/:jobid/metrics"
  • Task: "/jobs/:jobid/vertices/:vertexid/metrics"

This will return a JSON array containing the names of all available metrics.

The values for a list of metrics can be requested by appending "?get=[<metric_name1>[,<metric_nameX>]]"

This will return a JSON array containing "id":"values" pairs.


A working prototype that follows this FLIP can be found here.
It allows the WebInterface to display task/operator metrics. Credits to Piotr Godek who provided the code to display of task metrics in the WebInterface.


Everything can be tested with unit tests.

Rejected Alternatives