一、什么是Spring Cloud RocketMQ
Spring Cloud RocketMQ是Spring Cloud家族中的一个微服务框架,它集成了RocketMQ、Spring Cloud Stream以及Spring Boot等技术,为开发人员提供了一个完整的、基于消息驱动的分布式系统解决方案。 Spring Cloud RocketMQ支持消息的生产与消费,通过消息中间件RocketMQ实现跨服务的消息传递和通信。使用Spring Cloud Stream统一封装了与消息中间件的交互细节,使得开发者聚焦于业务逻辑的实现上。而Spring Boot的集成又为开发者带来了便捷、灵活和高效的开发体验。
二、Spring Cloud RocketMQ基本应用
1、引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
<version>3.0.3</version>
</dependency>
需要注意的是,Spring Cloud Stream Binder RocketMQ的版本与RocketMQ的版本是相关联的。因此,在使用之前需要先确认版本的兼容性。
2、定义生产者
@Component
@EnableBinding(Source.class)
public class RocketMQProducer {
@Autowired
private Source source;
public void send(String messageContent) {
Message message = MessageBuilder.withPayload(messageContent.getBytes()).build();
source.output().send(message);
}
}
生产者的实现很简单,只需要通过@EnableBinding
注解绑定Source,并使用MessageBuilder
创建消息并发送即可。
3、定义消费者
@Component
@EnableBinding(Sink.class)
public class RocketMQConsumer {
@StreamListener(Sink.INPUT)
public void receive(Message<?> message) {
System.out.println("Received Message: " + new String((byte[]) message.getPayload()));
}
}
消费者同样很简单,只需要使用@EnableBinding
注解绑定Sink,并使用@StreamListener
注解接收消息即可。
三、Spring Cloud RocketMQ进阶应用
1、消息确认机制
在实际应用中,消息往往一次性发送给多个消费者,出现消息处理失败等情况时需要有一种确认机制保证消息不会丢失。Spring Cloud RocketMQ提供了两种不同的确认机制,即自动确认机制和手动确认机制。
@Configuration
@EnableBinding(MyCustomChannel.class)
public class RocketMQConfirmConfiguration {
@Autowired
private MyCustomChannel myCustomChannel;
@Bean
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer defaultMQProducer) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(defaultMQProducer);
rocketMQTemplate.setSendMsgTimeout(3000);
rocketMQTemplate.setCheckThreadPoolExecutor(Executors.newFixedThreadPool(50));
return rocketMQTemplate;
}
@Bean
public DefaultMQProducer defaultMQProducer() throws Exception {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setProducerGroup("default_group");
defaultMQProducer.setNamesrvAddr("localhost:9876");
defaultMQProducer.setInstanceName("default");
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(1);
defaultMQProducer.start();
return defaultMQProducer;
}
@Bean
public RocketMQTransactionListener rocketMQTransactionListener() {
return new RocketMQTransactionListenerImpl();
}
@Bean
public TransactionMQProducer transactionMQProducer(DefaultMQProducer defaultMQProducer,
RocketMQTransactionListener rocketMQTransactionListener) throws Exception {
TransactionMQProducer transactionMQProducer = new TransactionMQProducer("trans_group");
transactionMQProducer.setNamesrvAddr("localhost:9876");
transactionMQProducer.setProducerGroup("trans_group");
transactionMQProducer.setTransactionListener(rocketMQTransactionListener);
transactionMQProducer.setExecutorService(new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}));
transactionMQProducer.start();
return transactionMQProducer;
}
@Bean
public RocketMQLocalTransactionCheckerImpl rocketMQLocalTransactionChecker() {
return new RocketMQLocalTransactionCheckerImpl();
}
@Bean
public RocketMQLocalTransactionStateConverterImpl rocketMQLocalTransactionStateConverter() {
return new RocketMQLocalTransactionStateConverterImpl();
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate() {
return new JmsMessagingTemplate(myCustomChannel.output());
}
}
上面是确认机制的配置代码,我们需要在配置类中定义Producer,根据需求选择自动确认或手动确认机制,设置相关属性并启动Producer。具体代码实现过程如下:
自动确认机制
rocketMQTemplate.convertAndSend("myTopic", "Hello, World!");
使用RocketMQTemplate
的convertAndSend()
方法即可发送消息,默认使用自动确认机制。
手动确认机制
rocketMQTemplate.sendMessageInTransaction("trans_topic", message, null);
使用RocketMQTemplate
的sendMessageInTransaction()
方法即可启用手动确认机制。
2、消息分组机制
消息分组功能允许将一组Topic归类为一个Group,方便在服务端进行管理,同时也方便客户端使用,并且还能提高消息发送和接收的效率。具体实现方式如下:
@Configuration
public class RocketMQConfig {
@Bean(name = "rocketMQMessageSender")
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer("TestGroup");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
配置文件中指定名称为testGroup
的DefaultMQProducer
。
3、消息过滤机制
Spring Cloud RocketMQ支持使用SQL92定义消息过滤规则,让客户端可以按照指定的规则筛选出满足条件的消息。具体实现方式如下:
3.1、消息消费者
@Component
@EnableBinding(value = { Sink.class })
public class RocketMQConsumerFilter {
@Autowired
private Sink sink;
@StreamListener(target = Sink.INPUT, condition = "headers['tag'] == 'tagA'")
public void handleA(String message) {
System.out.println("received message A: " + message);
}
@StreamListener(target = Sink.INPUT, condition = "headers['tag'] == 'tagB'")
public void handleB(String message) {
System.out.println("received message B: " + message);
}
}
这里定义了两个消费者,每个消费者根据tag值进行过滤。当tag值为tagA
时,调用handleA()
方法,当tag值为tagB
时,调用handleB()
方法。
3.2、消息生产者
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTagMessage(String message, String tag) {
Message<String> msg = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, tag).build();
SendResult sendResult = rocketMQTemplate.syncSend("test-tag-topic", msg);
System.out.printf("Send tag message: %s; send result: %s \n", message, sendResult);
}
这里使用MessageBuilder
设置消息的tag值,tags
属性可以用于过滤和重复消费。
四、总结
本文详细阐述了Spring Cloud RocketMQ的基本应用和进阶应用,包括消息确认机制、消息分组机制和消息过滤机制。Spring Cloud RocketMQ集成了RocketMQ、Spring Cloud Stream以及Spring Boot等技术,为开发者提供了一个完整的、基于消息驱动的分布式系统解决方案。通过本文的介绍,相信读者已经掌握了Spring Cloud RocketMQ的基础知识和高级特性,能够在实际项目中应用。