Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

Motivation

Doris now only supports stream_load to support loading client data files into doris cluster.
However, some users still prefer to use mysql load data sql syntax to load small data from clients.

Related Research

Some other mysql compatibility database engine also support this feature:

Detailed Design

1. syntax define

We will follow the syntax of mysql with some small unsupported expressions.

LOAD DATA
    [LOCAL]
    INFILE 'file_name'
    INTO TABLE tbl_name
    [PARTITION (partition_name [, partition_name] ...)]
    [COLUMNS TERMINATED BY 'string']
    [LINES TERMINATED BY 'string']
    [IGNORE number {LINES | ROWS}]
    [(col_name_or_user_var [, col_name_or_user_var] ...)]
    [SET (col_name={expr | DEFAULT} [, col_name={expr | DEFAULT}] ...)]
 	[PROPERTIES (key1 = value1 [, key2=value2]) ]

This synatax is very similar to BrokerLoad, but BrokerLoad has a label definition and MySQL Load does not have this.
And BrokerLoad is an async statement, but mysql load is a sync statement.


2.Implement

Now in Doris, StreamLoad already has the capability to transfer local files from client or fe node to the BE storage engine.
So we will not implement a new load execution for Mysql LOAD DATA, but just an adapter on the syntax layer which is used to convert Mysql LOAD DATA to StreamLoad request.

The keywork LOCAL means the file is in the client node, otherwise the file is in the server node.
In doris the server node is the FE NODE.

We don't identify the master fe nor the fellower fe node. User must make sure the server local file is in the right FE node.

Server local file

In this mode, the local file was already in FE NODE.

  1. The client executes the LOAD DATA syntax
  2. FE node checks the local file and chooses a BE Node to send StreamLoad request.
  3. FE Node reads the local file as file stream and wrappers it into http request body.
  4. BE Node receives the StreamLoad request and loads stream data into the table.
  5. Finish the StreamLoad and then response the client with Records: 1 Deleted: 0 Skipped: 0 Warnings: 0

The POC code of file stream to Http request:

public static RequestBody createInputStreamRequestBody(final InputStream stream) {
    return new okhttp3.RequestBody() {
        @Override
        public MediaType contentType() {
            // 设置body mime类型,这里以二进制流为例
            return MediaType.get("application/octet-stream");
        }

        @Override
        public long contentLength() throws IOException {
            // 返回-1表示body长度未知,将开启http chunk支持
            // RequestBody中默认返回也时-1
            return -1;
        }

        @Override
        public void writeTo(BufferedSink sink) throws IOException {
            try (Source source = Okio.source(stream)) {
                sink.writeAll(source);
            }
        }
    };
}

public static void main(String[] args) throws IOException {
    OkHttpClient client = new OkHttpClient().newBuilder().build();
    byte[] bytes = "3\t2".getBytes();
    ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    RequestBody body = createInputStreamRequestBody(Files.newInputStream(Paths.get("/Users/bytedance/Downloads/def")));
    Request request = new Request.Builder()
            .url("http://doris:8040/api/hzw/t2/_stream_load")
            .method("PUT", body)
            .addHeader("Expect", "100-continue")
            .addHeader("Authorization", "Basic cm9vdDo=")
            .addHeader("Content-Type", "text/plain")
            .build();
    Response response = client.newCall(request).execute();
    System.out.println(response.body().string());
}


Client local file

The client mode is different from server mode because the file is in the client node and we must transfer it to the server as network stream and then wrapper http request to be node.

The network protocol of mysql LOAD DATA is here:

  1. Client execute COM_QUERY to execute load data syntax
  2. Server response with 0xFB + filename
  3. Client sends all the bytes of file one by one package.
  4. At the end of file body transfer, client will send an empty package.
  5. When the server finishes loading data, it will send an OK package to client.

All the steps are the same with server mode except the NetwrokStream in the picture.


To reduce the disk usage of FE Node, we use produce-consumer block network stream to receive bytes from client and send bytes to BE node.

Authentication

Doris now have two authentication mechanisms for mysql client and http request: client will use mysql protocol and the http request "Basic auth".

So if the fe want to send a stream load request to be, it still need a user and password to auth.

And it is unacceptable becanse user already auth from mysql client. We call it "Double Auth" issue.

To avoid this issue, I design to use the cluster token as an whitelist auth for the streamload auth of be.

The cluster token is unique and only admin user who can login fe machine will obtain the token, so it's safe in this case.

IGNORE Lines

Doris now support skip 1 or 2 lines when reading csv file with specific format named csv_with_names and csv_with_names_and_types.

And this is not enough for mysql load syntax. It should support common ignore lines for csv.


Scheduling

Maybe it will take about 2 months to finish this feature:

  1. Impl the syntax of LOAD DATA and make it load server local file into doris. 
  2. Avoid double authentication for stream load
  3. Support IGNORE number {LINES | ROWS} in stream load

Reference

  1. MySQL Syntax
  2. MySQL Network Protocol
  3. TiDB Network Protocol Implement
  • No labels