一、RabbitMQ的延迟队列概述
RabbitMQ是一个开源的消息队列中间件,被广泛应用于长连接数据推送、数据异步处理、系统解耦等场景中。它使用 Erlang 语言编写,具有快速、高可靠性、可扩展性强、灵活性等特点。而RabbitMQ的延迟队列,是指在特定的时间后将消息投递到队列中,而非立即投递,解决了一些实时性要求不高的业务场景中出现的问题。下面将会从多个方面详细阐述RabbitMQ延迟队列的相关知识。
二、RabbitMQ延迟队列的特点
- 可以满足某些任务需要延迟处理的需求;
- 延迟队列会在一定时间内将消息重新投递到队列中,可更好的保证消息被消费;
- 可以避免消息长时间占用队列资源的情况,提高队列的吞吐量。
三、RabbitMQ延迟队列的具体应用场景
- 定时任务处理:如对于需要在固定时间点执行的任务,可以在指定时间将任务放入延迟队列中进行处理;
- 消息分发延迟处理:如对于消息的处理需要在特定时间后才能进行,可以将消息放入延迟队列中;
- 退款业务处理:如在业务退款中,需要等待一段时间才能确认退款是否成功,这时候可以将订单放入延迟队列中进行处理;
- 预留资源处理:如对于服务端处理资源需要有所保障,可以使用延迟队列在拒绝服务时进行控制;
- 抢购场景处理:如对于抢购场景下的订单处理,需要在一定时间后对于没有付款的订单进行自动取消,这时候可以使用延迟队列。
四、RabbitMQ延迟队列的创建和实现
对于创建RabbitMQ延迟队列,我们可以通过RabbitMQ再三延迟插件来实现,具体步骤如下:
1.下载插件
git clone https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.git
2.编译插件
make
3.将插件复制到RabbitMQ插件目录
cp ./dist/$(basename -s .ez ./dist/*.ez) /usr/lib/rabbitmq/lib/rabbitmq_server-<server_version>/plugins/
4.启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
systemctl enable rabbitmq-server.service
5.创建延迟队列
a)创建一个exchange
#Name: delay_exchange
#Type: x-delayed-message
#Argument: {'x-delayed-type', 'direct'}
b)创建一个delay_queue队列
五、RabbitMQ延迟队列的代码实现
以下是一份基于Java语言实现的RabbitMQ延迟队列示例代码:
public class DelayedSender {
private static final String EXCHANGE_NAME = "delay_exchange";
private static final String QUEUE_NAME = "delay_queue";
private static final String ROUTING_KEY = "delay_routing";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//创建延迟Exchange
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
//创建延迟队列
Map<String, Object> queueArgs = new HashMap<>(2);
queueArgs.put("x-dead-letter-exchange", EXCHANGE_NAME);
queueArgs.put("x-dead-letter-routing-key", ROUTING_KEY);
channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
// 将队列绑定到延迟Exchange,指定routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//发送延迟信息
Date sendTime = new Date(System.currentTimeMillis() + 10000);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration(String.valueOf(sendTime.getTime() - System.currentTimeMillis()));
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, "这是一条延迟的消息".getBytes());
System.out.println("消息已发送, 时间:" + sendTime);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
六、RabbitMQ延迟队列的注意事项
在使用RabbitMQ延迟队列时需要注意以下几点:
- 对于未处理的消息,必要时需要进行重新投递;
- 延迟队列尽量不要过期,否则会造成不必要的消息;
- 不同的应用场景下,需要结合具体业务场景来设定合理的延迟时间。