DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
| Discussion Thread | https://lists.apache.org/thread/vsnhhoyxw8x2r8bwvdsyq4l1vxhry154 |
|---|---|
| Vote Thread | |
| JIRA | FLINK-39546 - Getting issue details... STATUS |
| Target Release | FLINK - 2.4 |
| Current State |
Motivation
"It's 2 AM. The on-call engineer wakes up to a pager. A streaming job that has been quietly running for six months suddenly stretches its checkpoint duration from 30 seconds to 12 minutes. Backpressure climbs. Watermarks lag. The dashboards show "checkpoint duration up", "checkpoint size unchanged", and nothing else. Was it the network? RocksDB? S3? They reach for
tcpdump. They reach for AWS CloudTrail. They reach, eventually, for guesswork."
This FLIP is about that moment.
Apache Flink's filesystem layer is one of the most heavily used components in production. Every checkpoint, every savepoint, every file sink, every state download passes through it. The most predominantly used filesystem is S3 FS. And yet, when something goes wrong inside that layer, Flink tells the operator almost nothing. The metric surface today is dominated by aggregate file IO counters and after-the-fact checkpoint durations. There is no observability for:
whether S3 is throttling our job (HTTP 503 SlowDown).
whether our retries are masking real problems.
Which S3 operation is slow (PUT? CompleteMultipartUpload? GET range read?)
whether our entropy prefix is actually spreading the load.
whether we are leaking incomplete multipart uploads (a real, recurring cost issue).
whether
NativeS3InputStreamsilently reopening because of intermittent EOFs.Which P99 latency moved when our checkpoints regressed?
Every one of these has been the root cause of a real production incident. Today, finding any of them requires correlating Flink logs with CloudTrail, capturing TCP packets, or most often guessing. Operators of production Flink+S3 systems consistently report this as one of their top operational pain points. It is the gap between "Flink works" and "Flink is operable."
Goal
Make S3 traffic from
flink-s3-fs-nativedirectly observable through Flink's existing metric system, with no additional dashboards or alerting pipelines.Surface a small, high-signal set of metrics that answers the operational questions above.
Do this in a way that other filesystem plugins can adopt with the same plumbing so that other filesystems can also use it.
Avoid two recurring failure modes in observability work: cardinality explosion and metric overhead that erodes the very performance we are trying to measure.
Non-Goal
Replacing AWS CloudWatch or OpenTelemetry-based pipelines for users who already operate them.
Rewriting the AWS SDK's instrumentation. We will reuse its built-in
MetricPublishermechanism.Per-checkpoint-correlated metrics ("which PUT belonged to checkpoint 42?"). Possible, but deferred to a follow-up FLIP — see the Open Questions section.
flink-s3-fs-prestoorflink-s3-fs-hadoopis out of scope. But the API change is designed to allow them to adopt it cleanly.
What's Required for Debugging (the working set of questions)
The metrics shipped by this FLIP must answer the following, in production, in real time:
Operational question | Metric this FLIP introduces |
|---|---|
Are we being throttled by S3 right now? |
|
Which operation is slow? |
|
Is our retry rate masking errors? |
|
Are we leaking incomplete MPUs? |
|
Is |
|
Is our connection pool saturated? |
|
What's our actual S3 throughput? |
|
Is the entropy prefix spreading load? |
|
We deliberately do not attempt to answer everything possible. The chosen set is the smallest one that turns "the dashboard is not useful" into "the dashboard is enough."
Approaches considered
There is more than one defensible place to put this work. The decision matters because the code's home dictates its lifecycle, distribution, and the user experience for an operator who picks up the plugin tomorrow morning.
Approach A: Outside Flink, via AWS SDK v2 → CloudWatch (or OpenTelemetry)
AWS SDK v2 already ships a MetricPublisher SPI that emits structured metrics for every API call: operation name, duration, retries, HTTP status, and throughput. There are off-the-shelf publishers for CloudWatch and for OpenTelemetry exporters. A user can wire S3AsyncClient.builder().overrideConfiguration(o -> o.addMetricPublisher(...)) and start collecting today, no Flink change needed.
Pros | Cons |
|---|---|
Zero new code in Flink. | Operators run two parallel observability systems (Flink Prometheus + AWS CloudWatch). |
Captures retries the application can't see. | No correlation with Flink concepts (jobID, checkpointID, subtask). |
Vendor-neutral via OTel. | Plugin classloader does not control which |
Maintained by AWS / OTel. | Adds operational complexity for users who don't already run a separate AWS pipeline. |
This is a fine option for sophisticated AWS-native shops. It is a poor option for the average Flink user who has a Flink reporter wired into Prometheus and wants the data to land there, alongside watermarks and backpressure.
Approach B: A generic stream wrapper inside flink-core
Wrap every FSDataInputStream and FSDataOutputStream returned from FileSystem in an instrumented decorator that times, reads, writes and reports through a standard MetricGroup.
Pros | Cons |
|---|---|
Covers every filesystem plugin uniformly. | Captures only the stream surface — bytes, latency. Loses every S3-specific signal. |
No public API change. | Adds overhead to the per-record path for all file systems, even local. |
Operators get something for HDFS, Azure, GCS, etc. for free. | Cannot see retries, throttling, multipart lifecycle — exactly the high-value bits we set out to expose. |
This approach answers a different question than the one operators are asking. It is the right answer to "how much do I read and write through Flink's FS layer?" It is not the right answer to "Why is my S3 traffic slow?"
Approach C: Per-plugin, hand-rolled metrics inside flink-s3-fs-native
Instrument the plugin manually: count every s3Client.putObject(...) call, time it, classify exceptions, expose counters and histograms.
Pros | Cons |
|---|---|
Full control over metric semantics. | Reinvents what AWS SDK already publishes natively. |
Plugin-local; no public-API change. | Misses retries done inside the SDK that never surface to our code. |
Easy first version. | Code drift over time as SDK adds new operations. |
AWS SDK v2 makes most of it obsolete: the SDK already emits the right things, in the right shape, at the right granularity. We should consume that signal, not duplicate it.
[RECOMMENDED] Approach D: Bridge AWS SDK v2 MetricPublisher into Flink MetricGroup, with plugin-specific extensions on top
A two-layer design:
Tier 1 — Bridge. Implement an
AwsSdkMetricBridge(aMetricPublisher) insideflink-s3-fs-nativethat consumes the SDK'sMetricCollectionand emits FlinkCounter/Histogram/Gaugeagainst a plugin-scopedMetricGroup.Tier 2 — Flink-specific events. Instrument the things AWS SDK cannot see, because they are above the SDK layer:
NativeS3InputStreamreopens, the recoverable writer's multipart lifecycle, and (optionally) the entropy-prefix bucket distribution.
Pros | Cons |
|---|---|
Reuses the SDK's existing instrumentation; no duplicated logic. | Requires a small additive change to |
Metrics land in Flink's existing stack — same Grafana, same alerts. | Couples to AWS SDK v2 metric keys (mitigated by an internal abstraction). |
Plugin classloader keeps Flink's metric interfaces parent-loaded; isolation is preserved. | Slightly more design surface than (C). |
This is the approach this FLIP proposes.
Recommended design WalkThrough
We will introduce three things:
A small, additive change to the public
FileSystemFactoryinterface so plugins can receive aMetricGroupat init time.An
AwsSdkMetricBridgeinsideflink-s3-fs-nativethat registers as the SDK'sMetricPublisherand translates SDK metric records into Flink metrics.A focused, cardinality-disciplined metric taxonomy.
Public interface change
Today, It FileSystemFactory#configure(Configuration) is the only entry point a plugin has to learn about its environment. We add an additive default method:
public interface FileSystemFactory extends Plugin {
String getScheme();
FileSystem create(URI fsUri) throws IOException;
void configure(Configuration config);
/**
* Configures the factory with both Flink configuration and a metric group scoped to
* this filesystem plugin. The default implementation delegates to {@link #configure(Configuration)}
* for backward compatibility; plugins that emit metrics override this.
*
* @param config the Flink configuration
* @param pluginMetrics a MetricGroup scoped to this plugin (per cluster component);
* never {@code null}, though it may be a {@link UnregisteredMetricsGroup}
* in environments where metrics are disabled.
*/
default void configure(Configuration config, MetricGroup pluginMetrics) {
configure(config);
}
}
The default implementation preserves existing plugin behaviour byte-for-byte. Plugins that want metrics override the new method. There is no hard break.
Where the MetricGroup comes from
FileSystem.initialize(Configuration, PluginManager) is called once per JVM, by TaskExecutor (TaskManagers) and by ClusterEntrypoint / JobManagerRunner (JobManager). Both of these already own a process-level MetricGroup (taskmanager and jobmanager respectively). We extend FileSystem.initialize with an optional MetricGroup parameter:
public static void initialize(
Configuration config,
@Nullable PluginManager pluginManager,
@Nullable MetricGroup processMetricGroup) throws IOException;
The existing two-argument overload remains and supplies a no-opUnregisteredMetricsGroupfor environments that don't pass one (mini cluster tests, embedded scenarios). EachFileSystemFactorythen receives a child group, e.g.:
taskmanager.filesystem.s3
jobmanager.filesystem.s3The plugin builds further sub-groups beneath that for operation-level breakdown.
The bridge
Inside flink-s3-fs-native, We register an AwsSdkMetricBridge as a MetricPublisher on every S3AsyncClient The factory creates:
final S3AsyncClient client = S3AsyncClient.builder()
.overrideConfiguration(o -> o.addMetricPublisher(metricBridge))
...
.build();
The bridge consumes MetricCollection records emitted by the SDK after every API call. It pulls a small, fixed set of fields:
SDK metric | Flink metric | Notes |
|---|---|---|
| label | bounded set (~15 values) |
|
| |
| partitions | |
| classified into | classify, do not expose raw code |
|
| reason from a closed enum |
| feeds | |
| filtering only | not exposed as label |
The bridge holds bounded Counter and Histogram instances (one per operation, prepared up front), so the hot path is a bounded Map lookup followed by a counter increment. There is no allocation per metric record.
Plugin-specific extensions (Tier 2)
Two things matter to operators that the SDK cannot see, because they live one layer above the SDK:
NativeS3InputStreamreopens. When the input stream encounters EOF or a transient IOException mid-read, it reopens the underlying GET with an updated range. This event is invisible to the SDK (the SDK sees the new GET as a fresh call). We addflink_fs_s3n_stream_reopen_count{reason}andflink_fs_s3n_stream_reopen_latency_ms.Recoverable writer / multipart lifecycle. Each call site of
RecoverableWriter.open / persist / closeForCommit / commit / commitAfterRecovery / discardis a state transition: in-progress → pending → committed (or aborted). We track these as Counters. The most operationally important one ismpu_aborted_total: aborted MPUs accumulate parts in the bucket and incur cost until the bucket lifecycle policy reaps them.
The metric surface (v1)
Name | Type | Labels | What it answers |
|---|---|---|---|
| Counter |
| Baseline traffic + success/failure split |
| Histogram |
| P50/P95/P99 latency, per-operation |
| Counter |
| Retry storms attribution |
| Counter |
| "Are we throttled?" — the most-asked question |
| Counter |
| Final failures after retry exhaustion |
| Counter | — | Bandwidth |
| Counter | — | Bandwidth |
| Counter | — | MPU lifecycle |
| Counter | — | |
| Counter | — | Cost-impacting leaks |
| Gauge | — | Capacity |
| Histogram | — | Slowest typical operation |
| Counter |
| Detect flaky reads |
| Gauge |
| Pool sizing |
| Histogram | — | Detect pool saturation |
| Counter |
| Verify entropy works (opt-in) |
Cardinality discipline (a hard constraint)
This is where well-intended observability features turn into outages. The rules are simple:
Never label with a bucket name unless an operator opts in via
s3.metrics.detailed.enabled.Never label with key, prefix, or object name. Period.
Never label with full HTTP status; use a four-value classifier.
Always label with operation name (closed set, ≤ 15 distinct values for S3).
Retry reason is a closed enum:
{throttled, timeout, conn_reset, 5xx, other}.The opt-in
prefix_hashfor entropy verification uses a fixed-size hash modulo a small constant (default 16) so that no matter how many entropy values are generated, the cardinality is bounded.
Configuration
We resist the temptation to add many flags. The configuration surface is:
s3.metrics.enabled: true # default true s3.metrics.detailed.enabled: false # opt-in: bucket label, request-size hist s3.metrics.histogram.window-size: 1024 # reservoir size; bounds memory s3.metrics.entropy-bucket.cardinality: 16 # hash modulo for entropy verification s3.metrics.enabled: true ships out of the box because the v1 surface is cheap. Detailed mode is opt-in because it widens cardinality.
Failure scenarios
SL.NO | Scenario | Effect | Mitigation |
|---|---|---|---|
1 | The metric backend is down. |
| Flink's reporter framework already isolates reporters; an exception in a reporter does not propagate to the metric source. The bridge wraps its publish path in a try-catch as a defence in depth. IO is never affected. |
2 | The AWS SDK upgrades and renames metric keys. | Some metrics stop being populated. | The bridge reads |
3 | Cardinality blowup despite our rules. | Memory and time-series count grow unbounded. | Closed-set operation enum; status code is classified, not raw; entropy hash modulo. Cardinality test in CI asserts an upper bound. |
4 | High-QPS workload (10k+ ops/sec) overwhelms histograms. | Reservoir sampling skews; CPU goes up. | Reservoir size is fixed at 1024 by default; the |
5 | Plugin classloader can't see |
|
|
6 | Two filesystem instances created in the same component. | Metric registration conflict on duplicate names. | Plugin scopes metrics under |
7 | Operator sets | Cardinality blowup (opt-in). | Documentation flags this loudly. Detailed mode is off by default. |
8 | The bridge holds references that prevent classloader unload. | On dynamic plugin reload (rare), classloader leak. |
|
Performance considerations
SDK publishing path. AWS SDK v2 calls MetricPublisher#publish asynchronously, after the response has completed, on the SDK's internal completion executor. The bridge does roughly:
- 1 enum lookup to map operation name → Counter
- 1 enum lookup to classify status code
- 1 to 3 counter increments (api_call_count, retry_count if any, throttle_count if any)
- 1 histogram update (api_call_duration_ms)In a microbenchmark on a typical x86 host, this is on the order of a few hundred nanoseconds per API call. At 10,000 API calls per second on a single TaskManager, that is well under 1 % of one CPU. AWS SDK's own publishing overhead dominates ours.
Stream-level instrumentation. The reopen counter on NativeS3InputStream and the lifecycle counters on the recoverable writer are single counter increments at events that occur on the order of seconds, not milliseconds. The cost is immeasurable.
Memory. Sixteen metrics, ~50 time series, histogram reservoirs of 1024 longs each → a few hundred kilobytes per TaskManager. This is a rounding error against any real Flink JVM.
What we explicitly avoided. We did not put metrics on every read() or write() byte; we did not allocate a wrapper object per IO; we did not introduce a new background thread.
Gains and losses
Gains.
Operators can answer the working set of S3 questions in real time.
Same observability stack as the rest of Flink — one Grafana, one alerting pipeline.
Lays the foundation for the rest of the filesystem plugin family (Hadoop S3, Azure, GCS, OSS) to adopt the same plumbing.
Closes a recurring gap that today is filled by guesswork, CloudTrail, and
tcpdump.
Losses/costs.
One additive public-API change to
FileSystemFactoryto maintain.A small amount of code coupling to AWS SDK v2 metric key names. Mitigated by an internal abstraction.
Marginally larger plugin JAR (~tens of kilobytes).
Net. The gains are operational. The losses are surface-level engineering costs. The gains far exceed the losses.
Compatibility, deprecation, migration
Source/binary compatibility. Additive default method on FileSystemFactory. Existing plugins continue to compile and run.
Behavioural compatibility. No metric is emitted by any existing plugin until that plugin opts in. An operator who upgrades Flink with no other change sees the new flink-s3-fs-native metrics appear automatically if they are using that plugin. No other plugin's behaviour changes.
Deprecation. None. The single-argument configure(Configuration) remains.
Migration plan. There is nothing to migrate from. Operators upgrading get the metrics automatically. Operators on flink-s3-fs-hadoop or -presto see no change until those plugins adopt the same hook (out of scope here).
Test plan
- Unit tests. The bridge's translation logic from
MetricCollectionto Flink counters is pure; we test it with syntheticMetricCollectioninstances covering each operation type, status class, and retry path. - Cardinality bounds. A focused unit test asserts that exercising the bridge with 10,000 distinct keys, buckets, and statuses produces no more than the documented number of time series.
- Integration test against MinIO. A flink-fs-tests-style integration test runs a small streaming job with checkpointing onto MinIO, then asserts that the expected metrics are present with non-zero values. This also catches AWS SDK upgrade regressions because the SDK's metric keys are validated end-to-end.
- Failure injection. A second integration test runs against MinIO with a fault-injection proxy that returns 503 SlowDown on a configurable fraction of requests; the test asserts that
throttle_countandretry_count{reason=throttled}increment, and that the IO succeeds. - Performance test. A benchmark in
flink-benchmarksmeasures the per-API-call overhead of the bridge. Acceptance bar: ≤ 1 % CPU overhead at 10k ops/sec.
Open questions for community discussion
Should
s3.metrics.enableddefault totrueorfalse?The v1 surface is cheap. Defaulting to
truegives every user the value automatically. Defaulting tofalseis more conservative but trades operational benefit for a config knob nobody will know to flip. My preference is to set totrue.Per-checkpoint correlation.
Some operators want to know which PUT belonged to which checkpoint. This requires propagating aThreadLocal<CheckpointId>through the snapshot path and into the SDK's request execution chain, plus a corresponding label on the metrics. It is feasible but doubles the design surface. Author's preference: defer to a successor FLIP.
APPENDIX
Appendix A: Worked example
Suppose a job that has been running fine starts seeing checkpoints stretch from 30 seconds to 12 minutes overnight. With this FLIP shipped, the operator's investigation looks like:
Open the Flink dashboard.
flink_fs_s3_native_throttle_countrate is up 50× compared to last week.api_call_duration_ms{op="UploadPart"}P99 has gone from 200 ms to 9 s.retry_count{op="UploadPart", reason="throttled"}is climbing in step.
In ten seconds, the operator knows the cause is S3 throttling on multipart uploads. They check whether entropy is configured entropy_bucket_count distribution should be flat across hashes and it is not; one bucket dominates. They tune the entropy length, redeploy, and the regression resolves.
This sequence, today, would take hours and require a CloudTrail dump.
Appendix B: Worked example for cost
The same job, two weeks later, generates a cost alert. The bucket has accumulated 2 TB of "incomplete multipart upload" parts that the lifecycle policy hasn't reaped yet. With this FLIP:
mpu_initiated_total−mpu_completed_total−mpu_aborted_totalper minute is the rate of "leaked" MPUs.A simple Prometheus alert triggers when that rate is positive for more than five minutes.
The leak is caught the moment it happens, not when AWS bills.