Jun 01, 2021 Article blog
In peacetime work database is often used by us, in the microservices split architecture, each service has its own database, so often encounter data communication between services. For example, data from the B service database comes from the database of service A;
In code logic, when there is a related A service data write, the B service interface is called as an interface, and the B service then writes the data to the new database. T his approach may seem simple, but there are a lot of pits. A dding a lot of this call interface synchronization code to the A service code logic increases the complexity of the project code and becomes more difficult to maintain later. A lso, the way the interface is called is not a stable way, there is no retry mechanism, there is no synchronization location record, interface call failed how to deal with, suddenly a large number of interface calls will produce problems, etc. , these must be considered and handled in the business. T here's going to be a lot of work here. When you think about it, you exclude it.
(Recommended course: SQL tutorial.) )
Synchronize through the
binlog
of the database.
T
his solution, which is independent of the A service, is not code-coupled with the A service. D
ata can be transferred directly from a
TCP
connection, better than interface calls.
It's a proven production solution, and there are a lot of
binlog
synchronization middleware tools, so we're focused on which tools can better build stable, performance-satisfying, and easy-to-deploy scenarios.
After research, we chose
canal.
canal
is
MySQL binlog
incremental subscription and consumer component, and already has examples of production practices, as well as convenient support and other commonly used middleware component combinations, such as
kafka
elasticsearch
etc., as well as a
client
library in
canal-go
go
language to meet our needs on
go
and other specifics can be found on
canal
github
home page.
OK, get started! N
ow you want to synchronize the data changes from the A database to the B database. A
ccording to
wiki
a
canal-server
service was quickly run with
docker
writing canal-client code logic directly with
canal-go
canal-client
Use
canal-go
directly to connect
canal-server
canal-server
and
canal-client
to communicate with
Socket
the transport protocol is
TCP
and the interaction protocol uses
Google Protocol Buffer 3.0
Canal
connects to the A database and
slave
canal-client
connects with
Canal
and subscribes to the corresponding database table
binlog
Canal
sends
dump
requests to the database, gets
binlog
and parses, and sends parsed data to
canal-client
canal-client
the data and syncs it to the new database
Protocol Buffer
is fast to serialize.
The data obtained after deserialization is the data for each row, put into an array according to the structure of the field name and the value of the field Simple example of code:
func Handler(entry protocol. E ntry) { var keys []string rowChange := &protocol. R owChange{} proto. U nmarshal(entry. G etStoreValue(), rowChange) if rowChange! . . . . . E ventType_DELETE || e ventType == protocol. E ventType_UPDATE . . . . E ventType_INSERT . . . . . } }
For high availability and performance, we create multiple
canal-client
to parse and synchronize to the new database.
Here is a more important question, how to ensure that
canal-client
cluster resolves the order of consumption
binlog
The
binlog
we're using is
row
mode. E
ach write produces a
binlog
log. T
o give a simple example: an a record is inserted and the a record is immediately modified.
This sends two messages to
canal-client
and if, for network and other reasons, the updated message is processed earlier than the inserted message, the record has not been inserted, and the final effect of the update operation is a failure.
What do we do?
canal
can be combined with message queues! A
nd support
kafka
rabbitmq
rocketmq
a variety of options, so excellent.
We implement the sequencing of messages at the message queue level.
We've chosen the industry benchmark for message queues:
kafka UCloud
offers
kafka
and
rocketMQ
message queue offerings that enable you to quickly and easily build a message queue system.
Speed up development and facilitate operation.
Here's a look at what's going on:
1. Select
kafka
message queue product and request opening
2. After opening, in the management interface, create
kafka
cluster, according to their own needs, choose the appropriate hardware configuration
3. A
kafka + ZooKeeper
cluster is set up to give force!
It also includes node management,
Topic
management,
Consumer Group
management, making it easy to modify configurations directly in the console
In terms of monitoring views, the monitored data includes
kafka
generation and consumption
QPS
cluster monitoring,
ZooKeeper
monitoring.
Be able to provide more perfect monitoring indicators.
canal
also very easy to match
kafka
with canal.
vi /usr/local/canal/conf/canal.properties
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable
See the configuration of this line below
canal.mq.partitionHash=mydatabase.mytable
We configured
kafka
partitionHash
and one of our
Topic
is a table. T
he effect is that the data for a table is pushed only into a fixed
partition
and then pushed to
consumer
for consumption processing and synchronization to the new database. I
n this way, you solve the problem of
binlog
log sequential processing that you encountered earlier. S
o even if we deploy multiple
kafka consumer
to form a cluster,
consumer
consume data from one
partition
to the same table. T
his sacrifices parallel processing for a table, but personally, with
kafka
powerful processing architecture, it's not easy for our business to create bottlenecks at
kafka
node.
And our business purpose is not real-time consistency, and with a certain delay, the two databases guarantee final consistency.
(Recommended micro-class: SQL micro-class.) )
The following image is the final synchronization architecture, and we cluster at each service node.
It's all running on
UCloud
UK8s
ensuring high availability of service nodes.
canal
is also cluster-for-exchange, but at one point only one
canal
is working on
binlog
and the others are redundant services. W
hen this
canal
service hangs up, one of the redundant services switches to a working state.
Similarly, because you want to guarantee the order reading of
binlog
only one
canal
work.
Also, we use this architecture to synchronize cache failures. T
he cache pattern we use is
Cache-Aside
S
imilarly, caching invalidations where data changes in your code can complicate the code.
Therefore, on the basis of the above architecture, the complex logic that triggers cache failure is put into
kafka-client
side for unified processing, which achieves the purpose of decoupling.
The above is about
canal + Kafka
for database synchronization operations, I hope to help you.