重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Disruptor中怎么实现一个高性能队列

Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

创新互联是专业的辉县网站建设公司,辉县接单;提供成都网站制作、成都网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行辉县网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

Disruptor 例子

import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}

object DisruptorTest {

  val disruptor = {
    val factory = new EventFactory[Event] {
      override def newInstance(): Event = Event(-1)
    }

    val threadFactory = new ThreadFactory(){
      override def newThread(r: Runnable): Thread = new Thread(r)
    }
    
    val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, 
                        new BlockingWaitStrategy())

    disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
    
    disruptor
  }
  
  val translator = new EventTranslatorOneArg[Event, Int]() {
    override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
      event.id = arg
      println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
    }
  }

  def main(args: Array[String]): Unit = {
    disruptor.start()
    (0 until 100).foreach { i =>
      disruptor.publishEvent(translator, i)
    }
    disruptor.shutdown()
  }
}

case class Event(var id: Int) {
  override def toString: String = s"event: ${id}"
}

object TestHandler extends EventHandler[Event] {
  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
  }
}

object ThenHandler extends EventHandler[Event] {
  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
  }
}

源码阅读

disrutpor 初始化

先看 Disruptor 构造方法

public Disruptor(final EventFactory eventFactory, 
  final int ringBufferSize, 
  final ThreadFactory threadFactory, 
  final ProducerType producerType,
  final WaitStrategy waitStrategy) {
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), 
        new BasicExecutor(threadFactory));
}

在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面

public static  RingBuffer create(ProducerType producerType, 
  EventFactory factory, int bufferSize, WaitStrategy waitStrategy) {
    switch (producerType) {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
    }
}

public static  RingBuffer createSingleProducer(EventFactory factory, int bufferSize, 
    WaitStrategy waitStrategy) {
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer(factory, sequencer);
}

RingBufferFields(EventFactory eventFactory, Sequencer sequencer) {
    this.sequencer = sequencer;
    this.bufferSize = sequencer.getBufferSize();

    if (bufferSize < 1) {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.indexMask = bufferSize - 1;
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    fill(eventFactory);
}

private void fill(EventFactory eventFactory) {
    for (int i = 0; i < bufferSize; i++) {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

消费事件消息

首先看 disruptor.start(): 消费事件消息入口

private final ConsumerRepository consumerRepository = new ConsumerRepository<>();

public RingBuffer start() {
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链

public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) {
    return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventHandler[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler eventHandler = eventHandlers[i];

        final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题

public Disruptor(
        final EventFactory eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy) {
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer ringBuffer, final Executor executor) {
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

@Override
public void start(final java.util.concurrent.Executor executor){
    //EventProcessor extends Runnable
    //executor = BasicExecutor 
    executor.execute(eventprocessor);
}

public final class BatchEventProcessor implements EventProcessor {
  @Override
  public void run() {
      if (running.compareAndSet(IDLE, RUNNING)) {
          sequenceBarrier.clearAlert();

          notifyStart();
          try {
              if (running.get() == RUNNING) {
                  processEvents();
              }
          } finally {
              notifyShutdown();
              running.set(IDLE);
          }
      } else {
          if (running.get() == RUNNING) {
              throw new IllegalStateException("Thread is already running");
          } else {
              earlyExit();
          }
      }
  }
}

private void processEvents() {
    T event = null;
    long nextSequence = sequence.get() + 1L;

    while (true) {
        try {
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null) {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }

            while (nextSequence <= availableSequence) {
                event = dataProvider.get(nextSequence);
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }

            sequence.set(availableSequence);
        } catch (final TimeoutException e) {
            notifyTimeout(sequence.get());
        } catch (final AlertException ex) {
            if (running.get() != RUNNING) {
                break;
            }
        } catch (final Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法

问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?

我们看 processEvents 方法执行逻辑

  1. 先获取 BatchEventProcessor.sequence 并 +1

  2. 通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence

  3. 先看下 BlockingWaitStrategy.waitFor 的实现

     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, 
        SequenceBarrier barrier)
        throws AlertException, InterruptedException {
        long availableSequence;
        if (cursorSequence.get() < sequence) {
            lock.lock();
            try {
                while (cursorSequence.get() < sequence) {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally {
                lock.unlock();
            }
        }
    
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
        }
    
        return availableSequence;
    }

    如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值

  4. dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化

    final class ProcessingSequenceBarrier implements SequenceBarrier {
        private final WaitStrategy waitStrategy;
        private final Sequence dependentSequence;
        private volatile boolean alerted = false;
        private final Sequence cursorSequence;
        private final Sequencer sequencer;
    
        ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
            final Sequence cursorSequence, final Sequence[] dependentSequences) {
            this.sequencer = sequencer;
            this.waitStrategy = waitStrategy;
            this.cursorSequence = cursorSequence;
            if (0 == dependentSequences.length) {
                dependentSequence = cursorSequence;
            } else {
                dependentSequence = new FixedSequenceGroup(dependentSequences);
            }
        }
    }

    在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 仅会被 Disruptor.handleEventsWithEventHandlerGroup.handleEventsWith

    public class Disruptor {
        public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) {
            return createEventProcessors(new Sequence[0], handlers);
        }
    
        EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences,
            final EventHandler[] eventHandlers) {
            checkNotStarted();
    
            final Sequence[] processorSequences = new Sequence[eventHandlers.length];
            final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    
            for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
                final EventHandler eventHandler = eventHandlers[i];
    
                final BatchEventProcessor batchEventProcessor = 
                    new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
    
                if (exceptionHandler != null) {
                    batchEventProcessor.setExceptionHandler(exceptionHandler);
                }
    
                consumerRepository.add(batchEventProcessor, eventHandler, barrier);
                processorSequences[i] = batchEventProcessor.getSequence();
            }
    
            updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
    
            return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
        }
    }
    
    public class EventHandlerGroup {
        private final Disruptor disruptor;
        private final ConsumerRepository consumerRepository;
        private final Sequence[] sequences;
    
        EventHandlerGroup(final Disruptor disruptor, final ConsumerRepository consumerRepository,
            final Sequence[] sequences) {
            this.disruptor = disruptor;
            this.consumerRepository = consumerRepository;
            this.sequences = Arrays.copyOf(sequences, sequences.length);
        }
    
        public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) {
            return disruptor.createEventProcessors(sequences, handlers);
        }
    
        public final EventHandlerGroup then(final EventHandler... handlers) {
            return handleEventsWith(handlers);
        }
    }

    EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行

生产事件消息

接着看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放数据,

public  void publishEvent(EventTranslatorOneArg translator, A arg0) {
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence, arg0);
}

private  void translateAndPublish(EventTranslatorOneArg translator, long sequence, A arg0) {
    try {
        translator.translateTo(get(sequence), sequence, arg0);
    } finally {
        sequencer.publish(sequence);
    }
}

public E get(long sequence) {
    return elementAt(sequence);
}

get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
    cursor.set(sequence);
    waitStrategy.signalAllWhenBlocking();
}

总结

流程理清楚了,我们看看 知识点

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。


文章标题:Disruptor中怎么实现一个高性能队列
网站链接:
http://cqcxhl.cn/article/ggpecc.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP