一、CanalRocketMQ介绍
CanalRocketMQ是一个用于将Canal的数据变化订阅转化成RocketMQ消息的工具。Canal是阿里巴巴开源的基于数据库增量日志解析,提供增量数据订阅和消费的组件。而RocketMQ是阿里巴巴开源的分布式消息系统。CanalRocketMQ能够将Canal解析出来的数据变化,转换成RocketMQ消息,进而通过RocketMQ实现数据同步,为数据处理提供可靠的基础设施。
二、CanalRocketMQ的配置与使用
CanalRocketMQ的配置与使用相对简单。使用时,需要在Canal和RocketMQ的基础上,增加CanalRocketMQ这一组件。
1.引入CanalRocketMQ
在项目中,需要引入CanalRocketMQ的依赖:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal-rocketmq</artifactId> <version>1.1.4</version> </dependency>
2.配置Canal,并开启Canal的RocketMQ适配器
Canal的配置需要在canal.properties中进行配置。需要将Canal的模式设置成RocketMQ,并配置RocketMQ的相关信息。
# 定义canal server模式为 RocketMQ canal.serverMode = rocketmq # 配置rocketmq的相关信息 canal.rocketmq.nameServer = 127.0.0.1:9876 canal.rocketmq.producerGroup = canal-producer-group
同时,还需要在Canal的启动脚本中,添加RocketMQ的适配器。启动脚本如下:
bin/startup.sh --canal.instance.rmq.topic=example \ --canal.instance.rmq.group=test \ --canal.instance.rmq.nameServer=127.0.0.1:9876 \ --canal.instance.filter.regex=.*\\..* \ --canal.adapter.rocketmq \ --canal.adapter.rocketmq.namesrvAddr=127.0.0.1:9876 \ --canal.adapter.rocketmq.producerGroup=canal-producer-group
3.配置RocketMQ
在RocketMQ的配置文件rocketmq.proerties中,需要进行如下配置:
messageTraceTopic=rmq_sys_TRACE_DATA_777 brokerClusterName=DefaultCluster autoCreateTopicEnable=true
4.代码示例
下面是一个简单的代码示例,说明如何使用CanalRocketMQ:
public class CanalRocketMQTest { private static final Logger LOGGER = LoggerFactory.getLogger(CanalRocketMQTest.class); @Test public void test() { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "root", "123456"); connector.connect(); connector.subscribe(".*\\..*"); MessageProducer messageProducer = RocketMQMessageProducer.getInstance(); messageProducer.start(); while (true) { Message message = connector.getWithoutAck(100); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId != -1 && size > 0) { for (CanalEntry.Entry entry : message.getEntries()) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { LOGGER.error("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); continue; } String tableName = entry.getHeader().getTableName(); if (rowChange.getEventType() == CanalEntry.EventType.DELETE) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { MapbeforeMap = buildMap(rowData.getBeforeColumnsList()); MessageData messageData = new MessageData(tableName, beforeMap); messageProducer.send(messageData); } } else if (rowChange.getEventType() == CanalEntry.EventType.INSERT || rowChange.getEventType() == CanalEntry.EventType.UPDATE) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { Map afterMap = buildMap(rowData.getAfterColumnsList()); MessageData messageData = new MessageData(tableName, afterMap); messageProducer.send(messageData); } } } } connector.ack(batchId); } else { LOGGER.info("No message received!"); } } } private Map buildMap(List columns) { Map map = new HashMap<>(); for (CanalEntry.Column column : columns) { map.put(column.getName(), column.getValue()); } return map; } }
三、CanalRocketMQ的优点
从使用CanalRocketMQ的角度来看,它具有以下几点优点:
1.无缝衔接
使用CanalRocketMQ可以实现Canal和RocketMQ的无缝衔接,从而可以实现数据同步。这种无缝衔接代表了不同组件之间的良好协作,让使用方更加便捷地接入数据同步系统。
2.多样性
CanalRocketMQ既能够使用Canal解析出来的数据,又能够通过RocketMQ发送消息,具有多种用途。例如,可以实现数据的备份,也可以实现多个系统的数据同步。
3.高性能
CanalRocketMQ在消费Canal的数据变化时,使用了多线程的方式进行处理,从而提高了消费的效率和速度。
四、总结
CanalRocketMQ作为Canal和RocketMQ的无缝衔接工具,具有多样性、高性能等优点。使用过程相对简单,配置也相对清晰明了,能够为数据同步提供可靠的基础设施。