您的位置:

RocketMQ创建Topic详解

一、RocketMQ创建Topic命令

创建topic是使用RocketMQ的基本操作之一,具体的操作可以通过控制台或者使用命令行完成。使用命令行创建topic,可以使用下面的命令:

./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}

其中,readQueueNums表示消息队列数量,writeQueueNums表示写入队列数量。这里需要注意的是:每个Broker目前默认最大创建的队列数是1000个,如果需要创建更多队列,请调整相关配置文件。

二、RocketMQ创建Topic指定Broker

在RocketMQ中,topic是以broker为单位进行管理的,所以可以通过指定broker来创建和管理topic。指定broker需要指定其IP地址,具体的命令如下:

./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}

其中,clusterName是指定的集群名称,readQueueNums和writeQueueNums同上。

三、RocketMQ创建Topic过程

RocketMQ的Topic是由NameServer进行统一管理的,创建Topic需要执行以下操作:

  1. 在创建Topic的时候,需要向NameServer注册Topic信息。
  2. 创建Topic的时候,会在每个Broker上创建与Topic相关的队列。
  3. 指定写入队列数量,对应Broker上会创建与之相应的flush线程。
  4. 每个消息队列都会创建一个与之对应的消费队列,消费队列会与消费者之间建立连接。

四、RocketMQ创建Topic立刻就能使用

创建Topic之后,需要在Producer和Consumer的代码中启用对应的Topic,代码示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",// topic
        "TagA",// tag
        "OrderID001",// key
        ("Hello RocketMQ ").getBytes());// body
SendResult sendResult = producer.send(msg);

//Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

  

五、RocketMQ手动创建Topic

在RocketMQ控制台中,可以手动创建和管理Topic。在RocketMQ Console的左侧菜单中找到Topics一级菜单,在此处可以查看已经创建的Topic,也可以手动创建Topic。

六、RocketMQ创建Topic命令 -o

在使用mqadmin创建Topic的时候,可以使用-o选项指定创建Topic配置文件的路径,以达到快速创建Topic的目的。具体操作如下:

./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file

七、RocketMQ创建Topic网站配置

在RocketMQ的配置文件中,可以通过指定topicMap的方式进行topic的配置。在${ROCKET_HOME}/conf/broker.conf文件中添加如下配置:

topicMaps=101:topicA;102:topicB

其中,每一个topic的创建方式和mqadmin指令相同。

八、RocketMQ创建Group

在创建Topic的时候,可以通过设置Group的值进行分组管理。同一Group中的Consumer会消费Topic的不同Message,具体的使用方法可以参考下面的代码示例:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);

// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

  

九、RocketMQ创建链接

使用RocketMQ进行消息传递需要创建链接,代码示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);

// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

  

总结

通过本篇文章,我们详细地了解了RocketMQ创建Topic的各个方面,包括使用命令线、指定broker、创建过程、立刻使用、手动创建、创建命令等等,同时还介绍了创建Group和链接的方法。通过本文的学习,我们可以更加深入地理解RocketMQ的使用方法。