Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/rflwv5qn5xl1gf3054b5p7roc4fxzlxc
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/vrx1k67r8fm7k19zdd4s0ohhymcljh6c
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-36703

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • The name Process is chosen
    • to avoid confusion with the rather meaningless “Polymorphic“ term
    • to highlight that this is a very rich function with state and time access, and more.
  • The base class is similar to TableFunction.
  • The collector is globally available for both eval() , finish(), and onTimer()
  • TypeInferenceExtractor will extract state, tables, descriptors, and scalar arguments from the signature similar to AggregateFunction.

...

Code Block
languagejava
linenumberstrue
package org.apache.flink.table.types.inference;

public final class TypeInference {

  /**
   * Returns a list of state declarations.
   *
   * <p>For example, an accumulator for {@link AggregateFunction} or 
   * one or more intermediates for {@link ProcessTableFunction}.
   */
  public LinkedHashMap<String, StateTypeStrategy> getStateTypeStrategies();
  
  /**
   * Returns a list of static, non-overloaded, non-vararg arguments.
   */
  public Optional<List<StaticArgument>> getStaticArguments();
  
  /**
   * Argument that is part of static, not overloaded, not vararg signature.
   */
  static class StaticArgument {

      /**
       * Declares a scalar argument such as f(12) or f(otherField).
       *
       * @param name whether the assignment operator can be used e.g. f(myArg => 12)
       * @param type explicit type to which the argument is cast
       * @param isOptional whether the argument is optional and might be filled 
       *                   with a default value
       */
      static StaticArgument scalar(String name, DataType type, boolean isOptional);
      
      /**
       * Declares a table argument such as f(myTable) or f(TABLE myTable).
       * Uses {@link StaticArgumentTraits#TABLE_AS_ROW} by default.
       *
       * <p>If no data type is defined, it operates in a "Polymorphic" behavior
       * accepting all kind of tables. But requires a row class (Row or RowData).
       *
       * @param traits allows to specify further characteristics such
       *.       as {@link StaticArgumentTraits#TABLE_AS_ROW}
       *        or {@link StaticArgumentTraits#TABLE_AS_SET}
       */
      static StaticArgument table(
        String name,
        Class<?> conversionClass,
        boolean isOptional,
        EnumSet<ArgTrait>EnumSet<StaticArgumentTraits> traits);
        
      /**
       * Declares a table argument such as f(myTable) or f(TABLE myTable).
       * Uses {@link StaticArgumentTraits#TABLE_AS_ROW} by default.
       *
       * @param type data structure how the table is represented, must be a
       *        structured or row type. If a data type is set, the function looses the
       *         "Polymorphic" behavior and expects a certain table structure.
       * @param traits allows to specify further characteristics such
       *.       as {@link StaticArgumentTraits#TABLE_AS_ROW}
       *        or {@link StaticArgumentTraits#TABLE_AS_SET}
       */
      static StaticArgument table(
        String name,
        DataType type,
        boolean isOptional,
        EnumSet<StaticArgumentTraits> traits);

      /**
       * Declares a model argument such as f(myModel).
       * Only intended for built-in functions.
       */
      @Internal
      static StaticArgument model(
        String name,
        boolean isOptional,
        EnumSet<StaticArgumentTraits> traits);

      public DataType getDataType(DataType input);

      public boolean isOptional();

      public String getName();

      public Set<StaticArgumentTraits> getTraits();
    }
    
    /**
     * List of characteristics an argument can declare.
     * Kept generic for future extensions.
     */
    enum StaticArgumentTraits {
      SCALAR(null),
      TABLE_AS_ROW(null),
      TABLE_AS_SET(null),
      MODEL(null),
      OPTIONAL_PARTITION_BY(TABLE_AS_SET),
      SUPPORTS_NESTED_PARTITION_BY(TABLE_AS_SET),
      REQUIRES_ORDER_BY(TABLE_AS_SET),
      SUPPORTS_NESTED_ORDER_BY(TABLE_AS_SET),
      OPTIONAL_COPARTITION(TABLE_AS_SET, OPTIONAL_PARTITION_BY),
      REQUIRES_TIME_ATTRIBUTE(TABLE_AS_SET),
      SUPPORTS_RETRACT(TABLE_AS_SET);
      
      private final Set<ArgumentTrait> requires;
      
      ArgumentTrait(ArgumentTrait... requires) {
        this.requires = Set.of(requires);
      }
    }
    
    /** Builder for configuring and creating instances of {@link TypeInference}. */
    public static class Builder {
    
      public Builder staticArguments(StaticArgument... staticArgument);

      public Builder staticArguments(List<StaticArgument> staticArgument);

      public Builder staticArgument(StaticArgument staticArgument);

      public Builder stateTypeStrategy(String name, StateTypeStrategy strategy);

      public Builder stateTypeStrategies(
        LinkedHashMap<String, StateTypeStrategy> strategies);
    }
}

...