您的位置:

RocketMQ源码解析

一、简介

RocketMQ是一个分布式消息传递解决方案,具有高性能、高可靠、高可扩展性和分布式特性。本篇文章将从多个方面对RocketMQ的源码进行解析,帮助更好的理解RocketMQ的内部实现。本篇文章主要分为以下几个部分:

  • Broker启动流程
  • 消息发送流程
  • 消息存储流程
  • 消息消费流程
  • 高可用机制

二、Broker启动流程

Broker是RocketMQ的核心组件,负责储存和转发消息。下面是Broker启动流程的简要过程:

  1. Broker启动类是BrokerStartup,首先解析配置文件,生成BrokerConfig对象。
  2. BrokerConfig对象主要包括了Broker的ID、名称等基本信息,以及Topic的配置,如消息最大长度、刷盘方式等。
  3. 接着,启动Netty服务监听Producer和Consumer的连接请求,并处理 NameServer 请求。
  4. Broker启动后,先从持久化存储中加载Topic和消息数据到内存,然后启动Consumer拉取线程、commit线程,还有定时调度线程等。
  5. 最后,等待NameServer的心跳请求。

完整的Broker启动流程代码示例如下:

public static void main(String[] args) {
    try {
        // 解析配置文件,生成BrokerConfig对象
        final BrokerConfig brokerConfig = new BrokerConfig();
        // ...
        // 启动Netty服务监听Producer和Consumer的连接请求,并处理 NameServer 请求
        final RemotingServer remotingServer = new NettyRemotingServer(brokerConfig.getNettyServerConfig());
        final BrokerController brokerController = new BrokerController(brokerConfig, remotingServer);
        // 从持久化存储中加载Topic和消息数据到内存
        brokerController.initialize();
        // 启动Consumer拉取线程、commit线程,还有定时调度线程等
        brokerController.start();
        // 等待NameServer的心跳请求
        remotingServer.start();
    } catch (Throwable e) {
        // ...
    }
}

三、消息发送流程

消息发送是RocketMQ的一项核心功能。下面是消息发送的流程简介:

  1. Producer启动后初始化MQClientInstance,然后创建MQProducerInner对象。
  2. 发送消息时,先从本地缓存中获取TopicPublishInfo,如果没有就从NameServer获取。
  3. 选择Topic路由,得到投递的Queue,做负载均衡。
  4. 由MQClientInstance创建一个拉取Broker地址的任务PullNameServerTask。
  5. 高可用机制选择一个Broker发送消息。
  6. 发送消息到Broker。

完整的消息发送流程的代码示例如下:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();

    Message msg = new Message("topic", "tag", "key", "Hello World".getBytes());
    SendResult sendResult = producer.send(msg);
    System.out.println(sendResult);

    producer.shutdown();
}

四、消息存储流程

消息存储是RocketMQ最核心的一环。下面是消息存储的流程简介:

  1. 消息写入前,先获取一个内存映射文件,如果没有就创建。
  2. 将消息写入内存映射文件,写入后根据CommitLog文件刷盘策略决定是否同步刷盘。
  3. 消息写入后,将消息存入Index文件,Index相当于一个消息ID和文件偏移量的映射。
  4. 消息一旦从内存中刷入磁盘,就可以被Replication模块复制到其他机器。

完整的消息存储流程的代码示例如下:

// 代码来自CommitLog.java
public void putMessage(MessageExtBrokerInner msg) {
    // ...
    // 先获取一个内存映射文件,如果没有就创建
    MappedFile mappedFile = this.getMappedFile(matched);
    // 将消息写入内存映射文件
    mappedFile.appendMessage(msg);
    // 写入后根据CommitLog文件刷盘策略决定是否同步刷盘
    this.handleScheduleMessageService(mappedFile.getFileSize(), this.commitDataLeastPages);

    // 将消息存入Index文件
    this.putMessagePositionInfo(msg, mappedFile.getFileFromOffset(), mappedFile.getFileSize());

    // 消息一旦从内存中刷入磁盘,就可以被Replication模块复制到其他机器
    this.dataVersion.incrementAndGet();
}

五、消息消费流程

消息消费是RocketMQ的重要组成部分。下面是消息消费的简要过程:

  1. 消费者启动后会向NameServer请求Topic路由信息,得到所要消费的消息队列。
  2. Consumer从Broker拉取消息,如果有消息就开始消费。
  3. 消费消息前,先从Index文件中获取消息的文件偏移量。
  4. 从CommitLog中读取消息并返回。
  5. 消息消费成功后,提交偏移量。

完整的消息消费流程的代码示例如下:

// 代码来自DefaultMQPushConsumerImpl.java
@Override
public void pullMessage(final PullRequest pullRequest) {
    // 拉取消息
    PullResult pullResult = this.pullMessageService.pullMessage(pullRequest);

    switch (pullResult.getPullStatus()) {
        // 获取到消息
        case FOUND:
            this.processQueue.putMessage(pullResult.getMsgFoundList());
            this.executePullRequestLater(pullRequest.getPullFromThisOffset() + pullResult.getNextBeginOffset());
            break;
        // 等待下一次
        case NO_NEW_MSG:
            this.executePullRequestLater(pullRequest.getPullFromThisOffset());
            break;
        // 没有变化,再次拉取消息
        case NO_MATCHED_MSG:
            this.executePullRequestLater(pullRequest.getPullFromThisOffset());
            break;
        // 拉取被暂停
        case OFFSET_ILLEGAL:
            log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString());
            pullRequest.getProcessQueue().setDropped(true);
            break;
        default:
            break;
    }
}

public void handleMessage(final MessageExt msgs,
                          final ConsumeQueue cq,
                          final ConsumeQueueExt.CqExtUnit cqExt) {
    // 处理消息前,先从Index文件中获取消息的文件偏移量
    final long offsetPy = cq.getOffsetInQueueByTime(msgs.getStoreTimestamp());
    final long offsetDiff = cqExt.getOffsetPy() - offsetPy;

    int tagsCode = 0;
    // tags处理
    if (msgs.getTags() != null && msgs.getTags().length() > 0) {
        tagsCode = MessageExtBrokerInner.tagsString2tagsCode(msgs.getTags());
    }

    this.hook.consumeMessageBefore(msgs, this.context);

    // 从CommitLog中读取消息并返回
    final ConsumeReturnType returnType = this.consumeMessageService.consumeMessage(
        msgs, this.defaultMQPushConsumer.getMessageModel(), this.processQueue, this.consumeOrderly,
        tagsCode, this.properties, this.consumeMessageFilter, this.brokerSuspendMaxTimeMillis,
        this.consumerGroup, this.maxReconsumeTimes, this.context, offsetPy);
    // 消息消费成功后,提交偏移量
    this.processConsumeResult(msgs, returnType, this.context, consumeRequest.getStats());

    this.hook.consumeMessageAfter(msgs, this.context);
}

六、高可用机制

RocketMQ通过复制和双写等高可用机制,实现分布式消息存储的高可靠性。

  1. Master-Slave复制:Master节点负责消息写入和消费,Slaver节点只负责消息复制,并在Master故障时接管Master工作。
  2. 双写机制:消费者消费消息时,会将消息从Master和Slave双写节点同时消费,以保证消息不会丢失。

七、总结

本篇文章详细介绍了RocketMQ的源码解析,从Broker启动流程、消息发送流程、消息存储流程、消息消费流程、高可用机制等多个方面进行了详细的阐述,对于深入理解RocketMQ的内部实现有一定的帮助。