`
357029540
  • 浏览: 726029 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

Disruptor的简单介绍及使用

阅读更多

本文介绍的是版本是3.3.6的,参考了以下文章

 

http://11246272.blog.51cto.com/11236272/1745472

http://www.php.cn/java-article-370582.html

http://zhangfengzhe.blog.51cto.com/8855103/1885830

http://ifeve.com/concurrentlinkedqueue/

http://ifeve.com/disruptor/

http://wh0426.iteye.com/blog/221202

https://my.oschina.net/OutOfMemory/blog/793275

 

Disruptor是什么?

Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”,并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后在交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。

环形缓冲区(轮胎):RingBuffer

RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,包含一个指向下一个槽点的序号,可以在线程间传递数据,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:



 

数组

这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。

序号

RingBuffer中元素拥有序号的概念,并且序号是一直增长的,比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。 由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。

无锁的机制

在生产者/消费者模式下,disruptor号称“无锁并行框架”,下面我们来具体分析下:

一个生产者 + 一个消费者

生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

一个生产者 + 多个消费者

多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

多个生产者 + N个消费者

很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。

Event

在Disruptor框架中,生产者生产的数据叫做Event。

核心对象

RingBuffer:环形的一个数据结构,对象初始化时,会使用事件Event进行填充。Buffer的大小必须是2的幂次方,方便移位操作。

  1. Event:无指定具体接口,用户自己实现,可以携带任何业务数据。

  2. EventFactory:产生事件Event的工厂,由用户自己实现。

  3. EventTranslator:事件发布的回调接口,由用户实现,负责将业务参数设置到事件中。

  4. Sequencer:序列产生器,也是协调生产者和消费者及实现高并发的核心。有MultiProducerSequencer 和 SingleProducerSequencer两个实现类。

  5. SequenceBarrier:拥有RingBuffer的发布事件Sequence引用和消费者依赖的Sequence引用。决定消费者消费可消费的Sequence。

  6. EventHandler:事件的处理者,由用户自己实现。

  7. EventProcessor:事件的处理器,单独在一个线程中运行。

  8. WorkHandler:事件的处理者,由用户自己实现。

  9. WorkProcessor:事件的处理器,单独在一个线程中运行。

  10. WorkerPool:一组WorkProcessor的处理。

  11. WaitStrategy:在消费者比生产者快时,消费者处理器的等待策略。

等待策略:

消费者在缓存中没有可以消费的事件时,采取的等待策略:

1.BlockingWaitStrategy:默认等待策略。和BlockingQueue的实现很类似,通过使用锁和条件(Condition)进行线程阻塞的方式,等待生产者唤醒(线程同步和唤醒)。此策略对于线程切换来说,最节约CPU资源,但在高并发场景下性能有限。

2.BusySpinWaitStrategy:死循环策略。消费者线程会尽最大可能监控缓冲区的变化,会占用所有CPU资源,线程一直自旋等待,比较耗CPU。

3.LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。

4.PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。5.SleepingWaitStrategy:CPU友好型策略。会在循环中不断等待数据。可通过参数设置,首先进行自旋等待,若不成功,则使用Thread.yield()让出CPU,并使用LockSupport.parkNanos(1)进行线程睡眠,通过线程调度器重新调度;或一直自旋等待,所以,此策略数据处理数据可能会有较高的延迟,适合用于对延迟不敏感的场景,优点是对生产者线程影响小,典型应用场景是异步日志。

6.TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。

7.YieldingWaitStrategy:低延时策略。消费者线程会不断循环监控RingBuffer的变化,在循环内部使用Thread.yield()让出CPU给其他线程,通过线程调度器重新调度。

Disruptor框架基本构成

1.MyEvent:自定义实体对象,充当“生产者-消费者”模型中的数据。

2.MyEventFactory:实现EventFactory的接口,用于生产数据。

3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布,通过在自定义类中新建EventTranslator类实现。

4.MyEventHandler:自定义消费者,通过EventHandler接口实现。

与ConcurrentLinkedQueue的比较

相同点

1.都是使用了无锁算法的CAS(Compare And Swap/Set)的实现方式

不同点

1.在多线程的情况下Disruptor是不存在竞争的,每个对象都使用自己的序号;ConcurrentLinkedQueue因为是队列的方式,可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点,所以比较耗时。

2.Disruptor是一个环形的缓冲区,是一个数组,它不会清除已经存在的数据,它只会更新一个可用的序列号,然后向该序列号中写入数据,因为是每一个对象拥有自己的序列号,因此不存在写冲突,而ConcurrentLinkedQueue是一个无界的队列,它会根据指针来判断数据是否已经使用过,使用过则会更新head节点的标记,因此在耗时的时间上Disurptor比ConcurrentLinkedQueue少。

3.Disruptor解决了伪共享的问题。

更多的关于Disruptor的介绍可以参考 http://ifeve.com/disruptor/ 虽然这个上面的介绍版本有点旧了,但是原理基本都是一样的。

Disurptor的java测试实现

1.导入需要的jar包

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.7.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.20</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

    </dependencies>

 

2.自定义实体对象

package com.demo.disruptor.dto;

import com.alibaba.fastjson.JSONObject;

import java.util.Date;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自定义事件对象
 */
public class LogEvent {

    private long logId;
    private String content;
    private Date date;

    public long getLogId() {
        return logId;
    }

    public void setLogId(long logId) {
        this.logId = logId;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return JSONObject.toJSONString(this);
    }
}

 3.自定义factory类:

package com.demo.disruptor.factory;

import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventFactory;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 事件生成工厂,用来初始化预分配事件对象,即根据RingBuffer大小创建的实体对象
 */
public class LogEventFactory implements EventFactory<LogEvent> {
    public LogEvent newInstance() {
        System.out.println("新建LogEvent数据.....");
        return new LogEvent();
    }
}

 4.新建生产者类

4.1 非Translator生产者类

package com.demo.disruptor.producer;

import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.RingBuffer;

import java.util.Date;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自定义生产者
 */
public class LogEventProducer {

    private RingBuffer<LogEvent> ringBuffer;

    public LogEventProducer(RingBuffer<LogEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long logId, String content, Date date){
        //RingBuffer类似一个队列,获取下一个空闲的序号
        long seq = ringBuffer.next();
        LogEvent logEvent = ringBuffer.get(seq);
        logEvent.setLogId(logId);
        logEvent.setContent(content);
        logEvent.setDate(date);
        //发布事件
        ringBuffer.publish(seq);
    }
}

 

package com.demo.disruptor.producer;

import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.RingBuffer;

import java.util.Date;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自定义生产者
 */
public class LogEventProducer2 {

    private RingBuffer<LogEvent> ringBuffer;

    public LogEventProducer2(RingBuffer<LogEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long logId, String content, Date date){
        //RingBuffer类似一个队列,获取下一个空闲的序号
        long seq = ringBuffer.next();
        LogEvent logEvent = ringBuffer.get(seq);
        logEvent.setLogId(logId);
        logEvent.setContent(content);
        logEvent.setDate(date);
        //发布事件
        ringBuffer.publish(seq);
    }
}

 LogEventProducer3的类和生产者2也一样

4.2 Translator生产者类

package com.demo.disruptor.producer;

import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;

import java.util.Date;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 使用translator方式到事件生产者发布事件,通常使用该方法
 */
public class LogEventProducerWithTranslator {

    private EventTranslatorVararg eventTranslatorVararg = new EventTranslatorVararg<LogEvent>() {
        public void translateTo(LogEvent logEvent, long l, Object... objects) {
            logEvent.setLogId((Long) objects[0]);
            logEvent.setContent((String)objects[1]);
            logEvent.setDate((Date)objects[2]);
        }
    };

    private RingBuffer<LogEvent> ringBuffer;

    public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long logId, String content, Date date){
        ringBuffer.publishEvent(eventTranslatorVararg,logId,content,date);
    }
}

 5.新建消费者类

package com.demo.disruptor.consumer;

import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventHandler;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自定义消费者
 */
public class LogEventConsumer implements EventHandler<LogEvent> {
    public void onEvent(LogEvent logEvent, long l, boolean b) throws Exception {
        System.out.println("消费者1-seq:" + l + ",bool:" + b + ",logEvent:" + logEvent.toString());
    }
}

 

package com.demo.disruptor.consumer;

import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventHandler;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自定义消费者
 */
public class LogEventConsumer2 implements EventHandler<LogEvent> {
    public void onEvent(LogEvent logEvent, long l, boolean b) throws Exception {
        System.out.println("消费者2-seq:" + l + ",bool:" + b + ",logEvent:" + logEvent.toString());
    }
}

 剩下的LogEventConsumer3、LogEventConsumer4、LogEventConsumer5的类同LogEventConsumer2类一样。

6.新建启动类

package com.demo.disruptor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description
 */
@SpringBootApplication
public class App {

    public static void main(String [] args){
        SpringApplication.run(App.class,args);
    }

}

 7.新建测试类测试Disruptor

package com.demo.disruptor.logEvent;

import com.demo.disruptor.consumer.*;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.factory.LogEventFactory;
import com.demo.disruptor.producer.LogEventProducer;
import com.demo.disruptor.producer.LogEventProducer2;
import com.demo.disruptor.producer.LogEventProducer3;
import com.demo.disruptor.producer.LogEventProducerWithTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description
 */
@SpringBootTest(classes = LogEventMain.class)
@RunWith(SpringRunner.class)
public class LogEventMain {

    /**
     * 单个生产者和消费者的模式
     * @throws InterruptedException
     */
    @Test
    public void producer() throws InterruptedException {
        LogEventFactory logEventFactory = new LogEventFactory();
        //用于生成RingBuffer大小,其大小必须是2的n次方
        int ringBufferSize = 8;
        //定义Disruptor初始化信息
        Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        //定义处理事件的消费者
        disruptor.handleEventsWith(new LogEventConsumer());
        //定义事件的开始
        disruptor.start();

        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        //进行事件的发布
        LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
        for(int i = 0; i < 10; i++){
            logEventProducer.onData(i, "c" + i, new Date());
        }
        Thread.sleep(1000);
        //关闭Disruptor
        disruptor.shutdown();
    }

    /**
     * 使用EventTranslatorVararg的单个生产者和消费者模式
     * @throws InterruptedException
     */
    @Test
    public void producerWithTranslator() throws InterruptedException {
        LogEventFactory logEventFactory = new LogEventFactory();
        //用于生成RingBuffer大小,其大小必须是2的n次方
        int ringBufferSize = 8;
        //定义Disruptor初始化信息
        Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        //定义处理事件的消费者
        disruptor.handleEventsWith(new LogEventConsumer());
        //定义事件的开始
        disruptor.start();

        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        //进行事件的发布
        LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
        for(int i = 0; i < 10; i++){
            producerWithTranslator.onData(i, "c" + i, new Date());
        }
        Thread.sleep(1000);
        //关闭Disruptor
        disruptor.shutdown();
    }

    /**
     * 一个生产者,3个消费者,其中前面2个消费者完成后第3个消费者才可以消费
     * 也即使说当前面2个消费者把所有的RingBuffer占领完成,同时都消费完成后才会有第3个消费者的消费
     * 当发布的事件数量大于RingBuffer的大小的时候,在第3个消费者消费完RingBuffer大小的时候前面2个消费者才能继续消费,序号递增的
     * @throws InterruptedException
     */
    @Test
    public void multiConsumer() throws InterruptedException {
        LogEventFactory logEventFactory = new LogEventFactory();
        //用于生成RingBuffer大小,其大小必须是2的n次方
        int ringBufferSize = 8;
        //定义Disruptor初始化信息
        Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());

        //设置多个消费者
        EventHandlerGroup<LogEvent> eventEventHandlerGroup = disruptor.handleEventsWith(new LogEventConsumer(),new LogEventConsumer2());
        eventEventHandlerGroup.then(new LogEventConsumer3());
        //启动事件的开始
        disruptor.start();
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        //进行事件的发布
        LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
        for(int i = 0; i < 10; i++){
            producerWithTranslator.onData(i, "c" + i, new Date());
        }
        Thread.sleep(1000);
        //关闭Disruptor
        disruptor.shutdown();
    }

    /**
     * 一个生产者,多个消费者,有2条支线,其中消费者1和消费者3在同一条支线上,
     * 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者
     * 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4
     * 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5
     * 消费者5消费完RingBuffer大小后又按照上面的顺序来消费
     * 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来
     * @throws InterruptedException
     */
    @Test
    public void multiConsumers() throws InterruptedException {
        LogEventFactory logEventFactory = new LogEventFactory();
        //用于生成RingBuffer大小,其大小必须是2的n次方
        int ringBufferSize = 8;
        //定义Disruptor初始化信息
        Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        LogEventConsumer consumer1 = new LogEventConsumer();
        LogEventConsumer2 consumer2 = new LogEventConsumer2();
        LogEventConsumer3 consumer3 = new LogEventConsumer3();
        LogEventConsumer4 consumer4 = new LogEventConsumer4();
        LogEventConsumer5 consumer5 = new LogEventConsumer5();
        //同时执行消费者1和消费者2
        disruptor.handleEventsWith(consumer1,consumer2);
        //消费者1后面执行消费者3
        disruptor.after(consumer1).handleEventsWith(consumer3);
        //消费者后面执行消费者4
        disruptor.after(consumer2).handleEventsWith(consumer4);
        //消费者3和消费者3执行完后执行消费者5
        disruptor.after(consumer3,consumer4).handleEventsWith(consumer5);
        //定义事件的开始
        disruptor.start();

        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        //进行事件的发布
        LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
        for(int i = 0; i < 10; i++){
            logEventProducer.onData(i, "c" + i, new Date());
        }
        Thread.sleep(1000);
        //关闭Disruptor
        disruptor.shutdown();
    }

    /**
     * 多个生产者,多个消费者,有2条消费者支线,其中消费者1和消费者3在同一条支线上,
     * 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者
     * 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4
     * 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5
     * 消费者5消费完RingBuffer大小后又按照上面的顺序来消费
     * 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来
     * 生产者只是多生产了数据
     * @throws InterruptedException
     */
    @Test
    public void multiProcedureConsumers() throws InterruptedException {
        LogEventFactory logEventFactory = new LogEventFactory();
        //用于生成RingBuffer大小,其大小必须是2的n次方
        int ringBufferSize = 8;
        //定义Disruptor初始化信息
        Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
        LogEventConsumer consumer1 = new LogEventConsumer();
        LogEventConsumer2 consumer2 = new LogEventConsumer2();
        LogEventConsumer3 consumer3 = new LogEventConsumer3();
        LogEventConsumer4 consumer4 = new LogEventConsumer4();
        LogEventConsumer5 consumer5 = new LogEventConsumer5();
        //同时执行消费者1和消费者2
        disruptor.handleEventsWith(consumer1,consumer2);
        //消费者1后面执行消费者3
        disruptor.after(consumer1).handleEventsWith(consumer3);
        //消费者后面执行消费者4
        disruptor.after(consumer2).handleEventsWith(consumer4);
        //消费者3和消费者3执行完后执行消费者5
        disruptor.after(consumer3,consumer4).handleEventsWith(consumer5);
        //定义事件的开始
        disruptor.start();

        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        //进行事件的发布
        LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
        LogEventProducer2 logEventProducer2 = new LogEventProducer2(ringBuffer);
        LogEventProducer3 logEventProducer3 = new LogEventProducer3(ringBuffer);
        for(int i = 0; i < 10; i++){
            logEventProducer.onData(i, "1-c" + i, new Date());
            logEventProducer2.onData(i, "2-c" + i, new Date());
            logEventProducer3.onData(i, "3-c" + i, new Date());
        }
        Thread.sleep(1000);
        //关闭Disruptor
        disruptor.shutdown();
    }

}

 8.单个线程的ArrayBlockingQueue和Disruptor的性能测试对比

我们这里分别用500W,1000W,5000W的数据量来做单个生产者和消费者的测试,分别进行了10次测试,然后取他们的平均值做对比。

 

package com.demo.disruptor.test;

import com.demo.disruptor.dto.LogEvent;

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/11
 * @description
 */
public class BlockingQueueTest {
    public static int eventNum = 50000000;
//5000000:974,932,943,946,993,1073,1044,1018,1027,971  992
    //10000000:1845,1851,2433,2041,1789,1911,1953,2105,1862,1896   1969
    //50000000:9828,9595,9377,9273,9020,9450,9873,9994,8882,9695  9499
    public static void main(String[] args) {
        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<LogEvent>(65536);
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (i < eventNum) {
                    LogEvent logEvent = new LogEvent();
                    logEvent.setLogId(i);
                    logEvent.setContent("c" + i);
                    logEvent.setDate(new Date());
                    try {
                        queue.put(logEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                int k = 0;
                while (k < eventNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    k++;
                }
                long endTime = System.currentTimeMillis();
                System.out.println("BlockingQueue 花费时间:" + (endTime - startTime) + "ms");
            }
        }).start();

    }
}
 
package com.demo.disruptor.test;

import com.demo.disruptor.consumer.LogEventConsumer;
import com.demo.disruptor.consumer.LogEventConsumer2;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.factory.LogEventFactory;
import com.demo.disruptor.producer.LogEventProducer;
import com.demo.disruptor.producer.LogEventProducerWithTranslator;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.Date;

/**
 * @author liaoyubo
 * @version 1.0 2017/10/11
 * @description
 */
public class DisruptorTest {

    //5000000:542,499,550,547,605,502,743,505,657,608     576
    //10000000:1252,1048,1031,1075,1022,1207,1056,1494,1118,1258   1156
    //50000000:5489,5125,5265,5609,5201,5482,4982,4891,5351,5758  5315
    public static void main(String [] args){
        LogEventFactory factory = new LogEventFactory();
        int ringBufferSize = 65536;
        final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory,
                ringBufferSize, DaemonThreadFactory.INSTANCE,
                ProducerType.SINGLE, new BusySpinWaitStrategy());

        LogEventConsumer consumer = new LogEventConsumer();
        disruptor.handleEventsWith(consumer);
        disruptor.start();
        /*new Thread(new Runnable() {
            @Override
            public void run() {
                RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
                //进行事件的发布
                LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
                for(int i = 0; i < BlockingQueueTest.eventNum; i++){
                    logEventProducer.onData(i, "c" + i, new Date());
                }
            }
        }).start();*/

        new Thread(new Runnable() {
            @Override
            public void run() {
                RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
                //进行事件的发布
                LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
                for(int i = 0; i < BlockingQueueTest.eventNum; i++){
                    producerWithTranslator.onData(i, "c" + i, new Date());
                }
            }
        }).start();
        //disruptor.shutdown();
    }
}
 测试结果的对比为(都是按照ms统计):

 

数量 Disruptor ArrayBlockingQueue 性能比较
500W 576 992 1.7
1000W 1156 1969 1.7
5000W 5315 9499 1.8
测试的结果是Disruptor的性能是ArrayBlockingQueue的1.7倍左右(电脑的配置是win10,64位,i5-6500的cpu,8g的内存,jdk是1.8),但是官方提供的数据是在5倍左右:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results ,可能也与自己的电脑有关,还有就是一些参数上面的选择,如等待策略,但是不管怎样测试的性能说明了在生产者-消费者的模式下Disruptor的性能比ArrayBlockingQueue更好。
  • 大小: 15.5 KB
分享到:
评论

相关推荐

    Java工具:高性能并发工具Disruptor简单使用

    Java工具:高性能并发工具Disruptor简单使用

    Disruptor3.x Disruptor使用方式

    Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...

    disruptor案例加简单说明

    简单讲解disruptor并附上demo

    Netty 使用Disruptor机制的处理源代码

    使用Netty, Disruptor处理实时外汇报价

    Disruptor demo

    Disruptor简单使用。完成多线程间并行、等待、先后执行等功能。

    Disruptor专题简单案例资料

    Disruptor专题简单案例资料 https://phoenix.blog.csdn.net/article/details/131264151

    Disruptor 入门 - v1.0

    可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者...

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    disruptor-3.3.7-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    Disruptor示例

    Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程...

    Disruptor资料合集

    Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。其实Disruptor与其说是一个框架...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    springboot整合Disruptor并发编程框架,适合高并发场景需求的开发者

    Disruptor C++版(仅支持单生产者)

    Disruptor C++版,本人已在windows下成功使用,参照例子使用即可。

    disruptor-3.2.0.jar

    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载

    disruptor框架案例.rar

    Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...

    LMAX-Disruptor框架jar包

    Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。

Global site tag (gtag.js) - Google Analytics