canal需要自己编写客户端程序来推送从数据库中同步过来的数据。下面介绍如何编写canal客户端程序。

首先启动canal服务端

参考: canal入门(三):canal安装

构建maven
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</
artifactId> <version>1.0.12</version> </dependency>
示例代码
import java.net.InetSocketAddress; import java.util.List; import com.alibaba
.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client
.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils;
importcom.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal
.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry
.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com
.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter
.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol
.CanalEntry.RowData; public class SimpleCanalClientExample { public static void
main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors
.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111),
"example", "", "");① int batchSize = 1000;② int emptyCount = 0; try { connector
.connect(); connector.subscribe(".*\\..*"); ③ connector.rollback(); int
totalEmptyCount =120; while (emptyCount < totalEmptyCount) { Message message =
connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId();
int size = message.getEntries().size(); if (batchId == -1 || size == 0) {
emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread
.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; //
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector
.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times,
exit"); } finally { connector.disconnect(); } } private static void
printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry
.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() ==
EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try {
rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error ,
data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] ,
name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry
.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry
.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage
.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData
.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList()); } else { System.out.println(
"-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out
.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } }
} } private static void printColumn(List<Column> columns) { for (Column column
: columns) { System.out.println(column.getName() + " : " + column.getValue() +
" update=" + column.getUpdated()); } } }
说明:
① 指定ip和端口号
② 指定一次抓取的条数
③ 指定要监控的表,这里会覆盖配置文件

启动之后会看到如下信息

empty count : 1
empty count : 2
empty count : 3
empty count : 4

然后试着修改数据库,再观察控制台打印是否变化。

注意:
测试时最好是指定某张测试表,即connector.subscribe(“testdb.test”);

更多:canal源码分析与应用专栏 <https://blog.csdn.net/column/details/26852.html>
——————————————————————————————————
作者:桃花惜春风
转载请标明出处,原文地址:
https://blog.csdn.net/xiaoyu_BD/article/details/82344653
<https://blog.csdn.net/xiaoyu_BD/article/details/82344653>
如果感觉本文对您有帮助,您的支持是我坚持写作最大的动力,谢谢!

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信