Current state: Accepted

Discussion thread:

JIRA:   FLINK-14256 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Filesystem is a very important connector in the table/sql world.

  • Most important connector for batch job.
  • Startup for both streaming and batch.
  • Streaming sink to FileSystem/Hive is a very common case for data import of data warehouse.

But now, we only have Filesystem with csv, and it has many shortcomes:

  • Not support partitions.
  • Inserting into twice will lead to an exception.
  • Not support insert overwrite.
  • Old csv is not standard csv.
  • Not support batch failover.
  • Not support streaming exactly-once writing.

This FLIP aims to introduce an all round file system connector.

Proposed Change


CREATE TABLE table_name (


) [PARTITIONED BY (col_name ...)]


    ‘connector.type’ = ‘filesystem’,

    ‘connector.path’ = ..., -- should be a directory

    ‘format.type’ = … -- csv, orc, parquet, json, avro



Partition Support [1]:

  • Bring “CREATE TABLE … PARTITIONED BY(col_name ...)” grammar back.
  • The partition information should be in the file system path, whether it's a temporary table or a catalog table.


“INSERT INTO” means appending to directory.

INSERT INTO TABLE filesystem_table ...

“INSERT OVERWRITE” means overwriting connector directory or partition directory.

INSERT OVERWRITE TABLE filesystem_table ...

“INSERT INTO … PARTITION()” can specify static partition columns.

INSERT INTO TABLE filesystem_table PARTITION(partcol1=val1, partcol2=val2 ...) ...

Batch sink

Visibility: Only after the job finish, the written file is visible, the intermediate status is not visible.

Streaming sink


Streaming source:

CREATE TABLE kafka_table (

    user_id STRING,

    order_amount double,

    log_ts TIMESTAMP(3),

    WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- [2]

) WITH (



Filesystem sink:

CREATE TABLE filesystem_table (

    user_id STRING,

    order_amount double,

    day_ts STRING,

    hour_ts STRING

) PARTITIONED BY (day_ts, hour_ts)




Insert into Filesystem connector:

INSERT INTO TABLE filesystem_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;


Introduce rolling-policy to roll files. Rolling currently open files and opens a new one.

‘connector.sink.rolling-policy.record-number’ = ‘0’,

‘connector.sink.rolling-policy.file-size’ = ‘0’,

‘connector.sink.rolling-policy.time.interval’ = ‘0’


A single task may be writing multiple files at the same time. If the parallelism is too high, it may lead to a large number of small files. If the parallelism is too small, the performance is not enough. This requires that the user can specify parallelism.

  • Default is the same as upstream transformation
  • Users can specify parallelism too.

‘connector.sink.parallelism’ = ...

Before sink, we can shuffle by dynamic partition fields to sink parallelisms, this can greatly reduce the number of files. But filesystem tables are often partitioned by time, because input records are ordered by time, so unlike batch jobs, there won't be too many partitions at the same time, which also makes it unnecessary to shuffle by dynamic partitions before sink.

‘connector.sink.shuffle-by-partition.enable’ = ...

Partition Commit

Committing a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. The filesystem is different from hive:

  • Hive: add partition to metastore
  • Filesystem: Add “.succes” file to directory (success file name is configurable too)

‘connector.sink.partition-commit.policy’ = ... -- ‘metastore’ or ‘success-file’ or ‘metastore,success-file’

‘’ = …

When to commit partitions? We must know there are no elements belong to this partition. Most partitions are related to time, so we can use the watermark to monitor progress of partition writing. If a user wants to commit partition, he should define a watermark source [2]. 

We provide PartitionCommitTrigger to trigger commit by watermark and partition:

public interface PartitionCommitTrigger {

  canCommit(LinkedHashMap<String, String> partitionSpec, long watermark);


We provide built-in triggers for:

  • day: one partition column, format is ‘yyyy-MM-dd’. Extract and parse the first partition field and check whether it should be committed.
  • day-hour: two partition columns. The first partition field like above, represents day. The second format is ‘HH’ represents hour.

Users also can use custom triggers.

‘connector.sink.partition-commit.trigger’ = … -- day or day-hour or custom

‘connector.sink.partition-commit.trigger.class’ = … -- Implementation of extractor


  • Introduce FileSystemTableFactory, FileSystemTableSource, FileSystemTableSink to provide table related implementations, this should not be blocked by FLIP-95, but will migrate to new interfaces after FLIP-95 finished.
  • Read: Reusing FileInputFormat, this should not be blocked by FLIP-27, but will migrate to new interfaces after FLIP-27 finished.
  • Write: Reusing current batch sink and DataStream StreamingFileSink 
  • Formats should do:
    • Data format: Format should use BaseRow, after FLIP-95, we use BaseRow to be source/sink data format.
    • Read: Format should provide InputFormat with partition fields support.
    • Write: Format should provide BulkWriter.Factory or Encoder for unify sink implementation (Now for StreamingFileSink).
  • We plan to implement CSV, JSON, PARQUET, ORC first.

Migration Plan and Compatibility

  • Deprecated old filesystem-csv connector, not conflict with it.
    • CsvAppendTableFactory: Need UPDATE_MODE, new one don't need.
    • CsvBatchTableFactory: It is BatchTableFactory, new one is StreamTableFactory.
  • Others are new features

Rejected Alternatives


Future plan (Without voting)

Auto compact for streaming sink

Without shuffle by partitions, and users configure a small checkpoint interval for better latency. There may be lots of small files, this is not good to the downside reading, and not good to HDFS too.

Flink provides an auto compact mechanism. When it is opened, Flink will compact files into 128MB or so as much as possible.

‘’ = ...

NOTE: This mechanism only works when open the partition commit.

NOTE: This mechanism increases the risk of duplicate data. Before opening it, users should think about how to reduce file number through parallelism and checkpoint interval.

External HDFS:

Users may want to read&write to external HDFS, which is different from the HDFS configuration&username of Flink cluster.

‘connector.hadoop.conf.dir’ = …

‘connector.hadoop.username’ = … -- If external HDFS has security


Users can close the checkpoint, then the streaming sink can work too. Then will commit files when rolling files and end inputs. (No guarantees, may lose data, may more data)

‘connector.sink.semantic’ = ...’ -- 'exactly-once' or 'none'




  • No labels