首页 » 漏洞 » 深入解析中间件之-Cannal

深入解析中间件之-Cannal

 
文章目录

深入解析中间件之-Cannal

mysql服务端修改配置并重启

$ vi /etc/my.cnf [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1  $ mysql -uroot CREATE USER canal IDENTIFIED BY 'canal';   GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;  $ sudo service mysqld start 

问题:创建canal用户的目的是什么?直接使用现有的用户名可以吗,比如root。

答案:有些用户没有REPLICATION SLAVE, REPLICATION CLIENT的权限,用这些用户连接canal时,无法获取到binlog。

这里的canal用户授权了全部权限,所以客户端可以从canal中获取binlog。

明确两个概念:canal server连接mysql,客户端连接canal server。

  • canal指的是canal server,它会读取mysql的binlog,解析后存储起来
  • 客户端指的是消费canal server的binlog

本机连接服务端,验证binlog的格式是ROW

$ mysql -h192.168.6.52 -ucanal -pcanal mysql> show variables like '%binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW   | +---------------+-------+ 

MySQL binlog

在启动canal之前,先来了解下什么是mysql的binlog:

mysql> show binlog events; | Log_name         | Pos   | Event_type  | Server_id | End_log_pos | Info                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             | +------------------+-------+-------------+-----------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | mysql-bin.000001 |     4 | Format_desc |         1 |         106 | Server ver: 5.1.73-log, Binlog ver: 4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            | | mysql-bin.000001 |   106 | Query       |         1 |        1864 | use `mysql`; CREATE TABLE IF NOT EXISTS db (   Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, User char(16) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Event_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db,User), KEY User (User) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Database privileges' | | mysql-bin.000001 |  1864 | Query       |         1 |        3518 | use `mysql`; CREATE TABLE IF NOT EXISTS host (  Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Host privileges;  Merged with database privileges' | 

mysql主从复制的原理:

  • master将改变记录到二进制日志(binary log)中;
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

mysql数据文件下会生成mysql-bin.xxx的binlog文件,以及索引文件

[qihuang.zheng@dp0652 canal]$ ll /var/lib/mysql/ 总用量 26228 drwx------ 2 mysql mysql     4096 10月 11 14:05 canal_test -rw-rw---- 1 mysql mysql 10485760 9月  30 22:12 ibdata1 -rw-rw---- 1 mysql mysql  5242880 10月 11 09:57 ib_logfile0 -rw-rw---- 1 mysql mysql  5242880 10月 11 09:57 ib_logfile1 drwx------ 2 mysql mysql     4096 8月   2 11:01 mysql -rw-rw---- 1 mysql mysql    18451 8月   2 11:01 mysql-bin.000001 -rw-rw---- 1 mysql mysql   929226 8月   2 11:01 mysql-bin.000002 -rw-rw---- 1 mysql mysql  4890698 9月  30 22:12 mysql-bin.000003 -rw-rw---- 1 mysql mysql      897 10月 11 14:06 mysql-bin.000004 -rw-rw---- 1 mysql mysql       76 10月 11 09:57 mysql-bin.index srwxrwxrwx 1 mysql mysql        0 10月 11 09:57 mysql.sock 

针对mysql的操作都会有二进制的事件记录到binlog文件中。下面的一些操作包括创建用户,授权,创建数据库,创建表,插入一条记录。

[qihuang.zheng@dp0652 canal]$ sudo strings /var/lib/mysql/mysql-bin.000004 5.1.73-log CREATE USER canal IDENTIFIED BY 'canal' root    localhost GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' FLUSH PRIVILEGES canal_test create database canal_test    ===》创建数据库 canal_test create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null)  ==》创建表 canal_test BEGIN     ==》插入记录,这里有事务。但是没有把具体的语句打印出来 canal_test test canal_test COMMIT 

canal & config

部署canal server到6.52,并启动。查看canal的日志:

[qihuang.zheng@dp0652 canal]$ cat logs/canal/canal.log 2017-10-11 11:31:52.076 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2017-10-11 11:31:52.151 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.6.52:11111] 2017-10-11 11:31:52.644 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 

查看instance的日志:

[qihuang.zheng@dp0652 canal]$ cat logs/example/example.log 2017-10-11 11:31:52.435 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2017-10-11 11:31:52.444 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2017-10-11 11:31:52.587 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2017-10-11 11:31:52.599 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2017-10-11 11:31:52.679 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status 

canal server的conf下有几个配置文件

➜  canal.deployer-1.0.24 tree conf conf ├── canal.properties ├── example │   └── instance.properties ├── logback.xml └── spring     ├── default-instance.xml     ├── file-instance.xml     ├── group-instance.xml     ├── local-instance.xml     └── memory-instance.xml 

先来看canal.properties的common属性前四个配置项:

canal.id= 1 canal.ip= canal.port= 11111 canal.zkServers= 

canal.id是canal的编号,在集群环境下,不同canal的id不同,注意它和mysql的server_id不同。

ip这里不指定,默认为本机,比如上面是192.168.6.52,端口号是11111。zk用于canal cluster。

再看下canal.properties下destinations相关的配置:

################################################# #########       destinations        #############  ################################################# canal.destinations = example canal.conf.dir = ../conf canal.auto.scan = true canal.auto.scan.interval = 5  canal.instance.global.mode = spring  canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/file-instance.xml 

这里的canal.destinations = example可以设置多个,比如example1,example2,

则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。

全局的canal实例管理用spring,这里的file-instance.xml最终会实例化所有的destinations instances:

<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">     <property name="ignoreResourceNotFound" value="true" />     <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->     <property name="locationNames">         <list>             <value>classpath:canal.properties</value>             <value>classpath:${canal.instance.destination:}/instance.properties</value>         </list>     </property> </bean> <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">     <property name="destination" value="${canal.instance.destination}" />     <property name="eventParser"><ref local="eventParser" /></property>     <property name="eventSink"><ref local="eventSink" /></property>     <property name="eventStore"><ref local="eventStore" /></property>     <property name="metaManager"><ref local="metaManager" /></property>     <property name="alarmHandler"><ref local="alarmHandler" /></property> </bean> 

比如canal.instance.destination等于example,就会加载example/instance.properties配置文件

example下instance.properties配置文件不需要修改。一个canal server可以运行多个canal instance。

################################################# ## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样 canal.instance.mysql.slaveId = 1234  # position info 这里连接的是mysql master的地址。 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name =  canal.instance.master.position =  canal.instance.master.timestamp =   #canal.instance.standby.address =  #canal.instance.standby.journal.name = #canal.instance.standby.position =  #canal.instance.standby.timestamp =   # username/password canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8  canal.instance.filter.regex = .*//..* canal.instance.filter.black.regex =   ################################################# 

simple client

在mysql上创建数据库,创建表,插入一条记录,再修改记录。

create database canal_test; use canal_test; create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null); insert into test (name) values('10'); 

修改客户端测试例子的连接信息。其中example对应了canal实例的名称。

String destination = "example"; CanalConnector connector = CanalConnectors.newSingleConnector(     new InetSocketAddress("192.168.6.52", 11111), destination, "canal", "canal"); 

注意:如果连接有错误,客户端测试例子会立即结束,打印## stop the canal client。正常的话,终端不会退出,会一直运行。

SimpleCanalClientTest控制台的结果如下:

**************************************************** * Batch Id: [1] ,count : [2] , memsize : [263] , Time : 2017-10-11 14:06:06 * Start : [mysql-bin.000004:396:1507701897000(2017-10-11 14:04:57)]  * End : [mysql-bin.000004:491:1507701904000(2017-10-11 14:05:04)]  ****************************************************  ----------------> binlog[mysql-bin.000004:396] , name[canal_test,] , eventType : QUERY , executeTime : 1507701897000 , delay : 69710ms  sql ----> create database canal_test  ----------------> binlog[mysql-bin.000004:491] , name[canal_test,test] , eventType : CREATE , executeTime : 1507701904000 , delay : 62723ms  sql ----> create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null) 

插入一条记录:(其中uid和name的update都等于true)

**************************************************** * Batch Id: [2] ,count : [3] , memsize : [186] , Time : 2017-10-11 14:06:32 * Start : [mysql-bin.000004:659:1507701989000(2017-10-11 14:06:29)]  * End : [mysql-bin.000004:822:1507701989000(2017-10-11 14:06:29)]  ****************************************************  ================> binlog[mysql-bin.000004:659] , executeTime : 1507701989000 , delay : 3142ms  BEGIN ----> Thread id: 11 ----------------> binlog[mysql-bin.000004:785] , name[canal_test,test] , eventType : INSERT , executeTime : 1507701989000 , delay : 3154ms uid : 1    type=int(4)    update=true name : 10    type=varchar(10)    update=true ----------------  END ----> transaction id: 0 ================> binlog[mysql-bin.000004:822] , executeTime : 1507701989000 , delay : 3179ms 

修改记录:(其中name的update等于true)

**************************************************** * Batch Id: [3] ,count : [3] , memsize : [202] , Time : 2017-10-11 14:49:11 * Start : [mysql-bin.000004:897:1507704547000(2017-10-11 14:49:07)]  * End : [mysql-bin.000004:1076:1507704547000(2017-10-11 14:49:07)]  ****************************************************  ================> binlog[mysql-bin.000004:897] , executeTime : 1507704547000 , delay : 4048ms  BEGIN ----> Thread id: 13 ----------------> binlog[mysql-bin.000004:1023] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507704547000 , delay : 4059ms uid : 1    type=int(4) name : zqhxuyuan    type=varchar(10)    update=true ----------------  END ----> transaction id: 0 ================> binlog[mysql-bin.000004:1076] , executeTime : 1507704547000 , delay : 4096ms 

canal安装包下的example instance下除了example.log外,还有一个meta.log

[qihuang.zheng@dp0652 canal]$ cat logs/example/meta.log 2017-10-11 14:06:03.728 - clientId:1001 cursor:[mysql-bin.000004,396,1507701897000] address[/127.0.0.1:3306] 2017-10-11 14:06:04.589 - clientId:1001 cursor:[mysql-bin.000004,491,1507701904000] address[localhost/127.0.0.1:3306] 2017-10-11 14:06:29.589 - clientId:1001 cursor:[mysql-bin.000004,822,1507701989000] address[localhost/127.0.0.1:3306] 2017-10-11 14:49:08.589 - clientId:1001 cursor:[mysql-bin.000004,1076,1507704547000] address[localhost/127.0.0.1:3306] 

canal client & server

canal client与canal server之间是C/S模式的通信,客户端采用NIO,服务端采用Netty。

canal server启动后,如果没有canal client,那么canal server不会去mysql拉取binlog。

try {     connector.connect();     connector.subscribe();     while (running) {         Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据         long batchId = message.getId();         int size = message.getEntries().size();         printSummary(message, batchId, size);         printEntry(message.getEntries());         connector.ack(batchId); // 提交确认         connector.rollback(batchId); // 处理失败, 回滚数据     } } finally {     connector.disconnect(); } 

canal client与canal server之间属于增量订阅/消费,流程图如下:(其中C端是canal client,S端是canal server)

深入解析中间件之-Cannal

canal client调用connect()方法时,类型为PacketType.HANDSHAKE,接着写入CLIENTAUTHENTICATION。然后调用subscribe()方法,类型为SUBSCRIPTION。

对应服务端采用netty处理RPC请求(CanalServerWithNetty):

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {     public ChannelPipeline getPipeline() throws Exception {         ChannelPipeline pipelines = Channels.pipeline();         pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());         // 处理客户端的HANDSHAKE请求         pipelines.addLast(HandshakeInitializationHandler.class.getName(),             new HandshakeInitializationHandler(childGroups));         // 处理客户端的CLIENTAUTHENTICATION请求         pipelines.addLast(ClientAuthenticationHandler.class.getName(),             new ClientAuthenticationHandler(embeddedServer));          // 处理客户端的会话请求,包括SUBSCRIPTION,GET等         SessionHandler sessionHandler = new SessionHandler(embeddedServer);         pipelines.addLast(SessionHandler.class.getName(), sessionHandler);         return pipelines;     } }); 

ClientAuthenticationHandler处理鉴权后,会移除HandshakeInitializationHandler和ClientAuthenticationHandler

以client发送GET,server从mysql得到binlog后,返回MESSAGES给client为例,说明client和server的rpc交互过程:

SimpleCanalConnector发送GET请求,并读取响应结果的流程:

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {     waitClientRunning();     int size = (batchSize <= 0) ? 1000 : batchSize;     long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制     if (unit == null) unit = TimeUnit.MILLISECONDS;      // client发送GET请求     writeWithHeader(Packet.newBuilder()         .setType(PacketType.GET)         .setBody(Get.newBuilder()             .setAutoAck(false)             .setDestination(clientIdentity.getDestination())             .setClientId(String.valueOf(clientIdentity.getClientId()))             .setFetchSize(size)             .setTimeout(time)             .setUnit(unit.ordinal())             .build()             .toByteString())         .build()         .toByteArray());     // client获取GET结果         return receiveMessages(); }  private Message receiveMessages() throws IOException {     // 读取server发送的数据包     Packet p = Packet.parseFrom(readNextPacket());     switch (p.getType()) {         case MESSAGES: {             Messages messages = Messages.parseFrom(p.getBody());             Message result = new Message(messages.getBatchId());             for (ByteString byteString : messages.getMessagesList()) {                 result.addEntry(Entry.parseFrom(byteString));             }             return result;         }     } } 

服务端SessionHandler处理客户端发送的GET请求流程:

case GET:     // 读取客户端发送的数据包,封装为Get对象     Get get = CanalPacket.Get.parseFrom(packet.getBody());     // destination表示canal instance     if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) {         clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()));         Message message = null;         if (get.getTimeout() == -1) {// 是否是初始值             message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());         } else {             TimeUnit unit = convertTimeUnit(get.getUnit());             message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);         }         // 设置返回给客户端的数据包类型为MESSAGES            Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();         packetBuilder.setType(PacketType.MESSAGES);         // 构造Message         Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();         messageBuilder.setBatchId(message.getId());         if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {             for (Entry entry : message.getEntries()) {                 messageBuilder.addMessages(entry.toByteString());             }         }         packetBuilder.setBody(messageBuilder.build().toByteString());         // 输出数据,返回给客户端         NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);     } 

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    – batch id 唯一标识
    – entries 具体的数据对象,对应的数据对象格式: EntryProtocol.proto
  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

EntryProtocol.protod对应的canal消息结构如下:

Entry       Header           logfileName [binlog文件名]           logfileOffset [binlog position]           executeTime [binlog里记录变更发生的时间戳,精确到秒]           schemaName            tableName           eventType [insert/update/delete类型]       entryType   [事务头BEGIN/事务尾END/数据ROWDATA]       storeValue  [byte数据,可展开,对应的类型为RowChange]          RowChange       isDdl       [是否是ddl变更操作,比如create table/drop table]       sql         [具体的ddl sql]       rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]           beforeColumns [Column类型的数组,变更前的数据字段]           afterColumns [Column类型的数组,变更后的数据字段]              Column        index              sqlType     [jdbc type]       name        [column name]       isKey       [是否为主键]       updated     [是否发生过变更]       isNull      [值是否为null]       value       [具体的内容,注意为string文本] 

SessionHandler中服务端处理客户端的其他类型请求,都会调用CanalServerWithEmbedded的相关方法:

case SUBSCRIPTION:         Sub sub = Sub.parseFrom(packet.getBody());         embeddedServer.subscribe(clientIdentity); case GET:         Get get = CanalPacket.Get.parseFrom(packet.getBody());         message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize()); case CLIENTACK:         ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());         embeddedServer.ack(clientIdentity, ack.getBatchId()); case CLIENTROLLBACK:         ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());         embeddedServer.rollback(clientIdentity);// 回滚所有批次 

CanalServerWithEmbedded

CanalServer包含多个Instance,它的成员变量canalInstances记录了instance名称与实例的映射关系。

因为是一个Map,所以同一个Server不允许出现相同instance名称,比如不能同时有两个example在一个server上。

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {     private Map<String, CanalInstance> canalInstances;     private CanalInstanceGenerator     canalInstanceGenerator; } 

下图表示一个server有两个instance,每个Client连接一个Instance。

每个Canal实例模拟为一个MySQL的slave,所以每个Instance的slaveId必须不一样。比如图中两个Instance的id分别是1234和1235。

深入解析中间件之-Cannal

注意这里每个Canal Client都对应一个Instance,每个Client在启动时,都会指定一个Destination,这个Destination就表示Instance的名称。

所以CanalServerWithEmbedded处理各种请求时的参数都有ClientIdentity,从ClientIdentity中获取destination,就可以获取出对应的CanalInstance

下面以CanalServerWithEmbedded的订阅方法为例:

public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {     // ClientIdentity表示Canal Client客户端,从中可以获取出客户端指定连接的Destination     // 由于CanalServerWithEmbedded记录了每个Destination对应的Instance,可以获取客户端对应的Instance     CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());     if (!canalInstance.getMetaManager().isStart()) {         canalInstance.getMetaManager().start(); // 启动Instance的元数据管理器     }     canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅     Position position = canalInstance.getMetaManager().getCursor(clientIdentity);     if (position == null) {         position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条         if (position != null) {             canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor         }     }     // 通知下订阅关系变化     canalInstance.subscribeChange(clientIdentity); } 

每个CanalInstance中包括了四个组件:EventParser、EventSink、EventStore、MetaManager。

服务端主要的处理方法包括get/ack/rollback,这三个方法都会用到Instance上面的几个内部组件,主要还是EventStore和MetaManager:

在这之前,要先理解EventStore的含义,EventStore是一个RingBuffer,有三个指针:Put、Get、Ack。

  • Put: Canal Server从MySQL拉取到数据后,放到内存中,Put增加
  • Get: 消费者(Canal Client)从内存中消费数据,Get增加
  • Ack: 消费者消费完成,Ack增加。并且会删除Put中已经被Ack的数据

这三个操作与Instance组件的关系如下:

深入解析中间件之-Cannal

客户端通过canal server获取mysql binlog有几种方式(get方法和getWithoutAck):

  • 如果timeout为null,则采用tryGet方式,即时获取
  • 如果timeout不为null
    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,                                 TimeUnit unit) {     if (timeout == null) {         return eventStore.tryGet(start, batchSize); // 即时获取     } else if (timeout <= 0){         return eventStore.get(start, batchSize); // 阻塞获取     } else {         return eventStore.get(start, batchSize, timeout, unit); // 异步获取     } } 

注意:EventStore的实现采用了类似Disruptor的RingBuffer环形缓冲区。RingBuffer的实现类是MemoryEventStoreWithBuffer

get方法和getWithoutAck方法的区别是:

  • get方法会立即调用ack
  • getWithoutAck方法不会调用ack

EventStore

以10条数据为例,初始时current=-1,第一个元素起始next=0,end=9,循环 [0,9] 所有元素。

List元素为(A,B,C,D,E,F,G,H,I,J)

next entries[next] next-current-1 list element
0 entries[0] 0-(-1)-1=0 A
1 entries[1] 1-(-1)-1=1 B
2 entries[2] 2-(-1)-1=2 C
3 entries[3] 3-(-1)-1=3 D
. ………. ………. .
9 entries[9] 9-(-1)-1=9 J

第一批10个元素put完成后,putSequence设置为end=9。假设第二批又Put了5个元素:(K,L,M,N,O)

current=9,起始next=9+1=10,end=9+5=14,在Put完成后,putSequence设置为end=14。

next entries[next] next-current-1 list element
10 entries[10] 10-(9)-1=0 K
11 entries[11] 11-(9)-1=1 L
12 entries[12] 12-(9)-1=2 M
13 entries[13] 13-(9)-1=3 N
14 entries[14] 14-(9)-1=3 O

这里假设环形缓冲区的最大大小为15个(源码中是16MB),那么上面两批一共产生了15个元素,刚好填满了环形缓冲区。

如果又有Put事件进来,由于环形缓冲区已经满了,没有可用的slot,则Put操作会被阻塞,直到被消费掉。

下面是Put填充环形缓冲区的代码,检查可用slot(checkFreeSlotAt方法)在几个put方法中。

public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {     private static final long INIT_SQEUENCE = -1;     private int               bufferSize    = 16 * 1024;     private int               bufferMemUnit = 1024;                         // memsize的单位,默认为1kb大小     private int               indexMask;     private Event[]           entries;      // 记录下put/get/ack操作的三个下标     private AtomicLong        putSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置     private AtomicLong        getSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前get操作读取的最后一条的位置     private AtomicLong        ackSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前ack操作的最后一条的位置      // 启动EventStore时,创建指定大小的缓冲区,Event数组的大小是16*1024     // 也就是说算个数的话,数组可以容纳16000个事件。算内存的话,大小为16MB     public void start() throws CanalStoreException {         super.start();         indexMask = bufferSize - 1;         entries = new Event[bufferSize];     }      // EventParser解析后,会放入内存中(Event数组,缓冲区)     private void doPut(List<Event> data) {         long current = putSequence.get(); // 取得当前的位置,初始时为-1,第一个元素为-1+1=0         long end = current + data.size(); // 最末尾的位置,假设Put了10条数据,end=-1+10=9         // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值         for (long next = current + 1; next <= end; next++) {             entries[getIndex(next)] = data.get((int) (next - current - 1));         }         putSequence.set(end);     }  } 

Put是生产数据,Get是消费数据,Get一定不会超过Put。比如Put了10条数据,Get最多只能获取到10条数据。但有时候为了保证Get处理的速度,Put和Get并不会相等。

可以把Put看做是生产者,Get看做是消费者。生产者速度可以很快,消费者则可以慢慢地消费。比如Put了1000条,而Get我们只需要每次处理10条数据。

仍然以前面的示例来说明Get的流程,初始时current=-1,假设Put了两批数据一共15条,maxAbleSequence=14,而Get的BatchSize假设为10。

初始时next=current=-1,end=-1。通过startPosition,会设置next=0。最后end又被赋值为9,即循环缓冲区[0,9]一共10个元素。

private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {     LogPosition startPosition = (LogPosition) start;      long current = getSequence.get();     long maxAbleSequence = putSequence.get();     long next = current;     long end = current;     // 如果startPosition为null,说明是第一次,默认+1处理     if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录         next = next + 1;     }      end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;     // 提取数据并返回     for (; next <= end; next++) {         Event event = entries[getIndex(next)];         if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) {             // 如果是ddl隔离,直接返回             if (entrys.size() == 0) {                 entrys.add(event);// 如果没有DML事件,加入当前的DDL事件                 end = next; // 更新end为当前             } else {                 // 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置                 end = next - 1; // next-1一定大于current,不需要判断             }             break;         } else {             entrys.add(event);         }     }     // 处理PositionRange,然后设置getSequence为end     getSequence.compareAndSet(current, end) } 

ack操作的上限是Get,假设Put了15条数据,Get了10条数据,最多也只能Ack10条数据。Ack的目的是清空缓冲区中已经被Get过的数据

public void ack(Position position) throws CanalStoreException {     cleanUntil(position); }  public void cleanUntil(Position position) throws CanalStoreException {     long sequence = ackSequence.get();     long maxSequence = getSequence.get();      boolean hasMatch = false;     long memsize = 0;     for (long next = sequence + 1; next <= maxSequence; next++) {         Event event = entries[getIndex(next)];         memsize += calculateSize(event);         boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);         if (match) {// 找到对应的position,更新ack seq             hasMatch = true;              if (batchMode.isMemSize()) {                 ackMemSize.addAndGet(memsize);                 // 尝试清空buffer中的内存,将ack之前的内存全部释放掉                 for (long index = sequence + 1; index < next; index++) {                     entries[getIndex(index)] = null;// 设置为null                 }             }              ackSequence.compareAndSet(sequence, next)         }     } } 

rollback回滚方法的实现则比较简单,将getSequence回退到ack位置。

public void rollback() throws CanalStoreException {     getSequence.set(ackSequence.get());     getMemSize.set(ackMemSize.get()); } 

下图展示了RingBuffer的几个操作示例:

深入解析中间件之-Cannal

EventParser

EventStore负责存储解析后的Binlog事件,而解析动作负责拉取Binlog,它的流程比较复杂。需要和MetaManager进行交互。

比如要记录每次拉取的Position,这样下一次就可以从上一次的最后一个位置继续拉取。所以MetaManager应该是有状态的。

EventParser的流程如下:

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
  2. Connection建立链接,发送BINLOG_DUMP指令
  3. Mysql开始推送Binaly Log
  4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
  5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
  6. 存储成功后,定时记录Binaly Log位置

深入解析中间件之-Cannal

上面提到的Connection指的是实现了 ErosaConnection 接口的 MysqlConnection

EventParser 的实现类是实现了 AbstractEventParserMysqlEventParser

原文链接:深入解析中间件之-Cannal,转载请注明来源!

0