一、简介
RocketMQ是一个分布式消息传递解决方案,具有高性能、高可靠、高可扩展性和分布式特性。本篇文章将从多个方面对RocketMQ的源码进行解析,帮助更好的理解RocketMQ的内部实现。本篇文章主要分为以下几个部分:
- Broker启动流程
- 消息发送流程
- 消息存储流程
- 消息消费流程
- 高可用机制
二、Broker启动流程
Broker是RocketMQ的核心组件,负责储存和转发消息。下面是Broker启动流程的简要过程:
- Broker启动类是BrokerStartup,首先解析配置文件,生成BrokerConfig对象。
- BrokerConfig对象主要包括了Broker的ID、名称等基本信息,以及Topic的配置,如消息最大长度、刷盘方式等。
- 接着,启动Netty服务监听Producer和Consumer的连接请求,并处理 NameServer 请求。
- Broker启动后,先从持久化存储中加载Topic和消息数据到内存,然后启动Consumer拉取线程、commit线程,还有定时调度线程等。
- 最后,等待NameServer的心跳请求。 完整的Broker启动流程代码示例如下:
public static void main(String[] args) {
try {
// 解析配置文件,生成BrokerConfig对象
final BrokerConfig brokerConfig = new BrokerConfig();
// ...
// 启动Netty服务监听Producer和Consumer的连接请求,并处理 NameServer 请求
final RemotingServer remotingServer = new NettyRemotingServer(brokerConfig.getNettyServerConfig());
final BrokerController brokerController = new BrokerController(brokerConfig, remotingServer);
// 从持久化存储中加载Topic和消息数据到内存
brokerController.initialize();
// 启动Consumer拉取线程、commit线程,还有定时调度线程等
brokerController.start();
// 等待NameServer的心跳请求
remotingServer.start();
} catch (Throwable e) {
// ...
}
}
三、消息发送流程
消息发送是RocketMQ的一项核心功能。下面是消息发送的流程简介:
- Producer启动后初始化MQClientInstance,然后创建MQProducerInner对象。
- 发送消息时,先从本地缓存中获取TopicPublishInfo,如果没有就从NameServer获取。
- 选择Topic路由,得到投递的Queue,做负载均衡。
- 由MQClientInstance创建一个拉取Broker地址的任务PullNameServerTask。
- 高可用机制选择一个Broker发送消息。
- 发送消息到Broker。 完整的消息发送流程的代码示例如下:
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("topic", "tag", "key", "Hello World".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
producer.shutdown();
}
四、消息存储流程
消息存储是RocketMQ最核心的一环。下面是消息存储的流程简介:
- 消息写入前,先获取一个内存映射文件,如果没有就创建。
- 将消息写入内存映射文件,写入后根据CommitLog文件刷盘策略决定是否同步刷盘。
- 消息写入后,将消息存入Index文件,Index相当于一个消息ID和文件偏移量的映射。
- 消息一旦从内存中刷入磁盘,就可以被Replication模块复制到其他机器。 完整的消息存储流程的代码示例如下:
// 代码来自CommitLog.java
public void putMessage(MessageExtBrokerInner msg) {
// ...
// 先获取一个内存映射文件,如果没有就创建
MappedFile mappedFile = this.getMappedFile(matched);
// 将消息写入内存映射文件
mappedFile.appendMessage(msg);
// 写入后根据CommitLog文件刷盘策略决定是否同步刷盘
this.handleScheduleMessageService(mappedFile.getFileSize(), this.commitDataLeastPages);
// 将消息存入Index文件
this.putMessagePositionInfo(msg, mappedFile.getFileFromOffset(), mappedFile.getFileSize());
// 消息一旦从内存中刷入磁盘,就可以被Replication模块复制到其他机器
this.dataVersion.incrementAndGet();
}
五、消息消费流程
消息消费是RocketMQ的重要组成部分。下面是消息消费的简要过程:
- 消费者启动后会向NameServer请求Topic路由信息,得到所要消费的消息队列。
- Consumer从Broker拉取消息,如果有消息就开始消费。
- 消费消息前,先从Index文件中获取消息的文件偏移量。
- 从CommitLog中读取消息并返回。
- 消息消费成功后,提交偏移量。 完整的消息消费流程的代码示例如下:
// 代码来自DefaultMQPushConsumerImpl.java
@Override
public void pullMessage(final PullRequest pullRequest) {
// 拉取消息
PullResult pullResult = this.pullMessageService.pullMessage(pullRequest);
switch (pullResult.getPullStatus()) {
// 获取到消息
case FOUND:
this.processQueue.putMessage(pullResult.getMsgFoundList());
this.executePullRequestLater(pullRequest.getPullFromThisOffset() + pullResult.getNextBeginOffset());
break;
// 等待下一次
case NO_NEW_MSG:
this.executePullRequestLater(pullRequest.getPullFromThisOffset());
break;
// 没有变化,再次拉取消息
case NO_MATCHED_MSG:
this.executePullRequestLater(pullRequest.getPullFromThisOffset());
break;
// 拉取被暂停
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString());
pullRequest.getProcessQueue().setDropped(true);
break;
default:
break;
}
}
public void handleMessage(final MessageExt msgs,
final ConsumeQueue cq,
final ConsumeQueueExt.CqExtUnit cqExt) {
// 处理消息前,先从Index文件中获取消息的文件偏移量
final long offsetPy = cq.getOffsetInQueueByTime(msgs.getStoreTimestamp());
final long offsetDiff = cqExt.getOffsetPy() - offsetPy;
int tagsCode = 0;
// tags处理
if (msgs.getTags() != null && msgs.getTags().length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(msgs.getTags());
}
this.hook.consumeMessageBefore(msgs, this.context);
// 从CommitLog中读取消息并返回
final ConsumeReturnType returnType = this.consumeMessageService.consumeMessage(
msgs, this.defaultMQPushConsumer.getMessageModel(), this.processQueue, this.consumeOrderly,
tagsCode, this.properties, this.consumeMessageFilter, this.brokerSuspendMaxTimeMillis,
this.consumerGroup, this.maxReconsumeTimes, this.context, offsetPy);
// 消息消费成功后,提交偏移量
this.processConsumeResult(msgs, returnType, this.context, consumeRequest.getStats());
this.hook.consumeMessageAfter(msgs, this.context);
}
六、高可用机制
RocketMQ通过复制和双写等高可用机制,实现分布式消息存储的高可靠性。
- Master-Slave复制:Master节点负责消息写入和消费,Slaver节点只负责消息复制,并在Master故障时接管Master工作。
- 双写机制:消费者消费消息时,会将消息从Master和Slave双写节点同时消费,以保证消息不会丢失。
七、总结
本篇文章详细介绍了RocketMQ的源码解析,从Broker启动流程、消息发送流程、消息存储流程、消息消费流程、高可用机制等多个方面进行了详细的阐述,对于深入理解RocketMQ的内部实现有一定的帮助。