Status

Discussion threadhttps://lists.apache.org/thread/tm4qqfb4fxr7bc6nq5mwty1fqz8sj39x
Vote threadhttps://lists.apache.org/thread/wbn0gtnskq6t2pmw5vtq15db69dq0949
JIRA

FLINK-33137 - Getting issue details... STATUS

Releaseflink-connectors-prometheus-1.0.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As a data processing framework, Apache Flink is a popular choice to process and generate metrics. For example, users might want to consume a stream of metrics, reduce the cardinality of the metrics space, and publish to a metrics database (e.g. Prometheus). 

At the moment, Flink provides MetricReporters that allow users to publish metrics to an external metrics store. This works well for metrics on the health of the Flink cluster and job, but does not scale to use cases where the metric is the desired output of the job (at least once/exactly once semantics, batching, parallelism scaling, job retry mechanism). 

As such, we propose to introduce a Prometheus sink. This sink will allow users to write metrics to a Prometheus cluster as part of their job graph, meaning first class data-processing support for writing metrics.

With this sink, MetricReporters should be used for publishing internal metrics that relate to the health and operations of the Flink cluster and job, and the Prometheus sink should be used for Flink jobs that process external metrics as data.

Public Interfaces

We will add a Prometheus sink via the Async Sink framework with support for batch/stream and table API. Source support is not planned, but can be implemented at a later date.

Proposed Changes

The new sink will be housed in a new connector repo, similar to ElasticSearch [1]. The repository name will be: flink-connector-prometheus

Below are the considerations for the new sink:

Supports batch and streaming

The new sink connector will be based on the Async Sink (FLIP-171), support both Bounded (Batch) and Unbounded (Streaming).

Supports DataStream API and Table API

The connector will support DataStream and Table API. 

Prometheus integration using remote-write API

The connector will use the Prometheus Remote-Write API v1.0 and comply with the specifications, also regarding write retry behaviour.

The connector input will be a stream of objects representing a TimeSeries.
To optimise write throughput to Prometheus, the connector will buffer the input and write in batches to the remote-write endpoint.

Restrictions on data input

Due to the strict ordering and label format requirements of Prometheus remote-write API, any malformed or out-of-order data written to Prometheus is rejected. 

  • Only one write-request (a batch of TimeSeries) will be in-flight at one time.
  • For efficiency, the connector will not verify if input is well formed and in-order; any malformed or out-of-order input will be rejected on write by Prometheus and discarded.
  • To allow the user to identify any data issue causing lost writes (rejected writes cannot be retried), the connector will explicitly log any rejected write with the reason provided by the remote-write endpoint; counters will also be provided for successful and rejected writes.
  • A KeySelector will be provided to partition the connector input retaining ordering (TimeSeries with same Labels will go into the same partition). This is necessary to allow writing with parallelism > 1 without causing out-of-order writes even when the sink input is in order.

Rate limiting 

Rate limiting, due to excessive writes to the destination, will be baked into the remote-write retry mechanism.
The sink will follow Prometheus Remote-Write retry specs. On http 429 the sink will retry the request with an exponential backoff.  Because the sink will have one in-flight write-request at one time (per parallelism), retrying will cause buffering and generate backpressure upstreams, without overwhelming the destination Prometheus.


Request signing

The connector will support configurable request signing.
The initial version will include support for 1/ unsigned request (default), 2/ Amazon Managed Prometheus request signing, and 3/ an interface to implement custom request signing.

Packaging

To not pollute the Prometheus connector with AWS-specific dependencies, the connector will contain a generic interface of request signer. The Amazon Managed Prometheus request signed implementation will be distributed as separate dependency.


Example usage

A sample using the connector is shown below:

Properties requestSignerProperties = new Properties();
// Supports write request signing, e.g. for Amazon Managed Prometheus
// Static credentials are for the sake of this example only.
// The AmazonPrometheusRequestSigner will support different credential providers
requestSignerProperties.setProperty(AWS_ACCESS_KEY_ID, ...);
requestSignerProperties.setProperty(AWS_SECRET_ACCESS_KEY, ...);
requestSignerProperties.setProperty(AWS_REGION, ...);

stream.sinkTo(PrometheusSink.builder()
.setPrometheusRemoteWriteUrl(endpointUrl)
.setRequestSigner(new AmazonPrometheusRequestSigner(requestSignerProperties)
.build());

Versioning Strategy

The flink-connector-prometheus version will be independent of Flink. We will follow the same versioning strategy as Flink in terms of feature freeze windows, release candidates and branching/tagging. We will publish a Flink support matrix in the connector README and also update Flink documentation to reference supported connectors. The initial release of flink-connector-prometheus will target 1.0.0 and support Flink 1.15.x, 1.16.x and 1.17.x.

The flink-prometheus-aws-request-signer package, containing the Amazon Managed Prometheus request signer implementation, will follow an independent numbering, with major.minor version to match the connector major.minor version with compatible signer API.

Compatibility, Deprecation, and Migration Plan

The connectors are compatible with Prometheus. With respect to Flink, this is a new feature, no compatibility, deprecation, and migration plan is expected.

Test Plan

We will add the following tests:

  • Unit test
  • Integration tests that perform end to end tests against a Prometheus localstack container
  • End to end tests that hit Amazon Managed Prometheus service. These tests will be enabled when credentials are defined.

Rejected Alternatives

None

  • No labels