RabbitMQ延迟队列详解

发布时间:2023-05-24

一、RabbitMQ的延迟队列概述

RabbitMQ是一个开源的消息队列中间件,被广泛应用于长连接数据推送、数据异步处理、系统解耦等场景中。它使用 Erlang 语言编写,具有快速、高可靠性、可扩展性强、灵活性等特点。而RabbitMQ的延迟队列,是指在特定的时间后将消息投递到队列中,而非立即投递,解决了一些实时性要求不高的业务场景中出现的问题。下面将会从多个方面详细阐述RabbitMQ延迟队列的相关知识。

二、RabbitMQ延迟队列的特点

  1. 可以满足某些任务需要延迟处理的需求;
  2. 延迟队列会在一定时间内将消息重新投递到队列中,可更好的保证消息被消费;
  3. 可以避免消息长时间占用队列资源的情况,提高队列的吞吐量。

三、RabbitMQ延迟队列的具体应用场景

  1. 定时任务处理:如对于需要在固定时间点执行的任务,可以在指定时间将任务放入延迟队列中进行处理;
  2. 消息分发延迟处理:如对于消息的处理需要在特定时间后才能进行,可以将消息放入延迟队列中;
  3. 退款业务处理:如在业务退款中,需要等待一段时间才能确认退款是否成功,这时候可以将订单放入延迟队列中进行处理;
  4. 预留资源处理:如对于服务端处理资源需要有所保障,可以使用延迟队列在拒绝服务时进行控制;
  5. 抢购场景处理:如对于抢购场景下的订单处理,需要在一定时间后对于没有付款的订单进行自动取消,这时候可以使用延迟队列。

四、RabbitMQ延迟队列的创建和实现

对于创建RabbitMQ延迟队列,我们可以通过RabbitMQ再三延迟插件来实现,具体步骤如下:

1.下载插件
  git clone https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.git
2.编译插件
  make
3.将插件复制到RabbitMQ插件目录
  cp ./dist/$(basename -s .ez ./dist/*.ez) /usr/lib/rabbitmq/lib/rabbitmq_server-<server_version>/plugins/
4.启用插件
  rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  systemctl enable rabbitmq-server.service
5.创建延迟队列
  a)创建一个exchange
      #Name: delay_exchange
      #Type: x-delayed-message
      #Argument: {'x-delayed-type', 'direct'}
  b)创建一个delay_queue队列

五、RabbitMQ延迟队列的代码实现

以下是一份基于Java语言实现的RabbitMQ延迟队列示例代码:

public class DelayedSender {
  private static final String EXCHANGE_NAME = "delay_exchange";
  private static final String QUEUE_NAME = "delay_queue";
  private static final String ROUTING_KEY = "delay_routing";
  public static void main(String[] args) {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
           Channel channel = connection.createChannel()) {
          //创建延迟Exchange
          Map<String, Object> arguments = new HashMap<>(2);
          arguments.put("x-delayed-type", "direct");
          channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
          //创建延迟队列
          Map<String, Object> queueArgs = new HashMap<>(2);
          queueArgs.put("x-dead-letter-exchange", EXCHANGE_NAME);
          queueArgs.put("x-dead-letter-routing-key", ROUTING_KEY);
          channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
          // 将队列绑定到延迟Exchange,指定routingKey
          channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
          //发送延迟信息
          Date sendTime = new Date(System.currentTimeMillis() + 10000);
          AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
          builder.expiration(String.valueOf(sendTime.getTime() - System.currentTimeMillis()));
          AMQP.BasicProperties properties = builder.build();
          channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, "这是一条延迟的消息".getBytes());
          System.out.println("消息已发送, 时间:" + sendTime);
      } catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
  }
}

六、RabbitMQ延迟队列的注意事项

在使用RabbitMQ延迟队列时需要注意以下几点:

  1. 对于未处理的消息,必要时需要进行重新投递;
  2. 延迟队列尽量不要过期,否则会造成不必要的消息;
  3. 不同的应用场景下,需要结合具体业务场景来设定合理的延迟时间。