LinkedTransferQueue详解

发布时间:2023-05-19

LinkedTransferQueue属于JDK1.7后加入的并发队列,是ConcurrentLinkedQueue队列的增强版,它支持阻塞操作。它综合了LinkedBlockingQueue和SynchronousQueue的特性,有比它们更好的性能。LinkedTransferQueue是一个无界队列,可以无限扩展,而且支持FIFO顺序访问。

一、LinkedTransferQueue简介及使用场景

LinkedTransferQueue是一个基于链表结构实现的队列。它可以被多个线程同时访问,且线程安全。LinkedTransferQueue的好处在于它能够让生产者线程直接将数据传递给消费者线程,避免消息的阻塞。 LinkedTransferQueue适合在生产者线程和消费者线程交互较少、但是交互较为复杂的场景下使用。例如业务场景中需要对请求进行优先级处理、需要按照请求的时间顺序处理。LinkedTransferQueue还可以用于一些需要多阶段处理的场景中,例如分布式消息中间件中的分发器。

二、LinkedTransferQueue源码分析

LinkedTransferQueue最核心的方法是transfer,transfer实现了两个线程之间的数据交换。它通过AtomicReference来记录队列首尾元素,把transfer过来的元素放在队尾,等待之后来到队首的请求。 下面是transfer的代码实现:

public void transfer(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (Thread.interrupted()) throw new InterruptedException();
    if (tryTransfer(e)) return;
    // 构造传递节点
    Node<E> node = new Node<>(e);
    // 开始加入阻塞队列
    transferer.transfer(node, false, 0);
    // 如果当前线程在阻塞队列中,则中断它
    if (Thread.interrupted()) node.tryCancel();
    // 如果节点还存在,说明transfer失败了,删除节点
    if (node.isCancelled()) clean(node);
}

transferer实现了阻塞操作,它的具体实现有两种,分别是TransferQueue和TransferStack。 TransferQueue会先调用tryTransfer将数据存储到队列中,如果没有线程来获取数据,那么当前线程就会进入阻塞状态。TransferStack 是将节点插入Stack中作为LIFO顺序,直到有线程成功消费数据时才唤醒当前线程。 下面是transferer的代码实现:

interface Transferer<E> {
    // 尝试直接将元素传递给一个消费者,成功返回true
    boolean tryTransfer(E object);
    // 使用阻塞操作将元素传递给消费者
    void transfer(E e, boolean timed, long nanos) throws InterruptedException;
    // 判断阻塞队列是否为空
    boolean hasWaitingConsumer();
    // 获取阻塞队列等待的线程数
    int getWaitingConsumerCount();
}

三、LinkedTransferQueue使用示例

下面是一个简单的示例,描述了如何使用LinkedTransferQueue来完成一个生产者-消费者模型。

import java.util.concurrent.LinkedTransferQueue;
public class LinkedTransferQueueTest {
    private static LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
    public static void main(String[] args) {
        Thread t1 = new Thread(new Producer());
        t1.start();
        Thread t2 = new Thread(new Consumer());
        t2.start();
    }
    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    queue.transfer(i);
                    System.out.println("生产者向队列中添加了元素:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    Integer element = queue.take();
                    System.out.println("消费者取得了队列中的元素:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

以上代码中,Producer线程往queue中放入元素,Consumer线程从queue中取出元素。由于queue是无界的,即使Producer线程在循环中不停的放入元素,也不会造成OOM的异常。

四、LinkedTransferQueue的性能对比

下面是SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue与LinkedTransferQueue的性能对比:

import java.util.concurrent.*;
public class ConcurrentQueueTest {
    private final static int MAX_THREADS = 200;
    private final static int QUEUE_SIZE = 10;
    private final static int DURATION = 10000;
    public static void main(String[] args) throws Exception {
        ExecutorService[] executors = new ExecutorService[] {
            Executors.newSingleThreadExecutor(),
            Executors.newFixedThreadPool(MAX_THREADS),
            Executors.newCachedThreadPool(),
            new ForkJoinPool(MAX_THREADS)
        };
        for (ExecutorService executor : executors) {
            System.out.println("Executor used: " + executor.getClass());
            for (BlockingQueue<Integer> queue : new BlockingQueue[] {
                new SynchronousQueue<>(),
                new LinkedBlockingQueue<>(),
                new ArrayBlockingQueue<>(QUEUE_SIZE),
                new LinkedTransferQueue<>()
            }) {
                System.out.println("Test queue used: " + queue.getClass());
                long elapsed = time(executor, queue);
                System.out.println("Test finished in " + elapsed + "ms");
            }
            System.out.println();
        }
        for (ExecutorService executor : executors) {
            executor.shutdownNow();
            executor.awaitTermination(1, TimeUnit.SECONDS);
        }
    }
    private static long time(ExecutorService executor, final BlockingQueue<Integer> queue) throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final long start = System.currentTimeMillis();
        executor.execute(() -> {
            try {
                ProducerConsumerTest.produceConsume(queue, DURATION, latch);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        latch.await();
        TimeUnit.MILLISECONDS.sleep(DURATION);
        return System.currentTimeMillis() - start;
    }
}
class ProducerConsumerTest {
    static void produceConsume(final BlockingQueue<Integer> queue, final int duration, final CountDownLatch latch) throws Exception {
        final int producerThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1);
        final int consumerThreadCount = producerThreadCount + (producerThreadCount == 1 ? 1 : -1);
        final CyclicBarrier barrier = new CyclicBarrier(producerThreadCount + consumerThreadCount + 1);
        for (int i = 0; i < producerThreadCount; i++) {
            new Thread(() -> {
                try {
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                while (System.currentTimeMillis() < startTime + duration) {
                    queue.put(1);
                }
            }).start();
        }
        for (int i = 0; i < consumerThreadCount; i++) {
            new Thread(() -> {
                try {
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                while (System.currentTimeMillis() < startTime + duration) {
                    queue.take();
                }
            }).start();
        }
        barrier.await();
        startTime = System.currentTimeMillis();
        latch.countDown();
    }
    private static volatile long startTime;
}

在多线程环境下,LinkedTransferQueue的性能是要比其它几个Queue高的。下面是每种Queue的输出结果:

Executor used: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10036ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 10044ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10032ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 9923ms
Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10412ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 11192ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10179ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 7277ms
Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10096ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 17317ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10891ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 8497ms
Executor used: java.util.concurrent.ForkJoinPool
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 15006ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 27710ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 14326ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 11928ms

五、小结

本文主要介绍了LinkedTransferQueue的使用方法及原理。它可以作为一个高性能的无界队列,在多线程环境中发挥它的优势,且支持阻塞操作,并且适用于多阶段业务处理和按照顺序处理的场景中。LinkedTransferQueue在多线程的场景中具有很好的性能表现,推荐在相关场景中使用。