您的位置:

Java DelayQueue:实现延迟任务的线程安全队列

一、DelayQueue的概述

Java的DelayQueue 是一个阻塞队列队列,主要用来实现对延迟任务的调度,也就是在指定的时间之后才能够取出任务来执行。该队列中保存的元素都必须实现了Delayed接口。

在实际应用中,DelayQueue被广泛地使用于缓存系统中,例如缓存的对象需要在过期时间到达后才能进行删除。通过使用DelayQueue队列,不必在添加缓存对象时设置过期时间,只需要在缓存对象失效后将其放入DelayQueue中,等待其到期后再执行对其的删除操作。

二、DelayQueue的基本使用

下面是一个基本使用的例子:

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {

    static class DelayedElement implements Delayed {
        private long delayTime;     // 存放延迟时间
        private String element;    // 存放实际的任务元素

        DelayedElement(long delay, String element) {
            this.delayTime = System.currentTimeMillis() + delay;
            this.element = element;
        }

        // 获取剩余时间
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 按剩余时间进行排序
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }

        // 执行任务
        public void exec() {
            System.out.println("执行任务:" + element);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue queue = new DelayQueue<>();

        queue.add(new DelayedElement(2000, "任务1"));
        queue.add(new DelayedElement(5000, "任务2"));
        queue.add(new DelayedElement(3000, "任务3"));
        queue.add(new DelayedElement(4000, "任务4"));

        while (!queue.isEmpty()) {
            DelayedElement element = queue.take();
            element.exec();
        }
    }
}

  

在上述代码中,我们创建了一个DelayQueue队列,并向其中添加了四个基于Delayed接口的对象,设置它们的延迟时间分别为2秒、5秒、3秒和4秒。

接下来,我们使用while循环,不断从队列中取出任务,若队列为空,则一直休眠直到队列中有元素出现。当任务取出后,调用exec()方法执行任务。

三、DelayQueue的特性说明

1. 延迟任务排序

对于DelayQueue中保存的元素,队列会按照剩余延迟时间进行排序,以保证最先到期的任务最先被执行。当我们使用poll()、take()等方法从队列中获取元素时,都会取出剩余时间最小的元素,如果延迟时间相同,则取出放入到队列中时间最长的元素。

2. 线程安全性

DelayQueue为线程安全的队列,可以被多个线程安全地操作。我们可以通过add()、offer()、put()、take()等方法对DelayQueue进行操作,这些方法都被设计为同步方法,具有互斥的特点。

3. DelayQueue其他方法

除了基本使用中的offer()、poll()、peek()、remove()、take()外,DelayQueue还提供了一些其他方法:

  • drainTo():从队列中取出多个元素,并将它们放入给定集合中
  • drainTo():从队列中取出所有元素,并将它们放入给定集合中
  • remainingCapacity():返回队列剩余容量

四、DelayQueue的使用实例:订单超时取消

一个典型的使用场景是:我们在订单系统中需要对订单过期时间进行检查,到了过期时间即可取消订单。我们可以创建如下的订单类:

public class Order implements Delayed {
    private String orderId;
    private long expireTime;

    public Order(String orderId, long expireTime) {
        this.orderId = orderId;
        this.expireTime = expireTime;
    }

    // 计算剩余过期时间
    @Override
    public long getDelay(TimeUnit unit) {
        return expireTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
}

在这个Order类中,我们需要实现Delayed接口,并覆盖getDelay()方法,计算剩余过期时间,以便进行排序。我们可以为它添加如下方法:

public class Order implements Delayed {
    // 省略getDelay方法和compareTo方法
 
    // 取消订单
    public void cancel() {
        System.out.println("订单[" + orderId + "]已自动取消!");
    }
}

接下来我们通过如下的代码创建DelayQueue,向其中添加订单并检测过期时间:

public class OrderCheckTask implements Runnable {
    private DelayQueue queue;

    public OrderCheckTask(DelayQueue
    queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            // 检测订单过期并取消过期订单
            while (true) {
                Order order = queue.take();
                order.cancel();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 订单下单,加入队列
    public boolean order(Order order) {
        return queue.add(order);
    }
}

public class AppMain {
    
    public static void main(String[] args) throws InterruptedException {
        // 创建DelayQueue队列
        DelayQueue
     queue = new DelayQueue<>();
        // 启动订单检测任务
        new Thread(new OrderCheckTask(queue)).start();

        // 添加订单
        queue.add(new Order("A20191000", System.currentTimeMillis() + 5000));
        queue.add(new Order("A20191001", System.currentTimeMillis() + 4000));
        queue.add(new Order("A20191002", System.currentTimeMillis() + 6000));
        queue.add(new Order("A20191003", System.currentTimeMillis() + 3000));

        // 等待5秒后取消订单
        Thread.sleep(5000);
        queue.add(new Order("A20191000", System.currentTimeMillis()));

        // 等待15秒输出队列大小
        Thread.sleep(15000);
        System.out.println("队列大小:" + queue.size());
    }
}

    
   
  

在上述代码中,我们首先创建了DelayQueue队列和一个订单检测任务。在添加订单时,我们指定延迟时间并加入DelayQueue队列中。运行程序后,会等待5秒后,通过订单ID删除订单,等待15秒后输出DelayQueue队列中还剩余多少个元素。

五、DelayQueue的总结

DelayQueue主要是用来实现对延迟任务的调度,按剩余时间排序,线程安全。我们在订单过期检测、缓存删除等场景中都可以使用它。

DelayQueue的实现使用简单,但需要我们实现Delayed接口中的getDelay()方法和compareTo()方法,并在不同的场景下调整它们的排序方式。使用好DelayQueue,将会大大提高程序处理效率。