Status

Current state: Under Discussion

Discussion threadhere (<- link to https://lists.apache.org/thread/w4qrgopg6o41hvx1lg59z2drtw3jy7fd)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-39203)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Inspecting intermediate data in a running Flink job is a long-standing pain point. Today, the only options are modifying the job (print() sink, executeAndCollect(), log statements  all require a restart) or deploying external infrastructure (extra Kafka topics, debug sinks to external storage). Both approaches are slow, disruptive, and disproportionately expensive for what is fundamentally a quick "what does the data look like here?" question.

This pain recurs across the entire job lifecycle:

  • Pre-launch development & testing — Verifying transformation logic requires adding a temporary debug sink, recompiling, redeploying, and restarting. Each iteration takes minutes; in staging environments the cost is even higher because state must be rebuilt from checkpoints. A native sampling capability turns this into a seconds-long point-and-click check in the WebUI.
  • Runtime data exploration — Once a job is in production, there is no way to see what intermediate operators emit. For example, in Source → Parse → Filter → Aggregate → Sink, you cannot inspect what Parse outputs or what Filter drops without modifying the job. Data sampling provides non-invasive, on-demand visibility into any vertex.
  • Anomaly investigation & troubleshooting — When a job produces unexpected output, diagnosing it requires hypothesizing which operator is at fault, adding debug output, restarting the job, and waiting for the anomaly to reoccur — repeating until the root cause is found. This can take hours to days for intermittent issues. With data sampling, an on-call engineer can immediately inspect each vertex's output and narrow down the faulty operator without any restart or code change.

Flink already has a proven architecture for this kind of on-demand runtime inspection  FlameGraph (FLINK-13550): REST API  JM coordination  TM-side sampling  WebUI rendering, all without restarting the job.

Proposal: Introduce native runtime data sampling at the output of any job vertex, with REST API trigger and Flink WebUI visualization, enabling users to inspect intermediate data in a running job without restart or topology modification.

Scope

Goals:

  1. On-demand data sampling at the output of any job vertex in a running job.
  2. REST API to trigger and retrieve sampling results.
  3. WebUI visualization using toString() for readable types. (Schema-Aware Formatting for Table/SQL binary formats needs another design)
  4. Minimal overhead: zero when disabled; ~54ns/record when enabled-idle; bounded during active rounds.
  5. Safety mechanisms: rate limiting, time budget, buffer caps, refresh-interval-based cache expiration.

Non-Goals: PII redaction, persistent storage, operator input-side sampling, intra-chain sampling.

Public Interfaces

Configuration Options (RestOptions.java)

KeyTypeDefaultDescription
rest.data-sampling.enabledBooleanfalseMaster switch. When false, no sampling infrastructure initialized.
rest.data-sampling.max-sample-rateInteger100Max records sampled per subtask per second. Range: 1–10000.
rest.data-sampling.max-record-lengthInteger10000Max toString() characters per record (truncated if exceeded).
rest.data-sampling.sampling-windowDuration3sDuration of each sampling round. Range: 1s–30s.
rest.data-sampling.refresh-intervalDuration60sTime after which cached results are stale and a new round is triggered.
rest.data-sampling.timeoutDuration30sOverall timeout for a complete round.
rest.data-sampling.tostring-budget-msInteger50Max ms/s budget for toString() calls per output.

Design notes:

  • Default disabled — sampling intercepts the data hot path and may expose sensitive data.
  • Internal caps (hardcoded): max 5000 total records per response, max 5 concurrent rounds per JM.
  • Cache key: (jobId, vertexId). Query parameters (subtaskIndex, maxRecords) are post-filters on cached results.

REST API

GET /jobs/:jobid/vertices/:vertexid/data-sample[?subtaskIndex=N&maxRecords=M]

Uses Lazy Trigger + Polling pattern  GET returns results and may initiate a sampling round if cache is empty/expired.

Response example:

{
    "status": "COMPLETE",
    "endTimestamp": 1700000000000,
    "roundId": 42,
    "stale": false,
    "totalRecordCount": 3,
    "totalTruncated": false,
    "droppedByContention": 0,
    "droppedByRateLimit": 5,
    "errorCode": null,
    "failedSubtasks": [],
    "samples": [
        {
            "subtaskIndex": 0,
            "records": [
                {
                    "sampleTimestamp": 1700000000100,
                    "data": "Row(name=Alice, age=30, city=Beijing)",
                    "dataType": "org.apache.flink.types.Row",
                    "truncated": false
                }
            ]
        }
    ]
}

Status Enum:

StatusMeaning
PENDINGSampling triggered, waiting for results
COMPLETEAll subtasks returned data
PARTIALSome subtasks succeeded, some failed — partial data displayed
NO_DATARound finished, no records captured
DISABLEDFeature disabled via configuration
FAILEDAll subtasks failed or admission rejected

Error Codes (when FAILED or PARTIAL): TIMEOUT · TM_UNREACHABLE · TOO_MANY_CONCURRENT_ROUNDS · TASK_TERMINATED · INTERNAL_ERROR

Polling behavior:

  1. First request → triggers round, returns PENDING.
  2. While in progress → returns previous cache (stale=true) or PENDING.
  3. Round completed → returns COMPLETE / PARTIAL / NO_DATA.
  4. WebUI polls every 3s; backend returns cache until refresh-interval expires, then triggers new round.

Proposed Changes

Sampling Round Lifecycle

Unlike thread sampling which is naturally bounded, data sampling on the hot path must be explicitly bounded. Each round follows a round-scoped lifecycle:

  1. Trigger: REST request + cache miss → Coordinator starts round with unique roundId.
  2. Single RPC: Coordinator sends requestDataSamples(roundId, samplingWindow, maxBufferCapacity) to TMs. Each TM autonomously: enables → captures for sampling-window → disables → returns result.
  3. Fill-and-stop: Captures until window expires or buffer full.
  4. Auto-disable: samplingEnabled = false set automatically after window.
  5. Cache: Coordinator assembles results, caches for refresh-interval (default 60s).

Key properties: samplingEnabled = true lasts at most sampling-window (default 3s). Multiple WebUI clients share the same round via cache. At most 5 concurrent rounds per JM.

State Machine

Five-Layer Architecture

Core Design Decisions

1. SamplingRecordWriterOutput (extends RecordWriterOutput)

Installed at the chain's tail end in OperatorChain.createChainOutputs(), capturing records after all chained operators have processed them.

  • Forward-first: super.collectAndCheckIfChained() executes before sampling — record forwarding never blocked.
  • Inheritance over composition: OperatorChain.streamOutputs[] is typed RecordWriterOutput<?>[]. Inheritance provides type compatibility with zero changes to checkpoint/broadcast paths.
  • toString() safety: Reflection-cached detection (ClassValue<Boolean>) of classes without custom toString() override, with time budget (50ms/s) and rate limiter (100/s/subtask) as defense-in-depth.
  • Exception isolation: toString() failures caught at DEBUG level — never block forwarding.

2. Coordinator with Partial Success

Standalone implementation (does not extend TaskStatsRequestCoordinator) because the base class assumes all-or-nothing semantics. Data sampling results from individual subtasks are independently valuable.

  • Fan-out single RPC to all relevant TMs (grouped by TM location).
  • Three disjoint sets: remainingGroups  successGroups / failedGroups.
  • PARTIAL and FAILED complete the future normally (not exceptionally) — ensures cacheability.
  • Proportional fair truncation when total records exceed 5000 cap.

3. Thread Safety Model

ThreadRoleSynchronization
Mailbox threadcollect()  sampleRecord() → buffer write; startRound()Single-writer model for buffer
Timer threaddisableSampling() after windowvolatile write only
RPC handlerSubmits to mailbox + schedules timerNo direct buffer access

samplingEnabled is volatile for cross-thread visibility. BoundedSampleBuffer is not thread-safe  all writes from mailbox thread only. startRound() runs on mailbox thread to avoid clear-vs-write races.

4. WebUI: "Data Sample" Tab

Auto-polling WebUI tab: subscribes to jobLocalService.jobWithVertexChanges(), polls every 3s. Backend returns cached results until refresh-interval (default 60s) expires, then automatically triggers a new sampling round. Features: subtask selector, record table (timestamp/data/type), truncation indicators, status-driven display (PENDING/PARTIAL/DISABLED/FAILED states), stale indicator (shown when cached result age exceeds refresh-interval).

Overhead Control

StatePer-Record Overhead
DisabledZero — SamplingRecordWriterOutput not instantiated
Enabled, idle~54ns (virtual dispatch + volatile read)
Active sampling~84ns (idle + rate limiter check; toString() called ≤100 times/s)
Between roundsSame as idle

Safety mechanisms: rate limiter (100/s/subtask), toString time budget (50ms/s), bounded buffer (≤1000/subtask), total cap (5000 records), record length limit, byte-level response cap, round-scoped auto-disable, concurrent round limit (5).

Performance Benchmark

Methodology

  • Topology: NumberSequenceSource → rebalance → Map(~1μs CPU) → DiscardingSink, chaining disabled.
  • Environment: macOS, standalone single-TM, parallelism=4.
  • Protocol: 90s warmup → 6 rounds at 60s intervals → discard round 1 → average rounds 2–6.

Results

ScenarioAvg Throughput (records/s)vs. Baseline
A. Baseline (disabled)1,201,794100%
B. Enabled-Idle (no API calls)1,182,556-1.60%
C. Active Sampling (continuous)1,172,342-2.45%

Subtask skew: 0% across all scenarios. Inter-round variance: < 1%.

Per-Record Overhead

ComparisonExtra LatencySource
A → B (Idle)~54nsVirtual dispatch + volatile read
A → C (Active)~84ns+ rate limiter (~30ns marginal)
B → C (Sampling only)~30nsRate limiter branch

Impact by Workload Type

WorkloadPer-Record CostIdle (54ns) as %
Lightest ETL (~1μs, this benchmark)~3.3μs1.6%
Typical ETL (parsing, filtering)~10μs0.54%
Stateful computation (windows, joins)~100μs0.05%
Production workloads≥1ms<0.01%

Conclusion: All targets met  Idle < 2%, Active < 3%. For typical production workloads (≥10μs/record), total impact < 0.5%.


Compatibility, Deprecation, and Migration Plan

  • No breaking changes: Purely additive — new REST endpoints, configurations, and classes.
  • No deprecations or migration: Default disabled.
  • All new classes @Internal: No public API surface.

Test Plan

Functional: E2E sampling flow, disabled mode, partial success (kill TM  PARTIAL), BATCH mode, concurrent round limit, auto-disable after window, subtask filter, record truncation, JM failover, task cancellation.

Performance: Three-state overhead validation, bounded memory consumption, throttling effectiveness.

Open Questions

  1. Default max-sample-rate: 100/s (conservative) vs. 500/s (better UX)?
  2. Per-vertex enable/disable: Support in initial version or defer to Future Work?
  3. Force refresh: Expose in REST API, or rely on refresh-interval expiration?
  4. Security: Require additional auth beyond Flink's existing REST auth?

Rejected Alternatives

AlternativeRejection Reason
OperatorEvent / CoordinationRequestHigh invasiveness — every operator must implement handler
Special Sink (like CollectSink)Cannot modify topology without restart
Continuous sampling with configurable ratePersistent overhead even when unused; round-scoped model has near-zero idle cost
Task-level interceptorCaptures input not output; Output decorator is established pattern

Implementation Plan

PhaseContentTarget
Phase 1Core: SamplingRecordWriterOutput, BoundedSampleBuffer, data models, TM-side RPC2 weeks
Phase 2JM: DataSampleRequestCoordinator, VertexDataSampleTracker, REST handler2 weeks
Phase 3WebUI: Data Sample tab with auto-polling, subtask selector, status handling1.5 weeks
Phase 4Testing + Documentation1 week

Future Work

  1. Schema-Aware Formatting: Structured display of BinaryRowData for Table/SQL jobs.
  2. PII redaction: Field-level masking rules.
  3. Intra-chain sampling: Target specific operator inside a chain.
  4. Input-side sampling: Before/after comparison.
  5. Record filtering: Server-side predicates (e.g., age > 30).

References



  • No labels