This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/1g1n2l8bc27zvw88ljw4l2gqzx6rg5zy
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

FLINK-38158 - Getting issue details... STATUS

Release2.2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When running a Flink job that reads data from multiple sources and writes to multiple sinks, it would be helpful to group metrics from each source/sink instance together, for example based on the underlying name of the source/sink table/topic.

Proposed Changes

This flip proposes to allow users to set custom metric variables to operators/transformations, that will be later passed on to the metrics and traces reporters.

For example, when setting:

fooSource =
        execEnv.fromSource(
                        kafkaSource,
                        getWatermarkStrategy(),
                        "KafkaSource-Foo")
                .addMetricVariable(
                        "table_name",
                        "Foo"");
barSource =
        execEnv.fromSource(
                        kafkaSource,
                        getWatermarkStrategy(),
                        "KafkaSource-Bar")
                .addMetricVariable(
                        "table_name",
                        "Bar"");

All metrics and traces produced by those sources would be additionally tagged with the table_name:Foo and table_name:Bar scopes. This would allow to easily filter those metrics/traces in external systems like NewRelic or DataDog.

Public Interfaces

@PublicEvolving
public SingleOutputStreamOperator<T> addMetricVariable(String key, String value) {
    this.transformation.addMetricVariable(key, value);
    return this;
}

addMetricVariable method will be added to the following transformation classes: Transformation, DataStreamSink, SingleOutputStreamOperator, and to DataStream.scala.

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users, there is no need for any migration

Test Plan

On top of automated tests, this feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again.

Rejected Alternatives

None