Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Use canal-Kafka for database synchronization


Jun 01, 2021 Article blog


Table of contents


In peacetime work database is often used by us, in the microservices split architecture, each service has its own database, so often encounter data communication between services. For example, data from the B service database comes from the database of service A;

The first solution:

In code logic, when there is a related A service data write, the B service interface is called as an interface, and the B service then writes the data to the new database. T his approach may seem simple, but there are a lot of pits. A dding a lot of this call interface synchronization code to the A service code logic increases the complexity of the project code and becomes more difficult to maintain later. A lso, the way the interface is called is not a stable way, there is no retry mechanism, there is no synchronization location record, interface call failed how to deal with, suddenly a large number of interface calls will produce problems, etc. , these must be considered and handled in the business. T here's going to be a lot of work here. When you think about it, you exclude it.

(Recommended course: SQL tutorial.) )

The second solution:

Synchronize through the binlog of the database. T his solution, which is independent of the A service, is not code-coupled with the A service. D ata can be transferred directly from a TCP connection, better than interface calls. It's a proven production solution, and there are a lot of binlog synchronization middleware tools, so we're focused on which tools can better build stable, performance-satisfying, and easy-to-deploy scenarios.

After research, we chose canal. canal is MySQL binlog incremental subscription and consumer component, and already has examples of production practices, as well as convenient support and other commonly used middleware component combinations, such as kafka elasticsearch etc., as well as a client library in canal-go go language to meet our needs on go and other specifics can be found on canal github home page.

Schematics

 Use canal-Kafka for database synchronization1

 Use canal-Kafka for database synchronization2

OK, get started! N ow you want to synchronize the data changes from the A database to the B database. A ccording to wiki a canal-server service was quickly run with docker writing canal-client code logic directly with canal-go canal-client Use canal-go directly to connect canal-server canal-server and canal-client to communicate with Socket the transport protocol is TCP and the interaction protocol uses Google Protocol Buffer 3.0

Workflow

  1. Canal connects to the A database and slave
  2. canal-client connects with Canal and subscribes to the corresponding database table
  3. A database changes are written to binlog Canal sends dump requests to the database, gets binlog and parses, and sends parsed data to canal-client
  4. canal-client the data and syncs it to the new database

Protocol Buffer is fast to serialize. The data obtained after deserialization is the data for each row, put into an array according to the structure of the field name and the value of the field Simple example of code:

func Handler(entry protocol. E ntry) { var keys []string rowChange := &protocol. R owChange{} proto. U nmarshal(entry. G etStoreValue(), rowChange) if rowChange! . . . . . E ventType_DELETE || e ventType == protocol. E ventType_UPDATE . . . . E ventType_INSERT . . . . . } }

The problem you are experiencing

For high availability and performance, we create multiple canal-client to parse and synchronize to the new database. Here is a more important question, how to ensure that canal-client cluster resolves the order of consumption binlog

The binlog we're using is row mode. E ach write produces a binlog log. T o give a simple example: an a record is inserted and the a record is immediately modified. This sends two messages to canal-client and if, for network and other reasons, the updated message is processed earlier than the inserted message, the record has not been inserted, and the final effect of the update operation is a failure.

What do we do? canal can be combined with message queues! A nd support kafka rabbitmq rocketmq a variety of options, so excellent. We implement the sequencing of messages at the message queue level.

Select the canal-kafka scheme

We've chosen the industry benchmark for message queues: kafka UCloud offers kafka and rocketMQ message queue offerings that enable you to quickly and easily build a message queue system. Speed up development and facilitate operation.

Here's a look at what's going on:

1. Select kafka message queue product and request opening

 Use canal-Kafka for database synchronization3

2. After opening, in the management interface, create kafka cluster, according to their own needs, choose the appropriate hardware configuration

 Use canal-Kafka for database synchronization4

3. A kafka + ZooKeeper cluster is set up to give force!

 Use canal-Kafka for database synchronization5

It also includes node management, Topic management, Consumer Group management, making it easy to modify configurations directly in the console

In terms of monitoring views, the monitored data includes kafka generation and consumption QPS cluster monitoring, ZooKeeper monitoring. Be able to provide more perfect monitoring indicators.

 Use canal-Kafka for database synchronization6

 Use canal-Kafka for database synchronization7

 Use canal-Kafka for database synchronization8

Canal's kafka configuration

canal also very easy to match kafka with canal. vi /usr/local/canal/conf/canal.properties

...

# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false


# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable

Solve the problem of sequential consumption

See the configuration of this line below

canal.mq.partitionHash=mydatabase.mytable

We configured kafka partitionHash and one of our Topic is a table. T he effect is that the data for a table is pushed only into a fixed partition and then pushed to consumer for consumption processing and synchronization to the new database. I n this way, you solve the problem of binlog log sequential processing that you encountered earlier. S o even if we deploy multiple kafka consumer to form a cluster, consumer consume data from one partition to the same table. T his sacrifices parallel processing for a table, but personally, with kafka powerful processing architecture, it's not easy for our business to create bottlenecks at kafka node. And our business purpose is not real-time consistency, and with a certain delay, the two databases guarantee final consistency.

(Recommended micro-class: SQL micro-class.) )

The following image is the final synchronization architecture, and we cluster at each service node. It's all running on UCloud UK8s ensuring high availability of service nodes.

canal is also cluster-for-exchange, but at one point only one canal is working on binlog and the others are redundant services. W hen this canal service hangs up, one of the redundant services switches to a working state. Similarly, because you want to guarantee the order reading of binlog only one canal work.

 Use canal-Kafka for database synchronization9

Also, we use this architecture to synchronize cache failures. T he cache pattern we use is Cache-Aside S imilarly, caching invalidations where data changes in your code can complicate the code. Therefore, on the basis of the above architecture, the complex logic that triggers cache failure is put into kafka-client side for unified processing, which achieves the purpose of decoupling.

The above is about canal + Kafka for database synchronization operations, I hope to help you.