RocketMQTemplate.convertAndSend是Spring RocketMQ的消息发送模板。它提供了多种消息发送的方法,支持发送同步消息、异步消息、单向消息和顺序消息等。RocketMQTemplate.convertAndSend方法具有极高的灵活性,可以让开发者自主选择序列化方式、topic、tag等相关参数。下面将从多个方面详细介绍该方法。
一、方法参数与返回值
RocketMQTemplate.convertAndSend方法是Spring RocketMQ的消息发送模板,它的参数中包含以下几个关键参数:
- topic:消息主题
- message:消息内容,可以为任何Java对象
- MessagePostProcessor:消息处理器,用于处理消息的元数据如消息头、消息tag等
发送消息成功后,该方法的返回值是一个org.springframework.messaging.Message对象。可以通过该对象获取发送的消息的元数据,如消息ID、消息发送时间等信息。
二、消息内容序列化
RocketMQTemplate.convertAndSend方法会将Java对象序列化成字节数组,以便RocketMQ可以进行传输和持久化。Spring RocketMQ支持多种消息序列化方式,包括JDK默认序列化、JSON序列化、Protobuf序列化等。默认情况下,Spring RocketMQ使用JDK默认序列化方式。 如果想要更换序列化方式,可以使用org.springframework.messaging.converter.MessageConverter接口进行定制。例如,使用JSON序列化方式:
@Configuration
public class RocketMQConfig {
@Bean
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper objectMapper) {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setObjectMapper(objectMapper);
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setMessageConverter(messageConverter);
rocketMQTemplate.setProducer(mqProducer);
return rocketMQTemplate;
}
}
三、消息处理器MessagePostProcessor
RocketMQTemplate.convertAndSend方法之所以设计MessagePostProcessor参数,是为了让开发者能够对消息的元数据进行处理。例如,可以将消息中的tag设置为业务相关的信息,方便后续处理、查询等操作。 以下是一个MessagePostProcessor的示例代码:
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.getMessageProperties().setUserProperty("tag", "important");
return message;
}
};
rocketMQTemplate.convertAndSend("myTopic", "hello world", messagePostProcessor);
四、同步、异步和单向消息发送
RocketMQTemplate.convertAndSend方法支持同步、异步和单向消息发送。其中,同步消息会等待MQ返回响应结果,返回消息的Message对象,是可以从MQ Broker中正常获得的完整信息。异步消息不会等待MQ返回,通过传入一个SendCallback回调函数,仅在发送完成或发送失败时被回调。单向消息则是最简单、也是性能最好的一种消息发送模式,方法只负责把消息发送给MQ Broker,不关心消息是否发送成功或失败。下面是这三种发送方式的示例代码:
// 同步消息发送
Message syncMessage = rocketMQTemplate.syncSend("myTopic", "hello world");
System.out.println(syncMessage.getPayload());
// 异步消息发送
rocketMQTemplate.asyncSend("myTopic", "hello world", new SendCallback() {
@Override
public void onSuccess(SendResult
sendResult) {
System.out.println("send success: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("send fail" + e.getMessage());
}
});
// 单向消息发送
rocketMQTemplate.sendOneWay("myTopic", "hello world");
五、顺序消息发送
对于需要保证消息顺序的业务场景,RocketMQ支持顺序消息的发送。Spring RocketMQ同样提供了顺序消息发送的支持。下面是一个发送顺序消息的示例:
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List list, Message message, Object o) {
String orderId = (String) o;
int index = Math.abs(orderId.hashCode()) % list.size();
return list.get(index);
}
});
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("hello world").build(), "order-001");
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("hello MQ").build(), "order-002");
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("rocketmq").build(), "order-003");
以上就是对RocketMQTemplate.convertAndSend方法的详细介绍。通过该方法,开发者可以轻松完成RocketMQ的消息发送,同时也可以根据业务需求选择相关的参数和发送方式,提高整个应用的效率和可维护性。