一、RocketMQTopic与group订阅关系
RocketMQTopic可以被多个group订阅,每个group可以有多个consumer。当一个message被多个group订阅时,RocketMQ会采用一些策略来决定哪个group会消费这个message。具体的策略包括:
- 广播模式(Broadcasting)
- 集群模式(Clustering)
- 发布/订阅模式(Publish/Subscribe)
以Broadcasting为例,它会将一个message发送给所有的group,每个group都会独立地消费这个message。
// 创建一个Broadcasting topic 'example_topic' String topicName = "example_topic"; MQAdmin admin = new DefaultMQAdminImpl(); admin.createTopic(topicName,"test_group",4);
以上代码会建立一个名为'example_topic'的Broadcasting topic,其中"test_group"表示group的名称,4表示这个topic中有4个队列。
二、RocketMQTopic与broker对应关系
RocketMQTopic与broker之间的对应关系需要通过命名服务器(Naming Server)来完成。当一个broker上的某个topic订阅情况发生变化时,管理命名服务器的程序会接收到通知,然后会将新的订阅信息推送给订阅了此topic的consumer。
以下代码展示了如何创建一个与broker对应的topic,其中broker的地址需要在代码中指定。具体的brokder信息可以从命名服务器中获取。
// 指定broker的地址 String brokerIP = "127.0.0.1"; int brokerPort = 10911; String address = brokerIP + ":" + brokerPort; // 创建一个与broker对应的topic String topicName = "example_topic"; MQAdmin admin = new DefaultMQAdminImpl(); TopicPublishInfo topicPublishInfo = admin.fetchPublishMessageQueues(topicName); SendMessageRequest request = new SendMessageRequest(); request.setTopic(topicName); request.setBody("hello world".getBytes()); request.setQueueId(topicPublishInfo.getMessageQueueList().get(0).getQueueId()); request.setBrokerAddr(address); SendMessageResult result = producer.sendMessage(request);
三、RocketMQTopic的属性
RocketMQTopic有一些自己特有的属性,其中比较重要的包括topic名称、消息的过滤表达式、消息发送延迟时间等:
- topic名称:在RocketMQ中,每一个topic都有自己唯一的名称,其中包含字符长度不能超过127个字符;
- 消息的过滤表达式:为了避免consumer接收到一些不希望接收到的信息,可以通过消息的过滤表达式来限制consumer消费相应的消息;
- 消息发送延迟时间:RocketMQ支持消息发送时的延迟时间设置,这个延迟时间是从消息发送到broker开始计算的。
下面是一个在RocketMQ中创建topic时设置延迟时间的代码示例:
// 创建一个topic 'example_topic' String topicName = "example_topic"; MQAdmin admin = new DefaultMQAdminImpl(); admin.createTopic(topicName,"test_group",4); // 为topic加上延迟时间100ms String delayTimeLevel = "1"; admin.createAndUpdateTopicConfig(topicName, new HashMap() {{ put(DelayTimeLevelKey, delayTimeLevel); }});
四、RocketMQTopic的动态调整
RocketMQ支持对其topic进行动态调整,从而实现动态伸缩的需求。可以通过RocketMQ配置文件中的参数来配置动态调整的行为,默认情况下是开启自动调整的。在使用动态调整时需要注意以下几点:
- 使用动态调整功能需要对Broker资源进行评估,并承认调整时可能会发生消息丢失的风险;
- 动态调整只支持Borker,而不支持NameServer。
下面是一个使用动态调整的代码示例:
// 指定BrokerIP、上一个offset与当前offset String brokerIP = "127.0.0.1"; int brokerPort = 10911; long lastOffset = 1L; long currentOffset = 2L; // 创建一个开启了自动调整的DefaultMQAdminImpl DefaultMQAdminImpl mqAdmin = new DefaultMQAdminImpl(); mqAdmin.setAutoAdjustMixedCancellation(Boolean.TRUE); mqAdmin.setInstanceName(Long.toString(System.currentTimeMillis())); // 进行动态调整 mqAdmin.adjustMixedVhostBrokerConfig( new HashSet(), new HashSet () {{ add(new MixTransferConfig(brokerIP, "", "", lastOffset, currentOffset)); }}, new MixConnectConfig(brokerIP, 22000) );
五、RocketMQTopic与事务消息
事务消息是RocketMQ中的一种特殊类型的消息,它需要满足ACID的事务特性。在RocketMQ中,事务消息数据要经过producer、broker的两次确认才会真正地被发送出去。如果在发送过程中出现了任何异常情况,RocketMQ就会将这个消息标记为UNKNOW(不确定),需要由producer或consumer通过特定接口进行二次确认。
// 创建一个Transaction topic String topicName = "example_topic"; MQAdmin admin = new DefaultMQAdminImpl(); admin.createTopic(topicName,"test_group", 4, TopicType.TRANSACTION_TOPIC); // 发送一个事务消息 TransactionProducer producer = new TransactionProducer("test_producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("transaction_producer_1"); producer.start(); TransactionSendResult result = producer.sendMessageInTransaction(new Message(topicName, "hello world".getBytes())); producer.shutdown();
总结
本篇文章从RocketMQTopic与group订阅关系、RocketMQTopic与broker对应关系、RocketMQTopic的属性、RocketMQTopic的动态调整以及RocketMQTopic与事务消息等多个方面对RocketMQTopic进行了详细的阐述。通过深入学习RocketMQTopic的相关知识,我们可以更好地了解RocketMQ的运行机制,从而更加高效地使用RocketMQ工具来完成我们的工作。