DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
1. TL;DR
The shape of data processing workloads is shifting. Beyond traditional structured-record processing, a coherent class of workloads — AI-oriented data processing: multimodal, inference-heavy, accelerator-backed, and long-running — is now in production at multiple Flink users, and the same workload class is the target of active development in adjacent data systems beyond Flink. The current engine has structural gaps in supporting this workload class — both in the individual capabilities required (GPU resource declaration, async batching, failure isolation under long-running operators, etc.) and in the coordinated runtime contract these capabilities should jointly form. This umbrella proposal lays out an evolution along five directions, decomposed into 11 sub-FLIPs organized into three engineering layers (core runtime primitives, AI workload expression and execution, production-grade operational guarantees). All proposed changes are incremental and backward-compatible.
2. Scope, Non-goals, and Consensus Criteria
In scope for this document. Whether the evolution directions are reasonable, whether each sub-FLIP's motivation and proposed approach are well-founded, and whether the boundaries and dependencies between sub-FLIPs are clear.
Out of scope for this document. The detailed design, API specifics, and implementation plans of each sub-FLIP — these will be developed in their own FLIPs. Acceptance of this umbrella does not lock in any sub-FLIP's design; sub-FLIPs may be adjusted, deferred, or withdrawn as they progress.
Non-goals. This proposal explicitly does not attempt to:
- Turn Flink into an AI framework, ML platform, or model serving system.
- Own the model training lifecycle (experiment tracking, model versioning, hyperparameter tuning) — a known mismatch from the Flink ML era that we do not repeat.
- Bake LLM-specific APIs, agent frameworks, or prompt/tool protocols into the engine — those belong in the ecosystem and SDK layer, where they can iterate at their own pace.
Consensus criteria. Reaching consensus on the overall direction is sufficient for this umbrella to pass.
3. Background: AI-oriented Data Processing Workloads
This section formalizes the workload model that the rest of the FLIP references. These characteristics co-occur in a coherent class of workloads, and their co-occurrence breaks assumptions baked into the current engine.
We define AI-oriented data processing workloads as the class of data processing workloads exhibiting all or most of the following characteristics:
- Per-record processing cost is 3–4 orders of magnitude higher than traditional row-at-a-time processing. A single inference typically costs 10ms–2s; long reasoning traces can exceed 10 minutes. Compare this with microseconds per row in BI/ETL.
- Execution involves heterogeneous resources, primarily CPU + GPU, where GPU is expensive, scarce, and frequently provisioned on Spot / preemptible instances. Resource churn is the norm, not the exception.
- Data objects are multimodal and large: images, video, audio, tensors, embeddings, or references to such objects in external storage. Payloads of 10KB–10MB per record are routine, against the few-bytes-to-KB norm of structured records.
- Computation logic includes model inference or service-style invocations as part of the pipeline, either through remote serving (Triton, vLLM, internal LLM gateways) or local GPU execution.
- Topologies are often shuffle-light and stateless, with the canonical shape
Source → preprocess → inference → sink. Many pipelines have no KeyBy and no keyed-state migration.
Throughput is typically 1k–100k QPS per pipeline. The implications of this combination — high per-record cost, expensive heterogeneous resources, large multimodal payloads, service-style invocation, and shuffle-light topology — show up throughout the rest of the document. They drive the runtime contract proposed in §6 and the layering in §7.
4. Why Now: Evidence from Production Scenarios
This workload class is already visible in production and in the broader ecosystem.
At Alibaba, multimodal data processing on Flink is already in production across image, video, audio, and text modalities. Public discussion on this thread also describes large-scale PyFlink/Flink usage at ByteDance for multimodal feature generation, including ID-based ETL, multimodal asset fetching, RPC or local GPU inference, and multimodal lake ingestion. Tencent also reports production workloads involving streaming, GPU-based training/inference, and autoscaling experience.
The same pattern appears in customer scenarios such as autonomous driving data pipelines, where image/video data goes through annotation, frame extraction, quality filtering, structured/unstructured transformation, and model inference. Today these workflows often combine a stream engine, a separate multimodal data processing framework, external model serving, and object storage, which introduces intermediate materialization and cross-system consistency challenges.
The ecosystem trend is also visible in systems such as Ray Data, Daft, Data-Juicer, and LAS, as well as the planned Alibaba–NVIDIA multimodal demo at Flink Forward Asia. The proposal focuses on stable engine-level capabilities — resource declaration, service-style invocation, OBJECT_REF, columnar transport, long-running async execution, and checkpoint/recovery — rather than fast-changing LLM APIs or agent protocols.
5. Why Flink: Runtime Foundation and Existing Data Infrastructure
Flink's opportunity in AI-oriented data processing does not come from replacing AI systems. It comes from extending its existing strengths as a production dataflow engine to a workload class where inference, multimodal objects, and GPU-backed execution become part of the data pipeline itself.
Flink has two foundations that make it a natural fit for this class of workloads.
First, Flink has a mature streaming runtime and checkpointing machinery accumulated over more than a decade. This matters because AI-oriented workloads change the cost model of failure. In traditional ETL or BI-style processing, replaying a small record is usually cheap. In multimodal inference pipelines, a single record may involve fetching large objects, decoding images or video, running GPU inference, or calling a model service. A single inference can take seconds or minutes. Under this cost model, fault tolerance is no longer a background runtime feature; it directly determines whether the pipeline is production-viable.
Flink already provides a strong foundation here: streaming execution, checkpointing, recovery, backpressure, and stateful dataflow coordination. The goal of this FLIP is to extend these runtime strengths to AI-oriented workloads, where inference operators, GPU-backed services, large objects, and long-running async calls need to participate in the same production-grade dataflow semantics.
Second, Flink is already deployed as core data infrastructure in many enterprises. Many users already run Flink for ETL, CDC, feature engineering, real-time analytics, stream-batch pipelines, and data lake ingestion. AI-oriented workloads often do not start from a separate AI system; they start from existing data pipelines that now need to fetch multimodal content, perform inference, generate embeddings, update indexes, evaluate predictions, or write multimodal results back to lakes and serving systems.
For these users, integrating inference and multimodal processing into existing Flink pipelines can be operationally natural. It avoids turning Flink into only a preprocessing layer in front of a separate AI stack, and instead lets Flink remain the dataflow backbone where data ingestion, transformation, inference, feedback, and sink operations are coordinated together.
This proposal does not aim to replace adjacent systems. Dedicated serving systems such as Triton, vLLM, KServe, BentoML, or internal model gateways remain the right place for optimized model serving. Python-native AI data systems provide valuable ecosystem capabilities. Agent frameworks focus on orchestration-level workflows.
The role of this FLIP is narrower: to evolve Flink's execution layer so that multimodal data, service-style inference, GPU-backed operators, checkpointing, scaling, and recovery can participate coherently in Flink jobs.
This is also the key difference from the Flink ML era. This proposal does not attempt to own the model training lifecycle, experiment management, or notebook-centric ML workflows. It stays on the inference and data-processing side, where Flink's existing runtime foundation and production deployment footprint are directly applicable.
6. Why an Umbrella: Shared Runtime Contract
Several of these sub-FLIPs, viewed in isolation, are general engine improvements. Their value emerges from coordinated design: together they form the runtime contract that AI-oriented data processing requires. If designed independently, they may produce abstractions that are locally usable but globally inconsistent.
A representative production pipeline makes this concrete:
Kafka / CDC / object storage
→ fetch multimodal content
→ preprocess: decode, resize, tokenize, chunk
→ inference: remote serving or local GPU
→ postprocess
→ sink: Paimon / Iceberg / Kafka / KV store
At first glance, each required improvement may look independent. Multimodal types look like a type-system extension. OBJECT_REF looks like a large-object optimization. Columnar transport looks like a Python execution optimization. RpcOperator looks like a service abstraction. GPU resource declaration looks like a scheduling improvement. Non-disruptive scaling and Pipeline Region checkpoints look like general runtime mechanisms.
However, in this pipeline, these capabilities are not independent from the user's point of view. They jointly determine whether Flink can execute the pipeline as one coherent, production-grade dataflow.
The pipeline starts by reading structured records that reference images, video, audio, documents, or embeddings in external storage. The source and format layer needs to expose these objects with consistent multimodal type information. Large objects should not be repeatedly serialized through every operator boundary; they should be represented as OBJECT_REF where possible. When data enters Python preprocessing or inference operators, the Java/Python boundary should preserve the same object and schema semantics, preferably through columnar transport, instead of repeatedly converting rows and materializing large payloads.
The inference stage then introduces runtime-control requirements. If inference is performed by a GPU-backed service inside the Flink job, RpcOperator is needed to represent that service as part of the job's execution model. If inference calls are issued from a CPU-side operator, AsyncOperator needs to track in-flight requests, backend latency, failures, and backpressure. Built-in AI functions and Python UDFs should not each invent their own batching, timeout, retry, and concurrency behavior; they should compile into a common invocation model.
Resource management is tied to the same contract. A GPU-backed inference operator cannot be treated like an ordinary CPU operator. New GPU workers may need to allocate memory, load models, warm up, and register before receiving traffic. During scale-in, they must stop receiving new requests and drain in-flight inference calls before resources are released. Therefore, GPU resource declaration, RpcOperator lifecycle states, load balancing, and non-disruptive scaling must share consistent semantics.
Fault tolerance is also part of the same problem. A single inference may take seconds or minutes, and a GPU OOM or Spot eviction should not force the entire pipeline to roll back and recompute unrelated CPU work. Pipeline Region independent checkpoints need to align with service-style execution boundaries so that GPU failures can be isolated where correctness allows. UAC improvements are needed because long-running async inference can delay checkpoint barriers and cause checkpoint timeouts under load.
This is why these sub-FLIPs should be discussed under one umbrella. The umbrella is not claiming that every mechanism is AI-only. It is saying that, for this workload class, the mechanisms must agree on shared semantics:
- multimodal type and OBJECT_REF semantics across source, Python execution, built-in functions, and sink;
- invocation semantics for batching, concurrency, timeout, retry, backpressure, and draining;
- resource semantics for GPU declaration, model readiness, warmup, load balancing, and scaling;
- recovery semantics for checkpoint boundaries, region isolation, and in-flight requests;
- API semantics so SQL/Table, Java DataStream, and Python DataFrame do not expose incompatible versions of the same concept.
If these capabilities are designed as isolated tactical changes, each may be useful locally, but users will still need to stitch together type handling, service invocation, GPU scheduling, scaling, and recovery manually. The umbrella aligns the workload model and runtime contract so that independently evolving sub-FLIPs converge on one coherent execution model rather than a patchwork of locally optimized abstractions.
7. Evolution Directions and Key Implementation Path
The umbrella describes the evolution from two complementary perspectives.
From a functional perspective, the work is grouped into five evolution directions:
- Development paradigm: provide a Pythonic DataFrame API and service-style inference invocation.
- Data model: introduce multimodal types and
OBJECT_REFfor large-object handling. - Heterogeneous resources: support GPU resource declaration and independent deployment of GPU-backed operators.
- Execution efficiency: optimize the Java/Python boundary, vectorized processing, and large-object data transfer.
- Fault tolerance and elasticity: improve regional checkpointing, UAC behavior, and non-disruptive scaling for long-running inference workloads.
From an engineering perspective, these directions are decomposed into 11 sub-FLIPs organized into three layers according to technical coupling:
- Core Runtime Primitives
- AI Workload Expression and Execution
- Production-Grade Operational Guarantees
This section summarizes each sub-FLIP at the umbrella level — clarifying why it belongs to this umbrella and how it relates to the shared workload model. Detailed API design, runtime protocol, compatibility guarantees, and implementation plans will be discussed in the individual sub-FLIPs.
Layer 1 — Core Runtime Primitives
Layer 1 introduces foundational runtime and data-model abstractions that upper-layer capabilities depend on.
FLIP-XXX: Supporting RpcOperator — Independently Deployed and Scaled RPC Service Operators
Model inference can be part of the logical dataflow, but its execution characteristics differ significantly from ordinary CPU operators. Inference may run on GPUs, require model loading and warmup, expose service-style endpoints, and need independent scaling and recovery.
This FLIP introduces RpcOperator as a first-class service-style operator abstraction. It allows GPU-backed or Flink-managed service-style computation to participate in a Flink job with explicit lifecycle, service discovery, health, backpressure, failover, and scaling semantics.
RpcOperator is complementary to existing async I/O. Async I/O remains the right tool for calling externally deployed services. RpcOperator targets the case where the service is part of the Flink job's resource, lifecycle, and failure domain.
At the umbrella level, this FLIP defines the runtime primitive that other capabilities depend on: GPU independent deployment, non-disruptive GPU scaling, service-side load balancing, and inference-aware failure isolation. The detailed service protocol, deployment model, and integration with existing task scheduling will be defined in the sub-FLIP.
FLIP-XXX: Multimodal Data Type System and Object Reference Mechanism
Flink's current type system does not natively represent multimodal objects such as images, audio, video, tensors, and embeddings. Users often encode them as VARBINARY, strings, or external URIs, which loses semantic information and prevents framework-level optimization.
This FLIP introduces native multimodal types such as Tensor, Image, and Embedding, together with an OBJECT_REF mechanism for large-object handling. Instead of repeatedly serializing large objects through the pipeline, Flink can pass lightweight references where possible and materialize the object only when needed.
These types should be engine-level concepts and visible across SQL/Table, Java DataStream, and Python DataFrame. The sub-FLIP will define type representation, SerDes behavior, object lifecycle, cleanup, recovery semantics, and fallback strategies for connectors and formats that do not initially support these types.
Layer 2 — AI Workload Expression and Execution
Layer 2 focuses on how users express AI-oriented workloads and how those workloads are compiled and executed efficiently.
A shared concern across this layer is inference invocation: batching, concurrency, timeout, retry, backpressure, and draining should not be reimplemented differently in every UDF, built-in function, or service operator. The individual sub-FLIPs may expose different APIs, but they should align on a common runtime model for inference invocation.
FLIP-XXX: A More Pythonic DataFrame API for Python Users
Many AI workload owners are data scientists, ML engineers, and algorithm engineers who expect Python and DataFrame-style programming. PyFlink's current APIs are powerful but still expose Java-centric abstractions in many places.
This FLIP introduces a more Pythonic DataFrame API as a first-class Flink API surface. The goal is to let Python users express distributed data processing logic naturally while still relying on Flink's planner, runtime, state management, checkpointing, and resource scheduling.
At the umbrella level, this FLIP is responsible for the user-facing Python programming model. It should integrate with multimodal types, OBJECT_REF, batch-wise UDFs, async UDFs, GPU resource declaration, and the shared inference invocation semantics. It should not become a separate execution engine.
FLIP-XXX: Connector API for Multimodal Data Source/Sink
Multimodal data is often stored outside structured tables: in object storage, video streams, document stores, or lake formats with binary/vector/variant fields. Users need a consistent way to ingest, reference, process, and sink such data in Flink.
This FLIP defines how Flink integrates multimodal sources, sinks, and external storage/lake formats. It covers source-side ingestion patterns such as object storage traversal, URI-based content fetching, and streaming media sources, as well as sink-side mappings to lake formats such as Paimon, Iceberg, and Lance.
The boundary is important: this FLIP should define source/sink integration and format mapping, but it should not force every connector to implement every multimodal type immediately. Detailed physical layout, field-level SerDes, and advanced type semantics should be coordinated with the multimodal type system sub-FLIP and the corresponding format implementations.
FLIP-XXX: Resource Declaration and Independent Deployment for GPU Workloads
AI-oriented workloads often require GPUs, but current Flink resource declarations are not expressive enough for common inference requirements. Users need to declare GPU type, quantity, memory requirements, parallelism, and whether computation should run locally or through a service-style operator.
This FLIP enables UDF-level and operator-level GPU resource declaration, and allows the planner/runtime to separate GPU-heavy computation from CPU dataflow where appropriate. GPU-backed operators may then be deployed, scheduled, scaled, and recovered independently.
At the umbrella level, this FLIP connects the programming model to the runtime resource model. It should align with RpcOperator, non-disruptive GPU scaling, and the shared inference invocation contract. The detailed syntax, resource model, and scheduling integration will be defined in the sub-FLIP.
FLIP-XXX: Built-in Multimodal Operators and AI Functions
Today, users repeatedly implement common multimodal and AI operations themselves: content fetching, image decoding, resizing, video frame extraction, text chunking, embedding generation, prediction, classification, and extraction. This increases boilerplate and prevents framework-level optimization.
This FLIP introduces built-in operators and functions for common multimodal and AI-oriented operations. These functions should be available to SQL and Python users first, with DataStream integration evaluated through native APIs, UDFs, or Table conversion where appropriate.
The goal is not to reimplement all AI libraries inside Flink. Built-ins should wrap or integrate mature existing libraries where possible, while exposing common operations in a way that Flink can optimize: batching, connection reuse, retry, timeout, resource declaration, model sharing, and observability.
FLIP-XXX: Columnar Data Transport and Processing Optimization
Multimodal and Python-heavy workloads are sensitive to row/column conversion and cross-language serialization overhead. When pipelines chain preprocessing, inference, and postprocessing in Python, repeated Java/Python conversion can become a major bottleneck.
This FLIP optimizes columnar data transport and batch-wise execution, starting from the Java/Python boundary. Arrow-compatible transport and vectorized Python execution can reduce conversion overhead and improve throughput for Python-heavy multimodal pipelines.
The scope is intentionally staged. This FLIP does not claim to deliver full end-to-end columnar execution across Java, SQL, connectors, and runtime in the first phase. Broader Java/SQL columnar execution should be discussed in a separate FLIP. The design here should remain compatible with that future direction.
Layer 3 — Production-Grade Operational Guarantees
Layer 3 focuses on elasticity, checkpointing, and recovery. These mechanisms are not AI-exclusive, but AI workload characteristics push existing implementations past their limits:
- Per-record cost 3–4 orders of magnitude higher means the cost of any failover or global rollback is also 3–4 orders of magnitude higher. Mechanisms whose blast radius was tolerable in BI become catastrophic in inference workloads.
- GPU resources frequently provisioned on Spot means resource churn is routine, not exceptional. Stop-and-restart elasticity is not viable.
- Asynchronous, long-tailed inference operators make mailbox blocking and UAC's barrier-priority behavior become first-order bottlenecks rather than edge cases.
The workload model in §3 is what makes these behaviors load-bearing for production viability.
FLIP-XXX: Non-Disruptive Scaling for CPU Operators
Traffic in AI-oriented pipelines can vary significantly. CPU preprocessing and postprocessing stages may need to scale without stopping the entire job. Restart-based scaling wastes resources during low traffic and causes disruption during peaks.
This FLIP explores non-disruptive scaling for CPU operators through checkpoint-anchored Pipeline Region handover. New regions can be initialized before cutover; old regions can drain in-flight data before being released.
At the umbrella level, this FLIP provides the CPU-side elasticity foundation. The initial scope should focus on topology classes where correctness can be clearly proven, such as stateless, shuffle-light, or simple region-based pipelines. Arbitrary keyed-state migration and complex all-to-all topologies may be out of scope for the first phase.
FLIP-XXX: Non-Disruptive Scaling for GPU Operators
GPU-backed inference operators have stronger elasticity requirements than ordinary CPU operators. GPU resources are expensive, model loading can be slow, and workers may need warmup before receiving traffic. Killing or restarting GPU workers during scaling can waste expensive computation and break inference SLAs.
This FLIP introduces non-disruptive scaling for GPU-backed operators and RpcOperator instances. Scale-out should register new workers only after model loading and warmup complete. Scale-in should stop routing new requests, drain in-flight inference calls, and release resources only after the handover is safe.
This FLIP depends on RpcOperator and service lifecycle semantics. It should also align with GPU resource declaration, load balancing, backpressure, and Pipeline Region checkpointing.
FLIP-XXX: Unaligned Checkpoint Enhancements for Long-Running AI Workloads
Long-running async inference can make checkpoint completion unstable. Async queues may block progress, checkpoint barriers may be delayed behind business messages, and some common forward/pointwise topologies cannot currently benefit from Unaligned Checkpoint.
This FLIP explores improvements to Unaligned Checkpoint behavior for long-running async operators. Potential directions include reducing mailbox blocking, prioritizing checkpoint barriers where appropriate, and allowing explicit opt-in for selected forward/pointwise edges when ordering is not required.
The default behavior should remain unchanged. Any relaxation of ordering-related restrictions must be explicit and limited to safe topology classes. The detailed correctness conditions will be defined in the sub-FLIP.
FLIP-XXX: Independent Checkpoints Based on Pipeline Region
In multimodal inference pipelines, a local failure can cause expensive global recomputation. If one GPU-heavy region fails, rolling back unrelated CPU regions may waste significant completed work.
This FLIP explores Pipeline Region independent checkpointing and recovery. The goal is to isolate checkpoint progress and recovery where correctness allows, so that a failure in one region does not necessarily force the entire job to roll back.
The initial scope should be conservative: simple topologies, shuffle-light pipelines, stateless or lightly stateful computation, and regions that can be reasoned about independently. The sub-FLIP must define how region snapshots compose into job-level recovery, how source progress is coordinated, and how stateful or keyed operators are handled or excluded initially.
8. API Surface Principle
Capabilities introduced under this umbrella must remain consistent across Flink's user-facing APIs. The principle:
Engine-level primitives — RpcOperator, multimodal types, OBJECT_REF, and GPU resource declaration — should be designed with SQL/Table, Java DataStream, and Python DataFrame in mind. Single-API designs are acceptable only when the capability is intrinsically API-bound (such as the Pythonic DataFrame API), or when a sub-FLIP explicitly justifies a staged rollout.
Built-in AI functions should target SQL and Python first, with DataStream integration provided through selected native APIs, UDFs, or Table conversion where appropriate. Columnar transport initially focuses on the Java/Python boundary while remaining compatible with future Java/SQL columnar work. Layer 3 runtime mechanisms apply to supported topology classes regardless of API surface.
9. Compatibility, Deprecation, and Migration Plan
Each sub-FLIP will address its specific compatibility considerations. At the umbrella level:
- All proposed changes are incremental — no existing API or behavior is removed or altered.
- The Python DataFrame API is a new module (
pyflink.dataframe) and does not affect existingpyflink.tableorpyflink.datastreamAPIs. - GPU resource scheduling extends the existing external-resource mechanism, providing better integration and ease of use rather than replacing it.
- New data types (OBJECT_REF, Tensor, Image, Embedding) are new logical types and do not affect existing types.
- Built-in multimodal functions are additions to the function catalog.
- RpcOperator is a newly introduced operator abstraction that coexists with the existing StreamOperator and does not affect the behavior of existing StreamOperator-based jobs.
- Pipeline Region independent checkpoints are provided as a configurable capability; the default behavior remains consistent with the existing global checkpoint.
- Enabling Unaligned Checkpoint on forward / pointwise edges and barrier priority scheduling are runtime optimizations available behind explicit configuration; they do not change the semantic guarantees of checkpoints.
- Non-disruptive scaling preserves exactly-once guarantees through checkpoint-anchored Pipeline Region handover, equivalent to the existing recovery path.
No deprecations are proposed. No migration is required for existing users.