This FLIP is co-authored by Sean Falconer, Chris Meyers and Xintong Song.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Flink Agents is a proposed Apache Flink sub-project (flink-agents) that introduces a native framework for building, testing, and running AI agents within Flink’s event-driven runtime. As enterprises adopt AI to automate complex decisions, agents increasingly need to operate on real-time data streams with low latency, strong fault tolerance, and seamless tool integration. Today, building such agents requires stitching together disjointed systems for stream processing, model inference, and orchestration, introducing complexity, fragility, and latency. This sub-project addresses that gap by enabling developers to build composable, long-running agents directly on Flink using its Table and DataStream APIs.

Note: By sub-project, we mean to create a separate git repository, managed by the Flink PMC and Committers. All Flink Committers should have write access to the new repository, and any release should be approved by the Flink PMC.

Our Worldview: Why This Approach Makes Sense

We believe agents should be built as event-driven microservices, software processes that:

  • Consume and respond to business events in real time
  • Access rich contextual data using streaming pipelines
  • Use LLMs as reasoning engines to plan, decide, or act

This worldview has four key pillars:

  1. Event-Native by Design: Most enterprise workflows are not synchronous or prompt-based, they’re asynchronous, stateful, and continuous. Agents built with Flink consume and process streams of events, allowing them to act over time, not in isolated, one-off requests.
  2. Agents Need Fresh, Contextual Data: Agents can’t do anything useful without the right data. Whether it's detecting fraud, generating a recommendation, or planning a response, agents need a problem-specific view of live, accurate, and relevant context. Flink and streaming storages like Kafka together form the ideal substrate to capture, process, and retain that data in motion. This enables agents to access timely context on demand, at the moment a decision needs to be made, without relying on stale snapshots or brittle polling mechanisms.
  3. Replayability for Iteration and Safety: Event-driven systems enable replay of input data. This allows agents to be developed and evaluated using real data without invoking live side effects. It supports local testing, dark launches, A/B testing, and faster iteration.
  4. Every Engineer is an AI Engineer: By extending familiar Flink constructs and targeting Java and Python developers, not just data scientists, we aim to democratize agent development. Our goal is to let developers use what they already know (event-driven programming) to build something entirely new (AI agents). This point of view leads to a simple idea: Agents won't succeed in the enterprise unless they're built like real software, long-running, asynchronous, and driven by live data. They need infrastructure that treats context, state, and observability as first-class concerns. Flink Agents gives them that. It turns agents from brittle demos into production systems.

This point of view leads to a simple idea: Agents won't succeed in the enterprise unless they're built like real software, long-running, asynchronous, and driven by live data. They need infrastructure that treats context, state, and observability as first-class concerns. Flink Agents gives them that. It turns agents from brittle demos into production systems.

Motivation

While model quality continues to improve, the real challenge in deploying agentic systems is infrastructure. Agents require access to live data, toolchains, and other agents. They must operate continuously, share outputs asynchronously, and integrate with multiple systems. These needs aren't met by static prompt chains or batch-based pipelines.

Flink is uniquely suited to meet these requirements. It offers a mature foundation for building long-running, autonomous systems that act on continuous streams of machine-generated data with low latency and high reliability.

The Gap

Flink today is a powerful stream processing engine, but it lacks native abstractions for building agent-oriented workflows. Developers must cobble together agents using external runtimes, various frameworks, orchestration layers, and disconnected tooling. This leads to:

  • Operational complexity: Managing dual-write pipelines, external agents, and intermediate data stores adds fragility and cost.
  • Limited visibility and control: Without native agent lifecycle support, debugging and optimizing agent workflows becomes opaque.
  • Slow iteration cycles: Without unified runtime and test utilities, shipping and validating agent logic is slow and error-prone.

At the same time, demand is growing across the industry, there is an urgent need for a composable, testable, and scalable way to run AI agents on real-time data. From anomaly detection to fraud prevention, predictive maintenance, supply chain optimization, and real-time personalization, enterprises need a way to bring agentic workflows natively into stream processing.

Apache Flink is uniquely suited to serve as the infrastructure backbone for the next generation of real-time, data-driven AI agents. It offers a mature foundation for building long-running, autonomous systems that act on continuous streams of machine-generated data with low latency and high reliability.

Out of the box, Flink provides:

  • A high-performance, low-latency runtime for continuous processing
  • Strong fault-tolerance and exactly-once semantics for stateful jobs
  • Built-in state management, enabling agents to persist and query context without external stores
  • Native integration with streaming storages like Kafka for durable, replayable messaging
  • Table and DataStream APIs for rich, declarative data transformations

These capabilities make Flink ideal for agentic workloads that aren’t just user-triggered, but system-triggered, from clickstreams and sensor telemetry to transactions and alerts. These agents must run continuously, act autonomously, and recover from failure without manual intervention. Flink was built for exactly these scenarios.

Seamless Integration of Data Processing and Agentic Workflow

One of Flink’s key advantages as an agent framework is its ability to natively integrate data processing with agentic workflows, avoiding the complexity of stitching together separate systems.

System-triggered agent workflows are often tightly coupled with upstream or downstream data transformations. For example:

  • Computing transaction metrics and joining with account profiles before detecting financial fraud
  • Aggregating by demographic groups after analyzing attitudes from social media posts

With separate frameworks, switching between data processing and agent execution is inefficient and error-prone, especially in large-scale systems where this boundary is crossed frequently.

With Flink, agents can consume arbitrary data streams as input and emit new data streams as output. This makes the connection between streaming data and agent reasoning natural and consistent. Flink’s checkpointing ensures end-to-end consistency across both processing and decision logic, no external coordination required.

Designed for Scale and Complexity

These agent use cases share several demanding characteristics:

  • High-throughput inputs across many partitions
  • Continuous, long-running execution
  • No human gating, requiring full autonomy and fault tolerance
  • Coordination across data domains and keys

Flink meets all of these needs natively, enabling agents to:

  • React in real time to massive volumes of events
  • Enrich and aggregate context over time
  • Share and persist state across invocations
  • Reprocess historical events for debugging, retraining, or audit

The Opportunity

The Flink Agents project fills the architectural gap between traditional stream processing and the emerging demands of compound and agentic AI. By bringing agent lifecycle, communication, replayability, and tool invocation natively into Flink, we give developers a coherent, scalable way to build intelligent systems that can react in real time without gluing together disconnected components.

Just as apps needed operating systems to thrive, agents need infrastructure purpose-built for long-lived, autonomous, data-driven operation. Flink Agents aims to be that infrastructure, natively within Apache Flink.

Proposed Solution

The proposed solution introduces a native framework for building, running, and scaling real-time AI agents. The goal is to provide the minimal runtime infrastructure needed to support long-running, event-driven agents directly within Flink, leveraging existing Flink primitives.

The focus of the initial phase is to deliver the core agent capabilities while maintaining simplicity, composability, and tight integration with Flink's existing Table and DataStream APIs.

Key Features:

  • Model Inference Support: Ability to define and call models for inference, enabling agents to perform reasoning, classification, or generation tasks based on incoming data or enriched context.
  • Tool Invocation & External APIs: Agents can call tools (e.g., SaaS apps, internal services, purpose-built functions) using UDFs with network access. Support for tool invocation via Model Context Protocol (MCP).
  • Contextual Search: Ability to gather external context (e.g., vector search, JDBC, REST) to enrich agent reasoning and decision-making.
  • Lifecycle and Context Management: Basic support for initializing, running, and tearing down agents, along with a mechanism for passing contextual information between invocations.
  • Agent Shell Runtime: A lightweight execution framework that enables agents to run as long-lived, stateful Flink jobs.
  • Replayability and Reprocessing: Native support for reprocessing historical event streams using streaming storages like Kafka and Flink’s bounded execution. Enables regression testing, agent debugging, model drift analysis, and post-hoc decision traceability.
  • Observability and Evaluation: Supports for understanding what’s happening inside the agentic workflow, as well as to evaluate the quality of the agent outputs.
  • Inter-Agent Communication: Built-in support for asynchronous agent-to-agent communication using streaming storages like Kafka. Messages can be replayed, routed, and persisted.
  • Agent & Workflow Support: Support developing of static planned workflows, dynamic ReAct style agents, and a mixture of both.
  • Sample Agents: Working examples demonstrating how to build and deploy an agent using the framework, guiding adoption and best practices.

Public Interfaces

The following examples illustrate how developers might use Flink Agents with Flink’s existing Table and DataStream APIs. These are demonstrative mockups, intended to showcase the intended developer experience and potential abstractions. They are not finalized API designs and should be considered exploratory and subject to change as the project evolves.

Our goal is to reuse and extend familiar Flink constructs so that Flink developers can define, test, and operate agents without learning a completely new framework. As we gather feedback and validate core functionality during the MVP phase, these APIs will continue to evolve with the broader Flink community.

Java API (Flink Table API)

This example shows how to:

  • Connect to external resources like an OpenAI endpoint or an MCP tool registry [FLIP-529].
  • Register a model using the CREATE MODEL syntax from [FLIP-507].
  • Define an agent workflow that uses a model, tools, and prompts.
  • Apply the agent to a Table input and generate a Table output, all within Flink’s Table API.


Java Example
import org.apache.flink.table.api.*;
import org.apache.flink.flink-agents.Agent;
import org.apache.flink.flink-agents.AgentRuntime;
import org.apache.flink.flink-agents.AgentWorkflow;

// Connections from FLIP-529
tableEnv.createConnection("my_mcp_server", ConnectionDescriptor
  .connectionOptions(
    'type' = 'mcp',
    'mcp.endpoint' = 'https:aaaaaa.com'
    'mcp.access_key' = 'xxxxxx')
  .build());
tableEnv.createConnection("openai_model_connection", ConnectionDescriptor
  .connectionOptions(
    'type' = 'openai',
    'openai.endpoint' = 'https://api.openai.com/v1/chat/completions',
    'openai.access_key' = 'xxxxxxx')
  .build());

Schema inputSchema = ...;
Schema outputSchema = ...;

// TableAPI Models from FLIP-507
tableEnv.createModel("my_openai_model", ModelDescriptor
  .inputSchema(inputSchema)
  .outputSchema(outputSchema)
  .modelOptions(
    'provider' = 'openai',
    'openai.connection' = 'openai_model_connection')
  // We don't provide a system prompt here, as the agent would override it.
  .build());

Agent myAgent = Agent.createAgent(AgentWorkflow.
  .model("my_openai_model")
  .mcpServer("my_mcp_server")
  .prompt("...")
  // Alternatively, use a named prompt from the mcpServer:
  // .remotePrompt("mcp_server_system_prompt_1")
  // Allow a subset of tools from the server, to avoid pulling in too many.
  .tools(["get_weather", "list_user_orders"])
  // The list of tools can also include local Flink UDFs.
  .localTools(["local_flink_udf_tool"])
  .build());

Table input_table = ...;
// Apply the agent to the TableAPI input and convert back to the output table.
Table result = AgentRuntime.fromTable(input_table).apply(myAgent).toTable();

Python API (PyFlink Table and DataStream API)

This example illustrates:

  • Defining an agent using AgentWorkflow.
  • Using the agent with both Table and DataStream APIs.
  • Applying tools and MCP-discovered capabilities.
  • Executing the agent as part of a Flink streaming pipeline.


Python Example
from flink_agent import Workflow
from flink_agent import AgentWorkflow
from flink_agent import AgentRuntime

# Defining an Agent Workflow
my_agent = AgentWorkflow( # AgentWorkflow is a built-in subclass of Workflow
   model = OllamaChatModel(
       host = "http://localhost:11434",
       model = "qwen2.5:7b",
   ),
   prompt = "...",
   mcp_servers = [
       MCPServerStdio(
           command = "...",
           args = ["...", ]
       ),
   ],
   local_tools = [PythonClass.my_tool],
   tools = ["get_weather", "list_user_orders"]
)

# Use with PyFlink Table
input_table = …
output_table = AgentRuntime\
   .from_table(input_table)\
   .apply(my_agent)\
   .to_table()

# Use with PyFlink DataStream
input_datastream = ...
output_datastream = AgentRuntime\
   .from_datastream(input_datastream)\
   .apply(my_agent)\
   .to_datastream()

# ----------------------------
# Using a Customized workflow.
class MyWorkflow(Workflow):
   # Define a tool as a python function
   @tool
   @staticmethod
   def my_tool(args):
       """Google style description of the tool."""
       pass

   # Define a MCP server, from which model can discover available tools
   @mcp_server
   @staticmethod
   def my_mcp_server():
       return MCPServerStdio(
           command = "...",
           args = ["...", ]
       )

   # Define a model, with system prompts, available tools and MCP servers
   @model
   @staticmethod
   def my_llm():
       return OllamaChatModel(
           host = "...",
           model = "...",
           prompt = "...",
           tools = ["my_tool"],
           mcp_servers = ["my_mcp_server"],
       )

   # Define a workflow step, which subscribes to InputEvent and ChatModelResponseEvent
   @step(InputEvent, ChatModelResponseEvent)
   def my_step(event: Event, state: State, ctx: RunnerContext):
       if isinstance(event, InputEvent): # Built-in InputEvent will be received for each incoming input
           input = event.input
           state.messages.append(input)
           ctx.send_event(ChatModelEvent(model = 'my_llm', messages = state.messages)) # send ChatModelEvent to invoke a defined model
       elif isinstance(event, ChatModelResponseEvent): # Built-in ChatModelResponseEvent will be received for each model responses
           state.messages.append(event.response)
           ctx.send_event(OutputEvent(event.response)) # send OutputEvent to emit outputs

# Use with PyFlink DataStream
my_workflow = MyWorkflow()
input_datastream = ...
output_datastream = AgentRuntime\
   .from_datastream(input_datastream)\
   .apply(my_workflow)\
   .to_datastream()

Flink SQL

This example illustrates:

  • Equivalent functionality in SQL with a loaded module.
  • Default behavior with minimal customization using SQL UDF.
SQL Example
-- Connections from FLIP-529
CREATE CONNECTION my_mcp_server
WITH (
  'type' = 'mcp',
  'mcp.endpoint' = 'https:aaaaaa.com',
  'mcp.access_key' = 'xxxxxx'
)

CREATE CONNECTION openai_model_connection
WITH (
  'type' = 'openai',
  'openai.endpoint' = 'https://api.openai.com/v1/chat/completions',
  'openai.access_key' = 'xxxxxxx'
)

-- Models from FLIP-437
CREATE MODEL `my_openai_model`
INPUT (input STRING)
OUTPUT (output STRING)
WITH(
  'provider' = 'openai',
  'openai.connection' = openai_model_connection,
  'task' = 'classification',
  'type' = 'remote',
);


-- Load the flink-agents module to access the AgentWorkflow UDF
-- This UDF supplies "default" agent behavior with minimal custimazation.
LOAD MODULE flink-agents;

SELECT * FROM input_table, LATERAL TABLE(
  AGENT_WORKFLOW(
    'my_openai_model', -- model
    'my_mcp_server', -- MCP Server
    'Agent System Prompt String', -- prompt
    ["get_weather", "list_user_orders"], -- Remote MCP Tools
    ["local_flink_udf_tool"] -- Local UDF Tools
  )
)

Operating Model

Flink Agents will be developed under the umbrella of Apache Flink, but with an execution-first approach that prioritizes velocity, early validation, and community growth.

  • Design discussions
    • The sub-project will initially operate outside the formal Apache FLIP process to enable rapid iteration and prototyping.
    • We expect most discussions will happen offline, but will be reflected on Github discussions, for transparency and for wider community to chime in.
    • Formal design reviews (FLIPs) and upstream alignment will begin after the beta phase, once key abstractions have been validated and stabilized.
  • Tasks & issues tracking:
    • We plan to use Github issues rather than JIRA, to streamline collaboration and lower the barrier for new contributors.
    • It’s also easier for tracking tasks & issues only relevant to the sub-project, without requiring carefully setting the jira ticket fields.
  • Code changes
    • We will still require code changes to be reviewed and merged by Flink Committers, as required by ASF policy.

Roadmap

Flink Agents project will follow a phased roadmap focused on delivering a usable MVP quickly, then expanding functionality based on adoption and feedback.

Phase

Key Deliverables

Target Date

MVP Eng Design

Define core components, architecture, interfaces, and feature scope

May-June 2025

MVP Development

Model support, tool invocation, contextual search, lifecycle and context management, replayability

July-August 2025

Multi-agent and examples

Inter-agent communication, sample applications

September 2025

MVP Launch

MVP release and announcement

October 2025

Post MVP Expansions

Formalize FLIP process, add new features (multi-agent coordination, observability)

Q4 2025 onward