Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14132

Release1.10


Google Dochttps://docs.google.com/document/d/17CPMpMbPDjvM4selUVEfh_tqUK_oV0TODAUA9dfHakc/edit?usp=sharingReleased:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Enable users to integrate Flink with cores and built-in objects of other systems, so users can reuse whatever they are familiar with in other SQL systems seamlessly as core and built-ins of Flink SQL and Table 
  2. Enpower users to write code and do customized developement for Flink table core

Plugins Modules define a set of metadata, including functions, user defined types, operators, rules, etc. Prebuilt plugins modules will be added and provided, or users may choose write their own. Flink will take metadata from plugins modules as extensions of its core built-in system that users can take advantages of. For example, users can define their own geo functions and geo data types and plug them into Flink table as built-in objects. Another example is users can use an out-of-shelf Hive plugin module to use Hive built-in functions as part of Flink built-in functions.

...

In this FLIP we’ll design and develop a generic mechanism for modular plugins in pluggable modules in Flink table core, with a focus on built-in functions.

We’ll specifically create two plugin implementations module implementations in this FLIP

  • CorePluginCoreModule, with existing Flink built-in functions only
  • HivePluginHiveModule, supporting Hive built-in functions and numerous Hive versions

...

Users have to be fully aware of the consequences of resetting modules as that might cause that some objects can not be referenced anymore or resolution order of some objects changes. E.g. “CAST” and “AS” cannot be overriden in CoreModule and users should be fully aware of that.

How to Load

...

Modules

To load pluginsmodules, users have to make sure relevant classes are already in classpath.

...

// new APIs to TableEnvironment
interface TableEnvironment {
    // load a module instance to the end of the module list
    void loadModule(String name, Module m);

    // unload a module instance from module list and other modules remain the same relative positions
    void unloadModule(String name); // the type string defined in the module

    // list all the modules' names according to order in module list
    List<String> listModules();
}

// note the following modules will be of the order they are specified
tableEnv.loadModule("a", MymoduleMyModule.INSTANCE);
tableEnv.loadModule("b", new Xxx(properties));
tableEnv.loadModule("c", new Yyy());
tableEnv.unloadModule("a");

...

In case users forgot to specify core module, “modules” section will be commented out in yaml file as following 

#modules: # note the following modules will be of the order they are specified
#  - name: core
#     type: core

SQL:

  • SHOW MODULES: show module names in the existing module list in order
  • LOAD MODULE 'name' [WITH (‘type’=’xxx’, 'prop'='myProp', ...)] : load a module with given name and append to end of the module list
  • UNLOAD MODULE 'name’ : unload a module by name from module list and other modules remain the same relative positions


NOTE: the SQL syntax has been discussed again and received some modifications, see FLINK-21045 and the discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLINK-21045-Support-load-module-and-unload-module-SQL-syntax-td48398.html


Resolution Order

Object will be resolved to modules in the order they are defined either in program or in yaml configs. When there are objects sharing the same name, resolution logic will go thru modules in order and return whatever the first one is found, the other ones sitting in the back in the order will be ignored. E.g. if modules are set as “xxx, yyy” where xxx and yyy modules both have a function named “f”, then “f” will always be resolved as that in xxx module.

...

interface ModuleFactory extends TableFactory {
   Plugin createModule Module createModule(String name, Map<String, String> properties);
}

...

We currently only move built-in functions into CoremoduleCoreModule.

public class CoreModule implements Module {
  public static final Core INSTANCE = new Core();

  private CoreModule() { }

  @Override
  public Set<String> listFunctions() {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
        .map(f -> f.getName())

        .collect(Collectors.toSet());
  }

  @Override
  public Optional<BuiltInFunctionDefinition> getFunctionDefinition(String name) {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
          .filter(f -> f.getName().equalsIgnoreCaseequals(name))
          .findFirst();
  }
}

...

class CoreModuleFactory {
  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("type", "core");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    return Collections.EMPTY_LIST;
  }

  @Override
  public Module createModule(String name, Map<String, String> properties) {
    return CoreModule.INSTANCE;
  }
}

...

ModuleManager is responsible for loading all the Pluginsmodules, managing their life cycles, and resolve module objects.

public class ModuleManager {
  private List<LinkedHashMap<String, Module> modules;

  public ModuleManager() {
    this.modules = new ArrayList<>(LinkedHashMap<>();

    modules.put("core", CoreModule.INSTANCE);
  }

  public void loadModule(String name, Module module) { ... }

  public void unloadModule(String name) { ... }

  public Set<Set<String>> Set<String> listFunctions() {
    return pluginsmodules.stream()
        .map(p -> p.listFunctions())

        .flatmap(e → e.stream())
        .collect(Collectors.toSet());
  }

  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
    Optional<Plugin> Optional<Module> p = pluginsmodules.stream()
        .filter(p -> p.listFunctions().contains(name))
        .findFirst();

    return p.isPresent() ? p.get().getFunctionDefinition(name) : Optional.empty();
  }


// addUserDefinedTypes(), getUserDefinedTypes(), etc
}

...

FunctionCatalog will hold PluginManager ModuleManager to resolve built-in functions.

...

public abstract class HiveModuleFactory implements ModuleFactory {

  @Override
  public Module createModule(String name, Map<String, String> properties) {
    return new HiveModule(properties.get("hive-version"));
  }

  @Override
  public Map<String, String> requiredContext() {
    return new HashMap<String, String>() {{
      put("type", "hive");
    }};
  }

  @Override
  public List<String> supportedProperties() {
    return Arrays.asList("hive-version");
  }
}

...

Java/Scala:

tableEnv.loadModule(CoreModule.INSTANCE"hive", new HiveModule("2.2.1"));

...

As mention above, though this FLIP provides a generic design and mechanism for all plugin object module object types we want to support, we will only implement functions. Other objects can be added incrementally later on.

...

...