Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: https://lists.apache.org/thread/6lbdw7ndgq2fnhyoomh0wyczp41wkbsf

JIRA or Github Issue: 

Released: <Doris Version>

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

Motivation

In the current data import ways,users generally use StreamLoad to push real-time data to Doris. In this way, users need to buffer multiple records in the client side, and then send them to Doris.
Users need to buffer batch data for the following reasons:

  1. All data writing, including the insert statement, is a separate import job.

  2. One import job is associated with one transaction, coordinated by Fe. At the same time, each job will generate a RowSet for every tablet in Be.

  3. If the data volume of multiple import tasks is relatively small, it will cause too many RowSets in Be, which will negatively affect Compaction and query performance. Doris currently avoids this by limiting the number of version for a single tablet.



But buffering batch data on client side makes it complicated for users to write to Doris:

  1. Write code to save batches, which may be difficult for some users;

  2. How much is buffered, such as according to the time or according to the amount of data, actually depends on the scene.



For the above reasons, it's necessary to improve the usability of real-time push write scenes. Using the insert statement is the most user-friendly.



In order to use insert statements, it is necessary to solve the above-mentioned problems of transactions and RowSets. Therefore, we consider buffering data and automatically commits can be done at the server side. As shown in the following picture:

  1. Client buffer data and stream load;

  2. Client uses insert statement to write data. Doris cluster provides a buffer client, which buffers data and stream load, and this is transparent to users;

  3. Client uses insert statement to write data. Considering the implementation, Be can be the buffer client;


The data visibility of insert statement in this new batch mode and the previous is different:

  1. Normal mode: The data is visible when the writing is successful.

  2. Batch mode: The data is not visible until server buffer enough and commit.

Related Research


Detailed Design

We want to implement the 3 goals:

  1. Guarantee once data is accepted, it is eventually imported successfully
    • WAL(Write Ahead Log)
  2. Data import is not duplicated
    • Use a unique label
  3. Commit order is the same as user write order
    • Each table uses a fixed Be to buffer data, split data to groups, and commit by the write order


The basic login is as the following picture:


  1. Add a auto_batch_load='true'/'false' property for table. If true, it means some inserts will use this feature.
  2. For every table enables "auto_batch_load",Fe maintains a fixed Be for buffering data, called "Load Be". Since the Load Be of each table is fixed, it guarantees the commit order is the same order as data written order.



The details of Insert:

  1. Fe receives the insert statement, if it meets the 'auto_batch_load' conditions, skip generate execution plan fragment and send rows data to Load Be.
  2. Load Be receives the rows data, and if it is the first insert(last transcation is null or committed), open a WAL and request begin transaction to Fe with a unique label; Fe will create a transaction and create a corresponding StreamLoadPip (The Load Be is the Coordinator Be at the same time).
  3. Load Be writes WAL;
  4. Load Be writes rows to stream load pip;



The details of Auto Commit:

  1. There is a background thread which will check if the commit conditions are satisfied and request commit to fe. Commit conditions include the following:

    • The number of rows;
    • Data size of rows;

    • The duration from transaction began to now;



Load Be is restarted and recovery WAL:

  1. Scan WAL directory, recovery WAL one by one according to the WAL id;
  2. For each WAL, use a unique label. If it's already committed before Be is down, the newly commit will be rejected by Fe. This guarantees that data is not duplicated.



Now, let's review the previous 3 goals again:

  1. Guarantee once data is accepted, it is eventually imported successfully.
    • Since in the current design, WAL is a single replica, there is a risk of data loss

    • Compatibility with Schema Change:

      1. For each insert, we first begin a transaction. This will block schema change and ensure the import is successful.

      2. When Load Be is restarted and schema changed, the data in WAL maybe can not loaded (maybe consider SchemaHash to solve it).

  2. Data import is not duplicated  (YES)
  3. Commit order is the same as user write order
    • The commit order can not be guaranteed if Load Be1 is down and can not started, the newly Load Be2 may commit first, and then Load Be1 starts the recovery WAL. In this case, the version of the old data is newer. (consider seq_id or some other solutions)

    • The order between conditional deletion and auto_batch_load is not guaranteed. (maybe conditional deletion statements can also be handled in auto_batch_load mode)

    • Currently, only insert statements that contain all columns are supported, and consider supporting insert that does not contain all columns



FAQ

  1. What scenario to use it
    • Currently, we use it in log writing scene

  2. How to use it
    • enable auto_batch_load for table
    • client use "insert into...values(),(),()" statement to write data
  3. How to work with stream load
    • The commit order can not by be guaranteed
  4. Why not use "begin... insert... commit"
    • From the user's view, the "begin... insert... commit" ensures the atomicity of multi rows. Many use cases do not require such capabilities, and can use insert directly.

    • "begin... insert... commit" requires the user to control the commits, and also associates with a transaction and RowSet, commits can not too frequently; auto_batch_load can commit data from multi clients.

    • Data may be lost if users don't commit when "begin... insert... commit"; auto_batch_load pre-write WAL to ensure that the data can be eventually loaded.

  5. The performance impact of too much rpc
    • Many KV systems are used in this way, and rpc has little impact

    • In our tests, Fe can parse and redirect 20,000 + SQL per second. At the same time, Fe is scalable.


Scheduling

specific implementation steps and approximate scheduling.

  • No labels

1 Comment

  1. If a user does not want to do complicated batching work himself by writing code, then she/he can just put data into doris by new auto_batch_load, it would be a much easier job than before. Regarding to data durability, for most user cases, a WAL is enough to provide a relatively high durability, especially, nowadays many users use cloud disks that provide durability with at least seven nines.