材部 发表于 2025-6-3 00:24:32

Disruptor—3.核心源码实现分析

大纲
1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
 
1.Disruptor的生产者源码分析
(1)通过Sequence序号发布消息
(2)通过Translator事件转换器发布消息
 
(1)通过Sequence序号发布消息
生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。
//注意:这里使用的版本是3.4.4
//单生产者单消费者的使用示例
public class Main {
    public static void main(String[] args) {
      //参数准备
      OrderEventFactory orderEventFactory = new OrderEventFactory();
      int ringBufferSize = 4;
      ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

      //参数一:eventFactory,消息(Event)工厂对象
      //参数二:ringBufferSize,容器的长度
      //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
      //参数四:ProducerType,单生产者还是多生产者
      //参数五:waitStrategy,等待策略
      //1.实例化Disruptor对象
      Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
            orderEventFactory,
            ringBufferSize,
            executor,
            ProducerType.SINGLE,
            new BlockingWaitStrategy()
      );

      //2.添加Event处理器,用于处理事件
      //也就是构建Disruptor与消费者的一个关联关系
      disruptor.handleEventsWith(new OrderEventHandler());

      //3.启动Disruptor
      disruptor.start();

      //4.获取实际存储数据的容器: RingBuffer
      RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
      OrderEventProducer producer = new OrderEventProducer(ringBuffer);
      ByteBuffer bb = ByteBuffer.allocate(8);
      for (long i = 0; i < 5; i++) {
            bb.putLong(0, i);
            //向容器中投递数据
            producer.sendData(bb);
      }
      disruptor.shutdown();
      executor.shutdown();
    }
}

public class OrderEventProducer {
    private RingBuffer<OrderEvent> ringBuffer;
   
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
      this.ringBuffer = ringBuffer;
    }
   
    public void sendData(ByteBuffer data) {
      //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
      long sequence = ringBuffer.next();
      try {
            //2.根据这个序号, 找到具体的"OrderEvent"元素
            //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
            OrderEvent event = ringBuffer.get(sequence);
            //3.进行实际的赋值处理
            event.setValue(data.getLong(0));
      } finally {
            //4.提交发布操作
            ringBuffer.publish(sequence);
      }
    }
}

public class OrderEventHandler implements EventHandler<OrderEvent> {
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
      Thread.sleep(1000);
      System.err.println("消费者: " + event.getValue());
    }
}//多生产者多消费者的使用示例
public class Main {
    public static void main(String[] args) throws InterruptedException {
      //1.创建RingBuffer
      RingBuffer<Order> ringBuffer = RingBuffer.create(
            ProducerType.MULTI,//多生产者
            new EventFactory<Order>() {
                public Order newInstance() {
                  return new Order();
                }
            },
            1024 * 1024,
            new YieldingWaitStrategy()
      );

      //2.通过ringBuffer创建一个屏障
      SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

      //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
      Consumer[] consumers = new Consumer;
      for (int i = 0; i < consumers.length; i++) {
            consumers = new Consumer("C" + i);
      }

      //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
      WorkerPool<Order> workerPool = new WorkerPool<Order>(
            ringBuffer,
            sequenceBarrier,
            new EventExceptionHandler(),
            consumers
      );

      //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
      ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

      //6.启动workerPool
      workerPool.start(Executors.newFixedThreadPool(5));

      final CountDownLatch latch = new CountDownLatch(1);
      for (int i = 0; i < 100; i++) {
            final Producer producer = new Producer(ringBuffer);
            new Thread(new Runnable() {
                public void run() {
                  try {
                        latch.await();
                  } catch (Exception e) {
                        e.printStackTrace();
                  }
                  for (int j = 0; j < 100; j++) {
                        producer.sendData(UUID.randomUUID().toString());
                  }
                }
            }).start();
      }

      Thread.sleep(2000);
      System.err.println("----------线程创建完毕,开始生产数据----------");
      latch.countDown();
      Thread.sleep(10000);
      System.err.println("任务总数:" + consumers.getCount());
    }
}

public class Producer {
    private RingBuffer<Order> ringBuffer;
   
    public Producer(RingBuffer<Order> ringBuffer) {
      this.ringBuffer = ringBuffer;
    }

    public void sendData(String uuid) {
      //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
      long sequence = ringBuffer.next();
      try {
            //2.根据这个序号, 找到具体的"Order"元素
            //注意:此时获取的Order对象是一个没有被赋值的"空对象"
            Order order = ringBuffer.get(sequence);
            //3.进行实际的赋值处理
            order.setId(uuid);
      } finally {
            //4.提交发布操作
            ringBuffer.publish(sequence);
      }
    }
}

public class Consumer implements WorkHandler<Order> {
    private static AtomicInteger count = new AtomicInteger(0);
    private String consumerId;
    private Random random = new Random();

    public Consumer(String consumerId) {
      this.consumerId = consumerId;
    }

    public void onEvent(Order event) throws Exception {
      Thread.sleep(1 * random.nextInt(5));
      System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
      count.incrementAndGet();
    }

    public int getCount() {
      return count.get();
    }
}其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。
abstract class RingBufferPad {    protected long p1, p2, p3, p4, p5, p6, p7;}abstract class RingBufferFields extends RingBufferPad {    ...    private static final Unsafe UNSAFE = Util.getUnsafe();    private final long indexMask;    //环形数组存储事件消息    private final Object[] entries;    protected final int bufferSize;    //RingBuffer的sequencer属性代表了当前线程对应的生产者    protected final Sequencer sequencer;      RingBufferFields(EventFactory eventFactory, Sequencer sequencer) {      this.sequencer = sequencer;      this.bufferSize = sequencer.getBufferSize();      if (bufferSize < 1) {            throw new IllegalArgumentException("bufferSize must not be less than 1");      }      if (Integer.bitCount(bufferSize) != 1) {            throw new IllegalArgumentException("bufferSize must be a power of 2");      }      this.indexMask = bufferSize - 1;      //初始化数组      this.entries = new Object;      //内存预加载      fill(eventFactory);    }      private void fill(EventFactory eventFactory) {      for (int i = 0; i < bufferSize; i++) {            entries = eventFactory.newInstance();      }    }      protected final E elementAt(long sequence) {      return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask)
页: [1]
查看完整版本: Disruptor—3.核心源码实现分析