...
Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>
View file | ||||
---|---|---|---|---|
|
Motivation
Describe the problems you are trying to solve.
Related Research
some research related to the function, such as the advantages and disadvantages of the design, related considerations, etc.
Detailed Design
the detailed design of the function.
Scheduling
...
Doris now only supports stream_load to support loading client data files into doris cluster.
However, some users still prefer to use mysql load data sql syntax to load small data from clients.
Related Research
Some other mysql compatibility database engine also support this feature:
Detailed Design
1. syntax define
We will follow the syntax of mysql with some small unsupported expressions.
Code Block | ||
---|---|---|
| ||
LOAD DATA
[LOCAL]
INFILE 'file_name'
[REPLACE | IGNORE]
INTO TABLE tbl_name
[PARTITION (partition_name [, partition_name] ...)]
[{FIELDS | COLUMNS}
[TERMINATED BY 'string']
]
[LINES
[TERMINATED BY 'string']
]
[IGNORE number {LINES | ROWS}]
[(col_name_or_user_var
[, col_name_or_user_var] ...)]
[SET col_name={expr | DEFAULT}
[, col_name={expr | DEFAULT}] ...] |
This synatax is very similar to BrokerLoad, but BrokerLoad has a label definition and MySQL Load does not have this.
And BrokerLoad is an async statement, but mysql load is a sync statement.
2.Implement
Now in Doris, StreamLoad already has the capability to transfer local files from client or fe node to the BE storage engine.
So we will not implement a new load execution for Mysql LOAD DATA, but just an adapter on the syntax layer which is used to convert Mysql LOAD DATA to StreamLoad request.
The keywork LOCAL
means the file is in the client node, otherwise the file is in the server node.
In doris the server node is the FE NODE.
We don't identify the master fe nor the fellower fe node. User must make sure the server local file is in the right FE node.
Server local file
In this mode, the local file was already in FE NODE.
- The client executes the LOAD DATA syntax
- FE node checks the local file and chooses a BE Node to send StreamLoad request.
- FE Node reads the local file as file stream and wrappers it into http request body.
- BE Node receives the StreamLoad request and loads stream data into the table.
- Finish the StreamLoad and then response the client with
Records: 1 Deleted: 0 Skipped: 0 Warnings: 0
The POC code of file stream to Http request:
Code Block | ||||
---|---|---|---|---|
| ||||
public static RequestBody createInputStreamRequestBody(final InputStream stream) {
return new okhttp3.RequestBody() {
@Override
public MediaType contentType() {
// 设置body mime类型,这里以二进制流为例
return MediaType.get("application/octet-stream");
}
@Override
public long contentLength() throws IOException {
// 返回-1表示body长度未知,将开启http chunk支持
// RequestBody中默认返回也时-1
return -1;
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
try (Source source = Okio.source(stream)) {
sink.writeAll(source);
}
}
};
}
public static void main(String[] args) throws IOException {
OkHttpClient client = new OkHttpClient().newBuilder().build();
byte[] bytes = "3\t2".getBytes();
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
RequestBody body = createInputStreamRequestBody(Files.newInputStream(Paths.get("/Users/bytedance/Downloads/def")));
Request request = new Request.Builder()
.url("http://doris:8040/api/hzw/t2/_stream_load")
.method("PUT", body)
.addHeader("Expect", "100-continue")
.addHeader("Authorization", "Basic cm9vdDo=")
.addHeader("Content-Type", "text/plain")
.build();
Response response = client.newCall(request).execute();
System.out.println(response.body().string());
} |
Client local file
The client mode is different from server mode because the file is in the client node and we must transfer it to the server as network stream and then wrapper http request to be node.
The network protocol of mysql LOAD DATA is here:
- Client execute
COM_QUERY
to execute load data syntax - Server response with
0xFB + filename
- Client sends all the bytes of file one by one package.
- At the end of file body transfer, client will send an empty package.
- When the server finishes loading data, it will send an OK package to client.
All the steps are the same with server mode except the NetwrokStream in the picture.
To reduce the disk usage of FE Node, we use produce-consumer block network stream to receive bytes from client and send bytes to BE node.
Scheduling
Maybe it will take about 2 months to finish this feature:
- Impl the syntax of LOAD DATA and make it load server local file into doris. (3 weeks)
- Support load client local file into doris.(2 weeks)
- Support IGNORE number {LINES | ROWS} in stream load (2 week)
- Support [REPLACE | IGNORE] in stream load (2 week)