在高性能计算场景下,事件驱动方式是提高效率和吞吐量的重要手段。而com.lmax.disruptor就是其中一款优秀的异步事件处理框架。本文将从其基础概念,应用场景,原理实现,性能测试等方面进行详细的阐述。
一、基础概念
1、RingBuffer RingBuffer是Disruptor的核心数据结构,具有队列的特性。通过上图的示意可以看出,RingBuffer实际上是一个固定长度的数组,其中每个元素代表一个事件。
public class RingBuffer<T>
{
private Object[] entries;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
......
}
2、Sequencer 在Disruptor中,各个消费者线程会不断消费事件,并维护一个单独的序列号。而所有的序列号由Sequencer来统一管理。Sequencer就是一个由数组模拟的环形队列,其中每个元素代表一个可消费的事件。在Sequencer中,通过CAS操作可以保证线程安全。如果一个消费者线程处理完一个事件后,想要继续获取下一个事件,那么它需要从自己上一次消费的位置到最大生产者指针之间的所有事件的序列号都占了一个坑,才能获取新的事件。
private Sequencer sequencer;
......
public final long next()
{
return sequencer.next(n);
}
3、SequenceBarrier 当一个消费者线程想要消费事件时,它需要获取到一个SequenceBarrier,以判断自己能不能消费事件。而SequenceBarrier的职责就是维护一个消费者线程所需消费的事件序列的最小值,表示在消费该事件之前,所有序列号代表的事件都已经被生产者发布,可以被消费。
public class SequenceBarrier
{
......
/**
* Waits for the given sequence to be available for consumption.
*
* @param sequence to wait for
* @return the sequence up to which is available
* @throws AlertException if a status change has occurred for the Disruptor (such as shutdown) while waiting
* @throws InterruptedException if the thread needs awaking on a condition variable.
* <p></p>
* <b>Busy Spin</b> strategy used to avoid syscalls which can introduce latency jitter. Good for low
* latency systems processing events with deterministic behavior.
*/
public long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = spinObserver.waitForNext(sequence, cursorSequence.get(), dependentSequences, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
......
}
二、应用场景
Com.lmax.disruptor适合于高并发、低延迟、高吞吐的场景。它几乎涉及到了所有需要通过消息传递来完成的领域。目前广泛应用于互联网和金融行业,比如交易系统中的订单管理、风控,接口服务的消息同步和下游消息通知等。
三、原理实现
在Com.lmax.disruptor框架中,生产者和消费者都使用Sequence来记录当前操作的事件位置。这个Sequence本质上就是一个递增的long类型的值,每个Sequence都对应着一个拥有这个序列号的线程或组件,同时还记录这个序列号对应的事件状态,表示这个事件已经完成了哪些处理。 另外,Sequencer维护了一个RingBuffer的生产者指针和消费者指针。当生产者发布一个新的事件时,生产者会首先获取Sequencer中最大的可生产序列号,并将其作为新事件的序列号,同时更新生产者指针。当消费者消费一个事件时,消费者会更新消费者的Sequence,同时更新消费者指针。 通过上述机制,Disruptor成功实现了无锁的高并发操作。
四、性能测试
下面的示例代码是3个生产者和3个消费者的模型。其中,生产者不断发送事件,事件分别被先后消费了5次、10次和15次,共发送1000个事件。
public class DisruptorDemo {
public static void main(String[] args) {
//定义RingBuffer
RingBuffer<Data> ringBuffer = RingBuffer.createMultiProducer(Data::new, 1024, new YieldingWaitStrategy());
//定义序列栅栏
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(new Sequence[0]);
//定义消费者线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//定义数据处理器
WorkHandler<Data> handler = new WorkHandler<Data>() {
@Override
public void onEvent(Data data) throws Exception {
System.out.println(Thread.currentThread().getName() + " [handler] " + data);
data.setaInt(data.getaInt() + 1);
}
};
//定义WorkProcessor,绑定RingBuffer和SequenceBarrier,并将数据处理器绑定到WorkProcessor中
WorkProcessor<Data> processor1 = new WorkProcessor<>(ringBuffer, sequenceBarrier, handler, new FatalExceptionHandler(), new Sequence(0));
WorkProcessor<Data> processor2 = new WorkProcessor<>(ringBuffer, sequenceBarrier, handler, new FatalExceptionHandler(), new Sequence(1));
WorkProcessor<Data> processor3 = new WorkProcessor<>(ringBuffer, sequenceBarrier, handler, new FatalExceptionHandler(), new Sequence(2));
//将WorkProcessor提交到线程池中
threadPool.submit(processor1);
threadPool.submit(processor2);
threadPool.submit(processor3);
//定义数据生产者,不断向RingBuffer中发送事件
for (int i = 0; i < 1000; i++) {
long sequence = ringBuffer.next();
Data data = ringBuffer.get(sequence);
data.setaLong(i);
ringBuffer.publish(sequence);
}
//等待一定时间,让消费者处理完成
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程和RingBuffer
processor1.halt();
processor2.halt();
processor3.halt();
threadPool.shutdown();
}
}
通过测试可以发现,Com.lmax.disruptor框架的性能是非常优秀的。单生产者多消费者可以轻易地达到7、8万元的吞吐量。
五、总结
Com.lmax.disruptor作为异步事件处理框架,拥有着出色的性能和易用性,在金融和互联网领域得到广泛的应用。本文从其基础概念,应用场景,原理实现和性能测试四个方面进行了详细阐述,希望可以帮助大家更加深入地了解并使用这个框架。