The previous MySQL Protocol and Read Call Stack introduced the call stack for a ClickHouse query statement. This article continues with the write call stack. Let’s go.
Write Request
Create table:
1 2 mysql> CREATE TABLE test(a UInt8, b UInt8, c UInt8) ENGINE=MergeTree() PARTITION BY (a, b) ORDER BY c; Query OK, 0 rows affected (0.03 sec)
Insert data:
1 INSERT INTO test VALUES(1,1,1), (2,2,2);
Call Stack Analysis 1. Get Storage Engine OutputStream 1 2 3 4 5 6 7 DB::StorageMergeTree::write(std::__1::shared_ptr<DB::IAST> const&, DB::Context const&) StorageMergeTree.cpp:174 DB::PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(std::__1::shared_ptr<DB::IStorage> const&, DB::Context const&, std::__1::shared_ptr<DB::IAST> const&, bool) PushingToViewsBlockOutputStream.cpp:110 DB::InterpreterInsertQuery::execute() InterpreterInsertQuery.cpp:229 DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery.cpp:364 DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:696 DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:311 DB::MySQLHandler::run() MySQLHandler.cpp:141
How is (1,1,1), (2,2,2) assembled into an inputstream structure?
1 2 3 4 5 DB::InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(std::__1::shared_ptr<DB::IAST> const&, DB::ReadBuffer*, DB::InterpreterInsertQuery::execute() InterpreterInsertQuery.cpp:300 DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) executeQuery.cpp:386 DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:313 DB::MySQLHandler::run() MySQLHandler.cpp:150
Then
1 2 res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr); res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out_streams.at(0));
Construct Block through NullAndDoCopyBlockInputStream’s copyData method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 DB::ValuesBlockInputFormat::readRow(std::__1::vector<COW<DB::IColumn>::mutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::mutable_ptr<DB::IColumn> > >&, unsigned long) ValuesBlockInputFormat.cpp:93 DB::ValuesBlockInputFormat::generate() ValuesBlockInputFormat.cpp:55 DB::ISource::work() ISource.cpp:48 DB::InputStreamFromInputFormat::readImpl() InputStreamFromInputFormat.h:48 DB::IBlockInputStream::read() IBlockInputStream.cpp:57 DB::InputStreamFromASTInsertQuery::readImpl() InputStreamFromASTInsertQuery.h:31 DB::IBlockInputStream::read() IBlockInputStream.cpp:57 void DB::copyDataImpl<DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::atomic<bool>*)::$_0&, void (&)(DB::Block const&)>(DB::IBlockInputStream&, DB::IBlockOutputStream&, DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::atomic<bool>*)::$_0&, void (&)(DB::Block const&)) copyData.cpp:26 DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::atomic<bool>*) copyData.cpp:62 DB::NullAndDoCopyBlockInputStream::readImpl() NullAndDoCopyBlockInputStream.h:47 DB::IBlockInputStream::read() IBlockInputStream.cpp:57 void DB::copyDataImpl<std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&>(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&) copyData.cpp:26 DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&) copyData.cpp:73 DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:785 DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:313 DB::MySQLHandler::run() MySQLHandler.cpp:150
3. Assemble OutputStream 1 2 3 4 5 DB::InterpreterInsertQuery::execute() InterpreterInsertQuery.cpp:107 DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery.cpp:364 DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:696 DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:311 DB::MySQLHandler::run() MySQLHandler.cpp:141
Assembly order:
NullAndDoCopyBlockInputStream
CountingBlockOutputStream
AddingDefaultBlockOutputStream
SquashingBlockOutputStream
PushingToViewsBlockOutputStream
MergeTreeBlockOutputStream
4. Write to OutputStream 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 DB::MergeTreeBlockOutputStream::write(DB::Block const&) MergeTreeBlockOutputStream.cpp:17 DB::PushingToViewsBlockOutputStream::write(DB::Block const&) PushingToViewsBlockOutputStream.cpp:145 DB::SquashingBlockOutputStream::finalize() SquashingBlockOutputStream.cpp:30 DB::SquashingBlockOutputStream::writeSuffix() SquashingBlockOutputStream.cpp:50 DB::AddingDefaultBlockOutputStream::writeSuffix() AddingDefaultBlockOutputStream.cpp:25 DB::CountingBlockOutputStream::writeSuffix() CountingBlockOutputStream.h:37 DB::copyDataImpl<DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::atomic<bool>*)::<lambda()>&, void (&)(const DB::Block&)>(DB::IBlockInputStream &, DB::IBlockOutputStream &, <lambda()> &, void (&)(const DB::Block &)) copyData.cpp:52 DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::atomic<bool>*) copyData.cpp:138 DB::NullAndDoCopyBlockInputStream::readImpl() NullAndDoCopyBlockInputStream.h:57 DB::IBlockInputStream::read() IBlockInputStream.cpp:60 void DB::copyDataImpl<std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&>(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&) copyData.cpp:29 DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&) copyData.cpp:154 DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:748 DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:311 DB::MySQLHandler::run() MySQLHandler.cpp:141
Through the copyData method, data flows through OutputStreams layer by layer until reaching MergeTreeBlockOutputStream.
5. Return to Client 1 2 3 4 5 6 7 8 9 DB::MySQLOutputFormat::finalize() MySQLOutputFormat.cpp:62 DB::IOutputFormat::doWriteSuffix() IOutputFormat.h:78 DB::OutputStreamToOutputFormat::writeSuffix() OutputStreamToOutputFormat.cpp:18 DB::MaterializingBlockOutputStream::writeSuffix() MaterializingBlockOutputStream.h:22 void DB::copyDataImpl<std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&>(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&) copyData.cpp:52 DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::__1::function<bool ()> const&, std::__1::function<void (DB::Block const&)> const&) copyData.cpp:154 DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function<void (std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)>) executeQuery.cpp:748 DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler.cpp:311 DB::MySQLHandler::run() MySQLHandler.cpp:141
Summary 1 INSERT INTO test VALUES(1,1,1), (2,2,2);
First, the kernel parses the SQL statement to generate AST, then gets the Interpreter based on AST: InterpreterInsertQuery. Next, the Interpreter adds corresponding OutputStreams in sequence. Then, read data from InputStream and write to OutputStream. The stream penetrates layer by layer until writing to the underlying storage engine. Finally, write to Socket Output and return results.
ClickHouse’s OutputStream orchestration is relatively complex, lacking Pipeline-style scheduling and orchestration. However, due to the relatively fixed pattern, it’s currently quite clear.