一、RocketMQ死信队列概述
RocketMQ是一个高性能、可靠性、可扩展性 broker 致力于处理大量数据,包括流式数据和批量数据,并且能够实现在线扩容。RocketMQ支持死信队列(Dead Letter Queue,DLQ),可以将无法被使用者消费的消息转发到指定的 topic 或者 queue 中去,死信队列就是为了解决这些问题而生的。
对于一些无法被用户正常消费的消息,RocketMQ会根据一些设定规则,最终将这些问题消息发送到RocketMQ死信队列中,从而引导一系列的流程操作。
二、RocketMQ死信队列要自己配
RocketMQ死信队列需要手动配制,这个最好在消费者端配制,如果在消费者配置消费者类为顺序消息消费者,需要在组消费者中增加一个成员来消费死信消息。以下是使用Java SDK 实现创建消费者及相关配制的示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic_name", "*");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setInstanceName("consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeTimeout(60);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
// business logic
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
// 死信队列配制
String groupName = "your_group_name";
String topic = "your_topic_name";
String origTopic = "your_topic_name";
String subExpression = "your_sub_expression";
int maxRetryTimes = 5;
long retryInterval = 60L;
String dlqTopic = MixAll.getRetryTopic(groupName);
String dlqRealTopic = String.format("%s%s", dlqTopic, System.currentTimeMillis());
String dlqSubExpression = String.format("%s&&%s", subExpression, MessageConst.PROPERTY_RETRY_TOPIC + "==" + dlqTopic);
CreateTopicKey topicKey = new CreateTopicKey(dlqTopic, org.apache.rocketmq.common.protocol.RequestCode.UPDATE_AND_CREATE_TOPIC);
TopicConfig topicConfig = new TopicConfig(dlqTopic);
groupConfig.getTopicConfigTable().put(dlqTopic, topicConfig);
table.put(topicKey, topicConfig.buildTopicSetting());
producer.send(new Message(origTopic, dlqSubExpression, "", new byte[] {1, 2, 3}), new SendCallback() {
public void onSuccess(SendResult sendResult) {
// do something
}
public void onException(Throwable e) {
// do something
}
});
三、RocketMQ死信队列保证消息被消费
RocketMQ生产者将消息发送到 broker 后,broker会将消息持久化到磁盘,保证消息不会丢失。RocketMQ消费者一旦收到消息后,就会自动提交 offset,表明将此条消息消费完整,所以在 broker处于重启等一些情况下,已经消费成功但未来得及提交 offset 的消息会再次被 broker 发送给消费者消费。
RocketMQ的重复删除机制有两种,第一种是使用默认的消费重试次数机制,重试这个机制在一定时间内会不重不漏地把消息发送给消费者,保障消息不丢失。如果运行消费者的服务器出现宕机,这个时间间隔可能会被限制。第二种是使用死信队列。
四、RocketMQ死信队列消费
RocketMQ的死信队列机制为保障消息不丢失提供了一种方式,分别从定义、实现、应用场景几个方面介绍死信队列的消费机制。
定义:
public enum ConsumeStatus {
SUCCESS,
FAIL,
EXCEPTION,
;
}
实现:
public interface MessageListener {
ConsumeStatus consumeMessage(MessageExt messageExt);
}
应用场景:
比如一个新用户提交了订单,但是由于某些原因,该订单的数据中缺少了一些必须的字段。这部分订单消息就可以通过 RocketMQ发往死信队列,等待相关的人员来处理。又比如当 RocketMQ 消费端出现异常,RocketMQ的死信队列可以帮助消费者重新消费这些过期或异常的消息,确保消息不丢失。
五、RocketMQ死信队列处理
RocketMQ消费者接收到消息后,可以判断一下消费是否成功。如果消费失败,则可以根据消息中的关联信息尝试进行重试、或者将消息发送到死信队列。
业务处理时如果出现异常,可以在消费端将消息发送到RocketMQ死信队列中,业务处理时,调用方法:reconsumeLater();
后的传入为设置该消息再次消费的时间,此时消息会被放入到 RocketMQ 的 “重试队列”,若多次发生了消费失败,则最终会被发送到 DLQ 中。
以下是使用Java SDK在消费者端将消息发送到RocketMQ死信队列的示例:
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
// 处理业务逻辑
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 如果消费失败,将该消息告知 broker,等待 broker 根据系统设置进行后续操作,如重新消费或进入死信队列
}
六、RocketMQ死信队列的使用场景
RocketMQ死信队列的具体使用场景如下:
1.消息延迟消费
我们可以将一些带有延迟消费的消息通过 RocketMQ定时储存在 broker 中,等到指定时间后再由 RocketMQ 发送给消费方进行消费。如果在指定时间内消费成功,则 broker 会将消息标记为已消费。否则,就会自动发送到死信队列。
2.异常消息重新消费
考虑到消费者在消息处理过程中,可能会出现脏数据、数据异常或者处理失败的情况,在消息重试了几次之后,依然不能被正常地消费,那么就可以将这些消费失败、过期数据等通过设置重试机制和死信队列机制来优雅地处理。RocketMQ死信队列就是可以让这些消费无法完成的消息正常被消费的解决方案。
3.流量负载与限流
可以通过死信队列来实现简单的负载均衡,将某些 topic 的消息发送到这个队列,这样的话就可以避免在消息消费处理失败过程中对 consumer 造成过于严重的负载压力,从而降低服务的风险,提高整个架构的稳定性。
七、RabbitMQ死信队列使用场景
RabbitMQ同样支持死信队列,但它与RocketMQ实现方式并不相同。RabbitMQ的 DLQ 需要为每个需要死信队列的 queue 配置一个单独的 exchange,而配置与代码示例如下:
//在生产端声明死信exchange
channel.exchangeDeclare("dlx.exchange", "topic", true);
//声明队列,并设置队列的 x-dead-letter-exchange 和 x-dead-letter-routing-key
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
arguments.put("x-dead-letter-routing-key", "dlqKey");
channel.queueDeclare("test.queue", true, false, false, arguments);
以上示例中death_exchange就是死信exchange,必须先绑定一个死信的exchange,然后在设置原队列的死信队列绑定到它上面,这样就完成了死信的设置。
八、RocketMQ重试队列
生产者在向 RocketMQ发送消息时,如果由于一些异常导致发送失败的话,RocketMQ 会自动尝试重试,RocketMQ 会将消息发送到“重试主题”(Retry Topic)中。重试主题是与原主题(Original Topic)对应的,在RocketMQ中指定了一次重试的时间一般在 15s 和 300s 之间。
RocketMQ的重试次数是可以自己控制的,可以根据实际情况自己设置重试次数。
九、RocketMQ消息队列
RocketMQ是一个分布式、多线程、高吞吐量的消息队列系统,拥有极高的性能以及鲁棒性,因此可以适用于各种消息队列相关的场景。消息队列可以帮助我们快速的支持异步消息通知、削峰填谷、订单处理、用户行为记录、数据同步、大数据处理等复杂的业务场景。
消息队列的用途主要有两个方向:
1. 在进行业务逻辑的设计时,可以采用消息队列将业务逻辑进行拆解和解耦,实现高内聚、低耦合的设计理念。这样可以避免代码量的过多臃肿,使逻辑处理可读性更好。我们将业务逻辑拆解为一个个简单的处理单元,每个处理单元之间通过消息队列进行沟通交流,实现松耦合、高内聚的性质。
2. 在互联网大数据时代,消息队列可以进行商业化的运用,将消息队列用于推送广告、推送信息等,实现信息流向管控和推送。如果使用 RocketMQ 等高性能、可扩展性的系统,则可以更好的解决各种问题。
十、RocketMQ查看消息队列
对于消息队列我们也可以设置监控,这样我们就可以知道消息队列在生产和消费过程中的情况,并且可以对问题进行快速的排查。
RocketMQ提供了许多接口来监控各种运维指标,通过这些接口我们可以快速了解集群状态,同时还可以进行横向扩展和纵向扩展,来满足我们不断变换的业务需求。
我们可以通过 RocketMQ 官网提供的 Console 控制台工具实现消息队列的查看、监控等等功能。
参考文献
- 阿里巴巴 IT 技术圈。RocketMQ的死信队列深入浅出。https://yq.aliyun.com/articles/718931
- 阿里巴巴 IT 技术圈。RocketMQ主题队列深入浅出。https://yq.aliyun.com/articles/718892