RocketMQ 创建 Topic 指南
一、RocketMQ 创建 Topic 命令
创建 Topic 是使用 RocketMQ 的基本操作之一,具体的操作可以通过控制台或者使用命令行完成。使用命令行创建 Topic,可以使用下面的命令:
./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}
其中,readQueueNums
表示消息队列数量,writeQueueNums
表示写入队列数量。这里需要注意的是:每个 Broker 目前默认最大创建的队列数是 1000 个,如果需要创建更多队列,请调整相关配置文件。
二、RocketMQ 创建 Topic 指定 Broker
在 RocketMQ 中,Topic 是以 broker 为单位进行管理的,所以可以通过指定 broker 来创建和管理 topic。指定 broker 需要指定其 IP 地址,具体的命令如下:
./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}
其中,clusterName
是指定的集群名称,readQueueNums
和 writeQueueNums
同上。
三、RocketMQ 创建 Topic 过程
RocketMQ 的 Topic 是由 NameServer 进行统一管理的,创建 Topic 需要执行以下操作:
- 在创建 Topic 的时候,需要向 NameServer 注册 Topic 信息。
- 创建 Topic 的时候,会在每个 Broker 上创建与 Topic 相关的队列。
- 指定写入队列数量,对应 Broker 上会创建与之相应的 flush 线程。
- 每个消息队列都会创建一个与之对应的消费队列,消费队列会与消费者之间建立连接。
四、RocketMQ 创建 Topic 立刻就能使用
创建 Topic 之后,需要在 Producer 和 Consumer 的代码中启用对应的 Topic,代码示例如下:
// Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID001",// key
("Hello RocketMQ ").getBytes());// body
SendResult sendResult = producer.send(msg);
//Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
五、RocketMQ 手动创建 Topic
在 RocketMQ 控制台中,可以手动创建和管理 Topic。在 RocketMQ Console 的左侧菜单中找到 Topics 一级菜单,在此处可以查看已经创建的 Topic,也可以手动创建 Topic。
六、RocketMQ 创建 Topic 命令 -o
在使用 mqadmin 创建 Topic 的时候,可以使用 -o
选项指定创建 Topic 配置文件的路径,以达到快速创建 Topic 的目的。具体操作如下:
./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file
七、RocketMQ 创建 Topic 网站配置
在 RocketMQ 的配置文件中,可以通过指定 topicMap
的方式进行 topic 的配置。在 ${ROCKET_HOME}/conf/broker.conf
文件中添加如下配置:
topicMaps=101:topicA;102:topicB
其中,每一个 topic 的创建方式和 mqadmin 指令相同。
八、RocketMQ 创建 Group
在创建 Topic 的时候,可以通过设置 Group 的值进行分组管理。同一 Group 中的 Consumer 会消费 Topic 的不同 Message,具体的使用方法可以参考下面的代码示例:
// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);
// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
九、RocketMQ 创建链接
使用 RocketMQ 进行消息传递需要创建链接,代码示例如下:
// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);
// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
总结
通过本篇文章,我们详细地了解了 RocketMQ 创建 Topic 的各个方面,包括使用命令行、指定 broker、创建过程、立刻使用、手动创建、创建命令等等,同时还介绍了创建 Group 和链接的方法。通过本文的学习,我们可以更加深入地理解 RocketMQ 的使用方法。