Status

Current stateReleased

Discussion thread: 

JIRA or Github Issue: 

Released: 1.2

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

Motivation

Currently, there are various ScanNodes in Doris, corresponding to different data sources, such as OlapScanNode, EsScanNode, OdbcScanNode, and the recently added FileScanNode.

A data scanning task usually consists of One ScanNode + Several Scanners.

Scanner is responsible for the actual data source access and scanning, and multiple Scanners can be executed concurrently to improve scanning performance.

ScanNode is responsible for the creation and scheduling of the Scanner.

We can see that the difference between scanning tasks among different data sources is mainly reflected in the way How Scanner Accesses Data Source, while the upper-level ScanNode has basically the same generation and scheduling logic for Scanner.


Here I list the general logic of these scanning tasks:

1. Scanner generation and scheduling

ScanNode generates several Scanners according to the data shards and the parallelism.

Multiple Scanners are executed in parallel, the scan results are stored in the blocks queue, and the ScanNode, as a consumer, takes data from the queue and returns it to the upper node.

This is a typical multi-producer-single-consumer model.

2. Runtime Filter

Runtime Filter is planned by FE and used by ScanNode.

3. Predicate pushdown

FE will push down some of the predicates to the ScanNode, and further, each data source can continue to push down some of the predicates to the data source for processing according to its own capabilities.


Therefore, the purpose of this refactor is to:

Unify the general logic of scanning tasks, so that when we add new data sources, we no longer need to write the same processing logic repeatedly, and we can ensure that all new data sources can benifit from the existing logic.

Related Research

None

Detailed Design

The new Scan framework mainly has the following classes:

1. VScanNode

The base class of all ScanNodes, mainly responsible for the following functions:

  1. As a standard ExecNode, implement standard interfaces such as init()/prepare()/open()/get_next()/close().
  2. Register, receive and process RuntimeFilter

In addition, ScanNode has the following interfaces, and subclasses of each data source can be overloaded according to the situation to achieve corresponding functions:

  • _init_profile()

Each ScanNode may need to add specific profiles and metrics

  • _process_conjuncts()

Each ScanNode may have special handling of some predicate conditions.

  • _init_scanners()

Each ScanNode implements the generation logic of its respective Scanners.

  • _should_push_down_binary_predicate()/_should_push_down_in_predicate()/_should_push_down_bloom_filter()/_should_push_down_is_null_predicate()

At present, we support 4 types of predicate pushdown, and each data source can choose whether to process the corresponding type of predicate according to its own situation. There are three types of return values:

    • ACCEPTABLE: Fully accepted. The predicate will be removed from the ScanNode and passed to the Scanner for processing.
    • UNACCEPTABLE: Unacceptable. Predicates will remain in ScanNode and will not be pushed down to Scanner
    • PARTIAL_ACCEPTABLE: Partial processing is possible. Such predicates are kept in the ScanNode, but are also pushed down to the Scanner, but the Scanner cannot filter based on that predicate entirely.

2. VScanner

The base class of all Scanners, responsible for fetching data from the data source. Its main interface is get_block()

In this refactoring, we will integrate the Scanner logic used by the load job and the Scanner logic used by the query. So the logic of get_block() is divided into the following 4 steps:

1. _get_block_impl

The reading method of the data source. Usually, the subclass of VScanner of each data source only needs to implement this method.

2. _filter_input_block

In a load job, it may be necessary to pre-filter the raw data through the preceding filters.

3. _convert_to_output_block

In a load job, it is necessary to convert the raw data into the column type of the target table.

4. _filter_output_block

The final result is filtered according to the predicate condition.


It can be seen that for the load job, the above 4 steps need to be performed, and for the query job, only step 1 and step 4 need to be performed.

3. ScannerContext

This is the scheduling context for all Scanners generated by a ScanNode.

After a ScanNode generates all Scanners, it generates a ScannerContext that contains all the information needed to schedule these Scanners.

After that, ScannerContext will be submitted to ScannerSchduler for scheduling.

ScannerContext also contains a blocks_queue, the producer-consumer queue between ScanNode and Scanner. The block fetched by Scanner will be added to blocks_queue, and the get_next() method of ScanNode will fetch the block from blocks_queue.

4. ScannerScheduler

Scanner's scheduler. This is a global class responsible for scheduling all Scanners of a BE node.

Through this global scheduler, we can plan the execution of all Scanners of the entire BE node.


The following diagram shows the relationship between these classes:


Scheduling

The target is to unify all ScanNodes to the new ScanNode framework, and the schedule is as follows:

  • OlapScanNode: Completed
  • FileScanNode: (end of September)
    FileScanNode will support both the previous FileScanNode's query capabilities for external table (Hive, Iceberg, hudi, etc.) and the BrokerScanNode's load capabilities. After that, the BrokerScanNode will be removed.
  • EsScanNode (October)
  • JdbcScanNode (October)
  • SchemaScanNode (October)


Most of the code is in the be/src/vec/exec/scan/ directory.

New Parquet Reader

This is another work, but related to the refactoring of the scan node.

Doris has currently implemented the multi catalog feature, and supports the reading of external data sources such as Hive, Iceberg, Hudi, etc. These data sources are read through FileScanNode.

The current FileScanNode, when reading the parquet format, uses the parquet-cpp implementation of from Apache Arrow.

But this function has the following problems:

1. To read data, it needs to be converted to arrow format first, and then converted to Doris format, with an extra memory copy and type conversion.
2. parquet-cpp does not support new Parquet features such as Page Index and BloomFilter.
3. In-depth optimization is not possible, such as predicate filtering using dictionary values for dictionary encoding types.

For the above reasons, we reimplemented a Parquet Reader to replace parquet-cpp.

The new Parquet Reader can support richer predicate pushdown, filter more pages in one pass, and reduce IO.

This work will be done at the end of Sept.










  • No labels