Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-437/FLIP-525, Apache Flink has initially integrated Large Language Model (LLM) capabilities, enabling semantic understanding and real-time processing of streaming data pipelines. This integration has been technically validated in scenarios such as log classification and real-time question-answering systems. However, the current architecture allows Flink to only use embedding models to convert unstructured data (e.g., text, images) into high-dimensional vector features, which are then persisted to downstream storage systems (e.g., Milvus, Mongodb). It lacks real-time online querying and similarity analysis capabilities for vector spaces. To address this limitation, we propose introducing the VECTOR_SEARCH function in this FLIP, enabling users to perform streaming vector similarity searches and real-time context retrieval (e.g., Retrieval-Augmented Generation, RAG) directly within Flink.

Public Interfaces

SQL API

Grammar

The implementation logic of vector search works as follows: when a data record enters the system, it accesses an external system to retrieve matching data and outputs the results. Referencing other systems and FLIP-517, we propose the following function signature:

VECTOR_SEARCH(
    <SEARCH_TABLE>, 
    <COLUMN_TO_SEARCH>,
    <COLUMN_TO_QUERY>, 
    <TOP_K>[,
    <ON_TIME>,
    <CONFIG>])


The definition of the parameters is in the following table.

Name

Type

Required

Description

SEARCH_TABLE

TABLE

Required

Vector table that supports providing data according to input vector.

COLUMN_TO_SEARCH

DESCRIPTOR

Required

The column in the search_table is used to compare the similarity between the input data and data in the search_table.

COLUMN_TO_QUERY

ARRAY<FLOAT>/ARRAY<DOUBLE>

Required

The column in the input record is used to compare with column_to_search to calculate the similarity

TOP_K

INT

Required

Number of output records.

ON_TIME

TIMESTAMP/TIMESTAMP_LTZ

Optional

Time attributes for VECTOR_SEARCH. Only event-time attribute is allowed here to align the behaviour with PTF. If users specify this field, it means the operator should use the event time of the input data to retrieve a historical snapshot of search_table.

If `on_time` is not provided, the query uses the latest snapshot of search_table to get matched rows.

CONFIG

MAP<STRING, STRING>

Optional

Provided options for the VECTOR_SEARCH operator.


The function’s output is structured as follows:

Columns

Description

${search_table}

The output contains all the columns in the ${search_table}

score

Similarity score

If users specify on_time attribute, the output is structured as follows(The behaviour is same as PTF):

Columns

Description

${search_table}

The output contains all the columns in the ${search_table}

score

Similarity score

rowtime

Rowtime column to indicate event time

Examples

The following example shows how to find the top 3 matched products for every user.


-- users schema: id, name, features
-- products schema: id, name, prices, index 
-- output schema: id, name, features, id0, name, prices, index
SELECT * FROM users, LATERAL TABLE (VECTOR_SEARCH(
  TABLE products,
  DESCRIPTOR(`index`),
  users.features,
  3
))

Here we implicitly rename the names for column id and column name to avoid name conflicts.

Other problems

Strict Mode

Strict Mode refers to how the system handles cases where multiple records have the same score and exceed the number specified by the topK parameter. Here, I prefer to analogize it to the behaviors of RANK(), DENSE_RANK(), and TOP-N:

  • RANK(): Rows with the same value share the same rank, and subsequent ranks skip the number of tied rows.
  • DENSE_RANK(): Rows with the same value share the same rank, and subsequent ranks are continuous (no gaps).
  • TOP-N: Rows with the same value do not share ranks; instead, ranks are assigned in a first-come-first-served manner.

In this implementation, we adopt a strategy similar to TOP-N. If future extensions are needed, additional functions with similar logic can be introduced to support new behaviors.

NULL value

If the input is NULL, it means no record can match. In the case of LEFT JOIN LATERAL TABLE, only one row is output. For INNER LATERAL, the result is filtered out, resulting in no output.

Runtime Implementation

Based on whether an external vector database is used and the time attribute, we can categorize it into four categories.


Event Time

Processing Time

Use state to build vector store

The logic is similar to the temporal join operator.

No implementation.

Leverage an external vector store

Similar to lookup join but it requires external vector store supports time travel

Similar to lookup join

In this FLIP, we focus exclusively on the fourth implementation: leveraging an external system for vector search based on processing time. Future FLIPs will discuss other implementations.

Runtime API

The vector search api is very similar to lookup source. The following picture simply describes the relationship among all interfaces.















VectorSearchSource
public interface VectorSearchTableSource extends DynamicTableSource {

  VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext context);

  interface VectorSearchContext extends DynamicTableSource.Context {
  
     /**
      * Returns an array of key index paths that should be used during the search. The indices
      * are 0-based and support composite keys within (possibly nested) structures.
      *
      * <p>For example, given a table with data type {@code ROW < i INT, s STRING, r ROW < i2
      * INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} when {@code i} and
      * {@code s2} are used for performing a lookup.
      *
      * @return array of key index paths
      */
     int[][] getSearchColumns();

     /**
      * Runtime config provided to provider. The config can be used by planner or vector search provider
      * at runtime. For example, async options can be used by planner to choose async inference.
      * Other config such as http timeout or retry can be used to configure search functions.
      */
     ReadableConfig runtimeConfig();

  }
  
  interface VectorSearchRuntimeProvider {  }
}

VectorSearchFunctionProvider
interface VectorSearchFunctionProvider extends VectorSearchRuntimeProvider {

  VectorSearchFunction createVectorSearchFunction();
  
}


VectorSearchFunction
interface VectorSearchFunction extends TableFunction<RowData> {

    /**
     * Using vector, search topK results. The search function should return most similar topK
     * results.
     *
     * <p>Notice: search function should return at most topK results even if multiple results have
     * same scores.
     */
    public abstract Collection<RowData> vectorSearch(int topK, RowData queryData) throws IOException;

    /** Invoke {@link #vectorSearch} and handle exceptions. */
    public final void eval(Object... args) {
        int topK = (int) args[0];
        GenericRowData argsData = GenericRowData.of(args[1]);
        try {
            Collection<RowData> results = vectorSearch(topK, argsData);
            if (results == null) {
                return;
            }
            results.forEach(this::collect);
        } catch  (Exception e) {
            throw new FlinkRuntimeException(
                    String.format("Failed to execute search with input row %s.", argsData), e);
        }
    }
}
VectorSearchFunctionProvider
interface AsyncVectorSearchFunctionProvider extends VectorSearchRuntimeProvider {

  AsyncVectorSearchFunction createAsyncVectorSearchFunction();  

}
AsyncVectorSearchFunction
interface AsyncVectorSearchFunction extends AsyncTableFunction {

    /**
     * Using vector, search topK results. The search function should return most similar topK
     * results.
     *
     * <p>Notice: search function should return at most topK results even if multiple results have
     * same scores.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch(int topK, RowData queryData);

    /** Invoke {@link #asyncVectorSearch} and handle exceptions. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... args) {
        int topK = (int) args[0];
        GenericRowData argsData = GenericRowData.of(args[1]);
        asyncVectorSearch(topK, argsData)
                .whenComplete(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(
                                        new TableException(
                                                String.format(
                                                        "Failed to asynchronously search entries with key '%s'",
                                                        args),
                                                exception));
                                return;
                            }
                            future.complete(result);
                        });
    }
}

Options

name

meaning

max-concurrent-operations

The max number of async i/o operation that the async search can trigger

timeout

The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

retry-delay

The time to wait between retries for the FIXED_DELAY strategy. Could be the base delay time for a (not yet proposed) exponential backoff.

max-attempts

The maximum number of attempts while retrying.

output-mode

Output mode for async operation. Can be ORDERED  (default) or ALLOW_UNORDERED .

async

Whether to run an async search function or not. Default to false.

The config options for the above parameters are:

  • table.exec.async-vector-search.max-concurrent-operations
  • table.exec.async-vector-search.timeout
  • table.exec.async-vector-search.retry-strategy
  • table.exec.async-vector-search.fixed-delay
  • table.exec.async-vector-search.max-attempts


Implementation Details

We simply describe to the planner how to generate the plan and operator for the following example SQL.

SELECT * FROM users, LATERAL TABLE (VECTOR_SEARCH(
  TABLE products,
  DESCRIPTOR(`index`),
  users.features,
  3
))

Because VECTOR_SEARCH refers left table’s column, the initial plan is 

LogicalProject(inputs=[0..5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
   :- LogicalTableScan(table=[[users]])
   +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'index'), $cor0.features, 3)]])
      +- LogicalProject(inputs=[0..2])
         +- LogicalTableScan(table=[[products]])

In the physical phase, we convert the Correlate to PhysicalVectorSearch.  During the optimization, the rule will check whether the right input of the vector search node is simple. If the check fails, an exception will be thrown to notify users. Finally, we will get the following plan:

VectorSearch(joinType=[INNER], searchTable=[products] queryColumn=[2], searchColumn=[2], topK=[3])
+- TableSourceScan(table=[[users]], fields=[id, name, features])

In the execnode, the planner generates an operator to do the search. Note: the search operator is almost the same as the lookup join operator.


The simple PoC code is https://github.com/apache/flink/compare/master...fsk119:flink:vector-search?expand=1


Future Work

Many engines provide a lot of interesting extensions for vector search. Here we discuss how to use these features in the Flink. 

Prefilter

Uncorrelated Filter

Here is an example: output the top 3 matched toy products for every user.

CREATE VIEW p AS SELECT * FROM products WHERE cat = 'toy';

SELECT * FROM users, LATERAL TABLE(VECTOR_SEARCH(
  TABLE p,
  DESCRIPTOR(`index`),
  users.features,
  'cosine',
  3
))

Correlated Filter

Here is an example: output the top 3 matched products for every user, and the prices of these products should be affordable for every user.

SELECT * FROM users, LATERAL TABLE(VECTOR_SEARCH(
  (SELECT * FROM products WHERE prices < users.threshold),
  DESCRIPTOR(`index`),
  users.features,
  'cosine',
  3
))

This does not work for Flink now. The current parser requires the query here is partitioned by the specific columns.

Hybrid Search

Hybrid search allows combining multiple vector search results together and output records satisfy conditions. I prefer to introduce a new function for this because this function require users to input weights for every vector_search results.

SELECT * FROM users, LATERAL TABLE(HYBIR_SEARCH(
  base_table => TABLE products,
  search_column => DESCRIPTOR(`sparse_index`, `dense_index`),
  probe_column => (users.features1,users.features2),
  weight => ARRAY(0.3, 0.7)
  distance_type =>'cosine',
  top_k => 3
)) AS p

Rejected Alternatives

Expose ID field in VECTOR_SEARCH results

While many vector databases support returning ID fields in their search results, this implementation does not treat the ID as an intrinsic output of the vector search computation. Instead, the ID field is conditionally included in the results only when explicitly present in the vector table schema. This design choice reflects the ID's role as a physical data identifier, distinct from the mathematical or semantic relationships derived through vector similarity calculations.

  • No labels