您的位置:

RocketMQ Windows 开发指南

一、RocketMQ 简介

RocketMQ是阿里巴巴团队开发的一款开源的分布式消息中间件。它具有低延迟、高吞吐量、高可靠性等特点,广泛应用于电商、金融、物流等领域中。

RocketMQ基于发布订阅模式,支持顺序消息、事务消息、定时消息等特性,可横向扩展,提高系统的可用性和可靠性。

本文将主要介绍 RocketMQ 在 Windows 环境下的开发和部署。

二、RocketMQ Windows 安装

1. 下载压缩包

wget http://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip

2. 解压缩

unzip rocketmq-all-4.3.2-bin-release.zip

3. 配置环境变量

在系统环境变量中添加以下两个变量:

ROCKETMQ_HOME = 解压缩后的目录路径,例如:C:\rocketmq-all-4.3.2-bin-release

PATH = %ROCKETMQ_HOME%\bin;

4. 测试是否安装成功

mqnamesrv
mqbroker

以上两个命令分别启动了 RocketMQ 的namesrv和broker服务,如果能够正常启动则表示安装成功。

三、RocketMQ Windows 常用命令

1. 启动 namesrv 服务

mqnamesrv

2. 启动 broker 服务

mqbroker -n localhost:9876

3. 查看 broker 状态

mqadmin topicStatus -t TopicTest -n localhost:9876

4. 创建 topic

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TopicTest

5. 发送消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message msg = new Message("TopicTest", "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.send(msg);
        producer.shutdown();
    }
}

6. 消费消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
    msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Receive message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer started.");
    }
}
   

四、RocketMQ Windows 开发注意事项

1. RocketMQ 的 Producer 和 Consumer 都需要启动namesrv服务,否则无法正常连接 broker。

2. 在 Windows 下,RocketMQ 的shell脚本无法使用,需要使用命令行工具或者Java API进行操作。

3. Producer 和 Consumer 的线程池配置需要根据实际需求进行调整,避免线程池开销过大影响系统性能。

五、总结

RocketMQ是一款非常优秀的分布式消息中间件,支持多种消息类型和特性,适用于各类场景。在 Windows 环境下,我们可以使用Java API进行开发、部署和测试,如果需要使用Shell命令,则需要使用Cygwin等工具。