DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here (<- link to https://lists.apache.org/thread/w4qrgopg6o41hvx1lg59z2drtw3jy7fd)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-39203)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Inspecting intermediate data in a running Flink job is a long-standing pain point. Today, the only options are modifying the job (print() sink, executeAndCollect(), log statements — all require a restart) or deploying external infrastructure (extra Kafka topics, debug sinks to external storage). Both approaches are slow, disruptive, and disproportionately expensive for what is fundamentally a quick "what does the data look like here?" question.
This pain recurs across the entire job lifecycle:
- Pre-launch development & testing — Verifying transformation logic requires adding a temporary debug sink, recompiling, redeploying, and restarting. Each iteration takes minutes; in staging environments the cost is even higher because state must be rebuilt from checkpoints. A native sampling capability turns this into a seconds-long point-and-click check in the WebUI.
- Runtime data exploration — Once a job is in production, there is no way to see what intermediate operators emit. For example, in
Source → Parse → Filter → Aggregate → Sink, you cannot inspect whatParseoutputs or whatFilterdrops without modifying the job. Data sampling provides non-invasive, on-demand visibility into any vertex. - Anomaly investigation & troubleshooting — When a job produces unexpected output, diagnosing it requires hypothesizing which operator is at fault, adding debug output, restarting the job, and waiting for the anomaly to reoccur — repeating until the root cause is found. This can take hours to days for intermittent issues. With data sampling, an on-call engineer can immediately inspect each vertex's output and narrow down the faulty operator without any restart or code change.
Flink already has a proven architecture for this kind of on-demand runtime inspection — FlameGraph (FLINK-13550): REST API → JM coordination → TM-side sampling → WebUI rendering, all without restarting the job.
Proposal: Introduce native runtime data sampling at the output of any job vertex, with REST API trigger and Flink WebUI visualization, enabling users to inspect intermediate data in a running job without restart or topology modification.
Scope
Goals:
- On-demand data sampling at the output of any job vertex in a running job.
- REST API to trigger and retrieve sampling results.
- WebUI visualization using
toString()for readable types. (Schema-Aware Formatting for Table/SQL binary formats needs another design) - Minimal overhead: zero when disabled; ~54ns/record when enabled-idle; bounded during active rounds.
- Safety mechanisms: rate limiting, time budget, buffer caps, refresh-interval-based cache expiration.
Non-Goals: PII redaction, persistent storage, operator input-side sampling, intra-chain sampling.
Public Interfaces
Configuration Options (RestOptions.java)
| Key | Type | Default | Description |
|---|---|---|---|
rest.data-sampling.enabled | Boolean | false | Master switch. When false, no sampling infrastructure initialized. |
rest.data-sampling.max-sample-rate | Integer | 100 | Max records sampled per subtask per second. Range: 1–10000. |
rest.data-sampling.max-record-length | Integer | 10000 | Max toString() characters per record (truncated if exceeded). |
rest.data-sampling.sampling-window | Duration | 3s | Duration of each sampling round. Range: 1s–30s. |
rest.data-sampling.refresh-interval | Duration | 60s | Time after which cached results are stale and a new round is triggered. |
rest.data-sampling.timeout | Duration | 30s | Overall timeout for a complete round. |
rest.data-sampling.tostring-budget-ms | Integer | 50 | Max ms/s budget for toString() calls per output. |
Design notes:
- Default disabled — sampling intercepts the data hot path and may expose sensitive data.
- Internal caps (hardcoded): max 5000 total records per response, max 5 concurrent rounds per JM.
- Cache key:
(jobId, vertexId). Query parameters (subtaskIndex,maxRecords) are post-filters on cached results.
REST API
GET /jobs/:jobid/vertices/:vertexid/data-sample[?subtaskIndex=N&maxRecords=M]
Uses Lazy Trigger + Polling pattern — GET returns results and may initiate a sampling round if cache is empty/expired.
Response example:
{
"status": "COMPLETE",
"endTimestamp": 1700000000000,
"roundId": 42,
"stale": false,
"totalRecordCount": 3,
"totalTruncated": false,
"droppedByContention": 0,
"droppedByRateLimit": 5,
"errorCode": null,
"failedSubtasks": [],
"samples": [
{
"subtaskIndex": 0,
"records": [
{
"sampleTimestamp": 1700000000100,
"data": "Row(name=Alice, age=30, city=Beijing)",
"dataType": "org.apache.flink.types.Row",
"truncated": false
}
]
}
]
}
Status Enum:
| Status | Meaning |
|---|---|
PENDING | Sampling triggered, waiting for results |
COMPLETE | All subtasks returned data |
PARTIAL | Some subtasks succeeded, some failed — partial data displayed |
NO_DATA | Round finished, no records captured |
DISABLED | Feature disabled via configuration |
FAILED | All subtasks failed or admission rejected |
Error Codes (when FAILED or PARTIAL): TIMEOUT · TM_UNREACHABLE · TOO_MANY_CONCURRENT_ROUNDS · TASK_TERMINATED · INTERNAL_ERROR
Polling behavior:
- First request → triggers round, returns
PENDING. - While in progress → returns previous cache (
stale=true) orPENDING. - Round completed → returns
COMPLETE/PARTIAL/NO_DATA. - WebUI polls every 3s; backend returns cache until
refresh-intervalexpires, then triggers new round.
Proposed Changes
Sampling Round Lifecycle
Unlike thread sampling which is naturally bounded, data sampling on the hot path must be explicitly bounded. Each round follows a round-scoped lifecycle:
- Trigger: REST request + cache miss → Coordinator starts round with unique
roundId. - Single RPC: Coordinator sends
requestDataSamples(roundId, samplingWindow, maxBufferCapacity)to TMs. Each TM autonomously: enables → captures forsampling-window→ disables → returns result. - Fill-and-stop: Captures until window expires or buffer full.
- Auto-disable:
samplingEnabled = falseset automatically after window. - Cache: Coordinator assembles results, caches for
refresh-interval(default 60s).
Key properties: samplingEnabled = true lasts at most sampling-window (default 3s). Multiple WebUI clients share the same round via cache. At most 5 concurrent rounds per JM.
State Machine
Five-Layer Architecture
Core Design Decisions
1. SamplingRecordWriterOutput (extends RecordWriterOutput)
Installed at the chain's tail end in OperatorChain.createChainOutputs(), capturing records after all chained operators have processed them.
- Forward-first:
super.collectAndCheckIfChained()executes before sampling — record forwarding never blocked. - Inheritance over composition:
OperatorChain.streamOutputs[]is typedRecordWriterOutput<?>[]. Inheritance provides type compatibility with zero changes to checkpoint/broadcast paths. - toString() safety: Reflection-cached detection (
ClassValue<Boolean>) of classes without customtoString()override, with time budget (50ms/s) and rate limiter (100/s/subtask) as defense-in-depth. - Exception isolation:
toString()failures caught at DEBUG level — never block forwarding.
2. Coordinator with Partial Success
Standalone implementation (does not extend TaskStatsRequestCoordinator) because the base class assumes all-or-nothing semantics. Data sampling results from individual subtasks are independently valuable.
- Fan-out single RPC to all relevant TMs (grouped by TM location).
- Three disjoint sets:
remainingGroups→successGroups/failedGroups. PARTIALandFAILEDcomplete the future normally (not exceptionally) — ensures cacheability.- Proportional fair truncation when total records exceed 5000 cap.
3. Thread Safety Model
| Thread | Role | Synchronization |
|---|---|---|
| Mailbox thread | collect() → sampleRecord() → buffer write; startRound() | Single-writer model for buffer |
| Timer thread | disableSampling() after window | volatile write only |
| RPC handler | Submits to mailbox + schedules timer | No direct buffer access |
samplingEnabled is volatile for cross-thread visibility. BoundedSampleBuffer is not thread-safe — all writes from mailbox thread only. startRound() runs on mailbox thread to avoid clear-vs-write races.
4. WebUI: "Data Sample" Tab
Auto-polling WebUI tab: subscribes to jobLocalService.jobWithVertexChanges(), polls every 3s. Backend returns cached results until refresh-interval (default 60s) expires, then automatically triggers a new sampling round. Features: subtask selector, record table (timestamp/data/type), truncation indicators, status-driven display (PENDING/PARTIAL/DISABLED/FAILED states), stale indicator (shown when cached result age exceeds refresh-interval).
Overhead Control
| State | Per-Record Overhead |
|---|---|
| Disabled | Zero — SamplingRecordWriterOutput not instantiated |
| Enabled, idle | ~54ns (virtual dispatch + volatile read) |
| Active sampling | ~84ns (idle + rate limiter check; toString() called ≤100 times/s) |
| Between rounds | Same as idle |
Safety mechanisms: rate limiter (100/s/subtask), toString time budget (50ms/s), bounded buffer (≤1000/subtask), total cap (5000 records), record length limit, byte-level response cap, round-scoped auto-disable, concurrent round limit (5).
Performance Benchmark
Methodology
- Topology:
NumberSequenceSource → rebalance → Map(~1μs CPU) → DiscardingSink, chaining disabled. - Environment: macOS, standalone single-TM, parallelism=4.
- Protocol: 90s warmup → 6 rounds at 60s intervals → discard round 1 → average rounds 2–6.
Results
| Scenario | Avg Throughput (records/s) | vs. Baseline |
|---|---|---|
| A. Baseline (disabled) | 1,201,794 | 100% |
| B. Enabled-Idle (no API calls) | 1,182,556 | -1.60% |
| C. Active Sampling (continuous) | 1,172,342 | -2.45% |
Subtask skew: 0% across all scenarios. Inter-round variance: < 1%.
Per-Record Overhead
| Comparison | Extra Latency | Source |
|---|---|---|
| A → B (Idle) | ~54ns | Virtual dispatch + volatile read |
| A → C (Active) | ~84ns | + rate limiter (~30ns marginal) |
| B → C (Sampling only) | ~30ns | Rate limiter branch |
Impact by Workload Type
| Workload | Per-Record Cost | Idle (54ns) as % |
|---|---|---|
| Lightest ETL (~1μs, this benchmark) | ~3.3μs | 1.6% |
| Typical ETL (parsing, filtering) | ~10μs | 0.54% |
| Stateful computation (windows, joins) | ~100μs | 0.05% |
| Production workloads | ≥1ms | <0.01% |
Conclusion: All targets met — Idle < 2%, Active < 3%. For typical production workloads (≥10μs/record), total impact < 0.5%.
Compatibility, Deprecation, and Migration Plan
- No breaking changes: Purely additive — new REST endpoints, configurations, and classes.
- No deprecations or migration: Default disabled.
- All new classes
@Internal: No public API surface.
Test Plan
Functional: E2E sampling flow, disabled mode, partial success (kill TM → PARTIAL), BATCH mode, concurrent round limit, auto-disable after window, subtask filter, record truncation, JM failover, task cancellation.
Performance: Three-state overhead validation, bounded memory consumption, throttling effectiveness.
Open Questions
- Default
max-sample-rate: 100/s (conservative) vs. 500/s (better UX)? - Per-vertex enable/disable: Support in initial version or defer to Future Work?
- Force refresh: Expose in REST API, or rely on
refresh-intervalexpiration? - Security: Require additional auth beyond Flink's existing REST auth?
Rejected Alternatives
| Alternative | Rejection Reason |
|---|---|
| OperatorEvent / CoordinationRequest | High invasiveness — every operator must implement handler |
| Special Sink (like CollectSink) | Cannot modify topology without restart |
| Continuous sampling with configurable rate | Persistent overhead even when unused; round-scoped model has near-zero idle cost |
| Task-level interceptor | Captures input not output; Output decorator is established pattern |
Implementation Plan
| Phase | Content | Target |
|---|---|---|
| Phase 1 | Core: SamplingRecordWriterOutput, BoundedSampleBuffer, data models, TM-side RPC | 2 weeks |
| Phase 2 | JM: DataSampleRequestCoordinator, VertexDataSampleTracker, REST handler | 2 weeks |
| Phase 3 | WebUI: Data Sample tab with auto-polling, subtask selector, status handling | 1.5 weeks |
| Phase 4 | Testing + Documentation | 1 week |
Future Work
- Schema-Aware Formatting: Structured display of
BinaryRowDatafor Table/SQL jobs. - PII redaction: Field-level masking rules.
- Intra-chain sampling: Target specific operator inside a chain.
- Input-side sampling: Before/after comparison.
- Record filtering: Server-side predicates (e.g.,
age > 30).
References
- FLINK-13550: FlameGraph architecture reference
- Flink REST API Documentation


