Discussion thread

Vote thread


JIRA


Release

<Flink Version>

Status

In discussion

Motivation

FLIP-437 introduced the model resource and inference in Flink SQL. FLIP-529 introduced the connection resource in Flink SQL. As AI Agent using Large Language Model (LLM), function and Model Context Protocol (MCP) is becoming popular nowadays, the introduction of agent and tool resources in Flink SQL aims to bridge the gap of real-time data processing with AI agent in SQL field. By integrating with existing resource types, it allows for the seamless combination of models, external tools defined with connection, and prompts to create powerful and adaptable agents for various use cases.

Public Interfaces

Tool Resource CRUD Operation

AI Agent needs to utilize tools to perform actions needed by LLM. It could be a function call or MCP tool call. We introduce tool resource to encapsulate different types of tools. Tool as a resource is common practice in various agent frameworks such as Google ADK, CrewAI and LlamaIndex.

Create Tool

CREATE TOOL [IF NOT EXISTS] [[catalog.]database.]<tool_name>
[
USING FUNCTION <function_identifier> 
| 
USING CONNECTION <connection_identifier>
]
[COMMENT <comment_string>]
WITH (<option_list>)

Describe Tool

{ DESCRIBE | DESC } TOOL [EXTENDED] [[catalog_name.]database_name.]<tool_name>

// Example output
| Resource | Type       |
| func1    | function   |
| conn1    | connection |

Show Tools

SHOW TOOLS (FROM | IN) [[catalog_name.]database_name]
[ [NOT] LIKE <sql_like_pattern> ]

Show Create Tool

SHOW CREATE TOOL [[catalog_name.]database_name]

Alter Tool

ALTER TOOL [IF EXISTS] [[catalog.]database.]<tool_name> 
RENAME TO [catalog.][database.]<tool_name1>

ALTER TOOL [IF EXISTS] [catalog.][database.]<tool_name>
SET (<key1>=<val1>[,<key>=<val>]*)
|
RESET (<key1>[,<key]>]*)

Drop Tool

DROP TOOL [IF EXISTS] [[catalog.]database.]<tool_name>

Agent Resource CRUD Operations

The Agent Resource will be defined using a DDL (Data Definition Language) similar to other Flink SQL resources. It will encapsulate references to Model, Prompt, and Tool resources.

Create Agent

CREATE AGENT [IF NOT EXISTS] [catalog.][database.]<agent_name>
USING MODEL <model_identifier>
USING PROMPT <prompt_string>
[USING TOOLS <tool1>, <tool2>]
[COMMENT <comment_string>]
WITH (<option_list>)

Describe Agent

{ DESCRIBE | DESC } AGENT [EXTENDED] [[catalog_name.]database_name.]<agent_name>

Show Agents

SHOW AGENTS (FROM | IN) [[catalog_name.]database_name]
[ [NOT] LIKE <sql_like_pattern> ]

Show Create Agent

SHOW CREATE AGENT [[catalog_name.]database_name]

Describe Agent

{ DESCRIBE | DESC } AGENT [EXTENDED] [[catalog_name.]database_name.]<agent_name>

// Example output:
| resource | type  |
| tool1    | tool  |
| tool2    | tool  |
| model1   | model |
| 'prompt' | prompt|

Alter Agent

ALTER AGENT [IF EXISTS] [[catalog_name.]database_name.]<agent_name>
SET MODEL <model_identifier>
|
SET PROMPT <prompt_string>
|
ADD|REMOVE TOOL <tool_identifer>
|
SET (<key1>=<val1>[,<key>=<val>]*)
|
RESET (<key1>[,<key]>]*)

Drop Agent

DROP AGENT [IF EXISTS] [[catalog.]database.]<agent_name>

Agent DDL Examples

// Create MCP server connection

CREATE CONNECTION mcp_connection
WITH (
  'type' = 'api-key',
  'api-key' = 'some key'
)

CREATE FUNCTION convert_to_celsius
USING JAR 'celsius.jar'
COMMENT 'function to convert degree to celsius'

CREATE TOOL mcp_server
USING CONNECTION mcp_connection
WITH (
  'type' = 'mcp',
  'allowed_tools' = 'tool1,tool2'
  'read_timeout' = '30s'
)

CREATE TOOL convert_to_celsius_tool
USING FUNCTION convert_to_celsius
WITH (
  'type' = 'function',
  'description' = 'This function can be used by model to convert degree to celsius'
)


CREATE MODEL openai
INPUT(text STRING)
OUTPUT (res STRING)
WITH (
  'provider' = 'openai',
  'model' = 'gpt-4o',
  'api_key' = 'some-key'
)

CREATE AGENT weather_agent
USING MODEL openai
USING PROMPT 'Find weather info for provided city'
USING TOOLS mcp_server, get_weather_tool
WITH (
  'provider' = 'default'
)

Catalog Resources and APIs

CatalogTool

/** Interface for a tool in a catalog. */
@PublicEvolving
public interface CatalogTool {

    /**
     * Get the function identifier associated with the tool, if any.
     *
     * @return a nullable function identifier
     */
    @Nullable
    ObjectIdentifier getFunction();

    /**
     * Get the connection identifier associated with the tool, if any.
     *
     * @return a nullable connection identifier
     */
    @Nullable
    ObjectIdentifier getConnection();

    /**
     * Create a deep copy of the tool.
     *
     * @return a deep copy of "this" instance
     */
    CatalogTool copy();

    /**
     * Create a deep copy of the tool with new options.
     *
     * @param newOptions the new options to use for the copied tool
     * @return a deep copy of "this" instance with the provided options
     */
    CatalogTool copy(Map<String, String> newOptions);

    /**
     * Get comment of the tool.
     *
     * @return comment of the tool
     */
    @Nullable
    String getComment();

    /**
     * Returns a map of string-based tool options.
     *
     * @return the tool options
     */
    default Map<String, String> getOptions() {
        return Collections.emptyMap();
    }

    /** Builder for configuring and creating instances of {@link CatalogTool}. */
    @PublicEvolving
    static CatalogTool.Builder newBuilder();

    /** Builder for configuring and creating instances of {@link CatalogTool}. */
    @PublicEvolving
    class Builder {
        private Builder() {}

        public Builder functionIdentifier(@Nullable ObjectIdentifier functionIdentifier);

        public Builder connectionIdentifier(@Nullable ObjectIdentifier connectionIdentifier);

        public Builder options(Map<String, String> options);

        public Builder comment(@Nullable String comment);

        public CatalogTool build();
    }
}

Catalog API for Tool

    /**
     * Get names of all tools under this database. An empty list is returned if none exists.
     *
     * @return a list of the names of all tools in this database
     * @throws DatabaseNotExistException if the database does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default List<String> listTools(String databaseName)
            throws DatabaseNotExistException, CatalogException;

    /**
     * Returns a {@link CatalogTool} identified by the given {@link ObjectPath}.
     *
     * @param toolPath path of the tool
     * @return the requested tool
     * @throws ToolNotExistException if the target does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default CatalogTool getTool(ObjectPath toolPath)
            throws ToolNotExistException, CatalogException;

    /**
     * Check if a tool exists in this catalog.
     *
     * @param toolPath Path of the tool
     * @return true if the given tool exists in the catalog false otherwise
     * @throws CatalogException in case of any runtime exception
     */
    default boolean toolExists(ObjectPath toolPath) throws CatalogException;

    /**
     * Drop a tool.
     *
     * @param toolPath Path of the tool to drop
     * @param ignoreIfNotExists if set to true, it does not throw an exception if the tool doesn't
     *     exist
     * @throws ToolNotExistException if the tool does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void dropTool(ObjectPath toolPath, boolean ignoreIfNotExists)
            throws ToolNotExistException, CatalogException;

    /**
     * Rename an existing tool.
     *
     * @param toolPath Path of the tool to rename
     * @param newToolName new name of the tool
     * @param ignoreIfNotExists if set to true, it does not throw an exception if the tool doesn't
     *     exist
     * @throws ToolNotExistException if the tool does not exist
     * @throws ToolAlreadyExistException if the target tool name already exist
     * @throws CatalogException in case of any runtime exception
     */
    default void renameTool(ObjectPath toolPath, String newToolName, boolean ignoreIfNotExists)
            throws ToolNotExistException, ToolAlreadyExistException, CatalogException;

    /**
     * Creates a new tool.
     *
     * @param toolPath path of the tool
     * @param tool the tool to be created
     * @param ignoreIfExists flag to specify behavior if a tool with the given name already exists:
     *     if set to false, it throws a ToolAlreadyExistException, if set to true, nothing happens.
     * @throws ToolAlreadyExistException if the tool already exist
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default void createTool(ObjectPath toolPath, CatalogTool tool, boolean ignoreIfExists)
            throws ToolAlreadyExistException, DatabaseNotExistException, CatalogException;

    /**
     * Modifies an existing tool.
     *
     * @param toolPath path of the tool
     * @param newTool the updated tool
     * @param ignoreIfNotExists flag to specify behavior if the tool does not exist: if set to
     *     false, throw an exception; if set to true, nothing happens
     * @throws ToolNotExistException if the tool does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void alterTool(ObjectPath toolPath, CatalogTool newTool, boolean ignoreIfNotExists)
            throws ToolNotExistException, CatalogException;

    /**
     * Modifies an existing tool with a list of tool changes.
     *
     * @param toolPath path of the tool
     * @param newTool the updated tool
     * @param toolChanges list of changes to apply to the tool
     * @param ignoreIfNotExists flag to specify behavior if the tool does not exist: if set to
     *     false, throw an exception; if set to true, nothing happens
     * @throws ToolNotExistException if the tool does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void alterTool(
            ObjectPath toolPath,
            CatalogTool newTool,
            List<ToolChange> toolChanges,
            boolean ignoreIfNotExists)
            throws ToolNotExistException, CatalogException;

CatalogAgent

@PublicEvolving
public interface CatalogAgent {

    /**
     * Get the model identifier associated with the agent.
     *
     * @return the model identifier
     */
    ObjectIdentifier getModel();

    /**
     * Get the prompt string associated with the agent.
     *
     * @return the prompt string
     */
    String getPrompt();

    /**
     * Get the tool identifiers associated with the agent, if any.
     *
     * @return a list of tool identifiers, empty if none
     */
    List<ObjectIdentifier> getTools()

    /**
     * Create a deep copy of the agent.
     *
     * @return a deep copy of "this" instance
     */
    CatalogAgent copy();

    /**
     * Get comment of the agent.
     *
     * @return comment of the agent
     */
    @Nullable
    String getComment();

    /**
     * Returns a map of string-based agent options.
     *
     * @return the agent options
     */
    Map<String, String> getOptions();

    /** Builder for configuring and creating instances of {@link CatalogAgent}. */
    @PublicEvolving
    static CatalogAgent.Builder newBuilder();

    /** Builder for configuring and creating instances of {@link CatalogAgent}. */
    @PublicEvolving
    class Builder {
        private Builder() {}

        public Builder modelIdentifier(String modelIdentifier);

        public Builder prompt(String prompt);

        public Builder toolIdentifiers(List<String> toolIdentifiers);

        public Builder options(Map<String, String> options);

        public Builder comment(@Nullable String comment);

        public CatalogAgent build();
    }
}

Catalog API for Agent

/**
     * Get names of all agents under this database. An empty list is returned if none exists.
     *
     * @return a list of the names of all agents in this database
     * @throws DatabaseNotExistException if the database does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default List<String> listAgents(String databaseName)
            throws DatabaseNotExistException, CatalogException;

    /**
     * Returns a {@link CatalogAgent} identified by the given {@link ObjectPath}.
     *
     * @param agentPath path of the agent
     * @return the requested agent
     * @throws AgentNotExistException if the target does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default CatalogAgent getAgent(ObjectPath agentPath)
            throws AgentNotExistException, CatalogException;

    /**
     * Check if an agent exists in this catalog.
     *
     * @param agentPath Path of the agent
     * @return true if the given agent exists in the catalog false otherwise
     * @throws CatalogException in case of any runtime exception
     */
    default boolean agentExists(ObjectPath agentPath) throws CatalogException;

    /**
     * Create an agent.
     *
     * @param agentPath Path of the agent to create
     * @param agent The agent definition
     * @param ignoreIfExists Flag to specify behavior when an agent with the given name already
     *     exists: if set to false, throw an AgentAlreadyExistException, if set to true, do nothing.
     * @throws AgentAlreadyExistException if the given agent already exists and ignoreIfExists is
     *     false
     * @throws DatabaseNotExistException if the database in agentPath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default void createAgent(ObjectPath agentPath, CatalogAgent agent, boolean ignoreIfExists)
            throws AgentAlreadyExistException, DatabaseNotExistException, CatalogException;

    /**
     * Drop an agent.
     *
     * @param agentPath Path of the agent to drop
     * @param ignoreIfNotExists if set to true, it does not throw an exception if the agent doesn't
     *     exist
     * @throws AgentNotExistException if the agent does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void dropAgent(ObjectPath agentPath, boolean ignoreIfNotExists)
            throws AgentNotExistException, CatalogException;

    /**
     * Rename an existing agent.
     *
     * @param agentPath Path of the agent to rename
     * @param newAgentName new name of the agent
     * @param ignoreIfNotExists if set to true, it does not throw an exception if the agent doesn't
     *     exist
     * @throws AgentNotExistException if the agent does not exist
     * @throws AgentAlreadyExistException if the target agent name already exist
     * @throws CatalogException in case of any runtime exception
     */
    default void renameAgent(ObjectPath agentPath, String newAgentName, boolean ignoreIfNotExists)
            throws AgentNotExistException, AgentAlreadyExistException, CatalogException;

    /**
     * Modifies an existing agent.
     *
     * @param agentPath path of the agent
     * @param newAgent the updated agent
     * @param ignoreIfNotExists flag to specify behavior if the agent does not exist: if set to
     *     false, throw an exception; if set to true, nothing happens
     * @throws AgentNotExistException if the agent does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void alterAgent(ObjectPath agentPath, CatalogAgent newAgent, boolean ignoreIfNotExists)
            throws AgentNotExistException, CatalogException;

    /**
     * Modifies an existing agent with a list of agent changes applied to the provided agent.
     *
     * @param agentPath path of the agent
     * @param newAgent the updated agent
     * @param agentChanges list of changes to apply to the agent
     * @param ignoreIfNotExists flag to specify behavior if the agent does not exist: if set to
     *     false, throw an exception; if set to true, nothing happens
     * @throws AgentNotExistException if the agent does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void alterAgent(
            ObjectPath agentPath,
            CatalogAgent newAgent,
            List<AgentChange> agentChanges,
            boolean ignoreIfNotExists)
            throws AgentNotExistException, CatalogException;

Agent Runtime

A new SQL function agent_run , will be introduced to execute the agent with specific input parameters.

Function Signature

agent_run(
  INPUT     => <input_table_identifier>, 
  AGENT     => <agent_identifier>, 
  INPUT_COL => <input_column_descriptor>,
  [CONFIG   => config]
)
  • <input_table_identifier>: Input table identifier.

  • <agent_identifier>: The identifier of the agent resource to execute.

  • <input_column_descriptor>: input table column input for agent.

  • <config>: An optional config for agent runtime.

  • RETURNS ROW type: The output from the agent execution is agent_output column + original table columns. If agent_output  conflicts with original table columns, 0 will be added so that output column name will be agent_output0 .

Example Usage

// With col id
SELECT * FROM agent_run(
  INPUT => TABLE city_table PARTITION BY session_id, 
  AGENT => AGENT weather_agent, 
  COL => DESCRIPTOR(city), 
  CONFIG => MAP[]);

Agent runtime interfaces

@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface ArgumentHint {
  // Description of argument
  String description() default ""; // Add new method for ArgumentHint
}
public class StaticArgument {
  String description() // Add optional description to static argument
}
// Function Tool generated from runtime from Flink Function definition
// This is generated and passed to user since user has no access to this internal
// information
public interface FunctionTool {
  public List<StaticArgument> getInputsArguments();
  public StaticArugment getOutputArgument();

  public Object call(Object... );
}
public interface AgentProviderFactory {
    AgentProvider createAgentProvider(Context context);
    
    interface Context {
        /**
         * Path of the agent with db
         */
        ObjectIdentifier getObjectIdentifier();
 
        /**
         * Get resolved catalog agent. A resolved agent 
         * contains resolved catalog model and resolved tools instance
         * as well as their identifier. Connection in Tool should be decrypted when
         * passed to createAgentProvider
         */
        ResolvedCatalogAgent getCatalogAgent();
        
        /** Gives read-only access to the configuration of the current session. */
        ReadableConfig getConfiguration();

        /**
         * Returns the class loader of the current session.
         *
         * <p>The class loader is in particular useful for discovering further (nested) factories.
         */
        ClassLoader getClassLoader();

        /** Whether the agent is temporary. */
        boolean isTemporary();
    }
} 
public interface AgentProvider {
    /** Context for creating runtime providers. */
    @PublicEvolving
    interface Context {

        /** Resolved catalog agent. */
        ResolvedCatalogAgent getCatalogAgent();

        /** Generated Flink function tools. */
        Map<ObjectIdentifer, FunctionTool> getFunctionTools();

        /**
         * Runtime config provided to provider. The config can be used by planner or agent provider
         * at runtime. For example, stateful options can be used by planner to choose stateful agent runtime.
         * Other config such as http timeout or retry can be used to configure agent provider
         * runtime http client when calling external model providers such as OpenAI.
         */
        Configuration runtimeConfig();
    }
} 
/** Runtime provider. */
public class AgentRuntimeProvider implements AgentProvider {
    public AgentRunner createAgentRunner(Context context);
} 
@PublicEvolving
public abstract class AgentRunner extends TableFunction<RowData> {

    /**
     * Synchronously run agent based on input row.
     *
     * @param inputRow - A {@link RowData} that wraps input for agent runner.
     * @return A collection of agent run results.
     */
    public abstract Collection<RowData> run(RowData inputRow);

    /** Invoke {@link #predict} and handle exceptions. */
    public final void eval(Object... args) {
        GenericRowData argsData = GenericRowData.of(args);
        try {
            Collection<RowData> results = run(argsData);
            if (results == null) {
                return;
            }
            results.forEach(this::collect);
        } catch (Exception e) {
            throw new FlinkRuntimeException(
                    String.format("Failed to execute agent runner with input row %s.", argsData), e);
        }
    }
}

Proposed Changes

During the physical rewrite phase, we can introduce a rule to rewrite the logical plan to a new exec node which can store ContextResolvedAgent  and process each row. ContextResolvedAgent  needs to store ContextResolvedModel , Prompt , a list of ContextResolvedTool . These resources need to be serializable.

Model execution

Model execution will be implemented by agent developer by using ResolvedCatalogModel  inside ResolvedCatalogAgent . Flink can provide builtin tool calling model execution interfaces.

MCP execution

For external tools like MCP, users can implement how to talk to MCP server once they get connection information. Flink can provide builtin helper functions to list mcp tools and execute them.

Function/Tool execution

Function needs to satisfy following properties:

  • Scalar function.

  • InputTypeStrategy has StaticArgument  or Sequence of ArgumentTypeStrategy .

  • Output type strategy is explicit data type.

  • FunctionDefinition  also must provide runtime with eval  functions so that the function can be executed if the model decides to call the function. In exec node, it will codegen UDFs with customized collector  and construct FunctionTool  map with generated UDFs.

Relationship with flink-agent project

Flink-agent project provides a superset of functionalities that agent sql can provide in Flink SQL. An analogy is flink-agent projects can provide data stream level execution and agent sql focus on SQL and Table api level. Agent sql runtime can also leverage flink-agent runtime primitives by delegating llm/mcp/tool calling to flink-agent runtime. It can be done so by constructing flink-agent definitions in AgentProvider  using ResolvedCatalogAgent's model/prompt/tool definitions.

Compatibility, Deprecation, and Migration Plan

NA

Test Plan

  • All existing tests pass

  • Add new unit tests and integration tests for any new code changes

Rejected Alternatives

  • Providing fine grained interfaces such as ToolCallingModelProvider  for tooling calling purposes and add it to AgentProvider  context . This is because user may have this done through other libraries and we don't want to force user implementing this logic in Flink. Alternatively we can provide this for default AgentProvider .
  • No labels