Status

Current state"Accepted"

Discussion thread: http://mail-archives.apache.org/mod_mbox/flink-dev/201712.mbox/%3C8a9d718b-5dae-0fe2-1da6-a8d557d45582%40apache.org%3E

JIRA:

Released: 1.5

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

SQL is undoubtedly the most widely used language for data analytics. It is declarative and can be optimized and efficiently executed by most query processors. The necessity to apply those concepts also to stream processors is a logical consequence for making streaming accessible to a broader audience and enable faster development without exact knowledge of the underlying runtime.

 

Although Flink’s Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with Maven before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers.

In a long-term view, with the SQL Client proposed in this document we want to:


The goal of this FLIP is to have an initial minimum viable product for using Flink with SQL without an IDE. We will use this product to incrementally refine the requirements based on the feedback from users and contributors. Further FLIPs and design documents might follow in order to define REST/JDBC capabilities or materialized view semantics.

Public Interfaces

Proposed Changes

General Architecture

The SQL Client can be executed in two modes: a gateway and embedded mode. In this FLIP we mostly focus on the embedded mode but also consider a later gateway conceptually.

The communication to Flink happens through the ClusterClient. By using the ClusterClient we can ensure that the SQL client will run Flink on YARN, Mesos, and other supported environments without additional effort.

For the embedded mode, a general communication flow would like:

Gateway Mode

 

Embedded Mode

Configuration

 

Independent of the execution mode, the SQL client can be configured globally (sql-defaults.conf) and/or for every CLI session (sql-context.conf). The configuration specifies settings that would programmatically be applied to a ExecutionEnvironment/StreamExecutionEnvironment and TableEnvironment. It contains catalog information as well as job specific parameters.

The global configuration is located in a dedicated file and is read during startup. Global configuration applies to all CLI sessions. A context configuration file can be specified when starting the CLI client and is attached to any query executed in the current CLI session. Thus, context properties might overwrite global default properties. In future versions, the configuration might also be refined by using DDL statements such as:

DDL statements will have highest priority and overwrite the properties specified in configuration files.

Here are some properties that might need to be defined in the long run:

Result Retrieval

 

In the future, we can use different options for retrieving materialized results both for debugging purposes and long-running maintained views. The retrieval is managed by the executor.

There are three materialization modes:

 


The supported materialization mode also depends on the query type:


Query Type

Internal Mode

External Mode*

Batch

collect() -> Heap/Database

File table sink

Append Stream

collect() -> Heap/Database

Kafka/file table sink

Retract/Upsert Stream

collect() -> Heap/Database

(Compacted Kafka)/Cassandra table sink


We might use usual heap space at the beginning. The internal database can be any JDBC database. External materialization modes (*) are not included in the first version. In the future, Kafka would be read by general Kafka utility functions. Files as well with support for different file systems.

Result Maintenance

While batch queries have bounded results, streaming queries are potentially never-ending and, therefore, require special treatment for keeping the results up to date and consistent. The streaming query can be considered as a view and the running streaming application as the view maintenance. Results might need to be supplied to systems that were not made for streaming queries, e.g., Java applications that read from a JDBC API. In those cases, every requested result set must be a snapshot (materialized view) of the running query at a point in time. The snapshot must be immutable until all result rows have been consumed by the application or a new result set is requested. We distinguish between two types of results that will require different materialization semantics: a materialized view and a materialized result stream.

Materialized View

A consistent materialized view of results for production use cases. Materialized views are not part of this FLIP but might be added in future versions. It requires another design document for the DDL statement and execution but here are some properties we aim for:

SQL: CREATE MATERIALIZED VIEW ...

Materialized Result Stream

A materialized stream of results for getting immediate insights into the running SQL query.

SQL: SELECT ...


We focus on simple SELECT queries first that are materialized on the heap of the executor (internal materialization mode).

Compatibility, Deprecation, and Migration Plan

 No compatibility changes or other deprecation necessary.

Implementation Plan

1. Basic Embedded SQL Client 

Add the basic features to play around with Flink's streaming SQL.

2. Full Embedded SQL Client

Add important features to fully use the SQL client for a variety of use cases.

3. Discuss/design further features

Discuss and prioritize other features that are not part of this FLIP. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.