Status

Discussion Thread

https://lists.apache.org/thread/vsnhhoyxw8x2r8bwvdsyq4l1vxhry154

Vote Thread
JIRA

FLINK-39546 - Getting issue details... STATUS  

Target Release

FLINK - 2.4

Current State



Motivation

"It's 2 AM. The on-call engineer wakes up to a pager. A streaming job that has been quietly running for six months suddenly stretches its checkpoint duration from 30 seconds to 12 minutes. Backpressure climbs. Watermarks lag. The dashboards show "checkpoint duration up", "checkpoint size unchanged", and nothing else. Was it the network? RocksDB? S3? They reach for tcpdump. They reach for AWS CloudTrail. They reach, eventually, for guesswork."

This FLIP is about that moment.


Apache Flink's filesystem layer is one of the most heavily used components in production. Every checkpoint, every savepoint, every file sink, every state download passes through it. The most predominantly used filesystem is S3 FS. And yet, when something goes wrong inside that layer, Flink tells the operator almost nothing. The metric surface today is dominated by aggregate file IO counters and after-the-fact checkpoint durations. There is no observability for:

  • whether S3 is throttling our job (HTTP 503 SlowDown).

  • whether our retries are masking real problems.

  • Which S3 operation is slow (PUT? CompleteMultipartUpload? GET range read?)

  • whether our entropy prefix is actually spreading the load.

  • whether we are leaking incomplete multipart uploads (a real, recurring cost issue).

  • whether NativeS3InputStream silently reopening because of intermittent EOFs.

  • Which P99 latency moved when our checkpoints regressed?

Every one of these has been the root cause of a real production incident. Today, finding any of them requires correlating Flink logs with CloudTrail, capturing TCP packets, or most often guessing. Operators of production Flink+S3 systems consistently report this as one of their top operational pain points. It is the gap between "Flink works" and "Flink is operable."

Goal

  1. Make S3 traffic from flink-s3-fs-native directly observable through Flink's existing metric system, with no additional dashboards or alerting pipelines.

  2. Surface a small, high-signal set of metrics that answers the operational questions above.

  3. Do this in a way that other filesystem plugins can adopt with the same plumbing so that other filesystems can also use it.

  4. Avoid two recurring failure modes in observability work: cardinality explosion and metric overhead that erodes the very performance we are trying to measure.

Non-Goal

  1. Replacing AWS CloudWatch or OpenTelemetry-based pipelines for users who already operate them.

  2. Rewriting the AWS SDK's instrumentation. We will reuse its built-in MetricPublisher mechanism.

  3. Per-checkpoint-correlated metrics ("which PUT belonged to checkpoint 42?"). Possible, but deferred to a follow-up FLIP — see the Open Questions section.

  4. flink-s3-fs-presto or flink-s3-fs-hadoop is out of scope. But the API change is designed to allow them to adopt it cleanly.

What's Required for Debugging (the working set of questions)

The metrics shipped by this FLIP must answer the following, in production, in real time:

Operational question

Metric this FLIP introduces

Are we being throttled by S3 right now?

flink_fs_s3n_throttle_count

Which operation is slow?

flink_fs_s3n_api_call_duration_ms{op} (P50/P95/P99)

Is our retry rate masking errors?

flink_fs_s3n_retry_count{reason}

Are we leaking incomplete MPUs?

flink_fs_s3n_mpu_aborted_total and mpu_in_flight

Is NativeS3InputStream Reopening a lot?

flink_fs_s3n_stream_reopen_count{reason}

Is our connection pool saturated?

flink_fs_s3n_connection_acquire_duration_ms

What's our actual S3 throughput?

flink_fs_s3n_bytes_{read,written}

Is the entropy prefix spreading load?

flink_fs_s3n_entropy_bucket_count{prefix_hash} (opt-in)

We deliberately do not attempt to answer everything possible. The chosen set is the smallest one that turns "the dashboard is not useful" into "the dashboard is enough."

Approaches considered

There is more than one defensible place to put this work. The decision matters because the code's home dictates its lifecycle, distribution, and the user experience for an operator who picks up the plugin tomorrow morning.

Approach A: Outside Flink, via AWS SDK v2 → CloudWatch (or OpenTelemetry)

AWS SDK v2 already ships a MetricPublisher SPI that emits structured metrics for every API call: operation name, duration, retries, HTTP status, and throughput. There are off-the-shelf publishers for CloudWatch and for OpenTelemetry exporters. A user can wire S3AsyncClient.builder().overrideConfiguration(o -> o.addMetricPublisher(...)) and start collecting today, no Flink change needed.

Pros

Cons

Zero new code in Flink.

Operators run two parallel observability systems (Flink Prometheus + AWS CloudWatch).

Captures retries the application can't see.

No correlation with Flink concepts (jobID, checkpointID, subtask).

Vendor-neutral via OTel.

Plugin classloader does not control which MetricPublisher user wire for the out-of-the-box experience varies.

Maintained by AWS / OTel.

Adds operational complexity for users who don't already run a separate AWS pipeline.

This is a fine option for sophisticated AWS-native shops. It is a poor option for the average Flink user who has a Flink reporter wired into Prometheus and wants the data to land there, alongside watermarks and backpressure.

Wrap every FSDataInputStream and FSDataOutputStream returned from FileSystem in an instrumented decorator that times, reads, writes and reports through a standard MetricGroup.

Pros

Cons

Covers every filesystem plugin uniformly.

Captures only the stream surface — bytes, latency. Loses every S3-specific signal.

No public API change.

Adds overhead to the per-record path for all file systems, even local.

Operators get something for HDFS, Azure, GCS, etc. for free.

Cannot see retries, throttling, multipart lifecycle — exactly the high-value bits we set out to expose.

This approach answers a different question than the one operators are asking. It is the right answer to "how much do I read and write through Flink's FS layer?" It is not the right answer to "Why is my S3 traffic slow?"

Instrument the plugin manually: count every s3Client.putObject(...) call, time it, classify exceptions, expose counters and histograms.

Pros

Cons

Full control over metric semantics.

Reinvents what AWS SDK already publishes natively.

Plugin-local; no public-API change.

Misses retries done inside the SDK that never surface to our code.

Easy first version.

Code drift over time as SDK adds new operations.

AWS SDK v2 makes most of it obsolete: the SDK already emits the right things, in the right shape, at the right granularity. We should consume that signal, not duplicate it.

[RECOMMENDED] Approach D: Bridge AWS SDK v2 MetricPublisher into Flink MetricGroup, with plugin-specific extensions on top

A two-layer design:

  1. Tier 1 — Bridge. Implement an AwsSdkMetricBridge (a MetricPublisher) inside flink-s3-fs-native that consumes the SDK's MetricCollection and emits Flink Counter / Histogram / Gauge against a plugin-scoped MetricGroup.

  2. Tier 2 — Flink-specific events. Instrument the things AWS SDK cannot see, because they are above the SDK layer: NativeS3InputStream reopens, the recoverable writer's multipart lifecycle, and (optionally) the entropy-prefix bucket distribution.

Pros

Cons

Reuses the SDK's existing instrumentation; no duplicated logic.

Requires a small additive change to FileSystemFactory so plugins receive a MetricGroup at init time.

Metrics land in Flink's existing stack — same Grafana, same alerts.

Couples to AWS SDK v2 metric keys (mitigated by an internal abstraction).

Plugin classloader keeps Flink's metric interfaces parent-loaded; isolation is preserved.

Slightly more design surface than (C).

This is the approach this FLIP proposes.

Recommended design WalkThrough

We will introduce three things:

  1. A small, additive change to the public FileSystemFactory interface so plugins can receive a MetricGroup at init time.

  2. An AwsSdkMetricBridge inside flink-s3-fs-native that registers as the SDK's MetricPublisher and translates SDK metric records into Flink metrics.

  3. A focused, cardinality-disciplined metric taxonomy.

Public interface change

Today, It FileSystemFactory#configure(Configuration) is the only entry point a plugin has to learn about its environment. We add an additive default method:

public interface FileSystemFactory extends Plugin {

    String getScheme();

    FileSystem create(URI fsUri) throws IOException;

    void configure(Configuration config);

    /**
     * Configures the factory with both Flink configuration and a metric group scoped to
     * this filesystem plugin. The default implementation delegates to {@link #configure(Configuration)}
     * for backward compatibility; plugins that emit metrics override this.
     *
     * @param config         the Flink configuration
     * @param pluginMetrics  a MetricGroup scoped to this plugin (per cluster component);
     *                       never {@code null}, though it may be a {@link UnregisteredMetricsGroup}
     *                       in environments where metrics are disabled.
     */
    default void configure(Configuration config, MetricGroup pluginMetrics) {
        configure(config);
    }
}
The default implementation preserves existing plugin behaviour byte-for-byte. Plugins that want metrics override the new method. There is no hard break.

Where the MetricGroup comes from

FileSystem.initialize(Configuration, PluginManager) is called once per JVM, by TaskExecutor (TaskManagers) and by ClusterEntrypoint / JobManagerRunner (JobManager). Both of these already own a process-level MetricGroup (taskmanager and jobmanager respectively). We extend FileSystem.initialize with an optional MetricGroup parameter:

public static void initialize(
        Configuration config,
        @Nullable PluginManager pluginManager,
        @Nullable MetricGroup processMetricGroup) throws IOException;
The existing two-argument overload remains and supplies a no-op UnregisteredMetricsGroup for environments that don't pass one (mini cluster tests, embedded scenarios). Each FileSystemFactory then receives a child group, e.g.:
taskmanager.filesystem.s3
jobmanager.filesystem.s3

The plugin builds further sub-groups beneath that for operation-level breakdown.

The bridge

Inside flink-s3-fs-native, We register an AwsSdkMetricBridge as a MetricPublisher on every S3AsyncClient The factory creates:

final S3AsyncClient client = S3AsyncClient.builder()
        .overrideConfiguration(o -> o.addMetricPublisher(metricBridge))
        ...
        .build();

The bridge consumes MetricCollection records emitted by the SDK after every API call. It pulls a small, fixed set of fields:

SDK metric

Flink metric

Notes

CoreMetric.OPERATION_NAME

label op

bounded set (~15 values)

CoreMetric.API_CALL_DURATION

api_call_duration_ms{op} (Histogram)


CoreMetric.API_CALL_SUCCESSFUL

partitions api_call_count{op,status_class}


HttpMetric.HTTP_STATUS_CODE

classified into {2xx, 4xx, 5xx, throttled}

classify, do not expose raw code

CoreMetric.RETRY_COUNT

retry_count{op,reason}

reason from a closed enum

HttpMetric.MAX_RETRY_EXCEEDED

feeds max_retries_exceeded_total


CoreMetric.SERVICE_ID (= "S3")

filtering only

not exposed as label

The bridge holds bounded Counter and Histogram instances (one per operation, prepared up front), so the hot path is a bounded Map lookup followed by a counter increment. There is no allocation per metric record.

Plugin-specific extensions (Tier 2)

Two things matter to operators that the SDK cannot see, because they live one layer above the SDK:

  1. NativeS3InputStream reopens. When the input stream encounters EOF or a transient IOException mid-read, it reopens the underlying GET with an updated range. This event is invisible to the SDK (the SDK sees the new GET as a fresh call). We add flink_fs_s3n_stream_reopen_count{reason} and flink_fs_s3n_stream_reopen_latency_ms.

  2. Recoverable writer / multipart lifecycle. Each call site of RecoverableWriter.open / persist / closeForCommit / commit / commitAfterRecovery / discard is a state transition: in-progress → pending → committed (or aborted). We track these as Counters. The most operationally important one is mpu_aborted_total: aborted MPUs accumulate parts in the bucket and incur cost until the bucket lifecycle policy reaps them.

The metric surface (v1)

Name

Type

Labels

What it answers

flink_fs_s3n_api_call_count

Counter

{op, status_class}

Baseline traffic + success/failure split

flink_fs_s3n_api_call_duration_ms

Histogram

{op}

P50/P95/P99 latency, per-operation

flink_fs_s3n_retry_count

Counter

{op, reason}

Retry storms attribution

flink_fs_s3n_throttle_count

Counter

{op}

"Are we throttled?" — the most-asked question

flink_fs_s3n_max_retries_exceeded_total

Counter

{op}

Final failures after retry exhaustion

flink_fs_s3n_bytes_read

Counter

Bandwidth

flink_fs_s3n_bytes_written

Counter

Bandwidth

flink_fs_s3n_mpu_initiated_total

Counter

MPU lifecycle

flink_fs_s3n_mpu_completed_total

Counter


flink_fs_s3n_mpu_aborted_total

Counter

Cost-impacting leaks

flink_fs_s3n_mpu_in_flight

Gauge

Capacity

flink_fs_s3n_mpu_complete_duration_ms

Histogram

Slowest typical operation

flink_fs_s3n_stream_reopen_count

Counter

{reason}

Detect flaky reads

flink_fs_s3n_active_streams

Gauge

{kind=in/out}

Pool sizing

flink_fs_s3n_connection_acquire_duration_ms

Histogram

Detect pool saturation

flink_fs_s3n_entropy_bucket_count

Counter

{prefix_hash}

Verify entropy works (opt-in)

Cardinality discipline (a hard constraint)

This is where well-intended observability features turn into outages. The rules are simple:

  • Never label with a bucket name unless an operator opts in via s3.metrics.detailed.enabled.

  • Never label with key, prefix, or object name. Period.

  • Never label with full HTTP status; use a four-value classifier.

  • Always label with operation name (closed set, ≤ 15 distinct values for S3).

  • Retry reason is a closed enum: {throttled, timeout, conn_reset, 5xx, other}.

  • The opt-in prefix_hash for entropy verification uses a fixed-size hash modulo a small constant (default 16) so that no matter how many entropy values are generated, the cardinality is bounded.

Configuration

We resist the temptation to add many flags. The configuration surface is:

s3.metrics.enabled: true                # default true
s3.metrics.detailed.enabled: false      # opt-in: bucket label, request-size hist
s3.metrics.histogram.window-size: 1024  # reservoir size; bounds memory
s3.metrics.entropy-bucket.cardinality: 16  # hash modulo for entropy verification
s3.metrics.enabled: true ships out of the box because the v1 surface is cheap. Detailed mode is opt-in because it widens cardinality.

Failure scenarios

SL.NO

Scenario

Effect

Mitigation

1

The metric backend is down.

MetricGroup calls become no-ops or fail.

Flink's reporter framework already isolates reporters; an exception in a reporter does not propagate to the metric source. The bridge wraps its publish path in a try-catch as a defence in depth. IO is never affected.

2

The AWS SDK upgrades and renames metric keys.

Some metrics stop being populated.

The bridge reads CoreMetric.X constants by class reference, so a renamed constant is a compile-time break, not a silent regression. CI integration test runs against the SDK version we ship; an unexpected silence in the test fails the build.

3

Cardinality blowup despite our rules.

Memory and time-series count grow unbounded.

Closed-set operation enum; status code is classified, not raw; entropy hash modulo. Cardinality test in CI asserts an upper bound.

4

High-QPS workload (10k+ ops/sec) overwhelms histograms.

Reservoir sampling skews; CPU goes up.

Reservoir size is fixed at 1024 by default; the update(long) call is O(1) using Flink's existing histogram implementations. Internal benchmark target: ≤ 1 % CPU overhead at 10k ops/sec on a single TM.

5

Plugin classloader can't see MetricGroup.

NoClassDefFoundError at plugin init.

org.apache.flink.metrics is already on the parent allowlist. ArchUnit rule guards against reaching beyond that.

6

Two filesystem instances created in the same component.

Metric registration conflict on duplicate names.

Plugin scopes metrics under <component>.filesystem.<scheme>.<authority_or_default>. FileSystem.get(URI) already deduplicates per (scheme, authority), so this is naturally bounded.

7

Operator sets s3.metrics.detailed.enabled=true on a job that touches thousands of buckets.

Cardinality blowup (opt-in).

Documentation flags this loudly. Detailed mode is off by default.

8

The bridge holds references that prevent classloader unload.

On dynamic plugin reload (rare), classloader leak.

FileSystem.initialize clears the FS cache and unregisters metric groups before reloading. We add a corresponding close() on the bridge.

Performance considerations

SDK publishing path. AWS SDK v2 calls MetricPublisher#publish asynchronously, after the response has completed, on the SDK's internal completion executor. The bridge does roughly:

- 1 enum lookup to map operation name → Counter
- 1 enum lookup to classify status code
- 1 to 3 counter increments (api_call_count, retry_count if any, throttle_count if any)
- 1 histogram update (api_call_duration_ms)

In a microbenchmark on a typical x86 host, this is on the order of a few hundred nanoseconds per API call. At 10,000 API calls per second on a single TaskManager, that is well under 1 % of one CPU. AWS SDK's own publishing overhead dominates ours.

Stream-level instrumentation. The reopen counter on NativeS3InputStream and the lifecycle counters on the recoverable writer are single counter increments at events that occur on the order of seconds, not milliseconds. The cost is immeasurable.

Memory. Sixteen metrics, ~50 time series, histogram reservoirs of 1024 longs each → a few hundred kilobytes per TaskManager. This is a rounding error against any real Flink JVM.

What we explicitly avoided. We did not put metrics on every read() or write() byte; we did not allocate a wrapper object per IO; we did not introduce a new background thread.

Gains and losses

Gains.

  • Operators can answer the working set of S3 questions in real time.

  • Same observability stack as the rest of Flink — one Grafana, one alerting pipeline.

  • Lays the foundation for the rest of the filesystem plugin family (Hadoop S3, Azure, GCS, OSS) to adopt the same plumbing.

  • Closes a recurring gap that today is filled by guesswork, CloudTrail, and tcpdump.

Losses/costs.

  • One additive public-API change to FileSystemFactory to maintain.

  • A small amount of code coupling to AWS SDK v2 metric key names. Mitigated by an internal abstraction.

  • Marginally larger plugin JAR (~tens of kilobytes).

Net. The gains are operational. The losses are surface-level engineering costs. The gains far exceed the losses.

Compatibility, deprecation, migration

Source/binary compatibility. Additive default method on FileSystemFactory. Existing plugins continue to compile and run.

Behavioural compatibility. No metric is emitted by any existing plugin until that plugin opts in. An operator who upgrades Flink with no other change sees the new flink-s3-fs-native metrics appear automatically if they are using that plugin. No other plugin's behaviour changes.

Deprecation. None. The single-argument configure(Configuration) remains.

Migration plan. There is nothing to migrate from. Operators upgrading get the metrics automatically. Operators on flink-s3-fs-hadoop or -presto see no change until those plugins adopt the same hook (out of scope here).

Test plan

  1. Unit tests. The bridge's translation logic from MetricCollection to Flink counters is pure; we test it with synthetic MetricCollection instances covering each operation type, status class, and retry path.
  2. Cardinality bounds. A focused unit test asserts that exercising the bridge with 10,000 distinct keys, buckets, and statuses produces no more than the documented number of time series.
  3. Integration test against MinIO. A flink-fs-tests-style integration test runs a small streaming job with checkpointing onto MinIO, then asserts that the expected metrics are present with non-zero values. This also catches AWS SDK upgrade regressions because the SDK's metric keys are validated end-to-end.
  4. Failure injection. A second integration test runs against MinIO with a fault-injection proxy that returns 503 SlowDown on a configurable fraction of requests; the test asserts that throttle_count and retry_count{reason=throttled} increment, and that the IO succeeds.
  5. Performance test. A benchmark in flink-benchmarks measures the per-API-call overhead of the bridge. Acceptance bar: ≤ 1 % CPU overhead at 10k ops/sec.


Open questions for community discussion

  1. Should s3.metrics.enabled default to true or false?

    The v1 surface is cheap. Defaulting to true gives every user the value automatically. Defaulting to false is more conservative but trades operational benefit for a config knob nobody will know to flip. My preference is to set to true.

  2. Per-checkpoint correlation.

    Some operators want to know which PUT belonged to which checkpoint. This requires propagating a ThreadLocal<CheckpointId> through the snapshot path and into the SDK's request execution chain, plus a corresponding label on the metrics. It is feasible but doubles the design surface. Author's preference: defer to a successor FLIP.

APPENDIX

Appendix A: Worked example

Suppose a job that has been running fine starts seeing checkpoints stretch from 30 seconds to 12 minutes overnight. With this FLIP shipped, the operator's investigation looks like:

  1. Open the Flink dashboard. flink_fs_s3_native_throttle_count rate is up 50× compared to last week.

  2. api_call_duration_ms{op="UploadPart"} P99 has gone from 200 ms to 9 s.

  3. retry_count{op="UploadPart", reason="throttled"} is climbing in step.

In ten seconds, the operator knows the cause is S3 throttling on multipart uploads. They check whether entropy is configured entropy_bucket_count distribution should be flat across hashes and it is not; one bucket dominates. They tune the entropy length, redeploy, and the regression resolves.

This sequence, today, would take hours and require a CloudTrail dump.

Appendix B: Worked example for cost

The same job, two weeks later, generates a cost alert. The bucket has accumulated 2 TB of "incomplete multipart upload" parts that the lifecycle policy hasn't reaped yet. With this FLIP:

  • mpu_initiated_totalmpu_completed_totalmpu_aborted_total per minute is the rate of "leaked" MPUs.

  • A simple Prometheus alert triggers when that rate is positive for more than five minutes.

The leak is caught the moment it happens, not when AWS bills.

  • No labels