精华内容
下载资源
问答
  • Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
  • disruptor3 我决定对整个版本的Disruptor都放置一个beta标签感到无聊,所以我决定将Disruptor 3.0.0发行到全世界。 此版本的最大挑战是清理代码并提出更好的算法来处理多个生产者。 如果我很幸运,可以更快。 在...

    disruptor3

    我决定对整个版本的Disruptor都放置一个beta标签感到无聊,所以我决定将Disruptor 3.0.0发行到全世界。 此版本的最大挑战是清理代码并提出更好的算法来处理多个生产者。 如果我很幸运,可以更快。 在发布此版本时,我最初走了几个阴暗的小巷,但与2.x版本的区别不大,但是又带来了一些好处。 我本来想实施一些功能测试并改善文档,但是我可能会一直等到那些方面,直到我100%满意为止。

    改进了对多个生产者的支持

    Disruptor的2.x版本真正令人讨厌的问题之一是,它附带了ClaimStrategy的2种实现。 一种是用于线程多于CPU的情况,另一种用于线程少的情况。 这有点难看。 现在,第3版只有一个MultiProducerSequencer(稍后会在Sequencers / ClaimStrategies上提供更多信息),在两种情况下都可以正常工作。 对于无竞争的情况,我们以前的实现大约是18M ops / sec v 15M ops / sec *,但速度较慢,但​​是随着线程数量的增加和CPU数量的增加,扩展性会更好。 去年,我在TechMesh和JAX London上做了一些关于新实现的讨论。 如果您对算法的细节感兴趣,可以观看有关JAX London演讲YouTube视频

    没有更多的索赔策略

    内部设计和API的主要变化之一是,我摆脱了ClaimStrategy接口。 通过对多生产者用例进行的一些更改,它变得很麻烦。 因此,我删除了它,而是提供了覆盖2个重要用例的Sequencer,SingleProducerSequencer和MultiProducerSequencer的2种实现。 很抱歉,我们不再允许用户在其中插入自己的实现,但是结果使代码变得更加简洁,简单和快捷。

    单生产者案例更快

    从一个生产者到一个消费者基准测试*,第3版的运行速度约为2.2亿每秒,而第2版的运行速度约为8000万每秒。这主要是总体清理(例如删除ClaimStrategies)和一些小优化的结果。

    改进的用于RingBuffer的EventTranslator API

    RingBuffer现在支持的EventTranslator API现在具有用于传递给EventTranslator实现的1、2、3和Varargs参数的变体。 这意味着不必通过匿名内部类或通过EventTranslator实现中的字段推送值来完成对EventTranslator的输入。 它还使将EventTranslators编写为易于测试的独立单元变得更加容易。 代码隔离FTW!

    定序器的批量生产者接口

    我不是我们现有的批量生产者接口的忠实拥护者,最初我打算将其转储。 经过一番思考,我将其部分重新添加了。即,它仅在Sequencer上可用。 如果我可以提出一个干净安全的API来公开该功能,则可以稍后将其添加到RingBuffer中。 出于兴趣,我添加了使用批处理接口的原始定序器吞吐量测试。 该测试没有实际作用,只是在线程之间发出信号。 但是,我证明了Disruptor不能做任何有用的事情,它以10批为单位的速度超过4000000000000 ops / sec。 无论如何,这都不是一个有用的基准,但是对于吹牛的权利来说却是一个很好的选择!

    特别提及

    有一些人帮助将Disruptor从版本2状态升级到版本3:
    • Jason Koch:亲自做了大量JavaDoc。 自合并他的请求请求以来,我一直在努力使其保持最新状态。
    • Dalibor Novak和Danny Yates:Github和Gradle迁移。
    • Adrian Sutton:有关DSL的进一步工作。
    • 马丁·汤普森(Martin Thompson):对于他的许多有用想法,尤其是围绕支持多个生产者的算法。
    *所有测试都是在3.40 GHz – YMMV的Intel Core i7-3770(Ivy Bridge)上完成的。

    参考:来自Bad Concurrency博客的JCG合作伙伴 Michael Barker 发布了Disruptor 3.0.0

    翻译自: https://www.javacodegeeks.com/2013/04/release-of-disruptor-3-0-0.html

    disruptor3

    展开全文
  • Spring整合Disruptor3

    2019-08-04 18:07:14
    1.Disruptor介绍 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。 可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地...

    1.Disruptor介绍

    从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。
    可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。
    我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。
     Disruptor 在实现上述功能的同时,提供了很多很好的特性:

     1.同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);

     2.预分配用于存储事件内容的内存空间;

     3.针对极高的性能目标而实现的极度优化和无锁的设计;

    简而言之,当你需要在两个独立的处理过程之间交换数据时,就可以使用 Disruptor 。当然使用队列也可以,只不过 Disruptor 的性能更好。

    2.实战


    本文先不具体去阐述Disruptor的工作具体原理,只是简单地将Spring与其整合。具体步骤如下:

    1.在pom文件中引入disruptor

    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.4.2</version>
    </dependency>

    2.创建事件

    @Data
    public class FeedComeEvent {
        private int consumerId;
    }

    3.创建消息工厂

    public class FeedComeEventFactory implements EventFactory {
        @Override
        public Object newInstance() {
            return new FeedComeEvent();
        }
    }

    4.创建消费者

    @Component
    public class FeedComeEventHandler implements EventHandler<FeedComeEvent>,WorkHandler<FeedComeEvent> {
    
        @autowired
        FeedMongoDao feedMongoDao;
    
        @Override
        public void onEvent(FeedComeEvent feedComeEvent, long l, boolean b) throws Exception {
             this.onEvent(feedComeEvent);
        }
    
        @Override
        public void onEvent(FeedComeEvent feedComeEvent) throws Exception {
            feedMongoDao.log(feedComeEvent);
        }
    }

    5.自定义异常

    @Log4j2
    public class FeedComeEventHandlerException implements ExceptionHandler {
        @Override
        public void handleEventException(Throwable throwable, long sequence, Object event) {
            throwable.fillInStackTrace();
            log.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), throwable.getMessage());
        }
    
        @Override
        public void handleOnStartException(Throwable throwable) {
            log.error("start disruptor error ==[{}]!", throwable.getMessage());
        }
    
        @Override
        public void handleOnShutdownException(Throwable throwable) {
            log.error("shutdown disruptor error ==[{}]!", throwable.getMessage());
        }
    }

    6.定义Disruptor服务

    @Service
    public class FeedComeServiceImpl implements IFeedComeService, DisposableBean,InitializingBean {
        private Disruptor<FeedComeEvent> disruptor;
        private static final int RING_BUFFER_SIZE = 8;
    
        @Autowired
        private FeedComeEventHandler feedComeEventHandler;
    
        @Override
        public void destroy() throws Exception {
            disruptor.shutdown();
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            disruptor = new Disruptor<FeedComeEvent>(new FeedComeEventFactory(),RING_BUFFER_SIZE, Executors.defaultThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
            disruptor.setDefaultExceptionHandler(new FeedComeEventHandlerException());
            disruptor.handleEventsWith(feedComeEventHandler);
            disruptor.start();
        }
    
    
        @Override
        public void feedCome(int consumerId) {
            RingBuffer<FeedComeEvent> ringBuffer = disruptor.getRingBuffer();
            ringBuffer.publishEvent(new EventTranslatorOneArg<FeedComeEvent,  Integer>() {
                @Override
                public void translateTo(FeedComeEvent event, long sequence, Integer consumerId) {
                    event.setConsumerId(consumerId);
                }
            }, consumerId);  
         
        }
    }

    7.使用Disruptor服务

      @GetMapping("feedcome")
      @ResponseBody
      public String feedCome(int consumerId) {
        feedComeService.feedCome(consumerId);
      }

    3.注意事项

    在享受Disruptor框架提供的强大特性的同时,也要注意Disruptor框架带来的副作用。

    根据笔者的线上应用经验,有以下几个事项需要注意:

    1.仔细定义Disruptor服务相关参数: 在定义Disruptor服务的同时,需要基于部署机器的配置情况仔细确定队列长度和应用哪种消费者等待策略

    2.充分的测试:Disruptor服务是一个高内存和高CPU使用的服务。因此,在系统使用Disruptor服务的时候,需要密切注意CPU和内存的使用情况,防止CPU被打爆和内存占用过高

    展开全文
  • disruptor3.x 简单例子

    2013-05-03 14:53:49
    这个是最新的 disruptor3的例子....来自官方代码稍微简化后的     package io.grass.core.collect; import static com.lmax.disruptor.RingBuffer.createSingleProducer; import java.util.concurrent....

    这个是最新的 disruptor3的例子....来自官方代码稍微简化后的

     

     

    package io.grass.core.collect;
    
    import static com.lmax.disruptor.RingBuffer.createSingleProducer;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.lmax.disruptor.BatchEventProcessor;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.util.PaddedLong;
    
    /**
     * 简单测试
     * 
     * @author zuoge85
     * 
     */
    public class DisruptorBaseTest {
    	protected static final Logger log = LoggerFactory.getLogger(DisruptorBaseTest.class);
    	
    	private static final int THREAD_NUMS = 1;
    	private static final int BUFFER_SIZE = 1024 * 8;
    	private static final long NUMS = 1000_000_00L;
    
    	public static void main(String[] args) throws InterruptedException {
    		RingBuffer<MessageEvent> ringBuffer = createSingleProducer(
    				MessageEvent.EVENT_FACTORY, BUFFER_SIZE,
    				new YieldingWaitStrategy());
    		ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMS);
    		SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    
    		MessageMutationEventHandler[] handlers = new MessageMutationEventHandler[THREAD_NUMS];
    		BatchEventProcessor<?>[] batchEventProcessors = new BatchEventProcessor[THREAD_NUMS];
    
    		for (int i = 0; i < THREAD_NUMS; i++) {
    			handlers[i] = new MessageMutationEventHandler();
    			batchEventProcessors[i] = new BatchEventProcessor<MessageEvent>(
    					ringBuffer, sequenceBarrier, handlers[i]);
    			ringBuffer
    					.addGatingSequences(batchEventProcessors[i].getSequence());
    		}
    
    		CountDownLatch latch = new CountDownLatch(THREAD_NUMS);
    		for (int i = 0; i < THREAD_NUMS; i++) {
    			long n =  batchEventProcessors[i].getSequence().get() + NUMS;
    			System.out.println(n +"    "  +NUMS+"  "+batchEventProcessors[i].getSequence().get() );
    			handlers[i].reset(latch, n);
    			executors.submit(batchEventProcessors[i]);
    		}
    		long start = System.currentTimeMillis();
    
    		for (long i = 0; i < NUMS; i++) {
    			long sequence = ringBuffer.next();
    			ringBuffer.get(sequence).setValue(i);
    			ringBuffer.publish(sequence);
    		}
    
    		latch.await();
    		long opsPerSecond = (NUMS * 1000L)
    				/ (System.currentTimeMillis() - start);
    
    		for (int i = 0; i < THREAD_NUMS; i++) {
    			batchEventProcessors[i].halt();
    			if ((NUMS - 1) == handlers[i].getValue()) {
    
    			} else {
    				log.error("error");
    			}
    		}
    		executors.shutdown();
    		log.info(String.format("Run %d, Disruptor=%,d ops/sec%n", 1, opsPerSecond));
    	}
    
    	public static final class MessageMutationEventHandler implements
    			EventHandler<MessageEvent> {
    		private final PaddedLong value = new PaddedLong();
    		private long count;
    		private CountDownLatch latch;
    
    		public MessageMutationEventHandler() {
    			
    		}
    
    		public long getValue() {
    			return value.get();
    		}
    
    		public void reset(final CountDownLatch latch, final long expectedCount) {
    			value.set(0L);
    			this.latch = latch;
    			count = expectedCount;
    		}
    
    		@Override
    		public void onEvent(final MessageEvent event, final long sequence,
    				final boolean endOfBatch) throws Exception {
    			//log.info("onEvent:{}",event.getValue());
    			value.set(event.getValue());
    			if (count == sequence) {
    				latch.countDown();
    			}
    		}
    	}
    
    	public static final class MessageEvent {
    		private long value;
    
    		public long getValue() {
    			return value;
    		}
    
    		public void setValue(final long value) {
    			this.value = value;
    		}
    
    		public final static EventFactory<MessageEvent> EVENT_FACTORY = new EventFactory<MessageEvent>() {
    			public MessageEvent newInstance() {
    				return new MessageEvent();
    			}
    		};
    	}
    }
    

     

     

    展开全文
  • Disruptor3.x 的简单封装

    2018-01-12 14:18:28
    disruptor = new Disruptor(EVENT_FACTORY , bufferSize , threadFactory, ProducerType.SINGLE,YIELDING_WAIT); } @Override public void start() { disruptor.handleEventsWith(handler); disruptor.start...



    package com.www.pay.business;
    
    /**
     * ------------------------------
     * 事件对象
     * ------------------------------
     * @author wdm  @date 2018年1月12日
     * @version 1.0
     */
    public class Event{ 
    	
    	private EventData  data;
    
    	public Event(){}
        
        public Event(EventData data){
        	this.data=data;
        }
    
    	public EventData getData() {
    		return data;
    	}
    
    	public void setData(EventData data) {
    		this.data = data;
    	}
    } 

    package com.www.pay.business;
    
    import com.cdoframework.cdolib.data.cdo.CDO;
    
    /**
     * ------------------------------
     * 事件数据
     * ------------------------------
     * @author wdm  @date 2018年1月12日
     * @version 1.0
     */
    public class EventData {
    	
    	/**
    	 * 这个数据类型根据业务自定义
    	 * 可以定义为任意数据类型
    	 */
    	public CDO data;
    	public EventData(CDO data){
    		this.data=data;
    	}
    	
    }
    

    package com.www.pay.business;
    
    import org.apache.log4j.Logger;
    
    import com.lmax.disruptor.EventHandler;
    
    /**
     * ------------------------------
     * 交易事件处理
     * ------------------------------
     * @author wdm  @date 2018年1月12日
     * @version 1.0
     */
    public class DisruptorEventHandler implements EventHandler<Event> {
    	private static Logger logger=Logger.getLogger(DisruptorEventHandler.class);
    	
    	private String name;
    	
    	public DisruptorEventHandler(String name){
    		this.name=name;
    	}
    	
        @Override 
        public void onEvent(Event event, long l, boolean b){ 
        	try {
        		System.out.println("收到消息:"+name);
        		System.out.println(name+"----------"+event.getData().data); 
        		Thread.sleep(1);
    		} catch (Exception e) {
    			logger.error("处理交易数据异常:"+e.getMessage(),e);
    		}
        } 
    } 

    package com.www.pay.business;
    
    public interface IEventPublisher {
    	
    	public void start();
    	
    	public void publish(EventData data);
    	
    }
    

    package com.www.pay.business;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.WaitStrategy;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    /**
     * ------------------------------
     * Disruptor发布对象
     * ------------------------------
     * @author wdm  @date 2018年1月12日
     * @version 1.0
     */
    public class DisruptorPublisher implements IEventPublisher {
    	
    	//等待策略
        private static final WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        //事件工厂
        private static final EventFactory<Event> EVENT_FACTORY = new  EventFactory<Event>() { 
            @Override 
            public Event newInstance() { 
                return new Event(); 
            } 
        } ;
        private static final EventTranslatorOneArg<Event, EventData> TRANSLATOR = new EventTranslatorOneArg<Event, EventData>() { 
        	public void translateTo(Event event, long sequence, EventData data) { 
        		event.setData(data);
        	} 
        };
        
    	
    
        private Disruptor<Event> disruptor;
        private EventHandler<? super Event>[] handler;
        private RingBuffer<Event> ringbuffer;    
        private ThreadFactory threadFactory;
    
        public DisruptorPublisher(int bufferSize, EventHandler<? super Event>[] handler) {
            this.handler = handler;
            threadFactory = Executors.defaultThreadFactory();
            disruptor = new Disruptor<Event>(EVENT_FACTORY , bufferSize ,  threadFactory, ProducerType.SINGLE,YIELDING_WAIT);
        }
    
        @Override
        public void start() {
            disruptor.handleEventsWith(handler);
            disruptor.start();
            ringbuffer = disruptor.getRingBuffer();
        }
    
        @Override
        public void publish(EventData data) {
    //        long sequence = ringbuffer.next();
    //        try {
    //            Event evt = ringbuffer.get(sequence);
    //            evt.setTrade(data);
    //        } finally {
    //            ringbuffer.publish(sequence);
    //        }
        		ringbuffer.publishEvent(TRANSLATOR, data); 
        }
    }

    /**
    	maven坐标
    	<dependency>
    	    <groupId>com.lmax</groupId>
    	    <artifactId>disruptor</artifactId>
    	    <version>3.3.7</version>
    	</dependency>
    */
    public class TestDisruptor {
    
        public static void main(String[] args) {
        	EventHandler<Event>[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler("处理器1"),new DisruptorEventHandler("处理器2")};
        	DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers);
        	dp.start();
        	
        	for (int i = 0; true; i++) {
        		CDO cdo=new CDO();
        		cdo.setStringValue("strSN", i+"-"+DataUtil.uuid());
        		EventData data=new EventData(cdo);
        		dp.publish(data);
    		}
        	
    	}
    	
    }

    说明:

    1.在发布事件时是两阶段的:

    第一步获取一个事件位置序列号,第二步发布事件(这一步必须放finally中防止业务异常导致RingBuffer中位置错乱)

    使用最新版的推荐使用:ringbuffer.publishEvent(TRANSLATOR, data); 这种方式




    展开全文
  • Disruptor 3种EventProcessor实现类

    千次阅读 2014-07-10 05:15:26
    BatchEventProcessor 一个消费者处理对象 表示一个消费者.  //获取下一个消费下标位置  long nextSequence = sequence.get() + 1L;  //一直检查获取可以消费的产品位置. while (true) ... {
  • Disruptor

    2019-04-08 18:48:42
    1:是一个高性能的内存并发框架, 2:由于为了提高不同系统传递数据的低...3Disruptor是经过研究和测试的结果,在cpu级别的缓存丢失和需要kernel去仲裁的锁操作都是非常昂贵的。所以,Disruptor是无锁的。 4: ...
  • Disruptor系列3Disruptor样例实战

    千次阅读 2018-05-21 22:10:57
    章节回顾: - Disruptor系列1:初识Disruptor - Disruptor系列2:Disruptor...如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor系...
  • disruptor

    2013-12-13 00:36:50
    一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据。而每台服务器(8核12G)上CPU占用不到100%,load不超过5。...3,尽量避免大量GC 缓冲 vs 性能瓶颈 提高硬盘写入I
  • Disruptor3

    2018-11-29 18:28:13
    7. 发布消息Disruptor#publishEvent=》RingBuffer#publishEvent public void publishEvent(final EventTranslator<T> eventTranslator) { ringBuffer.publishEvent(eventTranslator); } public void ...
  • Disruptor简介

    2018-04-08 16:44:50
    一.介绍Disruptor是java的并发框架,实现了无锁的队列,应用场景是“生产者-消费者”或“发布-订阅”模型的应用场合。可以拿JDK的BlockQueue来和...为事件提前分配内存3.可选择无锁二.重要概念Disruptor里面有一...
  • 下面以一个简单的例子来看看Disruptor的用法:生产者发送一个long型的消息,消费者接收消息并打印出来。 首先,我们定义一个Event: public class LongEvent { private long value; public void set(long value) ...

空空如也

空空如也

1 2 3 4 5 ... 15
收藏数 298
精华内容 119
关键字:

disruptor3