一、RocketMQ 简介
RocketMQ是阿里巴巴团队开发的一款开源的分布式消息中间件。它具有低延迟、高吞吐量、高可靠性等特点,广泛应用于电商、金融、物流等领域中。
RocketMQ基于发布订阅模式,支持顺序消息、事务消息、定时消息等特性,可横向扩展,提高系统的可用性和可靠性。
本文将主要介绍 RocketMQ 在 Windows 环境下的开发和部署。
二、RocketMQ Windows 安装
1. 下载压缩包
wget http://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
2. 解压缩
unzip rocketmq-all-4.3.2-bin-release.zip
3. 配置环境变量
在系统环境变量中添加以下两个变量:
ROCKETMQ_HOME = 解压缩后的目录路径,例如:C:\rocketmq-all-4.3.2-bin-release
PATH = %ROCKETMQ_HOME%\bin;
4. 测试是否安装成功
mqnamesrv
mqbroker
以上两个命令分别启动了 RocketMQ 的namesrv和broker服务,如果能够正常启动则表示安装成功。
三、RocketMQ Windows 常用命令
1. 启动 namesrv 服务
mqnamesrv
2. 启动 broker 服务
mqbroker -n localhost:9876
3. 查看 broker 状态
mqadmin topicStatus -t TopicTest -n localhost:9876
4. 创建 topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TopicTest
5. 发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg);
producer.shutdown();
}
}
6. 消费消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
}
}
四、RocketMQ Windows 开发注意事项
1. RocketMQ 的 Producer 和 Consumer 都需要启动namesrv服务,否则无法正常连接 broker。
2. 在 Windows 下,RocketMQ 的shell脚本无法使用,需要使用命令行工具或者Java API进行操作。
3. Producer 和 Consumer 的线程池配置需要根据实际需求进行调整,避免线程池开销过大影响系统性能。
五、总结
RocketMQ是一款非常优秀的分布式消息中间件,支持多种消息类型和特性,适用于各类场景。在 Windows 环境下,我们可以使用Java API进行开发、部署和测试,如果需要使用Shell命令,则需要使用Cygwin等工具。