This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata


The distributed version is the key next milestone after the single node version, and aims to be a high-performance, elastic query process engine.

It consists of three components:

  • Conductor consists of the parser, the optimizer, ForemanCatalog, and BlockLocator.
    • BlockLocator caches the location of each block, and is used for exchanging Quickstep blocks between StorageManagers.
  • Ensemble, which manages a cluster of Executors, and
    • Executor consists of StorageManager, Shiftboss, and a group of Workers. Shiftboss manages Workers through the TMB in-memory implementation.
  • DistributedCli.

It also uses Transactional Message Bus (TMB) as the communication layer between the above components, and relies on HDFS (via libhdfs3) as the persistent storage.

It supports multiple instances of DistributedCli connecting to Conductor, and the queries from different clients run concurrently. However, the queries from the same client one by one. Note that Quickstep does not have transaction support yet.

For a SELECT query, it works as following:

  1. DistributedCli sends a query to Conductor.
  2. Conductor optimizes the query, and generates a query execution plan. The execution plan consists of a number of operators, which generates a bag of WorkOrders, each of which corresponds to a Quickstep block.
  3. Foreman in Conductor manages the query execution, and reports the query completion to the main thread of Conductor.
    1. Foreman sends a serialized WorkOrder to an Executor (actually Shiftboss) based on some policy (e.g., the data locality, or the minimal loads).
    2. Shiftboss in Executor deserializes the WorkOrder, and dispatches it to a Worker.
    3. Worker in Executor executes the WorkOrder, including a possible data exchange from a StorageManager peer.
    4. Worker in Executor notifies Shiftboss regarding the completion of the WorkOrder, and Shiftboss in Executor forwards it to Foreman.
    5. Go to a), unless the query completes.
  4. The main thread of Conductor replies to DistributedCli regarding the query completion.
  5. DistributedCli reads the query result, and notifies Conductor regarding the finish of reading the query result.
  6. Conductor deletes the query result.

Currently, it has some issues:

  • Performance issues due to TMB. We observes 10x slower than the single-node version, even running all components on the same machine.
  • Scalability: We could only set up a 6-node cluster (one for Conductor and TMB master, one for DistributedCli, and the rest for Ensemble) on the CloudLab, while deploying a 10-node has repeated HDFS block transfer failures during the bulk loading of the benchmark dataset.


The e2e Performance Evaluation for the Network based TMB implementation

The goal of this task is to evaluate whether TMB is suitable for the distributed Quickstep as a high-performance, scalable communication infrastructure. If not, we may need to replace it with other license-compatible, messaging system middleware.

The root cause of the performance issue mentioned above is the receive API implementation in the network-based TMB, which uses the sleep system call for blocking. We need a better implementation, e.g., event-driven based.

One alternative is to avoid the client side polling, by refactoring the Receive RPC API the server-side blocking implementation, instead of client-side streaming.

10-node-cluster Setup in CloudLab

Set up a 10-node cluster for the distributed Quickstep in CloudLab, and run the SSB benchmark.

The e2e Performance Evaluation for the Data Exchange Operation

The data exchange operation (also called Shuffle in other context) exposes performance impacts in the distributed query execution. And such performance evaluation could help StorageManager, when loading a block, to determine whether to fetch a block from remote or load from the persistent storage.

Check the Semantics Equivalence between the single-node and distributed version

In the distributed version, some operator (e.g., delete, update) is not trivial to implement to ensure the data consistency due to block caches.

Partition Support in (Distributed) Quickstep

Support Dynamic Scale-in / out in Ensemble and Elasticity Evaluation

Study Data Replication / Morph Impacts in the Distributed Query Execution



  • No labels