作为比较接近正式使用的方式,我用一个生产者多个消费者来进行了Disurptor的不重复消费的性能测试,在这里我主要是介绍下我在测试过程中使用的代码以及出现的情况做下说明,这些情况有可能是我自己的代码原因引起的,在此也给自己留一个记录,如果看到的同学提出异议的麻烦给我说一下,关于Disruptor的其他介绍可以参考http://357029540.iteye.com/blog/2395677,这是我参考别人的文章写的。
使用maven的方式引入需要的jar
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.20</version>
</dependency>
</dependencies>
在这里我使用的是WorkHandler接口来实现的消费者类,消费者的代码如下:
package com.demo.disruptor.consumer;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.test.BlockingQueueTest;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* @author liaoyubo
* @version 1.0 2017/10/9
* @description 自定义消费者
*/
public class LogEventWorkHandlerConsumer implements WorkHandler<LogEvent> {
private long startTime;
public LogEventWorkHandlerConsumer() {
this.startTime = System.currentTimeMillis();
}
@Override
public void onEvent(LogEvent logEvent) throws Exception {
//全部转化为大写,用于耗时测试
logEvent.setContent(logEvent.getContent().toUpperCase());
//判断是否已经有了开始时间
if(logEvent.getStartTime() == null || logEvent.getStartTime() == 0){
logEvent.setStartTime(startTime);
}else {
startTime = logEvent.getStartTime();
}
//判断是否已经到最后
if (logEvent.getLogId() +1 == BlockingQueueTest.eventNum) {
long endTime = System.currentTimeMillis();
System.out.println(" costTime1 = " + (endTime - startTime) + "ms");
}
//System.out.println("消费者1-seq logEvent:" + logEvent.toString());
}
}
LogEventWorkHandlerConsumer2和LogEventWorkHandlerConsumer3同LogEventWorkHandlerConsumer也一样。
生产者类代码如下:
package com.demo.disruptor.producer;
import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
import java.util.Date;
/**
* @author liaoyubo
* @version 1.0 2017/10/9
* @description 使用translator方式到事件生产者发布事件,通常使用该方法
*/
public class LogEventProducerWithTranslator {
private EventTranslatorVararg eventTranslatorVararg = new EventTranslatorVararg<LogEvent>() {
public void translateTo(LogEvent logEvent, long l, Object... objects) {
logEvent.setLogId((Long) objects[0]);
logEvent.setContent((String)objects[1]);
logEvent.setDate((Date)objects[2]);
}
};
private RingBuffer<LogEvent> ringBuffer;
public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long logId, String content, Date date){
//System.out.println("生产者:" + logId);
ringBuffer.publishEvent(eventTranslatorVararg,logId,content,date);
}
}
创建的实体对象:
package com.demo.disruptor.dto;
import com.alibaba.fastjson.JSONObject;
import java.util.Date;
/**
* @author liaoyubo
* @version 1.0 2017/10/9
* @description 自定义事件对象
*/
public class LogEvent {
private long logId;
private String content;
private Date date;
private Long startTime;
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
@Override
public String toString() {
return JSONObject.toJSONString(this);
}
}
一个用于定义测试次数的类:
package com.demo.disruptor.test;
/**
* @author liaoyubo
* @version 1.0 2017/10/11
* @description
*/
public class BlockingQueueTest {
public static int eventNum = 5000000;
}
用于定义异常的代码:
package com.demo.disruptor.exception;
import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.ExceptionHandler;
/**
* @author liaoyubo
* @version 1.0 2017/10/12
* @description
*/
public class LogEventExceptionHandler implements ExceptionHandler<LogEvent> {
@Override
public void handleEventException(Throwable throwable, long l, LogEvent logEvent) {
System.out.println("handleEventException....");
}
@Override
public void handleOnStartException(Throwable throwable) {
System.out.println("handleOnStartException....");
}
@Override
public void handleOnShutdownException(Throwable throwable) {
System.out.println("handleOnShutdownException....");
}
}
下面是用于测试耗时的主程序:
package com.demo.disruptor.test;
import com.demo.disruptor.consumer.*;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.exception.LogEventExceptionHandler;
import com.demo.disruptor.factory.LogEventFactory;
import com.demo.disruptor.producer.LogEventProducer;
import com.demo.disruptor.producer.LogEventProducerWithTranslator;
import com.lmax.disruptor.*;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.Date;
import java.util.concurrent.*;
/**
* @author liaoyubo
* @version 1.0 2017/10/11
* @description
*/
public class DisruptorTest {
//5000000:554,627,545,602,550,578,675,626,587,692 604
//10000000:1657,1471,1234,1231,1302,1083,1186,1064,1044,1073 1235
//50000000:5017,5255,5048,5009,5410,4609,5979,5184,5060,4771 5134
public static void main(String [] args) throws TimeoutException, InterruptedException, InsufficientCapacityException {
LogEventFactory factory = new LogEventFactory();
//ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
int ringBufferSize = 65536;
final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory,ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy());
EventHandlerGroup<LogEvent> eventHandlerGroup = disruptor.handleEventsWithWorkerPool(new LogEventWorkHandlerConsumer(),new LogEventWorkHandlerConsumer2(),new LogEventWorkHandlerConsumer3());
final RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
disruptor.setDefaultExceptionHandler(new LogEventExceptionHandler());
disruptor.start();
/*new Thread(new Runnable() {
@Override
public void run() {
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
for(int i = 0; i < BlockingQueueTest.eventNum; i++){
logEventProducer.onData(i, "c" + i, new Date());
}
}
}).start();*/
//启用单独线程进行发布
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//进行事件的发布
LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
for(int i = 0; i < BlockingQueueTest.eventNum; i++){
producerWithTranslator.onData(i, "c" + i, new Date());
}
}
});
thread.start();
while (true){
long sequence = ringBuffer.getMinimumGatingSequence();
//System.out.println(sequence);
if(sequence + 1 == BlockingQueueTest.eventNum){
break;
}
}
//检查现在运行的线程
//Thread.currentThread().getThreadGroup().list();
//从代码上理解应该只是把消费者线程关闭了
disruptor.halt();
//Thread.currentThread().getThreadGroup().list();
//在关闭disruptor后如果发起新线程,那么新线程不会关闭,因为需要消费者消防数据导致处于阻塞状态,线程会一直挂起
/*Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
//进行事件的发布
LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
for(int i = 0; i < BlockingQueueTest.eventNum; i++){
producerWithTranslator.onData(i, "d" + i, new Date());
}
}
});
thread1.setName("thread1");
thread1.start();
while (true){
long sequence = ringBuffer.getMinimumGatingSequence();
System.out.println(sequence);
if(sequence + 1 == BlockingQueueTest.eventNum){
break;
}
}
Thread.currentThread().getThreadGroup().list();*/
System.out.println(eventHandlerGroup.asSequenceBarrier().getCursor());
}
}
这个地方因为添加了关闭程序,所以性能上面比没有添加关闭操作及判断要慢一些,上面的测试数据是没有关闭操作的测试数据。
下面是BlockingQueue的代码:
/**
* @author liaoyubo
* @version 1.0 2017/10/11
* @description
*/
public class BlockingQueueMultiTest {
//5000000:1195,1189,1147,1181,1135,1174,1216,1133,1145,1093 1161
//10000000:2633,1972,2417,2330,2255,2429,2354,2178,2235,2402 2321
//50000000:11841,11902,13048,12262,9769,10531,12721,12229,12283,13595 12018
public static void main(String[] args) throws InterruptedException {
final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<LogEvent>(65536);
//System.out.println("开始时间:" + startTime);
new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (i < BlockingQueueTest.eventNum) {
LogEvent logEvent = new LogEvent();
logEvent.setLogId(i);
logEvent.setContent("c" + i);
logEvent.setDate(new Date());
try {
queue.put(logEvent);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}).start();
// 创建缓冲池
final ExecutorService executorService = Executors.newCachedThreadPool();
final long startTime = System.currentTimeMillis();
executorService.execute(new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
LogEvent logEvent = queue.take();
logEvent.setContent(logEvent.getContent().toUpperCase());
//System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("BlockingQueue1 花费时间:" + (endTime-startTime) + "ms");
}
}));
executorService.execute(new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
LogEvent logEvent = queue.take();
logEvent.setContent(logEvent.getContent().toUpperCase());
//System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("BlockingQueue2 花费时间:" + (endTime-startTime) + "ms");
}
}));
executorService.execute(new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
LogEvent logEvent = queue.take();
logEvent.setContent(logEvent.getContent().toUpperCase());
//System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("BlockingQueue3 花费时间:" + (endTime-startTime) + "ms");
}
}));
}
}
上面的Disruptor和BlockingQueue分别使用了3个线程来消费生产的数据,分别用500W,1000W,5000W的生产数据来取运行10次的平均值来查看结果:
数量 |
Disruptor |
BlockingQueue |
性能对比 |
500W |
604 |
1161 |
1.9 |
1000W |
1235 |
2321 |
1.9 |
5000W |
5134 |
12018 |
2.3 |
通过以上的测试对比说明在单生产者-多消费者的模式下Disruptor的性能还是比ArrayBlockingQueue好,这也可能和我使用的代码有关(电脑的配置是win10,64位,i5-6500的cpu,8g的内存,jdk是1.8).
相关推荐
主要介绍了java并发学习之BlockingQueue实现生产者消费者详解,具有一定参考价值,需要的朋友可以了解下。
java 多线程 生产者消费者模式,多个生产者对多个消费者,使用jdk 线程池及 BlockingQueue实现,解决了待生产的任务生产完成后,正常终止所有线程,避免线程(特别是消费者线程)因阻塞而无限等待的情况。源码中还简单...
BlockingQueue java 的工具类,初次要用于消费者,生产者的同步问题。
主要介绍了Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析,涉及queue,BlockingQueue等有关内容,具有一定参考价值,需要的朋友可以参考。
主要介绍了Java多线程 BlockingQueue实现生产者消费者模型详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
这个demo主要讲解了BlockingQueue的使用希望可以帮户需要的同学.
源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue...
线程----BlockingQueue 的介绍说明
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文简要介绍下BlockingQueue...
类似java BlockingQueue,C++写的,支持Windows与Linux。
简单实现BlockingQueue,BlockingQueue源码详解
我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。 ...
我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。 ...
定义全局线程池,将用户的请求放入自定义队列中,排队等候线程调用,等待超时则自动取消该任务,实现超时可取消的异步任务
BlockingQueue支持两个附加操作的Queue:1)当Queue为空时,获取元素线程被阻塞直到Queue变为非空;...BlockingQueue不允许元素为null,如果入队一个null元素,会抛NullPointerException。常用于生产者消费者模式。
BlockingQueue是一种特殊的Queue,若BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态直到BlocingkQueue进了新货才会被唤醒,下面是用BlockingQueue来实现Producer和Consumer的例子
本篇文章小编为大家介绍,基于java中BlockingQueue的使用介绍。需要的朋友参考下
14-阻塞队列BlockingQueue实战及其原理分析二.pdf
Java 多线程与并发(16_26)-JUC集合_ BlockingQueue详解