一、简介
RocketMQ是一个分布式消息传递解决方案,具有高性能、高可靠、高可扩展性和分布式特性。本篇文章将从多个方面对RocketMQ的源码进行解析,帮助更好的理解RocketMQ的内部实现。本篇文章主要分为以下几个部分:
- Broker启动流程
- 消息发送流程
- 消息存储流程
- 消息消费流程
- 高可用机制
二、Broker启动流程
Broker是RocketMQ的核心组件,负责储存和转发消息。下面是Broker启动流程的简要过程:
- Broker启动类是BrokerStartup,首先解析配置文件,生成BrokerConfig对象。
- BrokerConfig对象主要包括了Broker的ID、名称等基本信息,以及Topic的配置,如消息最大长度、刷盘方式等。
- 接着,启动Netty服务监听Producer和Consumer的连接请求,并处理 NameServer 请求。
- Broker启动后,先从持久化存储中加载Topic和消息数据到内存,然后启动Consumer拉取线程、commit线程,还有定时调度线程等。
- 最后,等待NameServer的心跳请求。
完整的Broker启动流程代码示例如下:
public static void main(String[] args) { try { // 解析配置文件,生成BrokerConfig对象 final BrokerConfig brokerConfig = new BrokerConfig(); // ... // 启动Netty服务监听Producer和Consumer的连接请求,并处理 NameServer 请求 final RemotingServer remotingServer = new NettyRemotingServer(brokerConfig.getNettyServerConfig()); final BrokerController brokerController = new BrokerController(brokerConfig, remotingServer); // 从持久化存储中加载Topic和消息数据到内存 brokerController.initialize(); // 启动Consumer拉取线程、commit线程,还有定时调度线程等 brokerController.start(); // 等待NameServer的心跳请求 remotingServer.start(); } catch (Throwable e) { // ... } }
三、消息发送流程
消息发送是RocketMQ的一项核心功能。下面是消息发送的流程简介:
- Producer启动后初始化MQClientInstance,然后创建MQProducerInner对象。
- 发送消息时,先从本地缓存中获取TopicPublishInfo,如果没有就从NameServer获取。
- 选择Topic路由,得到投递的Queue,做负载均衡。
- 由MQClientInstance创建一个拉取Broker地址的任务PullNameServerTask。
- 高可用机制选择一个Broker发送消息。
- 发送消息到Broker。
完整的消息发送流程的代码示例如下:
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("topic", "tag", "key", "Hello World".getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); producer.shutdown(); }
四、消息存储流程
消息存储是RocketMQ最核心的一环。下面是消息存储的流程简介:
- 消息写入前,先获取一个内存映射文件,如果没有就创建。
- 将消息写入内存映射文件,写入后根据CommitLog文件刷盘策略决定是否同步刷盘。
- 消息写入后,将消息存入Index文件,Index相当于一个消息ID和文件偏移量的映射。
- 消息一旦从内存中刷入磁盘,就可以被Replication模块复制到其他机器。
完整的消息存储流程的代码示例如下:
// 代码来自CommitLog.java public void putMessage(MessageExtBrokerInner msg) { // ... // 先获取一个内存映射文件,如果没有就创建 MappedFile mappedFile = this.getMappedFile(matched); // 将消息写入内存映射文件 mappedFile.appendMessage(msg); // 写入后根据CommitLog文件刷盘策略决定是否同步刷盘 this.handleScheduleMessageService(mappedFile.getFileSize(), this.commitDataLeastPages); // 将消息存入Index文件 this.putMessagePositionInfo(msg, mappedFile.getFileFromOffset(), mappedFile.getFileSize()); // 消息一旦从内存中刷入磁盘,就可以被Replication模块复制到其他机器 this.dataVersion.incrementAndGet(); }
五、消息消费流程
消息消费是RocketMQ的重要组成部分。下面是消息消费的简要过程:
- 消费者启动后会向NameServer请求Topic路由信息,得到所要消费的消息队列。
- Consumer从Broker拉取消息,如果有消息就开始消费。
- 消费消息前,先从Index文件中获取消息的文件偏移量。
- 从CommitLog中读取消息并返回。
- 消息消费成功后,提交偏移量。
完整的消息消费流程的代码示例如下:
// 代码来自DefaultMQPushConsumerImpl.java @Override public void pullMessage(final PullRequest pullRequest) { // 拉取消息 PullResult pullResult = this.pullMessageService.pullMessage(pullRequest); switch (pullResult.getPullStatus()) { // 获取到消息 case FOUND: this.processQueue.putMessage(pullResult.getMsgFoundList()); this.executePullRequestLater(pullRequest.getPullFromThisOffset() + pullResult.getNextBeginOffset()); break; // 等待下一次 case NO_NEW_MSG: this.executePullRequestLater(pullRequest.getPullFromThisOffset()); break; // 没有变化,再次拉取消息 case NO_MATCHED_MSG: this.executePullRequestLater(pullRequest.getPullFromThisOffset()); break; // 拉取被暂停 case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.getProcessQueue().setDropped(true); break; default: break; } } public void handleMessage(final MessageExt msgs, final ConsumeQueue cq, final ConsumeQueueExt.CqExtUnit cqExt) { // 处理消息前,先从Index文件中获取消息的文件偏移量 final long offsetPy = cq.getOffsetInQueueByTime(msgs.getStoreTimestamp()); final long offsetDiff = cqExt.getOffsetPy() - offsetPy; int tagsCode = 0; // tags处理 if (msgs.getTags() != null && msgs.getTags().length() > 0) { tagsCode = MessageExtBrokerInner.tagsString2tagsCode(msgs.getTags()); } this.hook.consumeMessageBefore(msgs, this.context); // 从CommitLog中读取消息并返回 final ConsumeReturnType returnType = this.consumeMessageService.consumeMessage( msgs, this.defaultMQPushConsumer.getMessageModel(), this.processQueue, this.consumeOrderly, tagsCode, this.properties, this.consumeMessageFilter, this.brokerSuspendMaxTimeMillis, this.consumerGroup, this.maxReconsumeTimes, this.context, offsetPy); // 消息消费成功后,提交偏移量 this.processConsumeResult(msgs, returnType, this.context, consumeRequest.getStats()); this.hook.consumeMessageAfter(msgs, this.context); }
六、高可用机制
RocketMQ通过复制和双写等高可用机制,实现分布式消息存储的高可靠性。
- Master-Slave复制:Master节点负责消息写入和消费,Slaver节点只负责消息复制,并在Master故障时接管Master工作。
- 双写机制:消费者消费消息时,会将消息从Master和Slave双写节点同时消费,以保证消息不会丢失。
七、总结
本篇文章详细介绍了RocketMQ的源码解析,从Broker启动流程、消息发送流程、消息存储流程、消息消费流程、高可用机制等多个方面进行了详细的阐述,对于深入理解RocketMQ的内部实现有一定的帮助。