What makes a contemporary database system? The three key modules are question optimizer, execution engine, and storage engine. Amongst them, the function of execution engine to the DBMS is just like the chef to a restaurant. This text focuses on the execution engine of the Apache Doris information warehouse, explaining the key to its excessive efficiency.
As an instance the function of the execution engine, let’s comply with the execution means of an SQL assertion:
- Upon receiving an SQL question, the question optimizer performs syntax/lexical evaluation and generates the optimum execution plan primarily based on the associated fee mannequin and optimization guidelines.
- The execution engine then schedules the plan to the nodes, which function on information within the underlying storage engine after which return the question outcomes.
The execution engine performs operations like information studying, filtering, sorting, and aggregation. The effectivity of those steps determines question efficiency and useful resource utilization. That is why totally different execution fashions convey distinction in question effectivity.
Volcano Mannequin
The Volcano Mannequin (initially referred to as the Iterator Mannequin) predominates in analytical databases, adopted by the Materialization Mannequin and Vectorized Mannequin. In a Volcano Mannequin, every operation is abstracted as an operator, so your entire SQL question is an operator tree. Throughout question execution, the tree is traversed top-down by calling the subsequent()
interface, and information is pulled and processed from the underside up. That is referred to as a pull-based execution mannequin.
The Volcano Mannequin is versatile, scalable, and simple to implement and optimize. It underpins Apache Doris earlier than model 2.1.0. When a person initiates an SQL question, Doris parses the question, generates a distributed execution plan, and dispatches duties to the nodes for execution. Every particular person process is an occasion. Take a easy question for instance:
choose age, intercourse from workers the place age > 30
In an occasion, information flows between operators are propelled by the subsequent()
methodology. If the subsequent()
methodology of an operator known as, it can first name the subsequent()
of its little one operator, acquire information from it, after which course of the info to provide output.
subsequent()
is a synchronous methodology. In different phrases, the present operator shall be blocked if its little one operator doesn’t present information for it. On this case, the subsequent()
methodology of the foundation operator must be referred to as in a loop till all information is processed, which is when the occasion finishes its computation.
Such execution mechanism faces a number of bottlenecks in single-node, multi-core use circumstances:
- Thread blocking: In a fixed-size thread pool if an occasion occupies a thread and it’s blocked, that can simply trigger a impasse when there are numerous situations requesting execution concurrently. That is particularly the case when the present occasion relies on different situations. Moreover, if a node is operating extra situations than the variety of CPU cores it has, the system scheduling mechanism shall be closely relied upon and an enormous context-switching overhead may be produced. In a colocation state of affairs, this can result in a fair bigger thread-switching overhead.
- CPU rivalry: The threads would possibly compete for CPU assets so queries of various sizes and between totally different tenants would possibly intervene with one another.
- Underutilization of the multi-core computing capabilities: Execution concurrency depends closely on information distribution. Particularly, the variety of situations operating on a node is restricted by the variety of information buckets on that node. On this case, it is necessary to set an applicable variety of buckets. For those who shard the info into too many buckets, that can turn out to be a burden for the system and convey pointless overheads; if the buckets are too few, you will be unable to make the most of your CPU cores to the fullest. Nevertheless, in a manufacturing atmosphere, it’s not all the time straightforward to estimate the right variety of buckets you want, thus efficiency loss.
Pipeline Execution Engine
Based mostly on the identified problems with the Volcano Mannequin, we have changed it with the Pipeline Execution Engine since Apache Doris 2.0.0.
Because the title suggests, the Pipeline Execution Engine breaks down the execution plan into pipeline duties and schedules these pipeline duties right into a thread pool in a time-sharing method. If a pipeline process is blocked, it will likely be placed on maintain to launch the thread it’s occupying. In the meantime, it helps numerous scheduling methods, that means which you could allocate CPU assets to totally different queries and tenants extra flexibly.
Moreover, the Pipeline Execution Engine swimming pools collectively information inside information buckets, so the variety of operating situations is now not capped by the variety of buckets. This not solely enhances Apache Doris’ utilization of multi-core programs but additionally improves system efficiency and stability by avoiding frequent thread creation and deletion.
Instance
That is the execution plan of a be a part of question. It consists of two situations:
As illustrated, the Probe operation can solely be executed after the hash desk is constructed, whereas the Construct operation is reliant on the computation outcomes of the Change operator. Every of the 2 situations is split into two pipeline duties as such. Then these duties shall be scheduled within the “ready” queue of the thread pool. Following the required methods, the threads acquire the duties to course of. In a pipeline process, after one information block is completed, if the related information is prepared and its runtime stays throughout the most allowed length, the thread will proceed to compute the subsequent information block.
Design and Implementation
Keep away from Thread Blocking
As talked about earlier, the Volcano Mannequin is confronted with a number of bottlenecks:
- If too many threads are blocked, the thread pool shall be saturated and unable to reply to subsequent queries.
- Thread scheduling is fully managed by the working system, with none user-level management or customization.
How does the Pipeline Execution Engine keep away from such points?
- We repair the scale of the thread pool to match the CPU core rely. Then we break up all operators which can be vulnerable to blocking into pipeline duties. For instance, we use particular person threads for disk I/O operations and RPC operations.
- We design a user-space polling scheduler. It constantly checks the state of all executable pipeline duties and assigns executable duties to threads. With this in place, the working system would not should regularly swap threads, thus much less overheads. It additionally permits custom-made scheduling methods, resembling assigning priorities to duties.
Parallelization
Earlier than model 2.0, Apache Doris requires customers to set a concurrency parameter for the execution engine (parallel_fragment_exec_instance_num
), which doesn’t dynamically change primarily based on the workloads. Due to this fact, it’s a burden for customers to determine an applicable concurrency degree that results in optimum efficiency.
What is the business’s answer to this?
Presto’s thought is to shuffle the info into an inexpensive variety of partitions throughout execution, which requires minimal concurrency management from customers. Then again, DuckDB introduces an additional synchronization mechanism as a substitute of shuffling. We resolve to comply with Presto’s monitor of Presto as a result of the DuckDB answer inevitably entails the usage of locks, which works in opposition to our function of avoiding blocking.
In contrast to Presto, Apache Doris would not want an additional Native Change mechanism to shards the info into an applicable variety of partitions. With its massively parallel processing (MPP) structure, Doris already does so throughout shuffling. (In Presto’s case, it re-partitions the info by way of Native Change for greater execution concurrency. For instance, in hash aggregation, Doris additional shards the info primarily based on the aggregation key as a way to totally make the most of the CPU cores. Additionally, this could downsize the hash desk that every execution thread has to construct.)
Based mostly on the MPP structure, we solely want two enhancements earlier than we obtain what we wish in Doris:
- Improve the concurrency degree throughout shuffling. For this, we solely must have the frontend (FE) understand the backend (BE) atmosphere after which set an inexpensive variety of partitions.
- Implement concurrent execution after information studying by the scan layer. To do that, we want a logical restructuring of the scan layer to decouple the threads from the variety of information tablets. This can be a pooling course of. We pool the info learn by scanner threads, so it may be fetched by a number of pipeline duties straight.
PipelineX
Launched in Apache Doris 2.0.0, the pipeline execution engine has been enhancing question efficiency and stability underneath hybrid workload eventualities (queries of various sizes and from totally different tenants). In model 2.1.0, we have tackled the identified points and upgraded this from an experimental characteristic to a sturdy and dependable answer, which is what we name PipelineX.
PipelineX has offered solutions to the next points that used to problem the Pipeline Execution Engine:
- Restricted execution concurrency
- Excessive execution overhead
- Excessive scheduling overhead
- Poor readability of operator profile
Execution Concurrency
The Pipeline Execution Engine stays underneath the restriction of the static concurrency parameter at FE and the pill rely on the storage layer, making itself unable to capitalize on the complete computing assets. Plus, it’s simply affected by information skew.
For instance, suppose that Desk A comprises 100 million rows nevertheless it has just one pill, which suggests it’s not sharded sufficient, let’s examine what can occur once you carry out an aggregation question on it:
SELECT COUNT(*) FROM A GROUP BY A.COL_1;
Throughout question execution, the question plan is split into two fragments. Every fragment, consisting of a number of operators, is dispatched by frontend (FE) to backend (BE). The BE begins threads to execute the fragments concurrently.
Now, let’s deal with Fragment 0 for additional elaboration. As a result of there is just one pill, Fragment 0 can solely be executed by one thread. Meaning aggregation of 100 million rows by one single thread. If in case you have 16 CPU cores, ideally, the system can allocate 8 threads to execute Fragment 0. On this case, there’s a concurrency disparity of 8 to 1. That is how the variety of tablets restricts execution concurrency and likewise why we introduce the thought of Native Shuffle mechanism to take away that restriction in Apache Doris 2.1.0. So that is the way it works in PipelineX:
- The threads execute their very own pipeline duties, however the pipeline duties solely keep their runtime state (referred to as Native State), whereas the knowledge that shared throughout all pipeline duties (referred to as World State) is managed by one pipeline object.
- On a single BE, the Native Shuffle mechanism is answerable for information distribution and information balancing throughout pipeline duties.
Other than decoupling execution concurrency from pill rely, Native Shuffle can keep away from efficiency loss on account of information skew. Once more, we’ll clarify with the foregoing instance.
This time, we shard Desk A into two tablets as a substitute of 1, however the information shouldn’t be evenly distributed. Pill 1 and Pill 3 maintain 10 million and 90 million rows, respectively. The Pipeline Execution Engine and PipelineX Execution Engine reply otherwise to such information skew:
- Pipeline Execution Engine: Thread 1 and Thread 2 executes Fragment 1 concurrently. The latter takes 9 instances so long as Thread 1 due to the totally different information sizes they cope with.
- PipelineX Execution Engine: With Native Shuffle, information is distributed evenly to the 2 threads, so that they take nearly equal time to complete.
Execution Overhead
Underneath the Pipeline Execution Engine, as a result of the expressions of various situations are particular person, every occasion is initialized individually. Nevertheless, because the initialization parameters of situations share so much in frequent, we will reuse the shared states to scale back execution overheads. That is what PipelineX does: it initializes the World State at a time, and the Native State sequentially.
Scheduling Overhead
Within the Pipeline Execution Engine, the blocked duties are put right into a blocked queue, the place a devoted thread takes polls and strikes the executable duties over to the runnable queue. This devoted scheduling thread consumes a CPU core and incurs overheads that may be notably noticeable on programs with restricted computing assets.
As a greater answer, PipelineX encapsulates the blocking situations as dependencies, and the duty standing (blocked or runnable) shall be triggered to alter by occasion notifications. Particularly, when RPC information arrives, the related process shall be thought of as prepared by the ExchangeSourceOperator after which moved to the runnable queue.
Meaning PipelineX implements event-driven scheduling. A question execution plan may be depicted as a DAG, the place the pipeline duties are abstracted as nodes and the dependencies as edges. Whether or not a pipeline process will get executed will depend on whether or not all its related dependencies have happy the requisite situations.
For simplicity of illustration, the above DAG solely exhibits the dependencies between the upstream and downstream pipeline duties. In reality, all blocking situations are abstracted as dependencies. The whole execution workflow of a pipeline process is as follows:
In event-driven execution, a pipeline process will solely be executed in any case its dependencies fulfill the situations; in any other case, it will likely be added to the blocked queue. When an exterior occasion arrives, all blocked duties shall be re-evaluated to see in the event that they’re runnable.
The event-driven design of PipelineX eliminates the necessity for a polling thread and thus the consequential efficiency loss underneath excessive cluster hundreds. Furthermore, the encapsulation of dependencies permits a extra versatile scheduling framework, making it simpler to spill information to disks.
Operator Profile
PipelineX has reorganized the metrics within the operator profiles, including new ones and obsoleting irrelevant ones. Moreover, with the dependencies encapsulated, we monitor how lengthy the dependencies take to prepare by the metric WaitForDependency
, so the profile can present a transparent image of the time spent in every step. These are two examples:
OLAP_SCAN_OPERATOR (id=4. desk title = Z03_DI_MID):
- ExecTime: 457.750ms
- WaitForDependency[OLAP_SCAN_OPERATOR_DEPENDENCY]Time: 436.883ms
- Change Supply Operator: The execution time of
EXCHANGE_OPERATOR
is 86.691us. The time spent ready for information from upstream is 409.256us.
EXCHANGE_OPERATOR (id=3):
- ExecTime: 86.691us
- WaitForDependencyTime: 0ns
- WaitForData0: 409.256us
What’s Subsequent?
From the Volcano Mannequin to the Pipeline Execution Engine, Apache Doris 2.0.0 has overcome the deadlocks underneath excessive cluster load and vastly elevated CPU utilization. Now, from the Pipeline Execution Engine to PipelineX, Apache Doris 2.1.0 is extra production-friendly because it has ironed out the kinks in concurrency, overheads, and operator profile.
What’s subsequent in our roadmap is to help spilling information to disk in PipelineX to additional enhance question velocity and system reliability. We additionally plan to advance additional when it comes to automation, resembling self-adaptive concurrency and auto-execution plan optimization, accompanied by NUMA applied sciences to reap higher efficiency from {hardware} assets.
If you wish to discuss to the superb Doris builders who lead these adjustments, you’re greater than welcome to affix the Apache Doris neighborhood.