一、RocketMQ创建Topic命令
创建topic是使用RocketMQ的基本操作之一,具体的操作可以通过控制台或者使用命令行完成。使用命令行创建topic,可以使用下面的命令:
./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}
其中,readQueueNums表示消息队列数量,writeQueueNums表示写入队列数量。这里需要注意的是:每个Broker目前默认最大创建的队列数是1000个,如果需要创建更多队列,请调整相关配置文件。
二、RocketMQ创建Topic指定Broker
在RocketMQ中,topic是以broker为单位进行管理的,所以可以通过指定broker来创建和管理topic。指定broker需要指定其IP地址,具体的命令如下:
./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}
其中,clusterName是指定的集群名称,readQueueNums和writeQueueNums同上。
三、RocketMQ创建Topic过程
RocketMQ的Topic是由NameServer进行统一管理的,创建Topic需要执行以下操作:
- 在创建Topic的时候,需要向NameServer注册Topic信息。
- 创建Topic的时候,会在每个Broker上创建与Topic相关的队列。
- 指定写入队列数量,对应Broker上会创建与之相应的flush线程。
- 每个消息队列都会创建一个与之对应的消费队列,消费队列会与消费者之间建立连接。
四、RocketMQ创建Topic立刻就能使用
创建Topic之后,需要在Producer和Consumer的代码中启用对应的Topic,代码示例如下:
// Producer DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest",// topic "TagA",// tag "OrderID001",// key ("Hello RocketMQ ").getBytes());// body SendResult sendResult = producer.send(msg); //Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
五、RocketMQ手动创建Topic
在RocketMQ控制台中,可以手动创建和管理Topic。在RocketMQ Console的左侧菜单中找到Topics一级菜单,在此处可以查看已经创建的Topic,也可以手动创建Topic。
六、RocketMQ创建Topic命令 -o
在使用mqadmin创建Topic的时候,可以使用-o选项指定创建Topic配置文件的路径,以达到快速创建Topic的目的。具体操作如下:
./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file
七、RocketMQ创建Topic网站配置
在RocketMQ的配置文件中,可以通过指定topicMap的方式进行topic的配置。在${ROCKET_HOME}/conf/broker.conf文件中添加如下配置:
topicMaps=101:topicA;102:topicB
其中,每一个topic的创建方式和mqadmin指令相同。
八、RocketMQ创建Group
在创建Topic的时候,可以通过设置Group的值进行分组管理。同一Group中的Consumer会消费Topic的不同Message,具体的使用方法可以参考下面的代码示例:
// Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes()); SendResult sendResult = producer.send(msg); // Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
九、RocketMQ创建链接
使用RocketMQ进行消息传递需要创建链接,代码示例如下:
// Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes()); SendResult sendResult = producer.send(msg); // Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
总结
通过本篇文章,我们详细地了解了RocketMQ创建Topic的各个方面,包括使用命令线、指定broker、创建过程、立刻使用、手动创建、创建命令等等,同时还介绍了创建Group和链接的方法。通过本文的学习,我们可以更加深入地理解RocketMQ的使用方法。