Last updated: 2020-09-13

In MySQL, to ensure high availability and data safety, master-replica mode is adopted, with data synchronized through binlog.

In ClickHouse, we can use the ReplicatedMergeTree engine, with data synchronization completed through zookeeper.

This article starts with setting up a multi-replica cluster, then takes a peek at the underlying mechanism for a simple taste.

1. Cluster Setup

Set up a 2-replica test cluster. Due to limited resources, we’ll start clickhouse-server (2 replicas) + zookeeper (1) on the same physical machine. To avoid port conflicts, the two replica ports will be different.

1.1 ZooKeeper

1
docker run  -p 2181:2181 --name some-zookeeper --restart always -d zookeeper

1.2 Replica Cluster

replica-1 config.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<zookeeper>
<node index="1">
<host>172.17.0.2</host>
<port>2181</port>
</node>
</zookeeper>

<remote_servers>
<mycluster_1>
<shard_1>
<internal_replication>true</internal_replication>
<replica>
<host>s1</host>
<port>9000</port>
</replica>
<replica>
<host>s2</host>
<port>9001</port>
</replica>
</shard_1>
</mycluster_1>
</remote_servers>

<macros>
<cluster>mycluster_1</cluster>
<shard>1</shard>
<replica>s1</replica>
</macros>


<tcp_port>9101</tcp_port>
<interserver_http_port>9009</interserver_http_port>
<path>/cluster/d1/datas/</path>

replica-2 config.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<zookeeper>
<node index="1">
<host>172.17.0.2</host>
<port>2181</port>
</node>
</zookeeper>

<remote_servers>
<mycluster_1>
<shard_1>
<internal_replication>true</internal_replication>
<replica>
<host>s1</host>
<port>9000</port>
</replica>
<replica>
<host>s2</host>
<port>9001</port>
</replica>
</shard_1>
</mycluster_1>
</remote_servers>

<macros>
<cluster>mycluster_1</cluster>
<shard>1</shard>
<replica>s2</replica>
</macros>

<tcp_port>9102</tcp_port>
<interserver_http_port>9010</interserver_http_port>
<path>/cluster/d2/datas/</path>

1.3 Create Test Table

1
2
3
4
5
6
7
8
CREATE TABLE default.rtest1 ON CLUSTER 'mycluster_1'
(
`id` Int64,
`p` Int16
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated/test', '{replica}')
PARTITION BY p
ORDER BY id

1.4 Check ZooKeeper

1
2
3
4
5
docker exec -it some-zookeeper bash
./bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 17] ls /clickhouse/tables/replicated/test/replicas
[s1, s2]

Both replicas are registered in zookeeper.

2. Synchronization Principle

If a write is executed on replica-1:

1
replica-1> INSERT INTO rtest VALUES(33,33);

How is data synchronized to replica-2?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
s1.  replica-1> StorageReplicatedMergeTree::write --> ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
s2. replica-1> storage.writer.writeTempPart, write to local disk
s3. replica-1> ReplicatedMergeTreeBlockOutputStream::commitPart
s4. replica-1> StorageReplicatedMergeTree::getCommitPartOp, commit LogEntry to zookeeper, information includes:
ReplicatedMergeTreeLogEntry {
type: GET_PART,
source_replica: replica-1,
new_part_name: part->name,
new_part_type: part->getType
}
s5. replica-1> zkutil::makeCreateRequest(zookeeper_path + "/log/log-0000000022"), update log_pointer to zookeeper

s6. replica-2> StorageReplicatedMergeTree::queueUpdatingTask(), scheduled pull task
s7. replica-2> ReplicatedMergeTreeQueue::pullLogsToQueue, pull
s8. replica-2> zookeeper->get(replica_path + "/log_pointer"), get current replica's synchronized position from zookeeper
s9. replica-2> zookeeper->getChildrenWatch(zookeeper_path + "/log"), get all LogEntry information from zookeeper
s10. replica-2> Filter LogEntries needing synchronization from all LogEntries based on sync position log_pointer, write to queue
s11. replica-2> StorageReplicatedMergeTree::queueTask, consume queue tasks
s12. replica-2> StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry), execute consumption based on LogEntry type
s13. replica-2> StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
s14. replica-2> StorageReplicatedMergeTree::fetchPart, download part directory data from replica-1's interserver_http_port
s15. replica-2> MergeTreeData::renameTempPartAndReplace, write files locally and update in-memory meta information
s16. replica-2> Data synchronization complete

You can also enter the zookeeper docker container to directly view a LogEntry:

1
2
3
4
5
6
7
[zk: localhost:2181(CONNECTED) 85] get /clickhouse/tables/replicated/test/log/log-0000000022
format version: 4
create_time: 2020-09-13 16:39:05
source replica: s1
block_id: 33_2673203974107464807_7670041793554220344
get
33_2_2_0

3. Summary

This article uses writes as an example to analyze from the bottom layer how ClickHouse ReplicatedMergeTree works. The logic is not complex.

Data synchronization between different replicas requires zookeeper (currently community members are working on etcd integration pr#10376) for metadata coordination. It’s a publish/subscribe model. For specific data directories, you need to go to the corresponding replica and download through the interserver_http_port.

Replica synchronization is all file directory-based. This brings a benefit: we can easily implement ClickHouse storage-compute separation. Multiple clickhouse-servers can simultaneously mount the same data for computation, and each of these servers is writable. Tiger Brother has already implemented a working prototype. For details, see the next article .

4. References

[1] StorageReplicatedMergeTree.cpp
[2] ReplicatedMergeTreeBlockOutputStream.cpp
[3] ReplicatedMergeTreeLogEntry.cpp
[4] ReplicatedMergeTreeQueue.cpp