为您推荐:
精华内容
最热下载
问答
  • Disruptor很好的解决了这些问题。 一、Disruptor究竟是什么 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disrup

    前言

    作为一位开发者,我们经常会接触到“线程”一词,线程意味着并发,但是并发编程是比较困难的。在并发编程中,我们比较关心的就是线程安全问题,解决线程安全问题常用的方法是加锁,可以是乐观锁或者悲观锁,但是我们知道锁技术是很慢的,而且加锁的过程中还很容易出现死锁的现象。Disruptor很好的解决了这些问题。

    一、Disruptor究竟是什么

    Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。

    二、Disruptor为什么这么快

    1.使用无锁算法来实现并发:

    首先,Disruptor根本就不用锁。在需要确保操作是线程安全的地方,Disruptor使用CAS(Compare And Swap/Set)操作。这是一个CPU级别的指令,它的工作方式有点像乐观锁——CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其它操作先改变了这个值。CAS操作比锁消耗资源少的多,因为它们不牵涉操作系统,它们直接在CPU上操作。

    2.让消息可以通过被多个消费者并行处理

    一般我们在使用队列的时候,队列中的消息只会被一个消费者使用,但是在 Disruptor 中,同一个消息可以被多个消费者同时处理,多个消费者之间是并行的。

    3.减少垃圾回收:

    Disruptor为了在低延迟的系统中减少进行内存分配,减少垃圾回收所带来的停顿时间,Disruptor使用 RingBuffer 来达成这个目标,在 RingBuffer 中提前创建好对象,后续通过反复利用这些对象来避免垃圾回收,这个实现是线程安全的。

    4.缓存行填充

    这块比较复杂,大家自行百度,这里就不再赘述。。。。

    三、Disruptor实战

    maven依赖

    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     *基于Disruptor的Executor,比Java自带的Executor速度快2-3倍.
     * @author syp
     */
    public class DisruptorExecutor implements Executor {
    
        /**
         * 线程名称
         */
        private static final String DEFAULT_EXECUTOR_NAME = "disruptor-executor";
        /**
         * Disruptor的ringBuffer缓存大小,必须是2的幂
         */
        private static final int BUFFER_SIZE = 65536;
        /**
         * 实际执行task的executor
         */
        private final ExecutorService executor;
    
        private final Disruptor<TaskEvent> disruptor;
    
        public DisruptorExecutor() {
            this(DEFAULT_EXECUTOR_NAME);
        }
    
        /**
         * 构造函数.
         *
         * @param name 名称.
         */
        public DisruptorExecutor(String name) {
            this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
            this.disruptor = new Disruptor<>(TaskEvent::new, BUFFER_SIZE, executor, ProducerType.MULTI,
                    new BlockingWaitStrategy());
        }
    
        /**
         * 启动DisruptorExecutor.
         */
        public void startUp() {
            disruptor.handleExceptionsWith(new LogExceptionHandler());
            disruptor.handleEventsWith((event, sequence, endOfBatch) -> event.getTask().run());
            disruptor.start();
        }
    
        /**
         * 停止任务调度(阻塞直到所有提交任务完成).
         *
         * @return 结果.
         */
        public boolean awaitAndShutdown() {
    
            return awaitAndShutdown(Integer.MAX_VALUE, TimeUnit.SECONDS);
        }
    
        /**
         * 停止任务调度(阻塞直到所有提交任务完成).
         *
         * @param timeout  . 超时时间.
         * @param timeUnit . 时间单位.
         * @return 结果.
         */
        public boolean awaitAndShutdown(long timeout, TimeUnit timeUnit) {
            shutdown();
    
            try {
    
                return executor.awaitTermination(timeout, timeUnit);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
    
            return false;
        }
    
        /**
         * 停止任务调度.
         */
        void shutdown() {
            disruptor.shutdown();
            executor.shutdown();
        }
    
        /**
         * 强制停止任务调度(正在执行的任务将被停止,未执行的任务将被丢弃).
         */
        void halt() {
            executor.shutdownNow();
            disruptor.halt();
        }
    
        /**
         * 执行任务.
         *
         * @param task . 任务.
         */
        @Override
        public void execute(Runnable task) {
            disruptor.getRingBuffer().publishEvent((event, sequence, buffer) -> event.setTask(task), task);
        }
    }
    

    在这里插入图片描述

    更多内容请关注微信公众号
    在这里插入图片描述

    展开全文
    jjs15259655776 2020-09-06 23:16:53
  • } } 2、集成Disruptor 1)、common @Data public class TranslatorDataWrapper { private TranslatorData translatorData; private ChannelHandlerContext channelHandlerContext; } @Data public abstract class ...

    1、Netty实现服务端与客户端数据传输

    1)、依赖

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.12.Final</version>
            </dependency>
            <!-- 序列化框架marshalling -->
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling</artifactId>
                <version>1.3.0.CR9</version>
            </dependency>
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-serial</artifactId>
                <version>1.3.0.CR9</version>
            </dependency>
            <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.3.2</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.20</version>
            </dependency>
    

    2)、common

    @Data
    public class TranslatorData implements Serializable {
        private String id;
    
        private String name;
    
        private String message;
    }
    
    /**
     * Marshalling工厂
     */
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         *
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            // 创建了MarshallingConfiguration对象,配置了版本号为5
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            // 根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            return new MarshallingDecoder(provider, 1024 * 1024 * 1);
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         *
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            return new MarshallingEncoder(provider);
        }
    }
    

    3)、服务端

    public class NettyServer {
        public NettyServer() {
            // 创建两个工作线程组:一个用于接收网络请求的,另一个用于实际处理业务的
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            try {
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        // 缓存区动态调配(自适应)
                        .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                        // 缓冲区池化操作
                        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel sc) throws Exception {
                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                                sc.pipeline().addLast(new ServerHandler());
                            }
                        });
                // 绑定端口,同步等等请求连接
                ChannelFuture cf = serverBootstrap.bind(8765).sync();
                System.out.println("Server Startup...");
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 优雅停机
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                System.out.println("Sever ShutDown...");
            }
        }
    }
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            TranslatorData request = (TranslatorData) msg;
            System.out.println("Server端:id=" + request.getId()
                    + ", name=" + request.getName()
                    + ", message=" + request.getMessage());
            TranslatorData response = new TranslatorData();
            response.setId("resp:" + request.getId());
            response.setName("resp:" + request.getName());
            response.setMessage("resp:" + request.getMessage());
            ctx.writeAndFlush(response);
        }
    }
    
    @SpringBootApplication
    public class NettyServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyServerApplication.class, args);
            new NettyServer();
        }
    
    }
    

    4)、客户端

    public class NettyClient {
        public static final String HOST = "127.0.0.1";
    
        public static final int PORT = 8765;
    
        private EventLoopGroup group = new NioEventLoopGroup();
    
        private Channel channel;
    
        private ChannelFuture cf;
    
        public NettyClient() {
            this.connect(HOST, PORT);
        }
    
        private void connect(String host, int port) {
            Bootstrap bootstrap = new Bootstrap();
            try {
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        // 缓存区动态调配(自适应)
                        .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                        // 缓冲区池化操作
                        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel sc) throws Exception {
                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                                sc.pipeline().addLast(new ClientHandler());
                            }
                        });
                this.cf = bootstrap.connect(host, port).sync();
                System.out.println("Client connected...");
                this.channel = cf.channel();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void sendData() {
            for (int i = 0; i < 10; i++) {
                TranslatorData request = new TranslatorData();
                request.setId("" + i);
                request.setName("请求消息名称" + i);
                request.setMessage("请求消息内容" + i);
                this.channel.writeAndFlush(request);
            }
        }
    
        public void close() throws Exception {
            cf.channel().closeFuture().sync();
            // 优雅停机
            group.shutdownGracefully();
            System.out.println("Sever ShutDown...");
        }
    }
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                TranslatorData response = (TranslatorData) msg;
                System.out.println("Client端:id=" + response.getId()
                        + ", name=" + response.getName()
                        + ", message=" + response.getMessage());
            } finally {
                // 释放缓存
                ReferenceCountUtil.release(msg);
            }
        }
    }
    
    @SpringBootApplication
    public class NettyClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyClientApplication.class, args);
            // 建立连接并发送消息
            new NettyClient().sendData();
        }
    
    }
    

    2、集成Disruptor

    在这里插入图片描述

    1)、common

    @Data
    public class TranslatorDataWrapper {
        private TranslatorData translatorData;
    
        private ChannelHandlerContext channelHandlerContext;
    }
    
    @Data
    public abstract class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {
        protected String consumerId;
    
        public MessageConsumer(String consumerId) {
            this.consumerId = consumerId;
        }
    }
    
    public class MessageProducer {
        private String producerId;
        private RingBuffer<TranslatorDataWrapper> ringBuffer;
    
        public MessageProducer(String producerId,
                               RingBuffer<TranslatorDataWrapper> ringBuffer) {
            this.producerId = producerId;
            this.ringBuffer = ringBuffer;
        }
    
        public void sendData(TranslatorData data, ChannelHandlerContext ctx) {
            long sequence = ringBuffer.next();
            try {
                TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
                wrapper.setTranslatorData(data);
                wrapper.setChannelHandlerContext(ctx);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
    
    public class RingBufferWorkerPoolFactory {
        private static class SingletonHolder {
            static final RingBufferWorkerPoolFactory INSTANCE = new RingBufferWorkerPoolFactory();
        }
    
        private RingBufferWorkerPoolFactory() {
    
        }
    
        public static RingBufferWorkerPoolFactory getInstance() {
            return SingletonHolder.INSTANCE;
        }
    
        private static Map<String, MessageProducer> producers = new ConcurrentHashMap<>();
    
        private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<>();
    
        private RingBuffer<TranslatorDataWrapper> ringBuffer;
    
        private SequenceBarrier sequenceBarrier;
    
        private WorkerPool<TranslatorDataWrapper> workerPool;
    
        public void initAndStart(ProducerType producerType,
                                 int bufferSize,
                                 WaitStrategy waitStrategy,
                                 MessageConsumer[] messageConsumers) {
            // 1.构建ringBuffer对象
            this.ringBuffer = RingBuffer.create(producerType,
                    new EventFactory<TranslatorDataWrapper>() {
                        @Override
                        public TranslatorDataWrapper newInstance() {
                            return new TranslatorDataWrapper();
                        }
                    },
                    bufferSize,
                    waitStrategy);
            // 2.设置序号栅栏
            this.sequenceBarrier = ringBuffer.newBarrier();
            // 3.设置工作池
            this.workerPool = new WorkerPool<>(
                    this.ringBuffer,
                    this.sequenceBarrier,
                    new EventExceptionHandler(),
                    messageConsumers);
            // 4.把所构建的消费者置入池中
            for (MessageConsumer consumer : messageConsumers) {
                consumers.put(consumer.getConsumerId(), consumer);
            }
            // 5.添加sequences
            this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
            // 6.启动工作池
            this.workerPool.start(
                    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2)
            );
        }
    
        public MessageProducer getMessageProducer(String producerId) {
            MessageProducer messageProducer = producers.get(producerId);
            if (null == messageProducer) {
                messageProducer = new MessageProducer(producerId, this.ringBuffer);
                producers.put(producerId, messageProducer);
            }
            return messageProducer;
        }
    
        static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {
    
            @Override
            public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
    
            }
    
            @Override
            public void handleOnStartException(Throwable ex) {
    
            }
    
            @Override
            public void handleOnShutdownException(Throwable ex) {
    
            }
        }
    }
    

    2)、服务端

    public class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            TranslatorData request = (TranslatorData) msg;
            String producerId = "code:sessionId:001";
            MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
            messageProducer.sendData(request, ctx);
        }
    }
    
    public class MessageConsumerImplForServer extends MessageConsumer {
        public MessageConsumerImplForServer(String consumerId) {
            super(consumerId);
        }
    
        @Override
        public void onEvent(TranslatorDataWrapper event) throws Exception {
            TranslatorData translatorData = event.getTranslatorData();
            System.out.println("Server端:id=" + translatorData.getId()
                    + ", name=" + translatorData.getName()
                    + ", message=" + translatorData.getMessage());
            TranslatorData response = new TranslatorData();
            response.setId("resp:" + translatorData.getId());
            response.setName("resp:" + translatorData.getName());
            response.setMessage("resp:" + translatorData.getMessage());
            event.getChannelHandlerContext().writeAndFlush(response);
        }
    }
    
    @SpringBootApplication
    public class NettyServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyServerApplication.class, args);
            MessageConsumer[] consumers = new MessageConsumer[4];
            for (int i = 0; i < consumers.length; ++i) {
                consumers[i] = new MessageConsumerImplForServer("code:serverId:" + i);
            }
            RingBufferWorkerPoolFactory.getInstance().initAndStart(
                    ProducerType.MULTI,
                    1024 * 1024,
                    new BlockingWaitStrategy(),
                    consumers);
            new NettyServer();
        }
    
    }
    

    3)、客户端

    public class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            TranslatorData response = (TranslatorData) msg;
            String producerId = "code:sessionId:002";
            MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
            messageProducer.sendData(response, ctx);
        }
    }
    
    public class MessageConsumerImplForClient extends MessageConsumer {
        public MessageConsumerImplForClient(String consumerId) {
            super(consumerId);
        }
    
        @Override
        public void onEvent(TranslatorDataWrapper event) throws Exception {
            TranslatorData translatorData = event.getTranslatorData();
            try {
                System.out.println("Client端:id=" + translatorData.getId()
                        + ", name=" + translatorData.getName()
                        + ", message=" + translatorData.getMessage());
            } finally {
                // 释放缓存
                ReferenceCountUtil.release(translatorData);
            }
        }
    }
    
    @SpringBootApplication
    public class NettyClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyClientApplication.class, args);
            MessageConsumer[] consumers = new MessageConsumer[4];
            for (int i = 0; i < consumers.length; ++i) {
                consumers[i] = new MessageConsumerImplForClient("code:clientId:" + i);
            }
            RingBufferWorkerPoolFactory.getInstance().initAndStart(
                    ProducerType.MULTI,
                    1024 * 1024,
                    new BlockingWaitStrategy(),
                    consumers);
            // 建立连接并发送消息
            new NettyClient().sendData();
        }
    
    }
    

    推荐资料

    高性能Java并发框架disruptor源码解析与实战

    展开全文
    qq_40378034 2021-09-20 17:26:21
  • 本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor...

    章节回顾:
    - Disruptor系列1:初识Disruptor
    - Disruptor系列2:Disruptor原理剖析

    本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor系列2:Disruptor原理剖析。通过本章节,希望让大家对如何使用Disruptor有个初步认识,看看它能够解决哪些情况。

    具体而言,它可以解决如下方面:
    - 并行计算实现;
    - 串行依次执行;
    - 菱形方式执行;
    - 链式并行计算。
    并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。

    本样例是一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。

    事件类

    public class LongEvent {
        private Long number;
    
        public Long getNumber() {
            return number;
        }
    
        public void setNumber(Long number) {
            this.number = number;
        }
    }

    事件工厂类

    public class LongEventFactory implements EventFactory<LongEvent> {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }

    事件转换类

    public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {
        @Override
        public void translateTo(LongEvent event, long sequence, Long arg0) {
            event.setNumber(arg0);
        }
    }

    C1-1消费者类

    该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandlerWorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类,当然,也没啥问题。

    public class C11EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number += 10;
            System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number += 10;
            System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);
        }
    }

    C1-2消费者类

    该消费者类执行将数值乘以10的操作。

    public class C12EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number *= 10;
            System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number *= 10;
            System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);
        }
    }

    c2-1消费者类

    该消费者类负责将数值+20.

    public class C21EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number += 20;
            System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number += 20;
            System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);
        }
    }

    C2-2消费者类

    该消费者类负责将数值*20

    public class C22EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number *= 20;
            System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number *= 20;
            System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);
        }
    }

    主函数

    public class Main {
        public static void main(String[] args) {
            int bufferSize = 1024*1024;//环形队列长度,必须是2的N次方
            EventFactory<LongEvent> eventFactory = new LongEventFactory();
            /**
             * 定义Disruptor,基于单生产者,阻塞策略
             */
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());
            /////////////////////////////////////////////////////////////////////
            XXX(disruptor);//这里是调用各种不同方法的地方.
            /////////////////////////////////////////////////////////////////////
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            /**
             * 输入10
             */
            ringBuffer.publishEvent(new LongEventTranslator(),10L);
            ringBuffer.publishEvent(new LongEventTranslator(),100L);
        }
    }

    并行计算实现

    这里写图片描述
    并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。

    /**
      * 并行计算实现,c1,c2互相不依赖
      * <br/>
      * p --> c11
      *   --> c21
      */
     public static void parallel(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler(),new C21EventHandler());
         disruptor.start();
     }

    串行计算,依次执行

    这里写图片描述

    /**
      * 串行依次执行
      * <br/>
      * p --> c11 --> c21
      * @param disruptor
      */
     public static void serial(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
         disruptor.start();
     }

    菱形方式执行

    这里写图片描述

    /**
      * 菱形方式执行
      * <br/>
      *   --> c11
      * p          --> c21
      *   --> c12
      * @param disruptor
      */
     public static void diamond(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
         disruptor.start();
     }

    链式并行计算

    这里写图片描述

    /**
      * 链式并行计算
      * <br/>
      *   --> c11 --> c12
      * p
      *   --> c21 --> c22
      * @param disruptor
      */
     public static void chain(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
         disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
         disruptor.start();
     }

    上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
    - 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
    - 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
    - 最后,实例化多个事件消费类。

    并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例

    这里写图片描述

    /**
      * 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例
      * <br/>
      * p --> c11
      *   --> c21
      */
     public static void parallelWithPool(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
         disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
         disruptor.start();
     }

    串行依次执行,同时C11,C21分别有2个实例

    这里写图片描述

    /**
      * 串行依次执行,同时C11,C21分别有2个实例
       * <br/>
       * p --> c11 --> c21
       * @param disruptor
       */
      public static void serialWithPool(Disruptor<LongEvent> disruptor){
          disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
          disruptor.start();
      }
    展开全文
    twypx 2018-05-21 22:10:57
  • 第1章 课程开篇 第2章 并发编程框架中心讲解 ...第6章 Netty整合并发编程框架Disruptor实战百万长链接服务构建 第7章 分布式一致ID生成效劳架构规划 第8章 课程总结 coding.rar下载地址:百度网盘 ...

    第1章 课程开篇

    第2章 并发编程框架中心讲解

    第3章 并发编程框架高档特性讲解

    第4章 并发编程深入学习与面试精讲

    第5章 并发编程框架底层源码深度分析

    第6章 Netty整合并发编程框架Disruptor实战百万长链接服务构建

    第7章 分布式一致ID生成效劳架构规划

    第8章 课程总结

    coding.rar


    下载地址:百度网盘

    展开全文
    lele11123 2019-05-06 11:18:49
  • weixin_33971130 2013-02-19 11:17:00
  • qq_20949471 2022-01-16 00:11:56

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,122
精华内容 448
热门标签
关键字:

disruptor实战