您的位置:

RocketMQTopic详解

RocketMQ是阿里巴巴开源的分布式消息队列,RocketMQTopic是RocketMQ中的一个核心概念。在本篇文章中,我们将从多个方面对RocketMQTopic进行详细的阐述。

一、RocketMQTopic与group订阅关系

RocketMQTopic可以被多个group订阅,每个group可以有多个consumer。当一个message被多个group订阅时,RocketMQ会采用一些策略来决定哪个group会消费这个message。具体的策略包括:

  • 广播模式(Broadcasting)
  • 集群模式(Clustering)
  • 发布/订阅模式(Publish/Subscribe)

以Broadcasting为例,它会将一个message发送给所有的group,每个group都会独立地消费这个message。

// 创建一个Broadcasting topic 'example_topic'
String topicName = "example_topic";
MQAdmin admin = new DefaultMQAdminImpl();
admin.createTopic(topicName,"test_group",4);

以上代码会建立一个名为'example_topic'的Broadcasting topic,其中"test_group"表示group的名称,4表示这个topic中有4个队列。

二、RocketMQTopic与broker对应关系

RocketMQTopic与broker之间的对应关系需要通过命名服务器(Naming Server)来完成。当一个broker上的某个topic订阅情况发生变化时,管理命名服务器的程序会接收到通知,然后会将新的订阅信息推送给订阅了此topic的consumer。

以下代码展示了如何创建一个与broker对应的topic,其中broker的地址需要在代码中指定。具体的brokder信息可以从命名服务器中获取。

// 指定broker的地址
String brokerIP = "127.0.0.1";
int brokerPort = 10911;
String address = brokerIP + ":" + brokerPort;

// 创建一个与broker对应的topic
String topicName = "example_topic";
MQAdmin admin = new DefaultMQAdminImpl();
TopicPublishInfo topicPublishInfo = admin.fetchPublishMessageQueues(topicName);
SendMessageRequest request = new SendMessageRequest();
request.setTopic(topicName);
request.setBody("hello world".getBytes());
request.setQueueId(topicPublishInfo.getMessageQueueList().get(0).getQueueId());
request.setBrokerAddr(address);
SendMessageResult result = producer.sendMessage(request);

三、RocketMQTopic的属性

RocketMQTopic有一些自己特有的属性,其中比较重要的包括topic名称、消息的过滤表达式、消息发送延迟时间等:

  • topic名称:在RocketMQ中,每一个topic都有自己唯一的名称,其中包含字符长度不能超过127个字符;
  • 消息的过滤表达式:为了避免consumer接收到一些不希望接收到的信息,可以通过消息的过滤表达式来限制consumer消费相应的消息;
  • 消息发送延迟时间:RocketMQ支持消息发送时的延迟时间设置,这个延迟时间是从消息发送到broker开始计算的。

下面是一个在RocketMQ中创建topic时设置延迟时间的代码示例:

// 创建一个topic 'example_topic'
String topicName = "example_topic";
MQAdmin admin = new DefaultMQAdminImpl();
admin.createTopic(topicName,"test_group",4);

// 为topic加上延迟时间100ms
String delayTimeLevel = "1";
admin.createAndUpdateTopicConfig(topicName, new HashMap() {{
   put(DelayTimeLevelKey, delayTimeLevel);
}});

  

四、RocketMQTopic的动态调整

RocketMQ支持对其topic进行动态调整,从而实现动态伸缩的需求。可以通过RocketMQ配置文件中的参数来配置动态调整的行为,默认情况下是开启自动调整的。在使用动态调整时需要注意以下几点:

  • 使用动态调整功能需要对Broker资源进行评估,并承认调整时可能会发生消息丢失的风险;
  • 动态调整只支持Borker,而不支持NameServer。

下面是一个使用动态调整的代码示例:

// 指定BrokerIP、上一个offset与当前offset
String brokerIP = "127.0.0.1";
int brokerPort = 10911;
long lastOffset = 1L;
long currentOffset = 2L;

// 创建一个开启了自动调整的DefaultMQAdminImpl
DefaultMQAdminImpl mqAdmin = new DefaultMQAdminImpl();
mqAdmin.setAutoAdjustMixedCancellation(Boolean.TRUE);
mqAdmin.setInstanceName(Long.toString(System.currentTimeMillis()));

// 进行动态调整
mqAdmin.adjustMixedVhostBrokerConfig(
   new HashSet(), 
   new HashSet
   () {{
       add(new MixTransferConfig(brokerIP, "", "", lastOffset, currentOffset));
   }}, 
   new MixConnectConfig(brokerIP, 22000)
);

   
  

五、RocketMQTopic与事务消息

事务消息是RocketMQ中的一种特殊类型的消息,它需要满足ACID的事务特性。在RocketMQ中,事务消息数据要经过producer、broker的两次确认才会真正地被发送出去。如果在发送过程中出现了任何异常情况,RocketMQ就会将这个消息标记为UNKNOW(不确定),需要由producer或consumer通过特定接口进行二次确认。

// 创建一个Transaction topic 
String topicName = "example_topic";
MQAdmin admin = new DefaultMQAdminImpl();
admin.createTopic(topicName,"test_group", 4, TopicType.TRANSACTION_TOPIC);

// 发送一个事务消息
TransactionProducer producer = new TransactionProducer("test_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("transaction_producer_1");
producer.start();
TransactionSendResult result = producer.sendMessageInTransaction(new Message(topicName, "hello world".getBytes()));
producer.shutdown();

总结

本篇文章从RocketMQTopic与group订阅关系、RocketMQTopic与broker对应关系、RocketMQTopic的属性、RocketMQTopic的动态调整以及RocketMQTopic与事务消息等多个方面对RocketMQTopic进行了详细的阐述。通过深入学习RocketMQTopic的相关知识,我们可以更好地了解RocketMQ的运行机制,从而更加高效地使用RocketMQ工具来完成我们的工作。