Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

View file
nameDoris load data设计.pdf
height250

Motivation

Describe the problems you are trying to solve.

Related Research

some research related to the function, such as the advantages and disadvantages of the design, related considerations, etc.

Detailed Design

the detailed design of the function.

Scheduling

...

Doris now only supports stream_load to support loading client data files into doris cluster.
However, some users still prefer to use mysql load data sql syntax to load small data from clients.

Related Research

Some other mysql compatibility database engine also support this feature:

Detailed Design

1. syntax define

We will follow the syntax of mysql with some small unsupported expressions.

Code Block
languagesql
LOAD DATA
    [LOCAL]
    INFILE 'file_name'
    [REPLACE | IGNORE]
    INTO TABLE tbl_name
    [PARTITION (partition_name [, partition_name] ...)]
    [{FIELDS | COLUMNS}
        [TERMINATED BY 'string']
    ]
    [LINES
        [TERMINATED BY 'string']
    ]
    [IGNORE number {LINES | ROWS}]
    [(col_name_or_user_var
        [, col_name_or_user_var] ...)]
    [SET col_name={expr | DEFAULT}
        [, col_name={expr | DEFAULT}] ...]

This synatax is very similar to BrokerLoad, but BrokerLoad has a label definition and MySQL Load does not have this.
And BrokerLoad is an async statement, but mysql load is a sync statement.


2.Implement

Now in Doris, StreamLoad already has the capability to transfer local files from client or fe node to the BE storage engine.
So we will not implement a new load execution for Mysql LOAD DATA, but just an adapter on the syntax layer which is used to convert Mysql LOAD DATA to StreamLoad request.

The keywork LOCAL means the file is in the client node, otherwise the file is in the server node.
In doris the server node is the FE NODE.

We don't identify the master fe nor the fellower fe node. User must make sure the server local file is in the right FE node.

Server local file

In this mode, the local file was already in FE NODE.

Image Added

  1. The client executes the LOAD DATA syntax
  2. FE node checks the local file and chooses a BE Node to send StreamLoad request.
  3. FE Node reads the local file as file stream and wrappers it into http request body.
  4. BE Node receives the StreamLoad request and loads stream data into the table.
  5. Finish the StreamLoad and then response the client with Records: 1 Deleted: 0 Skipped: 0 Warnings: 0

The POC code of file stream to Http request:

Code Block
languagejava
collapsetrue
public static RequestBody createInputStreamRequestBody(final InputStream stream) {
    return new okhttp3.RequestBody() {
        @Override
        public MediaType contentType() {
            // 设置body mime类型,这里以二进制流为例
            return MediaType.get("application/octet-stream");
        }

        @Override
        public long contentLength() throws IOException {
            // 返回-1表示body长度未知,将开启http chunk支持
            // RequestBody中默认返回也时-1
            return -1;
        }

        @Override
        public void writeTo(BufferedSink sink) throws IOException {
            try (Source source = Okio.source(stream)) {
                sink.writeAll(source);
            }
        }
    };
}

public static void main(String[] args) throws IOException {
    OkHttpClient client = new OkHttpClient().newBuilder().build();
    byte[] bytes = "3\t2".getBytes();
    ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    RequestBody body = createInputStreamRequestBody(Files.newInputStream(Paths.get("/Users/bytedance/Downloads/def")));
    Request request = new Request.Builder()
            .url("http://doris:8040/api/hzw/t2/_stream_load")
            .method("PUT", body)
            .addHeader("Expect", "100-continue")
            .addHeader("Authorization", "Basic cm9vdDo=")
            .addHeader("Content-Type", "text/plain")
            .build();
    Response response = client.newCall(request).execute();
    System.out.println(response.body().string());
}


Client local file

The client mode is different from server mode because the file is in the client node and we must transfer it to the server as network stream and then wrapper http request to be node.

The network protocol of mysql LOAD DATA is here:

Image Added

  1. Client execute COM_QUERY to execute load data syntax
  2. Server response with 0xFB + filename
  3. Client sends all the bytes of file one by one package.
  4. At the end of file body transfer, client will send an empty package.
  5. When the server finishes loading data, it will send an OK package to client.

All the steps are the same with server mode except the NetwrokStream in the picture.

Image Added
To reduce the disk usage of FE Node, we use produce-consumer block network stream to receive bytes from client and send bytes to BE node.

Scheduling

Maybe it will take about 2 months to finish this feature:

  1. Impl the syntax of LOAD DATA and make it load server local file into doris. (3 weeks)
  2. Support load client local file into doris.(2 weeks)
  3. Support IGNORE number {LINES | ROWS} in stream load (2 week)
  4. Support [REPLACE | IGNORE] in stream load (2 week)