Motivation

Flink’s SupportsFilterPushDown interface allows source connectors to accept predicates on physical columns for optimization (e.g., partition pruning, predicate push-down to storage). However, predicates on metadata columns — columns declared with METADATA or METADATA VIRTUAL in the table schema — are excluded from this mechanism and do not work when compiled plans are used.

Metadata columns (e.g., Kafka offset, timestamp, partition) are appended to the row type after physical columns. The existing FilterPushDownSpec serializes predicates as RexNode trees whose RexInputRef indices reference positions in the scan’s produced row type. A critical problem arises during compiled plan restoration: the source’s produced type does not contain the metadata columns. Metadata column indices in serialized predicates are out-of-bounds, causing ArrayIndexOutOfBoundsException at restore time.

This means metadata-column predicates cannot safely flow through the existing filter pushdown path. Without a dedicated metadata filter path, sources that could benefit from metadata-based optimizations (e.g., Kafka partition pruning by timestamp ranges, or file-based source pruning by partition metadata) must ignore metadata predicates entirely, losing optimization opportunities.

This FLIP introduces a dedicated metadata filter push-down path that solves the serialization problem and enables metadata-aware source optimizations.

Example

Consider a Kafka-backed table using the open source Kafka connector, which already exposes topic, partition, offset, timestamp, timestamp-type, and headers as readable metadata columns:

CREATE TABLE orders (
    order_id   BIGINT,
    amount     DECIMAL(10, 2),
    kafka_partition INT METADATA FROM 'partition' VIRTUAL,
    event_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    ...
);

A query filtering on metadata:

SELECT * FROM orders WHERE event_time >= TIMESTAMP '2024-01-01 00:00:00';

Without this FLIP, the event_time predicate remains as a runtime filter applied after all records are read. With this FLIP, the predicate is pushed to the source via applyMetadataFilters(), enabling the connector to optimize reads — for example, by seeking to the appropriate offset based on the timestamp. In a file-based connector, other metadata columns could have filters created which would be pushed down.

Public Interfaces

New Methods and Result Type on SupportsReadingMetadata

Two new default methods and a dedicated result type are added to org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata:

/**
 * Returns true if this source supports metadata filter push-down.
 * When true, the planner may call applyMetadataFilters() with
 * predicates expressed in metadata key names (not SQL aliases).
 */
default boolean supportsMetadataFilterPushDown() {
    return false;
}

/**
 * Applies metadata filter predicates. Same contract as
 * SupportsFilterPushDown.applyFilters(), but for metadata columns.
 */
default MetadataFilterResult applyMetadataFilters(
        List<ResolvedExpression> metadataFilters) {
    return MetadataFilterResult.of(
            Collections.emptyList(), metadataFilters);
}

/**
 * Result of a metadata filter push down. Communicates the source's
 * response to the planner during optimization.
 */
@PublicEvolving
final class MetadataFilterResult {
    private final List<ResolvedExpression> acceptedFilters;
    private final List<ResolvedExpression> remainingFilters;

    public static MetadataFilterResult of(
            List<ResolvedExpression> acceptedFilters,
            List<ResolvedExpression> remainingFilters) { ... }

    public List<ResolvedExpression> getAcceptedFilters() { ... }
    public List<ResolvedExpression> getRemainingFilters() { ... }
}

Opt-in design: Existing sources are unaffected. Sources must explicitly return true from supportsMetadataFilterPushDown() to receive metadata predicates.

Dedicated result type: MetadataFilterResult is a separate type from SupportsFilterPushDown.Result, scoped to the SupportsReadingMetadata interface. This decouples the metadata filter contract from the physical filter contract, allowing each to evolve independently.

Predicate naming: Predicates passed to applyMetadataFilters() use metadata key names (the internal names from listReadableMetadata()), not SQL column aliases. For example, a column declared as msg_offset BIGINT METADATA FROM 'offset' will have its predicate expressed as offset >= 1000, not msg_offset >= 1000. The planner handles the alias-to-key translation.

New Ability Spec: MetadataFilterPushDownSpec

A new SourceAbilitySpec implementation for serializing metadata predicates in compiled plans:

@JsonTypeName("MetadataFilterPushDown")
public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
    List<RexNode> predicates;
    RowType predicateRowType;  // uses metadata key names, not SQL aliases
}

Key design decisions:

  1. predicateRowType snapshot: The row type is captured at push-down time and stored alongside predicates. During compiled plan restoration, this stored type is used instead of the context’s current row type (which may have been narrowed by ProjectPushDownSpec). This solves the RexInputRef index mismatch that makes the existing FilterPushDownSpec unsafe for metadata predicates.

  2. Pre-translated field names: The predicateRowType stores metadata key names (e.g., timestamp) rather than SQL aliases (e.g., event_time). The alias-to-key translation happens once at optimization time, when the ResolvedSchema is available. This means the serialized spec is self-contained — no column-to-key mapping needs to be persisted, and restoration simply uses the stored row type directly.

  3. needAdjustFieldReferenceAfterProjection() returns true: This prevents scan reuse when metadata filter specs differ, analogous to FilterPushDownSpec.

Proposed Changes

Predicate Classification in PushFilterIntoTableSourceScanRule

The existing rule is extended to classify predicates into three categories:

  1. Physical predicates: Reference only physical columns (RexInputRef.index < physicalColumnCount). Pushed through SupportsFilterPushDown.applyFilters() as before.

  2. Metadata predicates: Reference only metadata columns (RexInputRef.index >= physicalColumnCount). Pushed through SupportsReadingMetadata.applyMetadataFilters() when the source opts in.

  3. Mixed predicates: Reference both physical and metadata columns. Remain as runtime filters (not pushed). A future enhancement could decompose conjunctive mixed predicates, but this is out of scope.

Classification uses RelOptUtil.InputFinder.bits() to determine which columns each predicate references, compared against the physical column count from the resolved schema.

Important: Predicates are only classified as metadata when the source supports metadata filter push-down (supportsMetadataFilterPushDown() returns true). Otherwise, all predicates flow through the physical path. This preserves the FilterPushDownSpec([]) guard that prevents rule re-firing and maintains scan reuse invariants.

Two-Path Push-Down Flow

Filter condition
    |
    v
extractPredicates() -> convertible[] + unconvertible[]
    |
    v
classify convertible predicates:
    |
    +-> physicalPredicates[]  ----> SupportsFilterPushDown.applyFilters()
    |                                   -> accepted[] + remaining[]
    |                                   -> FilterPushDownSpec
    |
    +-> metadataPredicates[]  ----> SupportsReadingMetadata.applyMetadataFilters()
    |                                   -> accepted[] + remaining[]
    |                                   -> MetadataFilterPushDownSpec
    |
    v
Combine remaining from both paths + unconvertible
    -> remaining condition on Filter operator (or eliminate Filter if empty)

Alias-to-Key Translation

Metadata columns can be aliased in SQL (msg_offset BIGINT METADATA FROM 'offset'). The planner’s row type uses the SQL alias (msg_offset), but the source expects the metadata key (offset). Translation happens once at optimization time in the push-down rule, before predicates are serialized:

  1. Build an alias-to-key map from the resolved schema’s MetadataColumn entries
  2. Create a translated RowType with metadata key names replacing SQL aliases
  3. Store this translated RowType as the predicateRowType in the spec
  4. At restore time, the stored row type already has the correct names — no mapping is needed

Compiled Plan Serialization

MetadataFilterPushDownSpec is registered as a Jackson @JsonSubType on SourceAbilitySpec with type name "MetadataFilterPushDown". It serializes:

  • predicates: The RexNode list (using existing Flink RexNode serde)
  • predicateRowType: The row type at push-down time, with field names already translated to metadata key names

During restoration, apply() uses the stored predicateRowType (not the context’s potentially-narrowed row type) to convert predicates. Since the stored row type already uses metadata key names, the resulting expressions can be passed directly to applyMetadataFilters() on the source without any translation.

Compatibility, Deprecation, and Migration Plan

  • No behavioral change for existing sources. The default supportsMetadataFilterPushDown() returns false, and the rule falls back to the physical-only path.
  • Existing compiled plans deserialize correctly; @JsonIgnoreProperties(ignoreUnknown = true) ensures unknown spec types are ignored.
  • New fields can be added to MetadataFilterPushDownSpec without breaking existing serialized plans.
  • No migration required. Sources can adopt metadata filter push-down incrementally by implementing the two new default methods.

Test Plan

  1. Planner rule tests: Verify predicate classification and push-down behavior, including basic metadata push-down, opt-out when source declines, aliased metadata keys, mixed physical+metadata separation, partial acceptance, and interaction with projection push-down.

  2. Serde roundtrip tests: Verify MetadataFilterPushDownSpec survives compiled plan serialization and restoration with correct predicate replay.

  3. Regression tests: Existing physical filter push-down tests must continue to pass unchanged. Scan reuse with metadata-capable sources must not regress.

Rejected Alternatives

1. Extending FilterPushDownSpec to Handle Metadata

Rejected because: FilterPushDownSpec serializes RexNode predicates with RexInputRef indices relative to the current produced row type. ProjectPushDownSpec may narrow this row type during compiled plan restoration, making metadata column indices invalid. Fixing this would require a fundamentally different approach to index stability in FilterPushDownSpec, which would be a much larger and riskier change.

2. Using a Separate SupportsMetadataFilterPushDown Interface

Rejected because: Metadata filter push-down is semantically tied to SupportsReadingMetadata (only sources that read metadata can filter on it). Adding a separate interface would fragment the API surface. Default methods on the existing interface achieve the same extensibility.

3. Translating Metadata Predicates at the Source Level

Rejected because: Requiring each source to handle alias-to-key translation duplicates logic across connectors. Centralizing translation in the planner ensures consistency and reduces connector implementation burden. Additionally, there is no way to update the compiled plan for the source suitably.

4. Decomposing Mixed Physical+Metadata Predicates

Rejected because: Decomposing conjunctive predicates that reference both physical and metadata columns adds complexity with limited benefit. In practice, most metadata predicates are simple range conditions on individual columns. Mixed predicates can be addressed in a future enhancement.

  • No labels