LinkedTransferQueue属于JDK1.7后加入的并发队列,是ConcurrentLinkedQueue队列的增强版,它支持阻塞操作。它综合了LinkedBlockingQueue和SynchronousQueue的特性,有比它们更好的性能。LinkedTransferQueue是一个无界队列,可以无限扩展,而且支持FIFO顺序访问。
一、LinkedTransferQueue简介及使用场景
LinkedTransferQueue是一个基于链表结构实现的队列。它可以被多个线程同时访问,且线程安全。LinkedTransferQueue的好处在于它能够让生产者线程直接将数据传递给消费者线程,避免消息的阻塞。 LinkedTransferQueue适合在生产者线程和消费者线程交互较少、但是交互较为复杂的场景下使用。例如业务场景中需要对请求进行优先级处理、需要按照请求的时间顺序处理。LinkedTransferQueue还可以用于一些需要多阶段处理的场景中,例如分布式消息中间件中的分发器。
二、LinkedTransferQueue源码分析
LinkedTransferQueue最核心的方法是transfer,transfer实现了两个线程之间的数据交换。它通过AtomicReference来记录队列首尾元素,把transfer过来的元素放在队尾,等待之后来到队首的请求。 下面是transfer的代码实现:
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (Thread.interrupted()) throw new InterruptedException();
if (tryTransfer(e)) return;
// 构造传递节点
Node<E> node = new Node<>(e);
// 开始加入阻塞队列
transferer.transfer(node, false, 0);
// 如果当前线程在阻塞队列中,则中断它
if (Thread.interrupted()) node.tryCancel();
// 如果节点还存在,说明transfer失败了,删除节点
if (node.isCancelled()) clean(node);
}
transferer实现了阻塞操作,它的具体实现有两种,分别是TransferQueue和TransferStack。 TransferQueue会先调用tryTransfer将数据存储到队列中,如果没有线程来获取数据,那么当前线程就会进入阻塞状态。TransferStack 是将节点插入Stack中作为LIFO顺序,直到有线程成功消费数据时才唤醒当前线程。 下面是transferer的代码实现:
interface Transferer<E> {
// 尝试直接将元素传递给一个消费者,成功返回true
boolean tryTransfer(E object);
// 使用阻塞操作将元素传递给消费者
void transfer(E e, boolean timed, long nanos) throws InterruptedException;
// 判断阻塞队列是否为空
boolean hasWaitingConsumer();
// 获取阻塞队列等待的线程数
int getWaitingConsumerCount();
}
三、LinkedTransferQueue使用示例
下面是一个简单的示例,描述了如何使用LinkedTransferQueue来完成一个生产者-消费者模型。
import java.util.concurrent.LinkedTransferQueue;
public class LinkedTransferQueueTest {
private static LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
public static void main(String[] args) {
Thread t1 = new Thread(new Producer());
t1.start();
Thread t2 = new Thread(new Consumer());
t2.start();
}
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
queue.transfer(i);
System.out.println("生产者向队列中添加了元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Integer element = queue.take();
System.out.println("消费者取得了队列中的元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
以上代码中,Producer线程往queue中放入元素,Consumer线程从queue中取出元素。由于queue是无界的,即使Producer线程在循环中不停的放入元素,也不会造成OOM的异常。
四、LinkedTransferQueue的性能对比
下面是SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue与LinkedTransferQueue的性能对比:
import java.util.concurrent.*;
public class ConcurrentQueueTest {
private final static int MAX_THREADS = 200;
private final static int QUEUE_SIZE = 10;
private final static int DURATION = 10000;
public static void main(String[] args) throws Exception {
ExecutorService[] executors = new ExecutorService[] {
Executors.newSingleThreadExecutor(),
Executors.newFixedThreadPool(MAX_THREADS),
Executors.newCachedThreadPool(),
new ForkJoinPool(MAX_THREADS)
};
for (ExecutorService executor : executors) {
System.out.println("Executor used: " + executor.getClass());
for (BlockingQueue<Integer> queue : new BlockingQueue[] {
new SynchronousQueue<>(),
new LinkedBlockingQueue<>(),
new ArrayBlockingQueue<>(QUEUE_SIZE),
new LinkedTransferQueue<>()
}) {
System.out.println("Test queue used: " + queue.getClass());
long elapsed = time(executor, queue);
System.out.println("Test finished in " + elapsed + "ms");
}
System.out.println();
}
for (ExecutorService executor : executors) {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
private static long time(ExecutorService executor, final BlockingQueue<Integer> queue) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
executor.execute(() -> {
try {
ProducerConsumerTest.produceConsume(queue, DURATION, latch);
} catch (Exception e) {
e.printStackTrace();
}
});
latch.await();
TimeUnit.MILLISECONDS.sleep(DURATION);
return System.currentTimeMillis() - start;
}
}
class ProducerConsumerTest {
static void produceConsume(final BlockingQueue<Integer> queue, final int duration, final CountDownLatch latch) throws Exception {
final int producerThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1);
final int consumerThreadCount = producerThreadCount + (producerThreadCount == 1 ? 1 : -1);
final CyclicBarrier barrier = new CyclicBarrier(producerThreadCount + consumerThreadCount + 1);
for (int i = 0; i < producerThreadCount; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.put(1);
}
}).start();
}
for (int i = 0; i < consumerThreadCount; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.take();
}
}).start();
}
barrier.await();
startTime = System.currentTimeMillis();
latch.countDown();
}
private static volatile long startTime;
}
在多线程环境下,LinkedTransferQueue的性能是要比其它几个Queue高的。下面是每种Queue的输出结果:
Executor used: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10036ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 10044ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10032ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 9923ms
Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10412ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 11192ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10179ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 7277ms
Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10096ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 17317ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10891ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 8497ms
Executor used: java.util.concurrent.ForkJoinPool
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 15006ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 27710ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 14326ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 11928ms
五、小结
本文主要介绍了LinkedTransferQueue的使用方法及原理。它可以作为一个高性能的无界队列,在多线程环境中发挥它的优势,且支持阻塞操作,并且适用于多阶段业务处理和按照顺序处理的场景中。LinkedTransferQueue在多线程的场景中具有很好的性能表现,推荐在相关场景中使用。