Disruptor—3.核心源码实现分析
大纲1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
1.Disruptor的生产者源码分析
(1)通过Sequence序号发布消息
(2)通过Translator事件转换器发布消息
(1)通过Sequence序号发布消息
生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。
//注意:这里使用的版本是3.4.4
//单生产者单消费者的使用示例
public class Main {
public static void main(String[] args) {
//参数准备
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 4;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//参数一:eventFactory,消息(Event)工厂对象
//参数二:ringBufferSize,容器的长度
//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
//参数四:ProducerType,单生产者还是多生产者
//参数五:waitStrategy,等待策略
//1.实例化Disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
//2.添加Event处理器,用于处理事件
//也就是构建Disruptor与消费者的一个关联关系
disruptor.handleEventsWith(new OrderEventHandler());
//3.启动Disruptor
disruptor.start();
//4.获取实际存储数据的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i < 5; i++) {
bb.putLong(0, i);
//向容器中投递数据
producer.sendData(bb);
}
disruptor.shutdown();
executor.shutdown();
}
}
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
long sequence = ringBuffer.next();
try {
//2.根据这个序号, 找到具体的"OrderEvent"元素
//注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
OrderEvent event = ringBuffer.get(sequence);
//3.进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4.提交发布操作
ringBuffer.publish(sequence);
}
}
}
public class OrderEventHandler implements EventHandler<OrderEvent> {
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(1000);
System.err.println("消费者: " + event.getValue());
}
}//多生产者多消费者的使用示例
public class Main {
public static void main(String[] args) throws InterruptedException {
//1.创建RingBuffer
RingBuffer<Order> ringBuffer = RingBuffer.create(
ProducerType.MULTI,//多生产者
new EventFactory<Order>() {
public Order newInstance() {
return new Order();
}
},
1024 * 1024,
new YieldingWaitStrategy()
);
//2.通过ringBuffer创建一个屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
Consumer[] consumers = new Consumer;
for (int i = 0; i < consumers.length; i++) {
consumers = new Consumer("C" + i);
}
//4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
WorkerPool<Order> workerPool = new WorkerPool<Order>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
consumers
);
//5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//6.启动workerPool
workerPool.start(Executors.newFixedThreadPool(5));
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
final Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
public void run() {
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}
}).start();
}
Thread.sleep(2000);
System.err.println("----------线程创建完毕,开始生产数据----------");
latch.countDown();
Thread.sleep(10000);
System.err.println("任务总数:" + consumers.getCount());
}
}
public class Producer {
private RingBuffer<Order> ringBuffer;
public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(String uuid) {
//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
long sequence = ringBuffer.next();
try {
//2.根据这个序号, 找到具体的"Order"元素
//注意:此时获取的Order对象是一个没有被赋值的"空对象"
Order order = ringBuffer.get(sequence);
//3.进行实际的赋值处理
order.setId(uuid);
} finally {
//4.提交发布操作
ringBuffer.publish(sequence);
}
}
}
public class Consumer implements WorkHandler<Order> {
private static AtomicInteger count = new AtomicInteger(0);
private String consumerId;
private Random random = new Random();
public Consumer(String consumerId) {
this.consumerId = consumerId;
}
public void onEvent(Order event) throws Exception {
Thread.sleep(1 * random.nextInt(5));
System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7;}abstract class RingBufferFields extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object; //内存预加载 fill(eventFactory); } private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { entries = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask)
页:
[1]