1 Motivation

The PipelineX execution engine is an experimental feature in Doris 2.1.0, expected to address the four major issues of the Doris pipeline engine:
1. In terms of execution concurrency, Doris is currently constrained by two factors: one is the parameters set by FE, and the other is limited by the number of buckets. This concurrent strategy prevents the execution engine from fully utilizing machine resources.
2. In terms of execution logic, Doris currently has some fixed additional overhead. For example, the common expression for all instances will be initialized multiple times due to independence between all instances.
3. In terms of scheduling logic, the scheduler of the current pipeline will put all blocking tasks into a blocking queue, and a blocking queue scheduler will be responsible for polling and extracting executable tasks from the blocking queue and placing them in the runnable queue. Therefore, during the query execution process, a CPU core will always be occupied to do scheduling instead of execution.
4. In terms of profile, currently the pipeline cannot provide users concise and clear metrics.

2 Goals

1. In terms of execution concurrency, pipelineX introduces local exchange optimization to fully utilize CPU resources, and distribute data evenly across different tasks to minimize data skewing. In addition, pipelineX will no longer be constrained by the number of tablets.
2. Logically, multiple pipeline tasks share all shared states of the same pipeline and eliminate additional initialization overhead, such as expressions and some const variables.
3. In terms of scheduling logic, the blocking conditions of all pipeline tasks are encapsulated using Dependency, and the execution logic of the tasks is triggered by external events (such as rpc completion) to enter the runnable queue, thereby eliminating the overhead of blocking polling threads.
4. Profile: Provide users with simple and easy to understand metrics.

3 Core Design

3.1 Abstraction

3.1.1 Inheritance Relationship

图1 operator继承关系

图2 LocalState继承关系

3.1.2 Combination

图3 核心类组合关系

3.2 Execution Design

3.2.1 Aggregate Query Execution


图4 多BE执行模型(聚合查询)


  1. 执行线程(thread 1, thread 2)执行各自的pipeline task,而pipeline task仅持有一些运行时状态(即local state)。全局信息则由多个task共享的同一个pipeline对象持有(即global state)

  2. 数据分发在单个be上由local shuffle节点完成,由local shuffle来保证多个pipeline task之间的数据均衡。单be上执行模型如下:

图5 单BE执行模型(聚合查询)

3.2.2 Join Query Execution

图6 单BE执行模型(join查询)

3.2.3 Goals

引入local shuffle主要是为了解决单机的并发能力,

  1. 可以减少部分情况下的数据倾斜 (详细设计见文末参考文档1。)

  2. 执行并发度不再受存储层tablet数量的制约 (详细设计见文末参考文档4。)

  3. 可以在运行时进行动态并发

3.3 Execution Process


图7 pipeline/pipelinex执行流程对比

prepare阶段,pipeline会并发启动多个线程去做不同instance的状态初始化,而由于pipelinex对共享的状态做了复用,也就是把pipeline执行流程中的第3步拆分成了pipelineX执行流程中的第3步和第5步,对比较重的global state只做一次,对更轻量的local state进行串行初始化。

3.3.1 Goals


  1. pipeline会启动多个线程同时对多个instance进行初始化的开销

  2. 全局const变量初始化在多个instance中重复初始化的开销

3.4 PipelineX Scheduler

pipeline调度中,就绪task保存在就绪队列中等待调度,阻塞task保存在阻塞队列中等待满足执行条件。而在Doris pipeline当前额外需要一个CPU core去轮询阻塞队列,如果task满足执行条件则保存在就绪队列中。而在pipelineX中,阻塞条件都使用dependency进行了封装,task的阻塞/就绪全部依赖事件通知来解决(详细设计见文末参考文档2。)。例如rpc数据到达将会触发ExchangeSourceOperator满足执行条件进入就绪队列。

图8 pipeline/pipelinex调度模型对比

3.4.1 Goals


3.5 PipelineX Profile

对于operator的profile,pipelineX做了整理,包括删除不合理的metrics并且添加了必要的metrics。除此之外,得益于调度模型改造,pipelineX中所有阻塞都被dependency封装,所以我们将所有dependency的就绪时间加入profile,通过wait for dependency时间我们可以直观看出每个地方的时间开销。
Scan operator:

OLAP_SCAN_OPERATOR (id=504):(Active: 17.606ms, % non-child: 0.00%)

    - WaitForDependencyTime: 0ns

        - WaitForData: 14.311ms

        - WaitForEos: 0ns

        - WaitForScannerDone: 0ns

    - WaitForPendingFinishDependency: 16.748ms

Exchange source operator:

EXCHANGE_OPERATOR (id=517):(Active: 575.846us, % non-child: 0.00%)

    - WaitForDependencyTime:

        - WaitForData: 56.122ms

    - WaitForPendingFinishDependency: 0ns


4 User Interface

Add 3 SessionVariable knobs.


Ignore storage data distribution or not. If turn on, execution concurrency will not be restricted by tablet num. Please refer to Further Reading 4 for more details.

A new http api:http://{host}:{web_server_port}/api/running_pipeline_tasks
Please refer to Further Reading 3 for more details.

Further Reading

  1. LocalExchanger Rules

  2. PipelineX Dependency Design
  3. pipelineX Debug Manual
  4. PipelineX Parallel Execution Design
  • No labels