Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the numRecordsIn & numBytesIn metrics for sources and the numRecordsOut & numBytesOut metrics for sinks are always 0 on the Flink web dashboard. It is especially confusing for simple ETL jobs where there's a single chained operator with 0 input rate and 0 output rate. For years, Flink newbies have been asking "Why my job has zero consumption rate and zero production rate, it it actually working?" here and there (see FLINK-7286 for example).

FLINK-11576 brings us these metrics on the opeartor level, but it does not integrate them on the task level. On the other hand, the summay metrics on the job overview page is based on the task level I/O metrics. As a result, even though new connectors supporting FLIP-33 metrics will report operator-level I/O metrics, we still cannot see the metrics on dashboard.

This FLIP attempts to integrate the operator level I/O metrics of sources/sinks with the task level I/O metrics, so that users can see the source/sink input/output metrics on dashboard.

Public Interfaces

The semantics of task-level I/O metrics is slightly changed:

  • Before: task-level I/O metrics will not take I/O with external source/sink systems into account.
  • After: task-level I/O metrics will take I/O with external source/sink systems into account, as exposed by the source/sink operator-level I/O metrics defined in FLIP-33.

Proposed Changes

Since all sources/sinks that implement the new Source/Sink APIs comes down to SourceOperator/SinkWriterOperator on the operator level, it suffices to reuse input/output metrics of SourceOperator/SinkWriterOperator for task. See https://github.com/apache/flink/pull/23998 for a reference impl.

Further, the metric reuse won't cause duplication on:

  • The I/O bytes metrics: Flink only accounts for internal traffic for input/output bytes metrics before.
  • The sink output records metrics: the output records metric is already intentionally dropped for SinkWriterOperator in OperatorChain#getOperatorRecordsOutCounter, because the metric "numRecordsOut" is defined as the total number of records written to the external system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the number of records sent to downstream operators, which is number of Committable batches sent to SinkCommitter.
  • The source input records metrics: no input records metric is collected for SourceOperatorStreamTask.

Compatibility, Deprecation, and Migration Plan

This feature requires connectors which properly implement the FLIP-33 connector metric standard to function, otherwise it falls back to the what we have before: 0 input stats for sources and 0 output stats for sinks.

There's no deprecation and migration involved.

Test Plan

The feature will be covered with integrated tests.

Rejected Alternatives

None.