RocketMQ创建Topic详解

发布时间:2023-05-17

RocketMQ 创建 Topic 指南

一、RocketMQ 创建 Topic 命令

创建 Topic 是使用 RocketMQ 的基本操作之一,具体的操作可以通过控制台或者使用命令行完成。使用命令行创建 Topic,可以使用下面的命令:

./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}

其中,readQueueNums 表示消息队列数量,writeQueueNums 表示写入队列数量。这里需要注意的是:每个 Broker 目前默认最大创建的队列数是 1000 个,如果需要创建更多队列,请调整相关配置文件。

二、RocketMQ 创建 Topic 指定 Broker

在 RocketMQ 中,Topic 是以 broker 为单位进行管理的,所以可以通过指定 broker 来创建和管理 topic。指定 broker 需要指定其 IP 地址,具体的命令如下:

./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}

其中,clusterName 是指定的集群名称,readQueueNumswriteQueueNums 同上。

三、RocketMQ 创建 Topic 过程

RocketMQ 的 Topic 是由 NameServer 进行统一管理的,创建 Topic 需要执行以下操作:

  1. 在创建 Topic 的时候,需要向 NameServer 注册 Topic 信息。
  2. 创建 Topic 的时候,会在每个 Broker 上创建与 Topic 相关的队列。
  3. 指定写入队列数量,对应 Broker 上会创建与之相应的 flush 线程。
  4. 每个消息队列都会创建一个与之对应的消费队列,消费队列会与消费者之间建立连接。

四、RocketMQ 创建 Topic 立刻就能使用

创建 Topic 之后,需要在 Producer 和 Consumer 的代码中启用对应的 Topic,代码示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",// topic
        "TagA",// tag
        "OrderID001",// key
        ("Hello RocketMQ ").getBytes());// body
SendResult sendResult = producer.send(msg);
//Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

五、RocketMQ 手动创建 Topic

在 RocketMQ 控制台中,可以手动创建和管理 Topic。在 RocketMQ Console 的左侧菜单中找到 Topics 一级菜单,在此处可以查看已经创建的 Topic,也可以手动创建 Topic。

六、RocketMQ 创建 Topic 命令 -o

在使用 mqadmin 创建 Topic 的时候,可以使用 -o 选项指定创建 Topic 配置文件的路径,以达到快速创建 Topic 的目的。具体操作如下:

./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file

七、RocketMQ 创建 Topic 网站配置

在 RocketMQ 的配置文件中,可以通过指定 topicMap 的方式进行 topic 的配置。在 ${ROCKET_HOME}/conf/broker.conf 文件中添加如下配置:

topicMaps=101:topicA;102:topicB

其中,每一个 topic 的创建方式和 mqadmin 指令相同。

八、RocketMQ 创建 Group

在创建 Topic 的时候,可以通过设置 Group 的值进行分组管理。同一 Group 中的 Consumer 会消费 Topic 的不同 Message,具体的使用方法可以参考下面的代码示例:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);
// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

九、RocketMQ 创建链接

使用 RocketMQ 进行消息传递需要创建链接,代码示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);
// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

总结

通过本篇文章,我们详细地了解了 RocketMQ 创建 Topic 的各个方面,包括使用命令行、指定 broker、创建过程、立刻使用、手动创建、创建命令等等,同时还介绍了创建 Group 和链接的方法。通过本文的学习,我们可以更加深入地理解 RocketMQ 的使用方法。