Last updated: 2020-08-15

This article discusses ClickHouse’s core technology: Processor and DAG Scheduler.

These concepts weren’t pioneered by ClickHouse. Interested folks can check out materialize‘s timely-dataflow. I also wrote a golang prototype.

It’s all about implementation details. It’s precisely these modules’ excellent design that gives ClickHouse its overall high performance.

Pipeline Problem

In traditional database systems, a Query processing flow is roughly:

In the Plan phase, a Pipeline assembly is often added (one transformer represents one data processing):

All transformers are orchestrated into a pipeline, then handed to the executor for serial execution. Each transformer execution processes and outputs the dataset until it reaches the downstream sinker.
You can see this model’s advantage is simplicity, and its disadvantage is low performance. It can’t leverage CPU parallelism — commonly called the volcano-style model. For OLTP low latency, it’s sufficient. For compute-intensive OLAP, it’s far from enough. CPU not at 100% is a crime!

For the above example, if transformer1 and transformer2 have no intersection, they can process in parallel:

This involves some soul-searching questions:

  1. How to achieve flexible orchestration of transformers?
  2. How to achieve data synchronization between transformers?
  3. How to achieve parallel scheduling between transformers?

Processor and DAG Scheduler

1. Transformer Orchestration

ClickHouse implements a series of basic transformer modules, see src/Processors/Transforms, for example:

  • FilterTransform – WHERE condition filtering
  • SortingTransform – ORDER BY sorting
  • LimitByTransform – LIMIT trimming

When we execute:

1
SELECT * FROM t1 WHERE id=1 ORDER BY time DESC LIMIT 10

For ClickHouse’s QueryPipeline, it orchestrates and assembles like this:

1
2
3
4
5
QueryPipeline::addSimpleTransform(Source)
QueryPipeline::addSimpleTransform(FilterTransform)
QueryPipeline::addSimpleTransform(SortingTransform)
QueryPipeline::addSimpleTransform(LimitByTransform)
QueryPipeline::addSimpleTransform(Sinker)

This achieves transformer orchestration. But how does data synchronize during execution?

2. Transformer Data Synchronization

When QueryPipeline orchestrates transformers, we also need to build lower-level DAG connectivity.

1
2
3
4
connect(Source.OutPort, FilterTransform.InPort)
connect(FilterTransform.OutPort, SortingTransform.InPort)
connect(SortingTransform.OutPort, LimitByTransform.InPort)
connect(LimitByTransform.OutPort, Sinker.InPort)

This establishes data flow relationships. One transformer’s OutPort connects to another’s InPort, just like real-world plumbing pipes. Interfaces can have 3-way or even multi-way connections.

3. Transformer Execution Scheduling

Now the pipeline is assembled. But how does the water inside the pipes get processed and pressurized to flow?

ClickHouse defines a set of transform states. The processor schedules based on these states.

1
2
3
4
5
6
7
8
9
10
enum class Status
{
NeedData // Waiting for data flow to enter
PortFull, // Pipe output end blocked
Finished, // Finished state, exit
Ready, // Switch to work function for logic processing
Async, // Switch to schedule function for async processing
Wait, // Wait for async processing
ExpandPipeline, // Pipeline needs to split
};

When source generates data, its status is set to PortFull, meaning it’s waiting to flow into other transformers’ InPort. The processor starts scheduling FilterTransformer’s (NeedData) Prepare to PullData, then its status is set to Ready, waiting for the processor to schedule the Work method to perform data Filter processing. Everyone relies on states to let the processor sense, schedule, and do state transitions until the Finished state.

Worth mentioning is the ExpandPipeline state. Based on transformer implementation, it can split one transformer into more transformers executing in parallel, achieving an explosion effect.

Example

1
SELECT number + 1 FROM t1;

For a deeper understanding of ClickHouse’s processor and scheduler mechanism, let’s have a native example:

  1. A Source:{0,1,2,3,4}
  2. AdderTransformer performs add-1 operation on each number
  3. A Sinker, outputs results

1. Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MySource : public ISource
{
public:
String getName() const override { return "MySource"; }

MySource(UInt64 end_)
: ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_)
{
}

private:
UInt64 end;
bool done = false;

Chunk generate() override
{
if (done)
{
return Chunk();
}
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
for (auto i = 0U; i < end; i++)
columns[0]->insert(i);

done = true;
return Chunk(std::move(columns), end);
}
};

2. MyAddTransform

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class MyAddTransformer : public IProcessor
{
public:
String getName() const override { return "MyAddTransformer"; }

MyAddTransformer()
: IProcessor(
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})},
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})})
, input(inputs.front())
, output(outputs.front())
{
}

Status prepare() override
{
if (output.isFinished())
{
input.close();
return Status::Finished;
}

if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}

if (has_process_data)
{
output.push(std::move(current_chunk));
has_process_data = false;
}

if (input.isFinished())
{
output.finish();
return Status::Finished;
}

if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(false);
return Status::Ready;
}

void work() override
{
auto num_rows = current_chunk.getNumRows();
auto result_columns = current_chunk.cloneEmptyColumns();
auto columns = current_chunk.detachColumns();
for (auto i = 0U; i < num_rows; i++)
{
auto val = columns[0]->getUInt(i);
result_columns[0]->insert(val+1);
}
current_chunk.setColumns(std::move(result_columns), num_rows);
has_process_data = true;
}

InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }

protected:
bool has_input = false;
bool has_process_data = false;
Chunk current_chunk;
InputPort & input;
OutputPort & output;
};

3. MySink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MySink : public ISink
{
public:
String getName() const override { return "MySinker"; }

MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }

private:
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;

void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();

for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString("prefix-", out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort()
.getHeader()
.getByPosition(column_num)
.type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}

out.next();
}
};

4. DAG Scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main(int, char **)
{
auto source0 = std::make_shared<MySource>(5);
auto add0 = std::make_shared<MyAddTransformer>();
auto sinker0 = std::make_shared<MySink>();

/// Connect.
connect(source0->getPort(), add0->getInputPort());
connect(add0->getOutputPort(), sinker0->getPort());

std::vector<ProcessorPtr> processors = {source0, add0, sinker0};
PipelineExecutor executor(processors);
executor.execute(1);
}

Summary

From a developer’s perspective, it’s still quite complex. State transitions need developer control. However, upstream has done a lot of foundational work, such as the ISource wrapper for sources, ISink wrapper for sinks, and a basic ISimpleTransform, making it easier for developers to use processors at higher levels. They can build the pipeline they want like building blocks.

ClickHouse’s transformer data unit is Chunk. Transformers process Chunks flowing from upstream OutPort, then output to downstream InPort. Graph-connected pipeline parallel work keeps the CPU as fully loaded as possible.

When a SQL is parsed into AST, ClickHouse builds a Query Plan based on the AST, then builds a pipeline based on QueryPlan, and finally the processor is responsible for scheduling and execution.
Currently, ClickHouse’s new version has QueryPipeline enabled by default, and this code is continuously iterating.