DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In FLIP-437/FLIP-525, Apache Flink has initially integrated Large Language Model (LLM) capabilities, enabling semantic understanding and real-time processing of streaming data pipelines. This integration has been technically validated in scenarios such as log classification and real-time question-answering systems. However, the current architecture allows Flink to only use embedding models to convert unstructured data (e.g., text, images) into high-dimensional vector features, which are then persisted to downstream storage systems (e.g., Milvus, Mongodb). It lacks real-time online querying and similarity analysis capabilities for vector spaces. To address this limitation, we propose introducing the VECTOR_SEARCH function in this FLIP, enabling users to perform streaming vector similarity searches and real-time context retrieval (e.g., Retrieval-Augmented Generation, RAG) directly within Flink.
Public Interfaces
SQL API
Grammar
The implementation logic of vector search works as follows: when a data record enters the system, it accesses an external system to retrieve matching data and outputs the results. Referencing other systems and FLIP-517, we propose the following function signature:
VECTOR_SEARCH( <SEARCH_TABLE>, <COLUMN_TO_SEARCH>, <COLUMN_TO_QUERY>, <TOP_K>[, <ON_TIME>, <CONFIG>])
The definition of the parameters is in the following table.
Name | Type | Required | Description |
SEARCH_TABLE | TABLE | Required | Vector table that supports providing data according to input vector. |
COLUMN_TO_SEARCH | DESCRIPTOR | Required | The column in the search_table is used to compare the similarity between the input data and data in the search_table. |
COLUMN_TO_QUERY | ARRAY<FLOAT>/ARRAY<DOUBLE> | Required | The column in the input record is used to compare with column_to_search to calculate the similarity |
TOP_K | INT | Required | Number of output records. |
ON_TIME | TIMESTAMP/TIMESTAMP_LTZ | Optional | Time attributes for VECTOR_SEARCH. Only event-time attribute is allowed here to align the behaviour with PTF. If users specify this field, it means the operator should use the event time of the input data to retrieve a historical snapshot of search_table. If `on_time` is not provided, the query uses the latest snapshot of search_table to get matched rows. |
CONFIG | MAP<STRING, STRING> | Optional | Provided options for the VECTOR_SEARCH operator. |
The function’s output is structured as follows:
Columns | Description |
${search_table} | The output contains all the columns in the ${search_table} |
score | Similarity score |
If users specify on_time attribute, the output is structured as follows(The behaviour is same as PTF):
Columns | Description |
${search_table} | The output contains all the columns in the ${search_table} |
score | Similarity score |
rowtime | Rowtime column to indicate event time |
Examples
The following example shows how to find the top 3 matched products for every user.
-- users schema: id, name, features -- products schema: id, name, prices, index -- output schema: id, name, features, id0, name, prices, index SELECT * FROM users, LATERAL TABLE (VECTOR_SEARCH( TABLE products, DESCRIPTOR(`index`), users.features, 3 ))
Here we implicitly rename the names for column id and column name to avoid name conflicts.
Other problems
Strict Mode
Strict Mode refers to how the system handles cases where multiple records have the same score and exceed the number specified by the topK parameter. Here, I prefer to analogize it to the behaviors of RANK(), DENSE_RANK(), and TOP-N:
- RANK(): Rows with the same value share the same rank, and subsequent ranks skip the number of tied rows.
- DENSE_RANK(): Rows with the same value share the same rank, and subsequent ranks are continuous (no gaps).
- TOP-N: Rows with the same value do not share ranks; instead, ranks are assigned in a first-come-first-served manner.
In this implementation, we adopt a strategy similar to TOP-N. If future extensions are needed, additional functions with similar logic can be introduced to support new behaviors.
NULL value
If the input is NULL, it means no record can match. In the case of LEFT JOIN LATERAL TABLE, only one row is output. For INNER LATERAL, the result is filtered out, resulting in no output.
Runtime Implementation
Based on whether an external vector database is used and the time attribute, we can categorize it into four categories.
Event Time | Processing Time | |
Use state to build vector store | The logic is similar to the temporal join operator. | No implementation. |
Leverage an external vector store | Similar to lookup join but it requires external vector store supports time travel | Similar to lookup join |
In this FLIP, we focus exclusively on the fourth implementation: leveraging an external system for vector search based on processing time. Future FLIPs will discuss other implementations.
Runtime API
The vector search api is very similar to lookup source. The following picture simply describes the relationship among all interfaces.
public interface VectorSearchTableSource extends DynamicTableSource {
VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext context);
interface VectorSearchContext extends DynamicTableSource.Context {
/**
* Returns an array of key index paths that should be used during the search. The indices
* are 0-based and support composite keys within (possibly nested) structures.
*
* <p>For example, given a table with data type {@code ROW < i INT, s STRING, r ROW < i2
* INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} when {@code i} and
* {@code s2} are used for performing a lookup.
*
* @return array of key index paths
*/
int[][] getSearchColumns();
/**
* Runtime config provided to provider. The config can be used by planner or vector search provider
* at runtime. For example, async options can be used by planner to choose async inference.
* Other config such as http timeout or retry can be used to configure search functions.
*/
ReadableConfig runtimeConfig();
}
interface VectorSearchRuntimeProvider { }
}
interface VectorSearchFunctionProvider extends VectorSearchRuntimeProvider {
VectorSearchFunction createVectorSearchFunction();
}
interface VectorSearchFunction extends TableFunction<RowData> {
/**
* Using vector, search topK results. The search function should return most similar topK
* results.
*
* <p>Notice: search function should return at most topK results even if multiple results have
* same scores.
*/
public abstract Collection<RowData> vectorSearch(int topK, RowData queryData) throws IOException;
/** Invoke {@link #vectorSearch} and handle exceptions. */
public final void eval(Object... args) {
int topK = (int) args[0];
GenericRowData argsData = GenericRowData.of(args[1]);
try {
Collection<RowData> results = vectorSearch(topK, argsData);
if (results == null) {
return;
}
results.forEach(this::collect);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Failed to execute search with input row %s.", argsData), e);
}
}
}
interface AsyncVectorSearchFunctionProvider extends VectorSearchRuntimeProvider {
AsyncVectorSearchFunction createAsyncVectorSearchFunction();
}
interface AsyncVectorSearchFunction extends AsyncTableFunction {
/**
* Using vector, search topK results. The search function should return most similar topK
* results.
*
* <p>Notice: search function should return at most topK results even if multiple results have
* same scores.
*/
public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch(int topK, RowData queryData);
/** Invoke {@link #asyncVectorSearch} and handle exceptions. */
public final void eval(CompletableFuture<Collection<RowData>> future, Object... args) {
int topK = (int) args[0];
GenericRowData argsData = GenericRowData.of(args[1]);
asyncVectorSearch(topK, argsData)
.whenComplete(
(result, exception) -> {
if (exception != null) {
future.completeExceptionally(
new TableException(
String.format(
"Failed to asynchronously search entries with key '%s'",
args),
exception));
return;
}
future.complete(result);
});
}
}
Options
name | meaning |
max-concurrent-operations | The max number of async i/o operation that the async search can trigger |
timeout | The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed |
retry-strategy | FIXED_DELAY is for a retry after a fixed amount of time |
retry-delay | The time to wait between retries for the FIXED_DELAY strategy. Could be the base delay time for a (not yet proposed) exponential backoff. |
max-attempts | The maximum number of attempts while retrying. |
output-mode | Output mode for async operation. Can be ORDERED (default) or ALLOW_UNORDERED . |
async | Whether to run an async search function or not. Default to false. |
The config options for the above parameters are:
- table.exec.async-vector-search.max-concurrent-operations
- table.exec.async-vector-search.timeout
- table.exec.async-vector-search.retry-strategy
- table.exec.async-vector-search.fixed-delay
- table.exec.async-vector-search.max-attempts
Implementation Details
We simply describe to the planner how to generate the plan and operator for the following example SQL.
SELECT * FROM users, LATERAL TABLE (VECTOR_SEARCH( TABLE products, DESCRIPTOR(`index`), users.features, 3 ))
Because VECTOR_SEARCH refers left table’s column, the initial plan is
LogicalProject(inputs=[0..5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
:- LogicalTableScan(table=[[users]])
+- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'index'), $cor0.features, 3)]])
+- LogicalProject(inputs=[0..2])
+- LogicalTableScan(table=[[products]])
In the physical phase, we convert the Correlate to PhysicalVectorSearch. During the optimization, the rule will check whether the right input of the vector search node is simple. If the check fails, an exception will be thrown to notify users. Finally, we will get the following plan:
VectorSearch(joinType=[INNER], searchTable=[products] queryColumn=[2], searchColumn=[2], topK=[3]) +- TableSourceScan(table=[[users]], fields=[id, name, features])
In the execnode, the planner generates an operator to do the search. Note: the search operator is almost the same as the lookup join operator.
The simple PoC code is https://github.com/apache/flink/compare/master...fsk119:flink:vector-search?expand=1
Future Work
Many engines provide a lot of interesting extensions for vector search. Here we discuss how to use these features in the Flink.
Prefilter
Uncorrelated Filter
Here is an example: output the top 3 matched toy products for every user.
CREATE VIEW p AS SELECT * FROM products WHERE cat = 'toy'; SELECT * FROM users, LATERAL TABLE(VECTOR_SEARCH( TABLE p, DESCRIPTOR(`index`), users.features, 'cosine', 3 ))
Correlated Filter
Here is an example: output the top 3 matched products for every user, and the prices of these products should be affordable for every user.
SELECT * FROM users, LATERAL TABLE(VECTOR_SEARCH( (SELECT * FROM products WHERE prices < users.threshold), DESCRIPTOR(`index`), users.features, 'cosine', 3 ))
This does not work for Flink now. The current parser requires the query here is partitioned by the specific columns.
Hybrid Search
Hybrid search allows combining multiple vector search results together and output records satisfy conditions. I prefer to introduce a new function for this because this function require users to input weights for every vector_search results.
SELECT * FROM users, LATERAL TABLE(HYBIR_SEARCH( base_table => TABLE products, search_column => DESCRIPTOR(`sparse_index`, `dense_index`), probe_column => (users.features1,users.features2), weight => ARRAY(0.3, 0.7) distance_type =>'cosine', top_k => 3 )) AS p
Rejected Alternatives
Expose ID field in VECTOR_SEARCH results
While many vector databases support returning ID fields in their search results, this implementation does not treat the ID as an intrinsic output of the vector search computation. Instead, the ID field is conditionally included in the results only when explicitly present in the vector table schema. This design choice reflects the ID's role as a physical data identifier, distinct from the mathematical or semantic relationships derived through vector similarity calculations.
