Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>

Design Doc in PDF:For convenience of Chinese developer, I also uploaded the Chinese version design doc in PDF(使用中文的开发者也可以参考PDF版的中文版设计文档)


1. Motivation

The current execution engine of Doris is volcano's pull model, which has some problems in single multi-core scenarios as follows:

  • In most scenarios, performance tuning requires manual setting of parallelism, which is almost difficult to set in production environments.
  • Each instance of a standalone query corresponds to one thread of the thread pool, which brings two additional problems.

    • Once the thread pool is full, Doris's query engine will enter a false deadlock and fail to respond to subsequent queries. There is also a certain probability of entering a logical deadlock situation: for example, all threads are executing an instance's probe task.

    • Blocking arithmetic relies on the thread scheduling mechanism of the operating system, and the thread switching overhead is high (especially in the scenario of mixed system distribution)

  • CPU resource management is difficult, and it is difficult to achieve finer-grained resource management, and the mixed concurrency of multiple queries achieves reasonable resource scheduling.

    After a large number of instances are generated for a large query, the thread pool is filled up. Small queries hardly get the chance to be scheduled, resulting in higher latency for small queries under mixed load.

2.How To Do

2.1 Core issues that need to be addressed

Asynchronization of blocking operations

Blocking arithmetic poses two problems:

  • Thread switching : introduces additional context switching overhead


  • Thread occupation: blocking threads also crowd the thread resources of a single process

Blocking operations
  • Physical blocking: Includes disk I/O, network I/O, locks, and other operations that cause thread blocking


  • Logical blocking : Sort, HashJoin, HashAgg and other arithmetic that require full materialization


CPU resource management

In mixed-load scenarios, there is CPU contention, where large CPU-intensive queries crowd CPU resources for a long time, and small queries are difficult to be scheduled by CPU.

2.2 Pipeline's execution model

What is PipeLine execution model

image.png

From the above figure, it can be different from the traditional volcano model model, in Pipeline model, it is the data that drives the control flow execution. Specifically, the data between different operators are interacted by means of push data. Each operator in the query execution process is merged and split into different pipeline tasks, and the whole execution engine uses a fixed number of thread pools, and the logic of each pipeline being scheduled is data-driven.

As an example.

image.png

R join S join T, the execution plan is split into 3 pipelines, each pipeline can be scheduled by different threads. pipeline splitting point is the blocking logic of Join build, due to the special join arithmetic, need to wait for the Build data Ready in the Probe phase before execution. Whether or not each pipeline occupies thread resources for scheduling depends on whether or not the preceding data is Ready, so the core point of scheduling for each pipeline is data-driven, and pipelines that are not ready need to release thread resources actively.



To sum up, the core idea of PipeLine can be analogous to the time-sharing scheduling system of the operating system, and the core points are as follows.

  • CPU cores are limited, how to make multiple processes that far exceed the number of CPUs run operationally.
  • How the processes occupy the CPU time slice with each other and when they need to cede CPU resources to other processes.

Here CPU resources = thread resources of the execution engine, scheduling strategy between processes = scheduling strategy of Pipeline, priority of processes = resource management of the query, and by putting these logics together, we can better understand the essence of the PipeLine execution engine.

image.png

2.3 How Pipeline solves the problem

  • Pipeline splitting for blocking operations The blocking logic is split into different pipelines for scheduling, while the pipelines between the blocking logic are driven by data. This solves the problem of blocking arithmetic taking up thread resources on the pull model, and the extra overhead caused by thread switching. At the same time, concurrent computation can be achieved between different pipelines and between the same pipeline.


  • PipeLine resource management After pipeline scheduling, thread resources -> pipeline -> query. It is possible to schedule different queries fairly by the amount of resources occupied by different queries. It is reasonable to share thread resources for queries with mixed load, and pipelines that occupy thread resources excessively need to release thread resources actively.

3. How to make PipeLine modifications to Doris.

Core change points on Doris.

1 phase of the transformation plan.:

Framework of the overall execution engine:

image.png

Two new structural modules have been added here to the original QueryEngine.

  • Pipeline's scheduling module : responsible for the scheduling and execution of PipeLine Tasks

image.png

  1. Main class: BE.main calls the constructor of BlockedTaskScheduler


  2. Start Schedule Thread, end less for loop, poll Blocked Pipeline Queue, perform state initialization and query work


  3. Start multiple Work Threads, end less for loop polling its own task queue and other cores' task queues to find the Runnable Pipeline Task

    • Give up the time slice and put the Pipeline Task back into the task queue
    • Blocking, put Pipeline Task back to Blocked Pipeline Queue
    • Close the completed Pipeline Task, call close to release resources
  • Pipeline execution module: responsible for the translation of PipeLine Task and the original QueryPlan, PipeLine execution plan and execution logic refactoring



Pipeline execution module: responsible for the translation of PipeLine Task and the original QueryPlan, PipeLine execution plan and execution logic refactoring

image.png

  1. Pipeline disassembly of blocking operator. The blocking operator is designed as follows: - The original Fragment Instance is disassembled into multiple PipeLine, and the basic unit of the subsequent scheduling module is the PipeLineTask.

    • Join Build

    • Sort

    • Agg

    • Scan

    • Exchange

  2. ExecNode pushing transformation. (This part is to be determined, depending on the progress of implementation. The core point is the refactoring of the interface of the operator, from the original pull mode to the push mode.

image.png

This part of the transformation only designs the way of data interaction between Operators inside the Pipeline: the core advantage is to be able to implement the Pipeline Task to interrupt between any Operators and cede the thread time slice.

2-stage transformation plan (not involved in this transformation).
  • Parallelization between single pipelines

  • CPU isolation capability based on user level


Pipeline's scheduling module.

Detailed Design

Pipeline's scheduling module.:

image.png

As shown in the figure above, a fixed number of Work Threads, usually the number of CPU cores, are started on the Pipeline to execute different PipeLineTask. different colors represent different queries of Pipetask, each PipeTask can be scheduled on any execution thread arbitrarily, making full use of the parallelism capability of multi-core CPUs.

1. Task Queues
  • Blocking queue: Pipeline Task whose predecessor data, or execution conditions are not ready are placed in this queue. It is polled periodically by the polling thread to check whether it is in Ready state, and if it is Ready, it enters the scheduling queue for scheduling.
  • Scheduling Queue: Binding execution threads, Pipeline Task on this queue are placed by the polling queue, which represents the queue of PipelineTask that can be executed. This is a multi-level feedback queue, detailed design ideas refer to the later
2. Polling threads

A globally unique thread that polls for the readiness of the PipeLine Task between queries.

  • After polling for tasks in the blocking queue, Ready, the polling thread takes the task Blocking queue -> Scheduling queue
  • The task has the last scheduled thread id recorded in it and is given priority to the scheduling queue of that thread id
Code:
   while (!_shutdown.load()) {
      {
      auto iter = local_blocked_tasks.begin();
      DateTimeValue now = DateTimeValue::local_time();
      while (iter != local_blocked_tasks.end()) {
          auto* task = *iter;
          auto state = task->get_state();
          if (state == PENDING_FINISH || task->fragment_context()->is_canceled()) {
              // should cancel or should finish
              if (task->is_pending_finish()) {
                  iter++;
              } else {
                  _make_task_run(local_blocked_tasks, iter, ready_tasks);
              }
          } else if (task->query_fragments_context()->is_timeout(now)) {
              LOG(WARNING) << "Timeout, query_id="
                            << print_id(task->query_fragments_context()->query_id)
                            << ", instance_id="
                            << print_id(task->fragment_context()->get_fragment_id());

              task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);

              if (task->is_pending_finish()) {
                  iter++;
              } else {
                  _make_task_run(local_blocked_tasks, iter, ready_tasks);
              }
          } else if (state == BLOCKED) {
              if (!task->is_blocking()) {
                  task->set_state(RUNNABLE);
                  _make_task_run(local_blocked_tasks, iter, ready_tasks);
              } else {
                  iter++;
              }
          } else {
              // TODO: DCHECK the state
              _make_task_run(local_blocked_tasks, iter, ready_tasks);
          }
      }
3. Execution threads

The execution engine starts a fixed number of execution threads for query task execution, the number can be configured by the user, and the default is the number of CPU cores.

  • If there is no corresponding Task in the scheduling queue bound to the thread, then Work Steal is performed and the Task is pulled from the scheduling queue of other threads.
  • Record the execution time and the number of blocks processed by the PipeLine Task, and put it back to the scheduling queue after it exceeds a fixed time slice or a certain amount of data, so as not to starve other queries with large queries.

If a PipeLine task is blocked during execution, it will be put back into the blocking queue and delivered to the polling thread for subsequent processing. 

Code:
while (*marker) {
      // 执行队列里获取PipeLine Task
      auto task = queue->try_take(index);
      if (!task) {
          task = queue->steal_take(index); // work steal
          if (!task) {
              // TODO: The take is a stock method, rethink the logic
              task = queue->take(index);
              if (!task) {
                  continue;
              }
          }
      }
     
      // 实际执行对应的task,绑定执行的线程的核心
      auto status = task->execute(&eos); //
      task->set_previous_core_id(index);

      auto pipeline_state = task->get_state();
      switch (pipeline_state) {
      case BLOCKED:
          _blocked_task_scheduler->add_blocked_task(task); // 放置回阻塞队列
          break;
      case RUNNABLE:
          queue->push_back(task, index); // 放置回调度队列
          break;
      default:
          DCHECK(false);
          break;
      }
  }
  LOG(INFO) << "Stop TaskScheduler worker " << index;

Pipeline's Multi-Level Feedback Queue.

Multi-Level Feedback Queue: Multi-Level Feedback Queue applied to process scheduling for BSD UNIX systems, Windows NT and subsequent Windows system operating systems, proposed by Turing Award winner Corbato.

image.png

Scheduling rules for PipeLine based on multi-level feedback queue adaptation.

  • The Pipeline task of the newly added Query is placed in the highest priority queue, level 1
  • Each Query Id has a quota for acquiring a task in the corresponding priority queue, and after it is used up, the Query-related Pipeline Task is automatically sunk to the next level of priority queue.
  • Each priority queue can be allocated a fixed time slice of scheduling to avoid the problem of starvation of large queries (the original algorithm is regularly flushed to level 1, you can also consider this implementation)

    level 1 50% of cpu time slice
    level 2 25% of cpu time slice
    level 3 12% cpu time slice
    ..

When no task exists in the corresponding level queue, the execution thread automatically gets the PipeTask of the next level priority queue for execution

Code:
// Each thread have private muti level queue
class WorkTaskQueue {
public:
  explicit WorkTaskQueue() : _closed(false) {
      double factor = 1;
      for (int i = SUB_QUEUE_LEVEL - 1; i >= 0; --i) {
          _sub_queues[i].set_factor_for_normal(factor);
          factor *= LEVEL_QUEUE_TIME_FACTOR;
      }
  } // 初始化多级反馈队列
 
  // 提交任务
  void push(PipelineTask* task) {
      size_t level = _compute_level(task); // 计算任务应该处于的队列
      std::unique_lock<std::mutex> lock(_work_size_mutex);
      _sub_queues[level].push_back(task);
      _total_task_size++;
      _wait_task.notify_one();
  }
 
  // 获取任务
  PipelineTask* try_take_unprotected() {
      if (_total_task_size == 0 || _closed) {
          return nullptr;
      }
      double min_consume_time = _sub_queues[0].total_consume_time();
      int idx = 0;
      for (int i = 1; i < SUB_QUEUE_LEVEL; ++i) {
          if (!_sub_queues[i].empty()) {
              double consume_time = _sub_queues[i].total_consume_time(); // 每个反馈队列有个自己的耗时情况
              if (idx == -1 || consume_time < min_consume_time) {
                  idx = i;
                  min_consume_time = consume_time;
              }
          }
      }
      auto task = _sub_queues[idx].try_take();
      if (task) {
          _total_task_size--;
      }
      return task;
  }

State transition of PipelineTask.

PipeLine's execution state definition.

enum PipelineTaskState : uint8_t {
  NOT_READY = 0, // do not prepare
  BLOCKED = 1,   // have some dependencies not finished or some conditions not met
  BLOCKED_FOR_DEPENDENCY = 2,
  BLOCKED_FOR_SOURCE = 3,
  BLOCKED_FOR_SINK = 4,
  RUNNABLE = 5, // can execute
  PENDING_FINISH = 6, // compute task is over, but still hold resource. like some scan and sink task
  FINISHED = 7,
  CANCELED = 8
};

PipelineTaskState is the state of the dispatched PipeLineTask

  • NOT_READY: The PipeLine task has not called the prepare function and is not ready for execution.
  • BLOCKED: The PipeLine task is blocked, waiting for the polling thread to check.
  • The pre-pipeline has not finished running : BLOCKED_FOR_DEPENDENCY

           SourceOperator's data is not readable, data is not ready: BLOCKED_FOR_SOURCE

           SinkOperator's data is not writable: BLOCKED_FOR_SINK

  • RUNNABLE : PipeLine's task is executable, waiting for the execution thread to perform scheduling.
  • PENDING_FINISH: PipeLine's task is ready to finish, wait for other related Pipeline tasks to finish, then call close for resource recovery.
  • FINISHED: The PipeLine task is finished, waiting for Shared_ptr destruct to be released.
  • FINISHED: PipeLineTask that has been cancelled, waiting for Shared_ptr destructor release

PipeLine's state machine change rules

 
                |-----------------------------------------------------|
                |---|                 transfer 2   transfer 3       |   transfer 4
                    |-------> BLOCKED ------------|                   |---------------------------------------> CANCELED
            |------|                             |                   | transfer 5           transfer 6|
NOT_READY ---| transfer 0                         |-----> RUNNABLE ---|---------> PENDING_FINISH ------|
            |                                   |         ^       |                     transfer 7|
            |------------------------------------|         |--------|---------------------------------------> FINISHED
              transfer 1                                   transfer 9         transfer 8

* transfer 0 (NOT_READY -> BLOCKED): this pipeline task has some incomplete dependencies
* transfer 1 (NOT_READY -> RUNNABLE): this pipeline task has no incomplete dependencies
* transfer 2 (BLOCKED -> RUNNABLE): runnable condition for this pipeline task is met (e.g. get a new block from rpc)
* transfer 3 (RUNNABLE -> BLOCKED): runnable condition for this pipeline task is not met (e.g. sink operator send a block by RPC and wait for a response)
* transfer 4 (RUNNABLE -> CANCELED): current fragment is cancelled
* transfer 5 (RUNNABLE -> PENDING_FINISH): this pipeline task completed but wait for releasing resources hold by itself
* transfer 6 (PENDING_FINISH -> CANCELED): current fragment is cancelled
* transfer 7 (PENDING_FINISH -> FINISHED): this pipeline task completed and resources hold by itself have been released already
* transfer 8 (RUNNABLE -> FINISHED): this pipeline task completed and no resource need to be released
* transfer 9 (RUNNABLE -> RUNNABLE): this pipeline task yields CPU and re-enters the runnable queue if it is runnable and has occupied CPU for a max time slice


Pipeline modification of the execution operator

Implementation of Operator

Operator is the basic unit of Pipeline execution query, which can be understood as the role of ExecNode in the original execution framework.

This part is relatively simple after abstraction, and the core logic of Operator can follow the execution logic of the previous Exec Node. The author abstracted part of the interface on the ExecNode to achieve code reuse

  • Operator class

abstract class, different ExecNode abstraction for the corresponding Operator. such as UnionNode - "UnionOperator, the interface is basically the same as the ExecNode

class Operator {
public:
  explicit Operator(OperatorTemplate* operator_template);
  virtual ~Operator() = default;

  // After both sink and source need to know the cancel state.
  // do cancel work
  bool is_sink() const;

  bool is_source() const;

  // Should be call after ExecNode is constructed
  virtual Status init(ExecNode* exec_node, RuntimeState* state = nullptr);

  // Only result sink and data stream sink need to impl the virtual function
  virtual Status init(const TDataSink& tsink) { return Status::OK(); };

  // Do prepare some state of Operator
  virtual Status prepare(RuntimeState* state);

  // Like ExecNode when pipeline task first time be scheduled? can't block
  // the pipeline should be open after dependencies is finish
  // Eg a -> c, b-> c, after a, b pipeline finish, c pipeline should call open
  // Now the pipeline only have one task, so the there is no performance bottleneck for the mechanism??
  // but if one pipeline have multi task to parallel work, need to rethink the logic
  //
  // Each operator should call open_self() to prepare resource to do data compute.
  // if ExecNode split to sink and source operator, open_self() should be called in sink operator
  virtual Status open(RuntimeState* state);

  // Release the resource, should not block the thread
  //
  // Each operator should call close_self() to release resource
  // if ExecNode split to sink and source operator, close_self() should be called in source operator
  virtual Status close(RuntimeState* state);

  virtual bool can_read() { return false; } // for source

  virtual bool can_write() { return false; } // for sink

  // for pipeline
  virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
      std::stringstream error_msg;
      error_msg << " has not implements get_block";
      return Status::NotSupported(error_msg.str());
  }
  • Implementation of the OperatorBuilder class

    An abstract class: mainly implements the work of ExecNode -> Operator, and multiple Operators can reuse the logic and shared resources, in order to prepare for the subsequent Pipeline parallelism

class OperatorTemplate {
protected:
  const int32_t _id;
  const std::string _name;
  ExecNode* _related_exec_node;
  std::shared_ptr<RuntimeProfile> _runtime_profile;

  RuntimeState* _state = nullptr;
  bool _is_closed = false;
};
PStructure of pipelineTask.

The structure of PipeLine is simpler than the original execution tree of the volcano model; it is not a multinomial structure. The overall structure is a chain: a data processing chain.

  • The starting Operator of PipeLine is Source Operator: read-only.
  • The starting Operator of PipeLine is SInk Operator: write only

Several Operators in the middle of the chain are readable and writable, stream computing, and no blocking operation is involved

PipeLine Head dependency: for example, Join Probe needs to wait for the data to be prepared by the preceding Join Build

PipeLine Tail dependency: after the execution of Pipeline is completed, it has to wake up the PipeLine Task that is waiting downstream.

image.png

Prerequisites for PipeLine Task to be schedulable.

  • Head dependency is complete, or no
  • Source Operator is readable
  • Sink Operator is writable

After the above conditions are met, the PipeLine Task can enter the Runnable state. When any Operator generates EOS, or when the query is cancelled, the PipeLineTask ends and the processing of the Tail dependency is executed.

Pipeline disassembly of blocking operator.

The blocking operator involved in the disassembly of the original Fragment Instance architecture into multiple PipeLine, the basic unit of the subsequent scheduling module is the PipeLineTask.

  • Join Build

  • Sort

  • Agg

  • Scan

  • Exchange

    These operators need to be disassembled into the following structure.

    Two Operators:


  • is_sink: the data writing side. Writes blocks to the opertor, e.g. result sink, shuffle sink. can_write means that data can be written to it, e.g. for the exchange sink operator (created by the original VDataSteamWriter), when the block written by shuffle does not exceed the limit.


  • is_source: Data output side, such as olap scan and shuffle, etc. can_read means the sink is executable, e.g. for olap scan operator, can_read returns true when there is data, cancel, no scan task.



    For example, AggNode, here it needs to be disassembled into AggSink and AggSource to achieve the effect of blocking logic and PipeLine isolation. This part of the work follows the Operator structure mentioned above for implementation.

    image.png

    The intermediate states of Sink and Source are connected in series, through the original PreAggNode.

4. The case of POC

Principle: Small steps and local validation.

POC validation results based on SSB-FLAT.

Configuration : Single 48 core

Test method: 4 threads, concurrently performing SSB-FALT queries.

Change the instance of a single query and observe the change in the number of threads and execution time.

Fixed the number of instances of single query to 48, limited the number of threads, and observed the change of execution time.

image.png


5. Pros and Cons

Advantages

  •  Full utilization of multi-core computing power :Concurrent computation between different pipelines can be implemented to reuse multi-core computation. Concurrency between the same pipeline is developed as a feature of the second version of PipeLine. (The advantage of Poc environment is huge, and the performance impact in production environment is small)
  • solve the deadlock problem of thread pool and the overhead of thread switching
  • solve the resource overhead caused by thread switching
  • CPU resource management

To solve the problem of fair scheduling of large and small queries, the problem of resource isolation between multiple users can be solved using the Pipeline framework. A more reasonable way is to store and calculate the separation of physical resources in isolation.

Disadvantages

Engineering complexity of the computational framework

Compared with the original volcano model, the Pipeline model makes the entire control flow and data flow of the execution engine to be disassembled. This will bring about an increase in engineering complexity and problem diagnosis complexity, while the blocking between Pipelines is disassembled into logical dependencies, where careless design will also bring about probable logical deadlock problems, and the performance and deadlock problems derived from such problems are difficult to locate.

6.References

[1] Leis, Viktor and Boncz, Peter and Kemper, Alfons and Neumann, Thomas, Morsel-driven parallelism: A NUMA-aware query evaluation framework for the many-core age, SIGMOD 2014

[2] Shaikhha, Amir and Dashti, Mohammad and Koch, Christoph, Push versus pull-based loop fusion in query engines, Journal of Functional Programming, Cambridge University Press, 2018

[3] Push-Based Execution in DuckDB, by Mark Raasveldt: slides

  • No labels