您的位置:

RocketMQTemplate.convertAndSend详解

RocketMQTemplate.convertAndSend是Spring RocketMQ的消息发送模板。它提供了多种消息发送的方法,支持发送同步消息、异步消息、单向消息和顺序消息等。RocketMQTemplate.convertAndSend方法具有极高的灵活性,可以让开发者自主选择序列化方式、topic、tag等相关参数。下面将从多个方面详细介绍该方法。

一、方法参数与返回值

RocketMQTemplate.convertAndSend方法是Spring RocketMQ的消息发送模板,它的参数中包含以下几个关键参数:
  • topic:消息主题
  • message:消息内容,可以为任何Java对象
  • MessagePostProcessor:消息处理器,用于处理消息的元数据如消息头、消息tag等
发送消息成功后,该方法的返回值是一个org.springframework.messaging.Message对象。可以通过该对象获取发送的消息的元数据,如消息ID、消息发送时间等信息。

二、消息内容序列化

RocketMQTemplate.convertAndSend方法会将Java对象序列化成字节数组,以便RocketMQ可以进行传输和持久化。Spring RocketMQ支持多种消息序列化方式,包括JDK默认序列化、JSON序列化、Protobuf序列化等。默认情况下,Spring RocketMQ使用JDK默认序列化方式。 如果想要更换序列化方式,可以使用org.springframework.messaging.converter.MessageConverter接口进行定制。例如,使用JSON序列化方式:
@Configuration
public class RocketMQConfig {

    @Bean
    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper objectMapper) {
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setObjectMapper(objectMapper);

        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        rocketMQTemplate.setMessageConverter(messageConverter);
        rocketMQTemplate.setProducer(mqProducer);

        return rocketMQTemplate;
    }
}

三、消息处理器MessagePostProcessor

RocketMQTemplate.convertAndSend方法之所以设计MessagePostProcessor参数,是为了让开发者能够对消息的元数据进行处理。例如,可以将消息中的tag设置为业务相关的信息,方便后续处理、查询等操作。 以下是一个MessagePostProcessor的示例代码:
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws JMSException {
        message.getMessageProperties().setUserProperty("tag", "important");
        return message;
    }
};

rocketMQTemplate.convertAndSend("myTopic", "hello world", messagePostProcessor);

四、同步、异步和单向消息发送

RocketMQTemplate.convertAndSend方法支持同步、异步和单向消息发送。其中,同步消息会等待MQ返回响应结果,返回消息的Message对象,是可以从MQ Broker中正常获得的完整信息。异步消息不会等待MQ返回,通过传入一个SendCallback回调函数,仅在发送完成或发送失败时被回调。单向消息则是最简单、也是性能最好的一种消息发送模式,方法只负责把消息发送给MQ Broker,不关心消息是否发送成功或失败。下面是这三种发送方式的示例代码:
// 同步消息发送
Message syncMessage = rocketMQTemplate.syncSend("myTopic", "hello world");
System.out.println(syncMessage.getPayload());

// 异步消息发送
rocketMQTemplate.asyncSend("myTopic", "hello world", new SendCallback() {
    @Override
    public void onSuccess(SendResult
    sendResult) {
        System.out.println("send success: " + sendResult);
    }

    @Override
    public void onException(Throwable e) {
        System.out.println("send fail" + e.getMessage());
    }
});

// 单向消息发送
rocketMQTemplate.sendOneWay("myTopic", "hello world");

   
  

五、顺序消息发送

对于需要保证消息顺序的业务场景,RocketMQ支持顺序消息的发送。Spring RocketMQ同样提供了顺序消息发送的支持。下面是一个发送顺序消息的示例:
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
    @Override
    public MessageQueue select(List list, Message message, Object o) {
        String orderId = (String) o;
        int index = Math.abs(orderId.hashCode()) % list.size();
        return list.get(index);
    }
});

rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("hello world").build(), "order-001");
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("hello MQ").build(), "order-002");
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("rocketmq").build(), "order-003");


  
以上就是对RocketMQTemplate.convertAndSend方法的详细介绍。通过该方法,开发者可以轻松完成RocketMQ的消息发送,同时也可以根据业务需求选择相关的参数和发送方式,提高整个应用的效率和可维护性。