Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Streaming and Batch users have different interests in probing a job. While streaming users mainly care about the instant status of a running job (tps, delay, backpressure, etc.), batch users care more about the overall job status during the entire execution (queueing / execution time, total data amount, etc.).
As Flink grows into a unified streaming & batch processor and is adopted by more and more batch users, the experiences in inspecting completed jobs has become more important than ever.
We compared Flink with other popular batch processors, and spotted several potential improvements. Most of these changes involves WebUI & REST API changes, which should be discussed and voted on as FLIPs. However, creating separated FLIPs for each of the improvement might be overkill, because changes needed by each improvement are quite small. Thus, we include all these potential improvements in this one FLIP.
Public Interfaces
This FLIP contains WebUI and REST API changes. See Proposed Changes for details.
Proposed Changes
We propose the following improvements.
Metrics
Add time breakdown metrics
Support metric aggregation
Add environmental information
Support viewing logs in history server
Some of the changes applies to both jobmanager and history server, while others only affect history server. We'll explain in detail in the following sections.
In addition to the proposed improvements, we discuss how to identify a JobManager / TaskManager in the context of history server in the last subsection.
Metrics
We propose to add the following metrics, to help users understand where the time is spent.
Duration that a task stays in each status
Status included are: INITIALIZING, CREATED, SCHEDULED, RUNNING, DEPLOYING
Status excluded (duration is less interested) are: FINISHED, CANCELING, CANCELED, RECONCILING
Accumulated time that a running task is busy / idle / back-pressured
Moreover, we propose to support aggregating task metrics.
Supported scopes: JobVertex, TaskManager
Supported aggregations: min, max, avg, mid, p25, p75
This change applies to both jobmanager and history server.
WebUI Preview
Split the per-subtask metrics and aggregated metrics into tabs. Add a table of aggregated metrics including the following content:
aggregations: min, max, avg, sum, median, p25, p75, p95
metrics
status durations: CREATED, SCHEDULED, INITIALIZING, DEPLOYING, RUNNING
read/write records
read/write bytes
accumulated backpressured/idle/busy time
Extend the original subtasks table by adding extra columns:
"Accumulated Time"
"Status Durations"
Under the TaskManagers tab, each taskManager row supports an action of viewing the "Aggregated Metrics". The modal's content table is identical to that under the subtask tab.
REST API Changes
Accumulated busy / idle / back-pressured time
We propose to introduce three new fields - "accumulated-idle-time", "accumulated-busy-time", and "accumulated-backpressured-time" - to IOMetricsInfo which represent the accumulated busy / idle / back-pressured time of a running task respectively.
The newly introduced fields will be added to the response of the following REST APIs:
/jobs/:jobid
/jobs/:jobid/vertices/:vertexid/taskmanagers
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex
/jobs/:jobid/vertices/:vertexid
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt
For example, the metrics field in the response of /jobs/:jobid will be changed to:
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 655360,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 85087,
"write-records-complete": true,
"accumulated-backpressured-time": 2340,
"accumulated-idle-time": 120,
"accumulated-busy-time": 100.0
}
Status duration of tasks
We propose to introduce a new field "status-duration" which represents the duration that a task stays in each status to the following REST APIs:
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex
/jobs/:jobid/vertices/:vertexid
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt
An example of the "status-duration":
"status-duration": {
"CREATED": 12,
"SCHEDULED": 340,
"INITIALIZING": 1203,
"DEPLOYING": 3210,
"RUNNING": 304930
}
Aggregating task metrics
We propose to introduce the aggregating task metrics with key "aggregated" to the following REST APIs:
As shown below, the "aggregated" field will contain the aforementioned aggregations of tasks' status duration and its accumulated busy / idle / back-pressured time:
"aggregated": {
"metrics": {
"read-bytes": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"write-bytes": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"read-records": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"write-records": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"accumulated-backpressured-time": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"accumulated-idle-time": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"accumulated-busy-time": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
}
},
"status-duration": {
"CREATED": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"SCHEDULED": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"INITIALIZING": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"DEPLOYING": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
},
"RUNNING": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"median": 0,
"p25": 0,
"p75": 0,
"p95": 0
}
}
}
Environmental Information
In addition to the flink configurations, we propose to also display the environment variables and JVM arguments and include them in the job archive. These currently can be found in logs, which is not always available after the job finishes (due to log rotation, or from history server).
This change applies to both jobmanager and history server.
WebUI Preview
Changes to history server:
Add a "Cluster Configuration" tab, and the tables are scrollable for better readability.
Original tab name "Configuration" is renamed as "Job Configuration" to avoid confusion.
The Job Manager Configuration page is now identical to this UI below under Runtime Web UI.
REST API Changes
We propose to introduce REST APIs for environmental information at both the cluster level and the job level. Besides the existing /jobmanager/config, we introduce 3 REST APIs:
- /jobmanager/environment
- /jobs/:jobid/jobmanager/environment
- /jobs/:jobid/jobmanager/config
Both the cluster level and the job level environment (config) API will return the same result at the moment. In the future, they may return the different things. The response of /jobs/:jobid/jobmanager/config will be same as /jobmanager/config, while the response of /jobmanager/environment and /jobs/:jobid/jobmanager/environment is shown below:
"environment": {
{
{"key": "FLINK_HOME", "value": "/foo/bar/flink"},
{"key": "JAVA_HOME", "value": "/foo/bar/java"},
...
},
"jvm": {
"version": "OpenJDK 64-Bit Server VM 1.8.0_252-b09",
"arch": "amd64",
"options": [
"-Xmx1073741824",
"-Xms1073741824",
...
]
},
"classpaths": [
"/foo/bar/dependencies",
...
]
}
Logs
Currently, Flink history server does not support viewing logs. This is mostly because logs are usually too large to be included as part of the job archives.
We believe most users who use Flink in production have their own mechanisms for collecting logs from terminated Flink instances, which is usually highly depended on the company internal infrastructures. Therefore, instead of building another log collecting mechanism and expecting users to migrate onto it, we decide to provide a method for users to easily integrate their existing log viewing service into the history server.
We propose to introduce a configuration option "historyserver.log.[jobmanager|taskmanager].url-pattern" that allows users to define a URL pattern that points to an external log viewing service. Flink history server will replace special placeholders (<jobid>, <tmid>) in this pattern with proper values to generate actual URLs.
For the JobManager, this API will return the log URL of the existing JobManager and TaskManager.
WebUI Preview
Job status information is rearranged into a better structure, with an additional "actions" section which contains a navigation button to the job manager log.
Under history server, that redirects to an external job manager log url.
Job vertex's subtask and taskManager in the History Server now support navigating to external TM log. The action button is disabled when the http request is loading or failed.
As for Runtime Web UI, the action remains unchanged where it redirect to taskManager log page.
REST API Changes
We propose to introduce "/jobs/:jobid/jobmanager/log-url" and "/jobs/:jobid/taskmanagers/:taskmanagerid/log-url", which will return the generated url of the log of the given job and jobmanager / taskmanager. The response is shown below:
{
"url": "http://localhost:8081/task-manager/taskmanager-1/logs"
}
Compatibility, Deprecation, and Migration Plan
Changes proposed in this FLIP are backward compatible. There's no deprecation, and no migration needed.
Test Plan
Backend changes will be covered by unit tests.
Frontend changes will be manually verified.
Rejected Alternatives
We have rejected two alternatives for identifying JM / TM in history server REST API.
- Using a cluster-id instead of job-id in the URLs. This is rejected because of the complexity on the user side having to understand the additional concept. Instead, the mapping from job-id to the specific cluster / JM / TM can be handled by the backend.
- Using different URLs for the same API. To be specific, prefixing the JM / TM related APIs with "/jobs/:jobid/" for history server. This is rejected because it breaks the simplicity that history server rest apis are a subset of jobmanager rest apis. Moreover, the current approach is more flexible if in future we want to return different things for job-wise / cluster-wise JM / TM information.