Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-6935

Release1.7

Status

Current state"Accepted"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrating-Flink-Table-API-amp-SQL-with-CEP-td17964.html

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The feature will support Row Pattern Recognition syntax in SQL.  In December 2016, ISO released a new version of the international SQL standard (ISO/IEC 9075:2016) including the Row Pattern Recognition for complex event processing. Oracle's SQL has supported this feature in 12cR1. With this extension it is possible to detect complex patterns between rows in a table. In the context of CEP each row can be considered as an event and the events are ordered according to the insertion order. The following shows an example query.

...

SELECT *
FROM Ticker MATCH_RECOGNIZE (
     PARTITION BY symbol
     ORDER BY tstamp
     MEASURES  STRT.tstamp AS start_tstamp,
               LAST(DOWN.tstamp) AS bottom_tstamp,
               LAST(UP.tstamp) AS end_tstamp
     ONE ROW PER MATCH
     AFTER MATCH SKIP TO LAST UP
     PATTERN (STRT DOWN+ UP+)
     DEFINE
        DOWN AS DOWN.price < PREV(DOWN.price),
        UP AS UP.price > PREV(UP.price)
     ) MR
ORDER BY MR.symbol, MR.start_tstamp;
 
// input
SYMBOL  TSTAMP   PRICE
------- -------- ------
'ACME' '01-Apr-11' 12
'ACME' '02-Apr-11' 17
'ACME' '03-Apr-11' 19
'ACME' '04-Apr-11' 21
'ACME' '05-Apr-11' 25
'ACME' '06-Apr-11' 12
'ACME' '07-Apr-11' 15
'ACME' '08-Apr-11' 20
'ACME' '09-Apr-11' 24
'ACME' '10-Apr-11' 25
'ACME' '11-Apr-11' 19
 
 
// output
SYMBOL     START_TST BOTTOM_TS END_TSTAM
---------- --------- --------- ---------
ACME       05-APR-11 06-APR-11 10-APR-11

 

Row Pattern Recognition in SQL is performed using the MATCH_RECOGNIZE clause. MATCH_RECOGNIZE enables you to do the following tasks:

  • Logically partition and order the data that is used in the MATCH_RECOGNIZE clause with its PARTITION BY and ORDER BY clauses.
  • Define patterns of rows to seek using the PATTERN clause of the MATCH_RECOGNIZE clause. These patterns use regular expression syntax, a powerful and expressive feature, applied to the pattern variables you define.
  • Specify the logical conditions required to map a row to a row pattern variable in the DEFINE clause.

...

  • Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause.

...

  • Syntax: Defines the exported columns.
  • Flink-CEP: PatternStream.select(IterativeCondition)
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

{ONE ROW | ALL ROWS} PER MATCH

...

  • Syntax: Used to specify a regular expression, the regular expression is built from variable names defined the define clause.
  • Flink-CEP: Partially supported in Pattern, more functionalities depend on feature#(1,2,3) In the “Required features” section
  • Status-in-Calcite: Supported in 1.12.0

...

SUBSET -- optional

  • Syntax: This syntax defines variables which are correlation variables defined in the pattern clause. This named variable can be used in the measures clause and define clause.
  • Flink-CEP: Easy to implement
  • Status-in-Calcite: Not supported in 1.12.0, will be supported in 1.13

 

DEFINE -- mandatory

  • Syntax: Used to define the boolean condition that defines a variable name that is declared in the pattern.
  • Flink-CEP: Can be supported with IterativeCondition
  • Status-in-Calcite: Supported in 1.12.0

...

CLASSIFIER -- optional

  • Syntax: This syntax is only available with ALL ROWS PER MATCH. It’s used to specify the variable name in the pattern that the row matched.
  • Flink-CEP: Easy to implement
  • Status-in-Calcite: Supported in 1.12.0

...

MATCH_NUMBER -- optional

  • Syntax: This syntax allows the user to receive the sequential number of the current match.
  • Flink-CEP: Easy to implement
  • Status-in-Calcite: Supported in 1.12.0

...

WITHIN -- optional

  • Syntax: Output a pattern match iff the match occurs within the specified time duration.
  • Flink-CEP: Pattern.within(Time)
  • Status-in-Calcite: Not supported yet

...

LogicalProject[select=”id, name”]
  LogicalMatch[pattern=”A B+ C”, definitions=”...”]
    LogicalTableScan[table=”T”]

 

What we need to do is translating “LogicalMatch” node to calls on Flink-CEP APIs. The most important work is translating PATTERN, DEFINE, and MEASURE.

There are six kinds of PATTERNs:

  • PATTERN_CONCAT: such as pattern A B is a pattern means that event B has to directly succeed the previous matching event A. This can be expressed by Flink-CEP API pattern.next(String).
  • PATTERN_QUANTIFIER: such as pattern B{3, 5} is a pattern that matches between 3 and 5 occurrences of B. And B+ is a pattern that matches one or more occurrences of B. B+ can be expressed by CEP API pattern.oneOrMore(), B{n, n} can be expressed by pattern.times(n), B? Can be expressed by pattern.optional(). B* can be expressed as pattern.oneOrMore().optional(). But B{n, m} and B* are not supported in CEP API.
  • PATTERN_ALTER: such as pattern A | B is a pattern that matches A or B. This can’t be expressed by CEP API currently. But I find a issue related to this feature. See FLINK-4641
  • PATTERN_GROUPING: such as ((A B){3} C) attempts to match the group (A B) three times and then seeks one occurrence of C. NOT supported in CEP API currently.
  • PATTERN_PERMUTE: such as PERMUTE (A, B) is a pattern that matches all permutations of A and B. NOT supported in CEP API currently.
  • PATTERN_EXCLUDE: such as {- A -} is a pattern that matches A but excludes A from the output.

...

And we can translate DEFINE into CEP’s IterativeCondition by code generation, and apply it into pattern.where(...). Most of the definition can be translated by current CodeGenerator, besides some specific calls such as PREV, LAST, FIRST.

...

...

Compatibility, Deprecation, and Migration Plan

...