Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Many users hit roadblocks early when implementing a pipeline in Flink SQL. This has multiple root causes:

  • Users come from an imperative programming language.
  • Users don’t know the full SQL standard. Some advanced concepts such as MATCH_RECOGNIZE or OVER windows are unknown or the syntax is too complicated.
  • Most importantly: The use case is not purely analytical. Instead, a user wants to express some event-driven if/then/else logic potentially with multiple inputs, timeouts, state, etc.

The SQL engine’s lack of extensibility leads to dead ends in SQL or Table API projects. Or, if expressible in SQL, to highly inefficient plans involving many joins and expensive changelog operations. Even basic stream processing operations that can easily be expressed in the DataStream API, force users to leave the SQL ecosystem.

Flink’s engine is very powerful with flexible state and timer services for dealing with streams. The SQL engine uses some of those features for built-in operators such as joins or aggregations. However, advanced users cannot access the same streaming primitives without writing new optimizer rules and diving deep into a historically grown code base.

Stream processing (and especially doing it efficiently) is hard:

  • Data platform teams would like to provide libraries and/or complex topology pieces.
  • Users would like to benefit from all the conveniences of the SQL ecosystem i.e. well-defined type system, catalog and connector integration, built-in functions, optimizer that is tightly integrated with connectors.
  • Make simple things easy, and complex ones possible.

Let’s face it:

  • SQL is great, but it cannot get the job done for event-driven applications - yet. It's mostly targeting analytical use cases.
  • Abstractions (such as catalog integration, type system, and API stability) are superior, but Flink’s engine cannot be fully leveraged.
  • We need extensions to make Flink SQL a success but should give our best to stay standard compliant.

Vision

This document proposes a new kind of user-defined function (UDF) that enables implementing user-defined SQL operators: ProcessTableFunction (PTF)

Currently, Flink has two core APIs: DataStream API and Table & SQL API. The new UDF kind should open the Table & SQL API towards capabilities of the DataStream API while staying in the SQL ecosystem. And using all benefits of it.

PTFs should look and feel familiar for both someone coming from the DataStream API world as well as the SQL world.

From SQL:

  • Similar types and type inference as ScalarFunction, AggregateFunction, or TableFunction
  • Registration in catalog, usage of inline/temporary UDF, built-in system functions in the future
  • Usage in both SQL and Table API
  • Very important: Standard-compliant syntax using Polymorphic Table Functions (see section below)

From DataStream API:

  • Familiar naming like ProcessFunction
  • Access to Map, List and Value state
  • Ability to both keyBy() and connect() streams. Long-term also broadcast side functionality.
  • Ability to deal with watermarks and dealing with time
  • Support of query evolution (terminology as defined in FLIP-190)

In that sense, a PTF combines:

  • Look and feel of a TableFunction
    (dealing with types and parameters)
  • Semantics of a DynamicTableSource and DynamicTableSink for the optimizer
    (consuming/producing changelogs)
  • State handling like an AggregateFunction
    (accumulator-like with MapView and ListView)
  • Additional features of DataStream API
    (timer services, watermark access, co-partitioning like connect(), mapping state on restore)

Background: Polymorphic Table Functions

The SQL 2016 standard introduced a way of defining custom SQL operators defined by ISO/IEC 19075-7:2021 (Part 7: Polymorphic table functions). ~200 pages define how this new kind of function can consume and produce tables with various execution properties. Unfortunately, this part of the standard is not publicly available.

A summary be found in section 3 of this SIGMOD paper:
https://sigmodrecord.org/publications/sigmodRecord/1806/pdfs/08_Industry_Michels.pdf

A tutorial from Oracle can be found here:
https://asktom.oracle.com/pls/apex/asktom.search?oh=6108

Polymorphic Table Functions are already used by Flink’s window functions such as TUMBLE, HOP, SESSION, and CUMULATE. As well as in the upcoming, ML functions of ML_PREDICT and ML_EVALUATE in FLIP-437.

Examples from the SQL Standard

-- PTF as CSV source
SELECT * FROM TABLE(
    CSVreader(
        File => 'abc.csv',
        Floats => DESCRIPTOR("principal", "interest")
        Dates => DESCRIPTOR("due_date")
    )
) AS S

-- PTF as script executor
SELECT D.Region, R.Name, R.Value
FROM TABLE (
  ExecR(
    Script => 'some R script',
    Input => TABLE(data) AS D PARTITION BY Region
    Rowtype => DESCRIPTOR(Name VARCHAR(100), Value DOUBLE)
  )
) AS R

-- PTF as user-defined join
SELECT E.*, D.* FROM TABLE(
    UDJoin(
        T1 => TABLE(Emp) AS E
              PARTITION BY Deptno,
        T2 => TABLE(Dept) AS D
              PARTITION BY Deptno 
              ORDER BY Tstamp
              COPARTITION(Emp,Dept)
    )
)

Important Characteristics

  • PTFs define the concept of a virtual processor. A processing unit capable of executing a sequential algorithm. Similar to the parallel instance of Flink’s ProcessFunction. Potentially scoped under a key context.
  • PTFs have 4 lifecycle phases: describe, start, fulfill, finish. Similar to Flink’s planning phase (calling e.g. getTypeInference()) and open()/eval()/close() methods.

Type of Arguments for a PTF

The types of arguments a PTF can take play an import role in its semantics.

Regular

Scalar parameters as known from other functions

Descriptor

DESCRIPTOR(a) a column list of names or names with data types DESCRIPTOR(a INT). The latter is currently not supported by Calcite.

Table

Input is a table with further parameters, as follows:

Row vs. Set Semantics:

  • WITH ROW SEMANTICS
    • result is determined on a row-by-row basis
    • rows can be assigned to arbitrary virtual processors
    • at most one input table can have row semantics
    • PTF may still produce more than one row for a given input row
  • WITH SET SEMANTICS
    • outcome of the function depends on how the data is partitioned
    • all rows of a partition should be processed on the same virtual processor
    • only tables with set semantics may be partitioned and/or ordered

Partition and Order Semantics:

  • PARTITION BY
    • only allowed on tables with set semantics
    • absence of PARTITION BY, a single partition
    • each partition must be processed on a separate virtual processor
    • in case of two or more inputs, a cross product
  • ORDER BY
    • only allowed on tables with set semantics
    • engine sorts before passing table
  • COPARTITION
    • no cross product
    • collocate rows of two tables or more tables in the same virtual processor
    • Calcite currently doesn’t support this clause!

Empty Semantics:

  • KEEP WHEN EMPTY
    • generate result rows even if the input table is empty
    • in case of two or more inputs, other tables drive the emission
    • e.g. makes left outer join possible
    • it’s fine if all tables are marked as keep, this powers for example the ExecR above that still produces some script output
    • the DBMS must instantiate a virtual processor (or more than one virtual processor in the presence of other input tables)
  • PRUNE WHEN EMPTY
    • no output when the input table is empty
    • for algorithms that cannot work with empty data
    • optimizer doesn’t have to even invoke the PTF when this table is empty
    • tables with row semantics are always effectively “prune when empty”, so this choice is not relevant to them

Pass Through Semantics:

  • Regardless of the semantics, PARTITION BY columns are part of the output for set semantic tables
  • PASS THROUGH
    • or each input row, the PTF makes the entire input row available in the output
    • PTF might still add additional output columns
    • f(TABLE(t) AS D) AS P exposes D and P for SELECT D.*, P.*
      Currently, not supported by Calcite!
    • concatenation order (in parameter order):
      pass through columns | partition columns | PTF columns
    • optional feature and may not be available in every implementation of polymorphic table functions
  • NO PASS THROUGH
    • output columns are defined by the PTF
    • concatenation order (in parameter order):
      partition columns | PTF columns
    • Note: SESSION window TVF in Flink currently violates the standard in this regard,
      because the partition columns are not at the beginning of the output!

Scoping and Simplifications

PTFs are very powerful but we need to scope it down to have a first version of them in Flink.

Row vs. Set Semantics:

  • We will support both WITH ROW SEMANTICS and WITH SET SEMANTICS
  • Following our existing functions for windows or ML function.

Partition and Order Semantics:

  • Since Calcite doesn’t support COPARTITION yet, we start with single table PTFs and later extend support to two tables once Calcite fix is in.
  • More than two tables is out of scope for this FLIP.
  • By default, we require a partitioning for tables with set semantics.
  • Once COPARTITION is supported, all tables must be copartitioned. No “broadcasting” semantics in the first version.
  • ORDER BY in streaming mode does only support time-based sorting on the first column similar to MATCH_RECOGNIZE

Empty Semantics:

  • An AggregateFunction has KEEP WHEN EMPTY semantics if the input is not partitioned.
  • A ProcessFunctions has PRUNE WHEN EMPTY semantics.
  • A StreamOperator with finish() has KEEP WHEN EMPTY semantics.
  • We go with KEEP WHEN EMPTY semantics which can still produce PRUNE WITH EMPTY results depending on the implementation.

Pass Through Semantics:

  • We only support NO PASS THROUGH

  • As written in the standard, PASS THROUGH is an optional feature.
    It can be modeled with a function return type strategy as well in Flink.

Time Semantics:

  • PTFs support event-time semantics only.

  • Processing-time doesn’t go well with batch mode and thus a unified API should built on event-time.
    The proposed onWatermark timers allow for making processing nevertheless and key-independent. An onWatermark should cover most processing time use cases.

  • We won’t support late events. Late events are generally unsupported in Flink SQL. Thus, a time attribute will always be greater than the current watermark.

  • A runtime around PTF will filter out late events. 

State Semantics:

  • We start with value state (similar to AggregateFunction accumulators)
  • Later extend with ListView and MapView

Changelog Semantics:

  • PTFs will support append and retract mode. Similar to AggregateFunction.

Motivating Example

Implementation

This example reimplements ProcessFunction example from the DataStream API docs. It aligns with existing UDFs. By extending and generalizing some parts we get similar functionality but with less code than current DataStream API:


public class CountWithTimeout extends ProcessTableFunction<Count> {

  // Define state as a Java object
  static class Count {
    public Long count = 0;
  }

  // Implement an eval() method with the full signature
  public void eval(Context ctx, @StateHint CountState state, @ArgumentHint(ArgumentTrait.TABLE_AS_SET) Row input) {
    // Update the state content
    state.count++;

    // Set (or replace an existing) timer
    Timers<Instant> timers = ctx.getTimersAs(Instant.class);
    timers.registerOnTime("timeout", timers.currentTimestamp().plus(Duration.ofMinutes(1)));
  }

  // Implement an onTimer() method with state signature
  public void onTimer(OnTimerContext ctx, @StateHint CountState state) {
    // Outputs a result
    collect(state);

    // Clears all state and timers
    ctx.clearAll();
  }
}

Usage

For SQL, the call style looks as follows. The implicitly added arguments (ON_TIME and UID) are covered in a dedicated section below. They are an integral part of the proposed design.


SELECT user_id, count
FROM
  CountWithTimeout(

    -- Split the input table into partitions per virtual processor
    input => TABLE(data) PARTITION BY user_id,

    -- Let the system know what "timers" and "time"  means
    on_time => DESCRIPTOR(rowtime),

    -- Support for query evolution
    uid => 'main-counting'
  )

Public Interfaces

Rough overview of interface changes. More explanation how those interfaces fit into the big picture can be found below.

ProcessTableFunction

Notes:

  • The name Process is chosen
    • to avoid confusion with the rather meaningless “Polymorphic“ term
    • to highlight that this is a very rich function with state and time access, and more.
  • The base class is similar to TableFunction.
  • The collector is globally available for both eval() , finish(), and onTimer()
  • TypeInferenceExtractor will extract state, tables, descriptors, and scalar arguments from the signature similar to AggregateFunction.


package org.apache.flink.table.functions;

/**
 * Base class for a user-defined process table function. A user-defined process table function maps zero, one, or
 * multiple tables and scalar values to zero, one, or multiple rows (or structured types).
 *
 * <p>The behavior of a {@link TableFunction} can be defined by implementing a custom evaluation
 * method. An evaluation method must be declared publicly, not static, and named <code>eval</code>.
 * 
 * ... More explanation will follow ...
 */
public class ProcessTableFunction<T> extends UserDefinedFunction {

    /** The code generated collector used to emit rows. */
    private transient Collector<T> collector;

    /** Internal use. Sets the current collector. */
    public final void setCollector(Collector<T> collector) {
        this.collector = collector;
    }

    @Override
    public FunctionKind getKind() {
        return FunctionKind.PROCESS_TABLE;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInferenceExtractor.forProcessTableFunction(typeFactory, getClass());
    }

    public void finish() throws Exception {
        // empty by default
    }

    /** Context for eval() method. */
    public interface Context {

      /** 
       * Gives access to time.
       * 
       * @param conversionClass defines the target class e.g. Instant, Long, LocalDateTime, TimestampData.
       */
      Timers<T> getTimersAs(Class<T> conversionClass);

      /** 
       * Gives access to the virtual partition of a table with set semantics.
       * 
       * @param arg name of the input argument
       */
      SetSemantics getSetSemanticsFor(String tableArgName);
      
      /**
       * Clears the state of a virtual partition. Semantically equal to setting
       * all fields of the row/structured type to null.
       */
      void clearState(String stateArgName);

      /**
       * Clears all state of a virtual partition.
       */
      void clearAllState();

      /**
       * Clears all timers of a virtual partition.
       */
      void clearAllTimers();

      /**
       * Clears the virtual partition incl. timers and state.
       */
      void clearAll();
    }

    /** Context for onTimer() methods. */
    public interface OnTimerContext extends Context {

        /** Returns the name of the currently firing timer.
         *
         * @return {@code null} if no name was used.
         */
        String currentTimer();
    }
}

Timers

Notes:

  • The Timers class is similar to the DataStream API’s TimerService but avoids name clashes.
  • It is closer to Apache Beam when it comes to identifying timers. Name-spacing timers is more user-friendly. For use cases where many timers need to be registered, a name is optional.
  • It solves frequent tasks such as periodic triggers or operating on watermarks. The latter enables StreamOperator-like behavior to execute processWatermark.
  • The generic T is required as Flink SQL supports both TIMESTAMP and TIMESTAMP_LTZ as watermarks and time attributes. Leading to either LocalDateTime or Instant Java time classes. Or pure Long.


/**
 * Gives access to time.
 * 
 * <p> Timers are scoped to a virtual processor (i.e. partition). The timer can only be registered
 * and deleted in the current virtual processor.
 */
interface Timers<T> {

    /**
     * Returns the current event-time defined by the {@code ON_TIME} argument descriptor.
     * If multiple tables are defined as input, it returns the timestamp of the currently
     * processed table.
     *
     * @return {@code null} if no time attribute is present.
     */
     T currentTime();

    /**
     * Returns the current event-time watermark.
     *
     * @return {@code null} if no watermark has been received yet.
     */
    T currentWatermark();
    
   /**
     * Returns the next event-time watermark in case of {@link #registerOnWatermark}.
     * Allows to flushing buffers shortly before the new watermark arrives.
     *
     * @return {@code null} if no watermark has been received yet.
     */
    T followingWatermark();

    /**
     * Registers a timer to be fired when the event time watermark passes the given time.
     * Replaces an existing timer under the same name.
     */
    void registerOnTime(String name, T time);

    /**
     * Registers a timer to be fired when the event time watermark passes the given time.
     * Has no effect if an existing timer exists under the same time.
     */
    void registerOnTime(T time);

    /**
     * Registers a periodic timer to be fired whenever the event time watermark passes the given time.
     * Replaces an existing timer under the same name.
     */
    void registerOnPeriodicTime(String name, T relativeTime, Duration period);

    /**
     * Registers a timer to be fired before a watermark passes.
     * Timer will get the name "watermark".
     */
    void registerOnWatermark();

    /**
     * Registers a periodic timer to be fired every time before a watermark passes.
     * Timer will get the name "watermark".
     */
    void registerOnPeriodicWatermark();

    /**
     * Deletes an event-time timer. This method has only an effect if such a timer was previously
     * registered and did not already expire.
     */
    void deleteTimer(String name);

    /**
     * Deletes an event-time timer without name. This method has only an effect if such a timer was
     * previously registered and did not already expire.
     */
    void deleteTimer(T time);

}

SetSemantics

Notes:

  • Gives access to the information about
    TABLE(Dept) AS D PARTITION BY Deptno ORDER BY Tstamp COPARTION(Dept, Emp)

  • In particular to key columns, sort columns, co-partitioning, and time attributes. They can be used to project input rows.


/**
 * Gives access to set semantics of a table argument.
 */
class interface SetSemantics {

  /**
   * An array of index paths that specifies the passed PARTITION BY columns.
   */
  int[][] partitionByColumns();
  
  /**
   * An array of index paths that specifies the passed ORDER BY columns.
   */
  int[][] orderByColumns();
  
  /**
   * Position of the passed "ON_TIME" column.
   */
  int timeColumn();
  
  /**
   * List of table argument names that are copartitioned with this table argument.
   */
  List<String> coPartitionArgs();
}

TypeInference

Notes:

  • Because PTFs have more complex signatures, parts of the TypeInference class receive an update for generalization. Those fit nicely into the existing concepts.
  • TypeInference already contains accumulatorTypeStrategy (i.e. a strategy for inferring the type of accumulator state), this will be generalized to stateTypeStrategies taking more than just one state entry. Accumulators will be put into the map under the name acc.
  • TypeInference already contains namedArguments, optionalArguments, typedArguments, those will be generalized to list of StaticArguments powering the existing properties plus more in the future. Including information whether the argument is scalar, a table of different semantics, or a model.


package org.apache.flink.table.types.inference;

public final class TypeInference {

  /**
   * Returns a list of state declarations.
   *
   * <p>For example, an accumulator for {@link AggregateFunction} or 
   * one or more intermediates for {@link ProcessTableFunction}.
   */
  public LinkedHashMap<String, TypeStrategy> getStateTypeStrategies();
  
  /**
   * Returns a list of static, non-overloaded, non-vararg arguments.
   */
  public Optional<List<StaticArgument>> getStaticArguments();
  
  /**
   * Argument that is part of static, not overloaded, not vararg signature.
   */
  static class StaticArgument {

      /**
       * Declares a scalar argument such as f(12) or f(otherField).
       *
       * @param name whether the assignment operator can be used e.g. f(myArg => 12)
       * @param type explicit type to which the argument is cast
       * @param isOptional whether the argument is optional and might be filled 
       *                   with a default value
       */
      static StaticArgument scalar(String name, DataType type, boolean isOptional);
      
      /**
       * Declares a table argument such as f(myTable) or f(TABLE myTable).
       * Uses {@link StaticArgumentTraits#TABLE_AS_ROW} by default.
       *
       * <p>If no data type is defined, it operates in a "Polymorphic" behavior
       * accepting all kind of tables. But requires a row class (Row or RowData).
       *
       * @param traits allows to specify further characteristics such
       *.       as {@link StaticArgumentTraits#TABLE_AS_ROW}
       *        or {@link StaticArgumentTraits#TABLE_AS_SET}
       */
      static StaticArgument table(
        String name,
        Class<?> conversionClass,
        boolean isOptional,
        EnumSet<ArgTrait> traits);
        
      /**
       * Declares a table argument such as f(myTable) or f(TABLE myTable).
       * Uses {@link StaticArgumentTraits#TABLE_AS_ROW} by default.
       *
       * @param type data structure how the table is represented, must be a
       *        structured or row type. If a data type is set, the function looses the
       *         "Polymorphic" behavior and expects a certain table structure.
       * @param traits allows to specify further characteristics such
       *.       as {@link StaticArgumentTraits#TABLE_AS_ROW}
       *        or {@link StaticArgumentTraits#TABLE_AS_SET}
       */
      static StaticArgument table(
        String name,
        DataType type,
        boolean isOptional,
        EnumSet<StaticArgumentTraits> traits);

      /**
       * Declares a model argument such as f(myModel).
       * Only intended for built-in functions.
       */
      @Internal
      static StaticArgument model(
        String name,
        boolean isOptional,
        EnumSet<StaticArgumentTraits> traits);

      public DataType getDataType(DataType input);

      public boolean isOptional();

      public String getName();

      public Set<StaticArgumentTraits> getTraits();
    }
    
    /**
     * List of characteristics an argument can declare.
     * Kept generic for future extensions.
     */
    enum StaticArgumentTraits {
      SCALAR(null),
      TABLE_AS_ROW(null),
      TABLE_AS_SET(null),
      MODEL(null),
      OPTIONAL_PARTITION_BY(TABLE_AS_SET),
      SUPPORTS_NESTED_PARTITION_BY(TABLE_AS_SET),
      REQUIRES_ORDER_BY(TABLE_AS_SET),
      SUPPORTS_NESTED_ORDER_BY(TABLE_AS_SET),
      OPTIONAL_COPARTITION(TABLE_AS_SET, OPTIONAL_PARTITION_BY),
      REQUIRES_TIME_ATTRIBUTE(TABLE_AS_SET);
      
      private final Set<ArgumentTrait> requires;
      
      ArgumentTrait(ArgumentTrait... requires) {
        this.requires = Set.of(requires);
      }
    }
    
    /** Builder for configuring and creating instances of {@link TypeInference}. */
    public static class Builder {
    
      public Builder staticArguments(StaticArgument... staticArgument);

      public Builder staticArguments(List<StaticArgument> staticArgument);

      public Builder staticArgument(StaticArgument staticArgument);

      public Builder stateTypeStrategy(String name, TypeStrategy strategy);

      public Builder stateTypeStrategies(
        LinkedHashMap<String, TypeStrategy> stateTypeStrategies);
    }
}


CallContext

Notes:

  • CallContext needs a redefinition of some methods:
    • getArgumentValue can also access descriptor values, descriptor is treated as scalar argument.
    • getArgumentDataTypes() can also return the data type of the table that is passed.
    • Thus, a output TypeStrategy can dynamically decide on the output table for built-in functions or advanced users. 
  • And access to SetSemantics for powering the validation


CallContext {
     /** 
       * Gives access to the virtual partition of a table with set semantics.
       * 
       * @param arg name of the input argument
       */
      SetSemantics getSetSemantics(int pos); 
}


FunctionDefinition

Notes:

  • Following the specification of FunctionDefinition:
    Instances of this class provide all details necessary to validate a function call and perform planning.
  • We add two additional methods to the interface: getInputChangelogMode() and getOutputChangelogMode()
  • These two method are inspired by DynamicTableSource and DynamicTableSink but leave out the requestedMode to avoid back-and-forth communication between planner und function as it is common for connectors.
  • The defaults are hard-coded for all other subclasses of UserDefinedFunction except for ProcessTableFunction.


/**
 * Definition of a function. Instances of this class provide all details necessary to validate a
 * function call and perform planning.
 */
public interface FunctionDefinition {
  
   /**
     * Returns the set of changes that the function accepts during runtime.
     *
     * Note: This method only applies to {@link FunctionKind.PROCESS_TABLE},
     * {@link FunctionKind.AGGREGATE}, and {@link FunctionKind.TABLE_AGGREGATE}.
     */
  default ChangelogMode getInputChangelogMode() {
    return ChangelogMode.insertOnly();
  }
  
   /**
     * Returns the set of changes that the function produces during runtime.
     *
     * Note: This method only applies to {@link FunctionKind.PROCESS_TABLE},
     * {@link FunctionKind.AGGREGATE}, and {@link FunctionKind.TABLE_AGGREGATE}.
     */
  default ChangelogMode getOutputChangelogMode() {
    return ChangelogMode.insertOnly();
  }
}

DescriptorType, LogicalTypeRoot, DataTypes

Notes:

  • The SQL standard defines a new type for describing a list of columns.
  • The Flink type system should be updated for this type.
  • Since Calcite only supports a list of column names yet, so we won’t support name and data type in the first version.
  • We can support a dedicated ColumnList conversion class. This name is chosen to avoid name clashes with TableDescriptor and CatalogDescriptor.
  • CallContext#getArgumentValue will be able to request objects of this class for dynamic type inference for built-in functions.


package org.apache.flink.table.types.logical;

public enum LogicalTypeRoot {
  DESCRIPTOR(LogicalTypeFamily.PREDEFINED),
}


package org.apache.flink.table.types.logical;

public final class DescriptorType extends LogicalType {

    private static final String FORMAT = "DESCRIPTOR";

    public DescriptorType(boolean isNullable) {
        super(isNullable, LogicalTypeRoot.DESCRIPTOR);
    }

    public DescriptorType() {
        this(true);
    }

    @Override
    public String asSummaryString() {
        return withNullability(FORMAT);
    }

    @Override
    public String asSerializableString() {
        return withNullability(FORMAT);
    }

    @Override
    public boolean supportsInputConversion(Class<?> clazz) {
        return ColumnList.class.isAssignableFrom(clazz);
    }

    @Override
    public boolean supportsOutputConversion(Class<?> clazz) {
        return ColumnList.class.isAssignableFrom(clazz);
    }

    @Override
    public Class<?> getDefaultConversion() {
        return ColumnList.class;
    }
}



package org.apache.flink.table.api;

public final class DataTypes {

  /**
   * Data type for passing a list of columns to function calls.
   */
  public static DataType DESCRIPTOR();
}

StateHint

Notes:

  • Similar to ArgumentHint, but for the state parts of the eval() signature
  • Can be extended with additional state config in the future.


/**
 * A hint that marks argument as a managed state.
 */
public @interface StateHint {

  /**
   * The name of the state.
   *
   * <p>This can be used to provide a descriptive name for the state.
   * And reference it e.g. in {@code ctx.clearState}.
   */
  String name() default "";

  /**
   * The data type hint for the argument.
   *
   * <p>This can be used to provide additional information about the expected data type of the
   * argument. The {@link DataTypeHint} annotation can be used to specify the data type explicitly
   * or provide hints for the reflection-based extraction of the data type.
   */
  DataTypeHint type() default @DataTypeHint();

  // we can add state TTL parameters here as well
}

ArgumentHint

Notes:

  • Extend the existing ArgumentHint by a traits field.
  • By using, "value" it is possible to declare @ArgumentHint(ArgumentTrait.TABLE_AS_SET)


/**
 * A hint that provides additional information about an argument.
 *
 */
public @interface ArgumentHint {

    /**
     * Traits that describe the argument. A scalar argument by default.
     */
    ArgumentTrait[] value() default {ArgumentTrait.SCALAR};

    /**
     * The name of the argument.
     *
     * <p>This can be used to provide a descriptive name for the argument.
     */
    String name() default "";

    /**
     * Specifies whether the argument is optional or required.
     *
     * <p>If set to {@code true}, the argument is considered optional.And if the user does not
     * specify this parameter when calling, 'null' will be passed in. By default, an argument is
     * considered required.
     */
    boolean isOptional() default false;

    /**
     * The data type hint for the argument.
     *
     * <p>This can be used to provide additional information about the expected data type of the
     * argument. The {@link DataTypeHint} annotation can be used to specify the data type explicitly
     * or provide hints for the reflection-based extraction of the data type.
     */
    DataTypeHint type() default @DataTypeHint();
}


Notes:

  • Put a annotation specific enum into the right package.
  • Omits internal features such as model.


package org.apache.flink.table.annotation;

/**
 * Traits that specify the usage of an argument.
 * Parameters for ArgumentHint.
 */
public enum ArgumentTrait {
    SCALAR(null),
    TABLE_AS_SET(null),
    TABLE_AS_ROW(null),
    OPTIONAL_PARTITION_BY(TABLE_AS_SET),
    SUPPORTS_NESTED_PARTITION_BY(TABLE_AS_SET),
    REQUIRES_ORDER_BY(TABLE_AS_SET),
    SUPPORTS_NESTED_ORDER_BY(TABLE_AS_SET),
    OPTIONAL_COPARTITION(OPTIONAL_PARTITION_BY),
    REQUIRES_TIME_ATTRIBUTE(TABLE_AS_SET);

    private final Set<ArgumentTrait> requires;

    ArgumentTrait(ArgumentTrait... requires) {
        this.requires = Set.of(requires);
    }
}

Proposed Changes

As outlined in the Vision section, we propose to add a new kind of user-defined function (UDF) that enables implementing user-defined SQL operators: ProcessTableFunction (PTF).

The following examples show how PTFs can serve various use cases. Both for user-defined logic and for built-in system functions in the future.

Complex Examples

OneMatchJoin

In production use cases, joins exist in various flavors and are one of the most requested operators that need customization. This example shows a PTF that waits for exactly one matching record for a given key.

It cleans up most of its state after this match has been found.

Implementation:

public class OneMatchJoin extends ProcessFunction<JoinResult> {

  public class JoinState {
    public boolean matched = false;
    public Customer customer;
    public Order order;
  }

  public class JoinResult {
    public Customer customer;
    public Order order;
  }

  public class Customer {
    public String name;
    public String address;
  }

  public class Order {
    public Long orderId;
    public Double price;
  }

  public void eval(
    Context ctx,
    @StateHint JoinState state,
    @ArgumentHint(TABLE_AS_SET) Customer customer,
    @ArgumentHint(TABLE_AS_SET) Order order) {

    if (state.matched) {
      return;
    }

    // Process left input
    if (customer != null) {
      if (state.order != null) {
        matchFound(state, customer, state.order);
      } else {
        state.customer = customer;
      }
    }
    // Process right input
    if (order != null) {
      if (state.customer != null) {
        matchFound(state, state.customer, order);
      } else {
        state.order = order;
      }
    }
  }

  private void matchFound(JoinState state, Customer customer, Order order) {
    state.matched = true;
    state.customer = null;
    state.order = null;

    JoinResult result = new JoinResult();
    result.customer = customer;
    result.order = order;
    collect(result);
  }
}

Call:

In SQL the given join can be called with minimal arguments. Specifying PARTITION BY and COPARTITION is mandatory as we don't support cross products (i.e. broadcasting) in the first version.

SELECT * FROM
FROM
  OneMatchJoin(
    customer => TABLE(data1) PARTITION BY customer_id,
    order => TABLE(data2) PARTITION BY customer_id COPARTITION(data1, data2)
  )


BuiltInOneMatchJoin

The OneMatchJoin above is statically typed. A built-in function would implement this with a strategy that derives the output type from the inputs. Also the state type is derived from the inputs. 

A built-in join would most likely also require timeout to keep the state size small. This is expressed in the StaticArgumentTraits that make a time attribute mandatory. Once the timeout has passed all state is deleted.

Implementation:

public class BuiltInOneMatchJoin extends ProcessFunction<Row> {

  public TypeInference getTypeInference() {
    return TypeInference.newBuilder()
      .staticArguments(         StaticArgument.table(
          "left", Row.class, false,
          TABLE_AS_SET, REQUIRES_TIME_ATTRIBUTE),
        StaticArgument.table(
          "right", Row.class, false,
          TABLE_AS_SET, REQUIRES_TIME_ATTRIBUTE),
        StaticArgument.scalar(
          "ttl", DataTypes.INTERVAL(SECOND(3)), false))
      .stateTypeStrategy("state", callContext -> {
        return Optional.of(
          DataTypes.ROW(
            FIELD("matched", DataTypes.BOOLEAN())
            FIELD("left", callContext.getArgumentDataTypes(0)),
            FIELD("right", callContext.getArgumentDataTypes(1))
        );
      })
      .outputTypeStrategy(callContext -> {
        return Optional.of(
          return Optional.of(
            DataTypes.ROW(
              FIELD("left", callContext.getArgumentDataTypes(0)),
              FIELD("right", callContext.getArgumentDataTypes(1))
            );
        );
      });
  }

  
  public void eval(Context ctx, Row state, Row left, Row right, Duration ttl) {
    Timers<Instant> timers = ctx.getTimersAs(Instant.class);
    timers.registerOnTime("timeout", timers.currentTimestamp().plus(ttl));

    if (Boolean.TRUE.equals(state.getField("matched"))) {
      return;
    }

    // Process left input
    if (left != null) {
      if (state.getField("right") != null) {
        matchFound(state, left, state.getField("right"));
      } else {
        state.setField("left", left);
      }
    }
    // Process right input
    if (right != null) {
      if (state.getField("left") != null) {
        matchFound(state, state.getField("left"), right);
      } else {
        state.setField("right", right);
    }
  }

  public void onTime(Context ctx, Row state) {
    ctx.clearAll();
  }

  private void matchFound(Row state, Row left, Row right) {
    state.setField("matched", true);
    state.setField("left", null);
    state.setField("right", null);

    collect(Row.of(left, right));
  }
}


Call:

In SQL the given join can be called with the following parameters. The on_time descriptor takes two arguments in this case to forward the time attributes of each side.

SELECT * FROM
FROM
  BuiltInOneMatchJoin(
    left => TABLE(customers) PARTITION BY customer_id,
    right => TABLE(orders) PARTITION BY customer_id COPARTITION(customers, orders),
    ttl => INTERVAL '5' MINUTES,
    on_time => DESCRIPTOR(c_rowtime, o_rowtime) 
  )


TimedLastValue

The function choses the row with the most recent time attribute as the new output.

It illustrates how to output a retract changelog.

Implementation:

@DataTypeHint("ROW<s STRING>")
public class TimedLastValue extends ProcessFunction<Row> {

  @Override
  public ChangelogMode getOutputChangelogMode() {
    return ChangelogMode.all();
  }

  public static class LastValue {
    public Instant time;
    public String value;
  }

  public void eval(
      Context ctx,
      @StateHint LastValue state,
      @ArgumentHint(TABLE_AS_SET, REQUIRES_TIME_ATTRIBUTE) Row input) {
    Timers<Instant> timers = ctx.getTimersAs(Instant.class);
    Instant inputTime = timers.currentTimestamp();
    String inputValue = input.getFieldAs("value");
    // State is empty
    if (state.time == null) {
      state.time = inputTime;
      state.value = inputValue;
      collect(Row.ofKind(INSERT, inputValue));
    }
    // State is not empty
    else {
      if (inputTime.isAfter(state.time)) {
        collect(Row.ofKind(UPDATE_BEFORE, state.value));
        collect(Row.ofKind(UPDATE_AFTER, inputValue));
        state.time = inputTime;
        state.value = inputValue;
      }
    }
  }
}

Call:

In SQL the given join can be called with. The on_time descriptor takes two arguments in this case to forward the time attributes of each side.

SELECT customer_id, s FROM
FROM
  TimedLastValue(
    input => TABLE(customers) PARTITION BY customer_id,
    on_time => DESCRIPTOR(c_rowtime) 
  )

BuiltInChangelog

The function converts a given table (in retract mode) into an append-only changelog.

It illustrates how to input a retract changelog.

The resulting table would get an additional column with a change flag. The table argument has ROW semantics.

Implementation:

public class BuiltInChangelog extends ProcessFunction<Row> {
  
  @Override
  public TypeInference getTypeInference() {
    return TypeInference.newBuilder()
      .staticArgument(StaticArgument.table("input", Row.class, false))
      .outputStrategy(callContext -> {
        List<DataTypes.Field> fields = Stream
          .concat(
            Stream.of(DataTypes.STRING()),
            DataType.getFields(callContext.getArgumentTypes(0)).stream()))
          .collect(toList())
        return Optional.of(ROW(fields));
      })
      .build();
  }

  @Override
  public ChangelogMode getInputChangelogMode() {
    return ChangelogMode.all();
  }

  public void eval(Row input) {
    Row flag = Row.of(input.getKind().shortString());
    collect(Row.join(flag, input));
  }
}

Call:

In SQL the function can then be used to convert retract to append (following the simplified syntax mentioned in FLIP-437):

SELECT f0 AS changeflag, c1, c2 FROM BuiltInChangelog(customers);


Implementation Details

In CompiledPlan, a PTF will be mapped to a separate StreamExecProcessTableFunction (following naming of StreamExecWindowTableFunction).

Time Semantics

Time semantics mostly follow the semantics of DataStream API. The biggest difference is that the event-time timestamp is not part of the StreamRecord but a column of the table. After a join, a table can contain multiple columns with event-time timestamps.

on_time Argument

The on_time argument solves these issues for us:

  • Every PTF takes an on_time DESCRIPTOR argument.
  • on_time is optional by default
  • It is mandatory for an input table, if one of the TABLE_AS_SET args contains a REQUIRES_TIME_ATTRIBUTE trait.
  • Each column in on_time must reference an existing column in the input tables. Similar to TIMECOL of current window TVFs.
  • The DESCRIPTOR can take up to one column name per input table. In other words: A join after a join must select a single time attribute for the joined table.
  • The validation will make sure that each input table gets a time attribute assigned:
    • If the columns have different names:
      e.g. Join(left => TABLE(l), right => TABLE(r), on_time = DESCRIPTOR(l_time, r_time))
    • If the same column name exists in both tables, a single column name is enough:
      e.g. Join(left => TABLE(l), right => TABLE(r), on_time = DESCRIPTOR(rowtime))
    • Any kind of conflict needs to be solved with projecting in the inputs otherwise.

Watermark by default

If no on_time has been specified in the SQL call and the REQUIRES_TIME_ATTRIBUTE trait is not present, timers will fire on watermark by default. This means:

  • A PTF can still make progress in time without an explicit on_time attribute.
  • It is still possible to register a timer and Timers.currentTimestamp() will return the timestamp with which the timer has been registered in onTimer().
  • This behavior is important to support users that might not understand time attributes or have no time attributes present at the current location where they call SQL.
    e.g. the time attribute has been modified and diverged from watermarks (e.g. it cannot be used anymore for time-based operations) but the watermarks are still correct and can be used as a good default by omitting the on_time attribute

Output timestamp

  • If on_time is specified, a PTF outputs a corresponding rowtime column. Similar to every window PTF returns window_time
  • The value of rowtime is fully defined by the framework. Similar how DataStream API defines the output StreamRecord timestamp of ProcessFunction:
    • within eval() the output timestamp is equal to the input timestamp of the processed row
    • within onTimer() the output timestamp is equal to the firing timer, in case of onWatermark the output is rowtime = Timers.followingWatermark() - 1
  • If on_time is not specified, the PTF will not return a rowtime column as part of its schema.
    This behavior is important to support users that might not understand time attributes.
    Using the above "Watermark by default" assumption, nested PTF calls are fully working without handling any time columns present.
    Neither in the input nor in the output. 

Changelog Semantics

  • As mentioned in the scoping, only retract and append are supported for input and output.
  • The FlinkChangelogModeInferenceProgram will be updated to follow source/sink behavior.
  • Change flags can only be read with Row or RowData conversion classes. Structured types (i.e. POJOs) are only supported in append similar to DataStream API.

Query Evolution

So far we did not have a story for query evolution in SQL. It is not possible (or very complex CompiledPlan magic) to restore the state of a join operator, aggregation, or window while modifying the rest of the SQL pipeline.

Query evolution for PTFs mostly follows the semantics of DataStream API. The biggest difference is that there is still the possibility of an optimizer where appropriate.

In other words: The optimizer optimizes around PTFs. But PTFs (both user-defined and also future built-in ones) remain as a constant but portable stateful building block in the pipeline.

uid Argument

The uid argument solves these issues for us:

  • Every PTF takes a uid STRING argument.
  • It allows to restore building blocks from old SQL statements from a Savepoint.
    Potentially even combine multiple building blocks from multiple SQL statements and their Savepoints in the future.
  • The uid argument is optional by default
  • If the uid argument is not specified, the function name will be used by the framework. This allows for one unique PTF invocation per statement which is usually enough.
  • If a PTF is used more than once, the validation will request passing a manual uid. It will check that the uid is unique for the entire StreamExecGraph.

Side output semantics

The uid also uniquely identifies PTFs also across multiple statements in a STATEMENT SET. This enables similar behavior than side outputs in DataStream API:

EXECUTE STATEMENT SET BEGIN
   INSERT INTO main SELECT a, b, target_table FROM FunctionWithSideOutput(input => data, uid = 'only_once') WHERE target_table = 'main'; 
   INSERT INTO side SELECT a, b, target_table FROM FunctionWithSideOutput(input => data, uid = 'only_once') WHERE target_table = 'side'; 
END;


The optimizer will ensure that the given PTF only exists once in a StreamExecGraph.

Usage of PTFs in Table API

We also add a syntax for PTFs in Table API. This allows for constructing a Table API pipeline with stateful building blocks.

TableEnvironment.from() API

Given the SQL of the BuiltInOneMatchJoin example above:

SELECT * FROM
FROM
  BuiltInOneMatchJoin(
    left => TABLE(customers) PARTITION BY customer_id,
    right => TABLE(orders) PARTITION BY customer_id COPARTITION(customers, orders),
    ttl => INTERVAL '5' MINUTES,
    on_time => DESCRIPTOR(c_rowtime, o_rowtime) 
  )

Table API would look like:

// PTF call without named args
env
    .from(
      call(
        BuiltInOneMatchJoin.class,
        orders.partitionBy($("customer_id")).asArgument("orderById"),
        payments.partitionBy($("order_id")).copartitionWith("orderById"),
        Duration.ofHours(2)
      )
    )
    .execute()
    .print();

// PTF call with named args
env
    .from(
      call(
        BuiltInOneMatchJoin.class,
        customers
          .partitionBy($("customer_id"))
          .asArgument("left"),
        orders
          .partitionBy($("customer_id"))
          .copartitionWith("left")
          .asArgument("orders"),
        lit(Duration.ofMinutes(5)))
          .asArgument("ttl")
        descriptor("c_rowtime", "o_rowtime")
          .asArgument("on_time")
      )
    )
    .execute()
    .print();


Note: The from(ptf) can be used for element generators in the future.

Table.process() API

Or use fluent invocation similar to .map() or .flatAggregate().

Given the SQL of the CountWithTimeout example above:

SELECT user_id, count
FROM
  CountWithTimeout(

    -- Split the input table into partitions per virtual processor
    input => TABLE(data) PARTITION BY user_id,

    -- Let the system know what "timers" and "time"  means
    on_time => DESCRIPTOR(rowtime),

    -- Support for query evolution
    uid => 'main-counting'
  )

Table API would look like:

env
  .from("orders")
  .partitionBy($("user_id"))
  // Same signatures as call() for avoiding one level of nesting expressions.
  // The table will be passed as the first argument of the PTF.
  .process(
    BuiltInOneMatchJoin.class,
    descriptor("rowtime").asArgument("on_time"),
    lit("main-counting").asArgument("uid"))


Compatibility, Deprecation, and Migration Plan

  • All changes fit nicely into existing concepts. Or are generalizations of existing ones.
  • We should discuss whether we want to reorder the columns of SESSION windows for Flink 2.0 because they currently violate the SQL standard by putting PARTITION BY columns at the end (instead of at the beginning) of the PTF output.
  • We should discuss whether we want to rename TIMECOL of TUMBLE/HOP/SESSION/CUMULATE to on_time for Flink 2.0 to better fit to this FLIP. This would make the PTF story round.

Test Plan

The following tests should be added:

  • RestoreTests
  • PlanTests
  • Many unit tests for individual components

Rejected Alternatives

None

Future Work

This FLIP is only the peak of the Iceberg. Future FLIPs can build on top of the PTF infrastructure:

  • Add additional ability interfaces to support more StreamOperator features
  • Extend ArgumentTraits to support more StreamOperator features
  • Extend StateHint to support state TTL
  • Add built-in PTFs for common operators for maximum efficiency.
  • Convert all existing SQL operators to the PTF stack. A theoretical suggestion but maybe a cleaner and more maintainable code base?