Skip to end of metadata
Go to start of metadata

Status

Current state"Accepted"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrating-Flink-Table-API-amp-SQL-with-CEP-td17964.html

JIRA:   FLINK-6935 - Integration of SQL and CEP Open

Released: -

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink's CEP library is a great library for complex event processing, more and more customers are expressing their interests in it. But it also has some limitations that users usually have to write a lot of code even for a very simple pattern match use case as it currently only supports the Java API.

CEP DSLs and SQLs strongly resemble each other. CEP's additional features compared to SQL boil down to pattern detection. So It will be awesome to consolidate CEP and SQL. It makes SQL more powerful to support more usage scenario. And it gives users the ability to easily and quickly to build CEP applications.

Public Interfaces

The feature will support Row Pattern Recognition syntax in SQL.  In December 2016, ISO released a new version of the international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing. Oracle's SQL has supported this feature in 12cR1. With this extension it is possible to detect complex patterns between rows in a table. In the context of CEP each row can be considered as an event and the events are ordered according to the insertion order. The following shows an example query.

 

SELECT *
FROM Ticker MATCH_RECOGNIZE (
     PARTITION BY symbol
     ORDER BY tstamp
     MEASURES  STRT.tstamp AS start_tstamp,
               LAST(DOWN.tstamp) AS bottom_tstamp,
               LAST(UP.tstamp) AS end_tstamp
     ONE ROW PER MATCH
     AFTER MATCH SKIP TO LAST UP
     PATTERN (STRT DOWN+ UP+)
     DEFINE
        DOWN AS DOWN.price < PREV(DOWN.price),
        UP AS UP.price > PREV(UP.price)
     ) MR
ORDER BY MR.symbol, MR.start_tstamp;
 
// input
SYMBOL  TSTAMP   PRICE
------- -------- ------
'ACME' '01-Apr-11' 12
'ACME' '02-Apr-11' 17
'ACME' '03-Apr-11' 19
'ACME' '04-Apr-11' 21
'ACME' '05-Apr-11' 25
'ACME' '06-Apr-11' 12
'ACME' '07-Apr-11' 15
'ACME' '08-Apr-11' 20
'ACME' '09-Apr-11' 24
'ACME' '10-Apr-11' 25
'ACME' '11-Apr-11' 19

// output
SYMBOL     START_TST BOTTOM_TS END_TSTAM
---------- --------- --------- ---------
ACME       05-APR-11 06-APR-11 10-APR-11

Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks:

  • Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses.
  • Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define.
  • Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause.
  • Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause.

Proposed Changes

We should support MATCH_RECOGNIZE in Calcite first. Fortunately, there is an undergoing issue in Calcite to add MATCH_RECOGNIZE syntax, see CALCITE-1570. And the newest Calcite version 1.12.0 (which is also the version Flink master is dependent on) has included a basic implementation of MATCH_RECOGNIZE syntax, see 1.12.0 release notes.  And the left features will be released in the next version. We might want to wait with CEP integration into flink-table until Calcite 1.13 is out and we updated the dependency though.

MATCH_RECOGNIZE is still an ongoing work in Calcite, the summary of its implementation status and its capacity compared with Flink-CEP is as follows:

PARTITION BY -- optional

  • Syntax: Specify the partition columns.
  • Flink-CEP: DataStream.keyBy()
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

ORDER BY -- mandatory

  • Syntax: Defines the order of the rows within a partition. Users need to explicitly order by processing or event time.
  • Flink-CEP: can be automatically handled by Flink CEP
  • Status in Calcite: Not supported in 1.12.0, will be supported in 1.13

MEASURES -- optional

  • Syntax: Defines the exported columns.
  • Flink-CEP: PatternStream.select(IterativeCondition)
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

{ONE ROW | ALL ROWS} PER MATCH

  • Syntax: ONE ROW PER MATCH: (Default) create a single summary row for each match 
                 ALL ROWS PER MATCH: create one row for each row of each match 
  • Flink-CEP: ONE ROW PER MATCH: PatternStream.select(PatternSelectFunction)
                      ALL ROWS PER MATCH: PatternStream.flatSelect(PatternFlatSelectFunction)
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

AFTER MATCH SKIP {TO NEXT ROW | PAST LAST ROW | TO FIRST <variable> | TO LAST <variable> | TO <variable>}

  • Syntax: This syntax determines the starting point of the next match pattern after a match has been found.  Default is AFTER MATCH SKIP PAST LAST ROW.
                 TO NEXT ROW: starting from the next row of the first row of the current match
                 PAST LAST ROW: starting from the next row after the last row the current match
                TO FIRST <variable>: starting from the first row of the designated group
                TO LAST <variable>: starting from the last row of the designated group
                TO <variable>: similar to TO LAST <variable>
  • Flink-CEP: TO NEXT ROW: Supported by default in Flink-CEP
                      PAST LAST ROW: Depends on feature#4 in the “Required features” section
                      TO FIRST <variable>: Depends on feature#4 in the “Required features” section
                      TO LAST <variable>: Depends on feature#4 in the “Required features” section
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

PATTERN -- mandatory

  • Syntax: Used to specify a regular expression, the regular expression is built from variable names defined the define clause.
  • Flink-CEP: Partially supported in Pattern, more functionalities depend on feature#(1,2,3) In the “Required features” section
  • Status-in-Calcite: Supported in 1.12.0

SUBSET -- optional

  • Syntax: This syntax defines variables which are correlation variables defined in the pattern clause. This named variable can be used in the measures clause and define clause.
  • Flink-CEP: Easy to implement
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

DEFINE -- mandatory

  • Syntax: Used to define the boolean condition that defines a variable name that is declared in the pattern.
  • Flink-CEP: Can be supported with IterativeCondition
  • Status-in-Calcite: Supported in 1.12.0

CLASSIFIER -- optional

  • Syntax: This syntax is only available with ALL ROWS PER MATCH. It’s used to specify the variable name in the pattern that the row matched.
  • Flink-CEP: Easy to implement
  • Status-in-Calcite: Supported in 1.12.0

MATCH_NUMBER -- optional

  • Syntax: This syntax allows the user to receive the sequential number of the current match.
  • Flink-CEP: Easy to implement
  • Status-in-Calcite: Supported in 1.12.0

WITHIN -- optional

  • Syntax: Output a pattern match iff the match occurs within the specified time duration.
  • Flink-CEP: Pattern.within(Time)
  • Status-in-Calcite: Not supported yet

Currently, MATCH_RECOGNIZE in Calcite only support PATTERN and DEFINE. We will explain translation with a simple example.

SELECT *
FROM T MATCH_RECOGNIZE (
     PATTERN (A B+ C)
     DEFINE
        A AS A.name = ‘a’,
        B AS B.name = ‘b’,
        C AS C.name = ‘c’
     ) MR

Giving the above simple SQL, Calcite will translate it to a relational tree:

LogicalProject[select=”id, name”]
  LogicalMatch[pattern=”A B+ C”, definitions=”...”]
    LogicalTableScan[table=”T”]

What we need to do is translating “LogicalMatch” node to calls on Flink-CEP APIs. The most important work is translating PATTERN, DEFINE, and MEASURE.

There are six kinds of PATTERNs:

  • PATTERN_CONCAT: such as pattern A B is a pattern means that event B has to directly succeed the previous matching event A. This can be expressed by Flink-CEP API pattern.next(String).
  • PATTERN_QUANTIFIER: such as pattern B{3, 5} is a pattern that matches between 3 and 5 occurrences of B. And B+ is a pattern that matches one or more occurrences of B. B+ can be expressed by CEP API pattern.oneOrMore(), B{n, n} can be expressed by pattern.times(n), B? Can be expressed by pattern.optional(). B* can be expressed as pattern.oneOrMore().optional(). But B{n, m} and B* are not supported in CEP API.
  • PATTERN_ALTER: such as pattern A | B is a pattern that matches A or B. This can’t be expressed by CEP API currently. But I find a issue related to this feature. See FLINK-4641
  • PATTERN_GROUPING: such as ((A B){3} C) attempts to match the group (A B) three times and then seeks one occurrence of C. NOT supported in CEP API currently.
  • PATTERN_PERMUTE: such as PERMUTE (A, B) is a pattern that matches all permutations of A and B. NOT supported in CEP API currently.
  • PATTERN_EXCLUDE: such as {- A -} is a pattern that matches A but excludes A from the output.

And we can translate DEFINE into CEP’s IterativeCondition by code generation, and apply it into pattern.where(...). Most of the definition can be translated by current CodeGenerator, besides some specific calls such as PREV, LAST, FIRST.

MEASURES can be translated into CEP’s PatternSelectFunction by code generation. The generation can heavily reuse the current CodeGenerator. NOTE: However, Calcite 1.12.0 doesn’t support MEASURES, so we don’t know what output do users want. As a workaround, in the propotype, we hardcode the PatternSelectFunction to only output the last event in the matched events.

As a result, the above example will be translated into CEP API like this:

val pattern = Pattern
  .begin(“A”).where(_.name == “a”)
  .next(“B”).oneOrMore().where(_.name == “b”)
  .next(“C”).where(_.name == “c”)
val patternStream = CEP.pattern(partitionedInput, pattern)
val output = patternStream.select(PatternSelectFunction))
  1. Support .times(from, to) on Pattern
  2. Support branching CEP patterns. FLINK-4641
  3. Support permutation on Pattern
  4. Support .prune(long pruningTimestamp) on NFA
  5. IterativeCondition support open() method to compile generated code and not de/serialized with NFA state.
  6. Support pattern grouping in Flink CEP

Compatibility, Deprecation, and Migration Plan

This FLIP proposes new functionality and syntax for SQL and CEP library. The behavior of existing operators is not modified.  To integrating CEP and SQL needs to add flink-cep dependency to flink-table. IMO, the flink-cep is a very slim library with almost none external dependencies, so it won't be a problem.

Test Plan

The features added to CEP library will be tested in flink-cep. The integration test will take place in flink-table. 

Rejected Alternatives

No rejected alternatives yet.


Implementation Plan

The implementation of this effort can be divided into several subtasks. The following subtasks should wait until Calcite 1.13 released:

  1. Implementation of the basic framework to support CEP on SQL. This includes supporting PATTERN and DEFINE clauses for MATCH_RECOGNIZE. see prototype
  2. Support MEASURES clause in MATCH_RECOGNIZE
  3. Support AFTER MATCH sub-clause of MATCH_RECOGNIZE
  4. Support SUBSET clause in MATCH_RECOGNIZE
  5. Support ALL ROWS PER MATCH for MATCH_RECOGNIZE
  6. Support PARTITION BY for MATCH_RECOGNIZE
  7. Support ORDER BY for MATCH_RECOGNIZE
  8. Support CLASSIFIER for MATCH_RECOGNIZE
  9. Support MATCH_NUMBER for MATCH_RECOGNIZE
  10. Support WITHIN clause for MATCH_RECOGNIZE
  11. Support LAST, FIRST, PREV, NEXT, FINAL, RUNNING aggregates for MATCH_RECOGNIZE


The following subtasks are about the missing features of the CEP library and can start working right away:

  1. Support .times(from, to) in Flink CEP Pattern
  2. Support branching CEP patterns in Flink CEP . FLINK-4641
  3. Support permutation in Flink CEP Pattern.
  4. Support .prune(long pruningTimestamp) on Flink CEP NFA.
  5. IterativeCondition support open() method to compile generated code and not de/serialized with NFA state.
  6. Support pattern grouping in Flink CEP
  • No labels