1.开启mysql binlog日志 安装路径下的my.ini文件添加配置

log-bin=mysql-bin #开启日志

binlog-format=ROW #选择row模式

server_id=1 

开启日志需要重启mysql服务后生效

2.下载canal 地址:https://github.com/alibaba/canal/releases

修改配置conf/canal.properties 全局配置,设置自己的数据库

#################################################  
## mysql serverId  
canal.instance.mysql.slaveId = 1234  


# position info,需要改成自己的数据库信息  
canal.instance.master.address = 127.0.0.1:3306   
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =


#canal.instance.standby.address =   
#canal.instance.standby.journal.name =  
#canal.instance.standby.position =   
#canal.instance.standby.timestamp =   


# username/password,需要改成自己的数据库信息  
canal.instance.dbUsername = root
canal.instance.dbPassword = root
canal.instance.defaultDatabaseName = canneltest  
canal.instance.connectionCharset = UTF-8  


# table regex  
canal.instance.filter.regex = .*\\..* 


#################################################

配置slave,slave可以配置成订阅单表日志,也可以配置订阅多表,或所有的表的日志,也可以配置多个slave,只需修改
canal.instance.filter.regex 参数,我这里订阅了所有的表日志

conf\example路径下的


#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234


# position info
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 


#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 


# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canneltest
canal.instance.connectionCharset = UTF-8


# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =  




#################################################

然后用程序需要订阅解析日志,引入相应版本的jar,我这里是1.0.24


<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client
</artifactId> <version>1.0.24</version> </dependency>public static void main
(String[] args)throws InterruptedException { // 链接canal CanalConnector
connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1"
,11111), "example", "", ""); connector.connect(); // 开启订阅日志
connector.subscribe(); // 循环订阅 while (true) { try { // 每次读取 1000 条 Message
message = connector.getWithoutAck(1000); System.out.println(message); long
batchID = message.getId(); int size = message.getEntries().size(); if
(batchID == -1 || size == 0) { Thread.sleep(1000); // 没有数据 } else { System.out
.println("数据进入===>"+message); } connector.ack(batchID); } catch (Exception e) {
//TODO: handle exception } finally { Thread.sleep(1000); } } }
可以打印解析message生成sql,集成kafka提供生产,供订阅同步数据


友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信