您的位置:

RocketMQ事务消息原理

一、RocketMQ概述

RocketMQ是一个高可用、高吞吐量、高性能的分布式消息队列系统,消息队列的分布式部署使得其可以满足一些异步处理的需求,如大数据量的日志分析,海量的数据传输等。

RocketMQ的特点:

  1. 高可用性:支持主从切换,防止单点故障
  2. 高吞吐量:通过横向扩展,保证业务高速运转
  3. 高性能:通过搭建物理网络,保证了传输速度
  4. 一致性:支持多种消息类型,保证消息的不同阶段能够保持一致

二、RocketMQ的事务消息原理

RocketMQ的事务消息是保证消息可靠传输的一种机制。当我们需要一个消息在多个阶段的整个流程中,保证消息可靠性时,利用RocketMQ的事务消息机制会是一种不错的选择。

在RocketMQ发送一个事务消息时,会将消息状态保存在Half Message中。在Half Message发送给消息消费端时,消费端会进行确认,确认之后,消息会从Half Message中删除,同时添加进消息存储中。如果消费端没有确认,那么消息的状态将一直处于Half Message状态,不会被其他消费端接收。

与普通消息不同,事务消息还需要实现两个关键接口:事务半消息发送和事务半消息确认。

三、RocketMQ事务消息的实现

1. 事务半消息发送

    public abstract class TransactionMQProducer extends DefaultMQProducer {
    
        public TransactionSendResult sendMessageInTransaction(Message message, LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
            
            // 发送半消息
            SendResult sendResult = this.send(message, new LocalTransactionMessageChecker(localTransactionExecuter), arg);
            
            // 返回结果
            return new TransactionSendResult(sendResult.getSendStatus(), sendResult.getMessageQueue(), sendResult.getMsgId(), sendResult.getMessage());
        }
    }

以上是一个事务半消息发送的实现,借助RocketMQ的事务消息接口,我们可以通过LocalTransactionMessageChecker来进行事务消息的半消息发送。半消息发送完成之后,我们需要等待消息消费端确认。事务消息在发送时,会带上业务系统自定义的参数 arg,这个参数用于在 commit 或 rollback 后,让业务系统通知 RocketMQ 相应的提交或回滚操作。

2.事务半消息确认

    public interface TransactionListener {
        LocalTransactionState executeLocalTransaction(Message msg, Object arg);
        LocalTransactionState checkLocalTransaction(MessageExt msg);
    }
    
    public abstract class TransactionMQListener implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            MessageExt msg = msgs.get(0);
            try {
                TransactionListener transactionListener = getTransactionListener();
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
                LocalTransactionState localTransactionState = transactionListener.executeLocalTransaction(msg, null);
                if (LocalTransactionState.COMMIT_MESSAGE.equals(localTransactionState)) {
                    // 如果是提交状态,则调用 commitTransaction 方法
                    commitTransaction(msg);
                } else if (LocalTransactionState.ROLLBACK_MESSAGE.equals(localTransactionState)) {
                    // 如果是回滚状态,则调用 rollbackTransaction 方法
                    rollbackTransaction(msg);
                } else if (LocalTransactionState.UNKNOW.equals(localTransactionState)) {
                    log.warn("unknown local transaction status, message:{}", msg);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            } catch (Throwable e) {: 
                log.warn("executeLocalTransaction Exception", e);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

  

以上是一个事务半消息确认的实现。事务消息被认为是提交的消息,需要调用LocalTransactionListener的executeLocalTransaction方法进行事务确认。如果事务消息未被确认,则需要调用 rollbackTransaction方法进行事务回滚。

四、RocketMQ事务消息的应用场景

步骤:

  1. 生产者发送prepare消息到RocketMQ,RocketMQ会将消息状态保存在Half Message中,返回Producer本地事务状态
  2. 生产者执行本地Transaction,也就是开始执行正式的业务操作,比如向数据库中插入数据
  3. 如果本地Transaction执行成功,则向RocketMQ发送COMMIT消息,这里需要注意,在返回COMMIT前,RocketMQ不会将消息提交到消费端
  4. 如果本地Transaction出现异常,则向RocketMQ发送ROLLBACK消息,这里需要注意,在返回ROLLBACK前,RocketMQ不会将消息提交到消费端
  5. 消费者正常消费消息,完成消息消费

事务型消息广泛应用于分布式事务场景中,可以解决原先分布式系统中不可避免的一系列事务问题。由于事务型消息具有较高的可靠性和数据一致性,因此在一些对数据准确性要求高的应用场景中得到了广泛的应用。

五、总结

RocketMQ的事务消息机制在分布式事务的场景下能够发挥出它的优越性。它通过事务半消息发送和事务半消息确认两个关键接口实现消息的事务性处理,保证了消息的可靠性和数据一致性,使其在一些对数据准确性要求较高的应用场景中得到了广泛的应用。