精华内容
下载资源
问答
  • Netty高并发性能优化

    万次阅读 2018-09-28 17:24:26
    Netty果然效率很,不用做太多努力就能达到一个比较的tps。但使用过程中也碰到了一些问题,个人觉得都是比较经典而在网上又不太容易查找到相关资料的问题,所以在此总结一下。 1.Context Switch过 压测时用...

    最近在写一个后台中间件的原型,主要是做消息的分发和透传。因为要用Java实现,所以网络通信框架的第一选择当然就是Netty了,使用的是Netty 4版本。Netty果然效率很高,不用做太多努力就能达到一个比较高的tps。但使用过程中也碰到了一些问题,个人觉得都是比较经典而在网上又不太容易查找到相关资料的问题,所以在此总结一下。

    1.Context Switch过高

    压测时用nmon监控内核,发现Context Switch高达30w+。这明显不正常,但JVM能有什么导致Context Switch。参考之前整理过的恐龙书《Operating System Concept》的读书笔记《进程调度》和Wiki上的Context Switch介绍,进程/线程发生上下文切换的原因有:

    • I/O等待:在多任务系统中,进程主动发起I/O请求,但I/O设备还没有准备好,所以会发生I/O阻塞,进程进入Wait状态。
    • 时间片耗尽:在多任务分时系统中,内核分配给进程的时间片已经耗尽了,进程进入Ready状态,等待内核重新分配时间片后的执行机会。
    • 硬件中断:在抢占式的多任务分时系统中,I/O设备可以在任意时刻发生中断,CPU会停下当前正在执行的进程去处理中断,因此进程进入Ready状态。

    根据分析,重点就放在第一个和第二个因素上。

    进程与线程的上下文切换

    之前的读书笔记里总结的是进程的上下文切换原因,那线程的上下文切换又有什么不同呢?在StackOverflow上果然找到了提问thread context switch vs process context switch

    “The main distinction between a thread switch and a process switch is that during a thread switch, the virtual memory space remains the same, while it does not during a process switch. Both types involve handing control over to the operating system kernel to perform the context switch. The process of switching in and out of the OS kernel along with the cost of switching out the registers is the largest fixed cost of performing a context switch. 
    A more fuzzy cost is that a context switch messes with the processors cacheing mechanisms. Basically, when you context switch, all of the memory addresses that the processor “remembers” in it’s cache effectively become useless. The one big distinction here is that when you change virtual memory spaces, the processor’s Translation Lookaside Buffer (TLB) or equivalent gets flushed making memory accesses much more expensive for a while. This does not happen during a thread switch.”

    通过排名第一的大牛的解答了解到,进程和线程的上下文切换都涉及进出系统内核和寄存器的保存和还原,这是它们的最大开销。但与进程的上下文切换相比,线程还是要轻量一些,最大的区别是线程上下文切换时虚拟内存地址保持不变,所以像TLB等CPU缓存不会失效。但要注意的是另一份提问What is the overhead of a context-switch?的中提到了:Intel和AMD在2008年引入的技术可能会使TLB不失效。感兴趣的话请自行研究吧。

    1.1 非阻塞I/O

    针对第一个因素I/O等待,最直接的解决办法就是使用非阻塞I/O操作。在Netty中,就是服务端和客户端都使用NIO。

    这里在说一下如何主动的向Netty的Channel写入数据,因为网络上搜到的资料都是千篇一律:服务端就是接到请求后在Handler中写入返回数据,而客户端的例子竟然也都是在Handler里Channel Active之后发送数据。因为要做消息透传,而且是向下游系统发消息时是异步非阻塞的,网上那种例子根本没法用,所以在这里说一下我的方法吧。

    关于服务端,在接收到请求后,在channelRead0()中通过ctx.channel()得到Channel,然后就通过ThreadLocal变量或其他方法,只要能把这个Channel保存住就行。当需要返回响应数据时就主动向持有的Channel写数据。具体请参照后面第4节。

    关于客户端也是同理,在启动客户端之后要拿到Channel,当要主动发送数据时就向Channel中写入。

    复制代码

    EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(...);
                    }
                });
    
            try {
                ChannelFuture future = b.connect().sync();
                this.channel = future.channel();
            }
            catch (InterruptedException e) {
                throw new IllegalStateException("Error when start netty client: addr=[" + addr + "]", e);
            }

    复制代码

     

    1.2 减少线程数

    线程太多的话每个线程得到的时间片就少,CPU要让各个线程都有机会执行就要切换,切换就要不断保存和还原线程的上下文现场。于是检查Netty的I/O worker的EventLoopGroup。之前在《Netty 4源码解析:服务端启动》中曾经分析过,EventLoopGroup默认的线程数是CPU核数的二倍。所以手动配置NioEventLoopGroup的线程数,减少一些I/O线程。

    复制代码

    private void doStartNettyServer(int port) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup(4);
            try {
                ServerBootstrap b = new ServerBootstrap()
                        .group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(port)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(...);
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }

    复制代码

      

    此外因为还用了Akka作为业务线程池,所以还看了下如何修改Akka的默认配置。方法是新建一个叫做application.conf的配置文件,我们创建ActorSystem时会自动加载这个配置文件,下面的配置文件中定制了一个dispatcher:

    复制代码

    my-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
      # What kind of ExecutionService to use
      executor = "fork-join-executor"
      # Configuration for the fork join pool
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 2
        # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 1.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 16
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 100
    }

    复制代码

     

     

    简单来说,最关键的几个配置项是:

    • parallelism-factor:决定线程池的大小(竟然不是parallelism-max)。
    • throughput:决定coroutine的切换频率,1是最为频繁也最为公平的设置。

    因为本篇主要是介绍Netty的,所以具体含义就详细介绍了,请参考官方文档中对Dispatcher和Mailbox的介绍。创建特定Dispatcher的Akka很简单,以下是创建类型化Actor时指定Dispatcher的方法。

    复制代码

    TypedActor.get(system).typedActorOf(
                new TypedProps<MyActorImpl>(
                        MyActor.class,
                        new Creator<MyActorImpl>() {
                            @Override
                            public MyActorImpl create() throws Exception {
                                return new MyActorImpl(XXX);
                            }
                        }
                ).withDispatcher("my-dispatcher")
        );

    复制代码

     

    1.3 去业务线程池

    尽管上面做了种种改进配置,用jstack查看线程配置确实生效了,但Context Switch的状况并没有好转。于是干脆去掉Akka实现的业务线程池,彻底减少线程上下文的切换。发现CS从30w+一下子降到了16w!费了好大力气在万能的StackOverflow上查到了一篇文章,其中一句话点醒了我:

    And if the recommendation is not to block in the event loop, then this can be done in an application thread. But that would imply an extra context switch. This extra context switch may not be acceptable to latency sensitive applaications.

    有了线索就赶紧去查Netty源码,发现的确像调用channel.write()操作不是在当前线程上执行。Netty内部统一使用executor.inEventLoop()判断当前线程是否是EventLoopGroup的线程,否则会包装好Task交给内部线程池执行:

    复制代码

    private void write(Object msg, boolean flush, ChannelPromise promise) {
    
            AbstractChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeWrite(msg, promise);
                if (flush) {
                    next.invokeFlush();
                }
            } else {
                int size = channel.estimatorHandle().size(msg);
                if (size > 0) {
                    ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                    // Check for null as it may be set to null if the channel is closed already
                    if (buffer != null) {
                        buffer.incrementPendingOutboundBytes(size);
                    }
                }
                Runnable task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, msg, size, promise);
                }  else {
                    task = WriteTask.newInstance(next, msg, size, promise);
                }
                safeExecute(executor, task, promise, msg);
            }
        }

    复制代码

     

     

    业务线程池原来是把双刃剑。虽然将任务交给业务线程池异步执行降低了Netty的I/O线程的占用时间、减轻了压力,但同时业务线程池增加了线程上下文切换的次数。通过上述这些优化手段,终于将压测时的CS从每秒30w+降到了8w左右,效果还是挺明显的!


    2.系统调用开销

    系统调用一般会涉及到从User Space到Kernel Space的模态转换(Mode Transition或Mode Switch)。这种转换也是有一定开销的。

    Mode Switch vs. Context Switch

    StackOverflow上果然什么问题都有。前面介绍过了线程的上下文切换,那它与内核态和用户态的切换是什么关系?模态切换算是CS的一种吗?Does there have to be a mode switch for something to qualify as a context switch?回答了这个问题:

    “A mode switch happens inside one process. A context switch involves more than one process (or thread). Context switch happens only in kernel mode. If context switching happens between two user mode processes, first cpu has to change to kernel mode, perform context switch, return back to user mode and so on. So there has to be a mode switch associated with a context switch. But a context switch doesn’t imply a mode switch (could be done by the hardware alone). A mode switch does not require a context switch either.”

    Context Switch必须在内核中完成,原理简单说就是主动触发一个软中断(类似被动被硬件触发的硬中断),所以一般Context Switch都会伴随Mode Switch。然而有些硬件也可以直接完成(不是很懂了),有些CPU甚至没有我们常说Ring 0 ~ 3的特权级概念。而Mode Switch则与Context Switch更是无关了,按照Wiki上的说法硬要扯上关系的话也只能说有的系统里可能在Mode Switch中发生Context Switch。

    Netty涉及的系统调用最多的就是网络通信操作了,所以为了降低系统调用的频度,最直接的方法就是缓冲输出内容,达到一定的数据大小、写入次数或时间间隔时才flush缓冲区

    对于缓冲区大小不足,写入速度过快等问题,Netty提供了writeBufferLowWaterMark和writeBufferHighWaterMark选项,当缓冲区达到一定大小时则不能写入,避免被撑爆。感觉跟Netty提供的Traffic Shaping流量整形功能有点像呢。具体还未深入研究,感兴趣的同学可以自行学习一下。


    3.Zero Copy实现

    《Netty权威指南(第二版)》中专门有一节介绍Netty的Zero Copy,但针对的是Netty内部的零拷贝功能。我们这里想谈的是如何在应用代码中实现Zero Copy,最典型的应用场景就是消息透传。因为透传不需要完整解析消息,只需要知道消息要转发给下游哪个系统就足够了。所以透传时,我们可以只解析出部分消息,消息整体还原封不动地放在Direct Buffer里,最后直接将它写入到连接下游系统的Channel中。所以应用层的Zero Copy实现就分为两部分:Direct Buffer配置和Buffer的零拷贝传递。

    3.1 内存池

    使用Netty带来的又一个好处就是内存管理。只需一行简单的配置,就能获得到内存池带来的好处。在底层,Netty实现了一个Java版的Jemalloc内存管理库(还记得Redis自带的那个吗),为我们做完了所有“脏活累活”!

    复制代码

    ServerBootstrap b = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .localAddress(port)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(...);
                    }
                });

    复制代码

     

     

    3.2 应用层的Zero Copy

    默认情况下,Netty会自动释放ByteBuf。也就是说当我们覆写的channelRead0()返回时,ByteBuf就结束了它的使命,被Netty自动释放掉(如果是池化的就可会被放回到内存池中)。

    复制代码

    public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean release = true;
            try {
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I imsg = (I) msg;
                    channelRead0(ctx, imsg);
                } else {
                    release = false;
                    ctx.fireChannelRead(msg);
                }
            } finally {
                if (autoRelease && release) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

    复制代码

     

     

    因为Netty是用引用计数的方式来判断是否回收的,所以要想继续使用ByteBuf而不让Netty释放的话,就要增加它的引用计数。只要我们在ChannelPipeline中的任意一个Handler中调用ByteBuf.retain()将引用计数加1,Netty就不会释放掉它了。我们在连接下游的客户端的Encoder中发送消息成功后再释放掉,这样就达到了零拷贝透传的效果:

    复制代码

    public class RespEncoder extends MessageToByteEncoder<Resp> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception {
            // Raw in Msg is retained ByteBuf
            out.writeBytes(msg.getRaw(), 0, msg.getRaw().readerIndex());
            msg.getRaw().release();
        }
    
    }

    复制代码

     

     


    4.并发下的状态处理

    前面第1.1节介绍的异步写入持有的Channel和第2节介绍的根据一定规则flush缓冲区等等,都涉及到状态的保存。如果要并发访问这些状态的话,就要提防并发的race condition问题,避免更新冲突、丢失等等。

    4.1 Channel保存

    在Netty服务端的Handler里如何持有Channel呢?我是这样做的,在channelActive()或第一次进入channelRead0()时创建一个Session对象持有Channel。因为之前在《Netty 4源码解析:请求处理》中曾经分析过Netty 4的线程模型:多个客户端可能会对应一个EventLoop线程,但对于一个客户端来说只能对应一个EventLoop线程。每个客户端都对应自己的Handler实例,并且一直使用到连接断开

    复制代码

    public class FrontendHandler extends SimpleChannelInboundHandler<Msg> {
    
        private Session session;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            session = factory.createSession(ctx.channel());
            super.channelActive(ctx);
        }
    
        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, Msg msg) throws Exception {
            session.handleRequest(msg);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            session = null;
            super.channelInactive(ctx);
        }
    
    }

    复制代码

     

     

    4.2 Decoder状态

    因为网络粘包拆包等因素,Decoder不可避免的要保存一些解析过程的中间状态。因为Netty对于每个客户端的生命周期内会一直使用同一个Decoder实例,所以解析完成后一定要重置中间状态,避免后续解析错误。

    复制代码

    public class RespDecoder extends ReplayingDecoder {
    
        public MsgDecoder() {
            doCleanUp();
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                throws Exception {
            if (doParseMsg(in)) {
                doSendToHandler(out);
                doCleanUp();
            }
        }
    }

    复制代码

     

     


    5.总结

    5.1 多变的Netty

    总结之前先吐槽一下,令人又爱又恨的Netty更新速度。从Netty 3到Netty 4,API发生了一次“大地震”,好多网上的示例程序都是基于Netty 3,所以学习Netty 4时发现好多例子都跑不起来了。除了API,Netty内部的线程模型等等变化就更不用说了。本以为用上了Netty 4就可以安心了,结果Netty 5的线程模型又-变-了!看看官方文档里的说法吧,升级的话又要注意了。

    Even more flexible thread model

    In Netty 4.x each EventLoop is tightly coupled with a fixed thread that executes all I/O events of its registered Channels and any tasks submitted to it. Starting with version 5.0 an EventLoop does no longer use threads directly but instead makes use of an Executor abstraction. That is, it takes an Executor object as a parameter in its constructor and instead of polling for I/O events in an endless loop each iteration is now a task that is submitted to this Executor. Netty 4.x would simply spawn its own threads and completely ignore the fact that it’s part of a larger system. Starting with Netty 5.0, developers can run Netty and the rest of the system in the same thread pool and potentially improve performance by applying better scheduling strategies and through less scheduling overhead (due to fewer threads). It shall be mentioned, that this change does not in any way affect the way ChannelHandlers are developed. From a developer’s point of view, the only thing that changes is that it’s no longer guaranteed that a ChannelHandler will always be executed by the same thread. It is, however, guaranteed that it will never be executed by two or more threads at the same time. Furthermore, Netty will also take care of any memory visibility issues that might occur. So there’s no need to worry about thread-safety and volatile variables within a ChannelHandler.

    根据官方文档的说法,Netty不再保证特定的Handler实例在运行时一定对应一个线程,所以,在Handler中用ThreadLocal的话就是比较危险的写法了!

    5.2 高并发编程技巧

    经过上面的种种琢磨和努力,tps终于从几千达到了5w左右,学到了很多之前不懂的网络编程和性能优化的知识,还是很有成就感的!总结一下,高并发中间件的优化策略有:

    • 线程数控制:高并发下如果线程较多时,Context Switch会非常明显,超过CPU核心数的线程不会带来任何好处。不是特别耗时的操作的话,业务线程池也是有害无益的。Netty 5为我们提供了指定底层线程池的机会,这样能更好的控制整个中间件的线程数和调度策略。
    • 非阻塞I/O操作:要想线程少还多做事,避免阻塞是一定要做的。
    • 减少系统调用:虽然Mode Switch比Context Switch的开销要小得多,但我们还是要尽量减少频繁的syscall。
    • 数据零拷贝:从内核空间的Direct Buffer拷贝到用户空间,每次透传都拷贝的话累积起来是个不小的开销。
    • 共享状态保护:中间件内部的并发处理也是决定性能的关键。
    展开全文
  • Netty 高并发 (长文)

    2019-01-09 21:21:00
    目录 Netty+Zookeeper 亿级 高并发实战 (长文) 写在前面 1. 高并发IM架构与部分实现 1.1. 高并发的学习和应用价值 1.1.1. 高并发IM实战的价值 1.1.2. 高并发IM的应用场景 ...

    Netty+Zookeeper 亿级 高并发实战 (长文)

    疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -30【 博客园 总入口


    @

    写在前面

    ​ 大家好,我是作者尼恩。目前和几个小伙伴一起,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战

    下面结合Netty + Zookeeper,介绍一下亿级流量的IM的架构和部分实现。

    1. 高并发IM架构与部分实现

    1.1. 高并发的学习和应用价值

    1.1.1. 高并发IM实战的价值

    为什么要开始一个高并发IM的实战呢?

    首先,实战完成一个分布式、高并发的IM系统,具有相当的技术挑战性。这一点,对于从事传统的企业级WEB开发的兄弟来说,相当于进入了一片全新的天地。企业级WEB,QPS峰值可能在1000以内,甚至在100以内,没有多少技术挑战性和含金量,属于重复性的CRUD的体力活。

    而一个分布式、高并发的IM系系统,面临的QPS峰值可能在十万、百万、千万,甚至上亿级别。如果此纵深的层次化递进的高并发需求,直接无极限的考验着系统的性能。需要不断的从通讯的协议、到系统的架构进行优化,对技术能力是一种非常极致的考验和训练。

    其次,不同的QPS峰值规模的IM系统,所处的用户需求环境是不一样的。这就造成了不同用户规模的IM系统,都具有一定的实际需求和市场需要,不一定需要所有的系统,都需要上亿级的高并发。但是,作为一个顶级的架构师,就应该具备全栈式的架构能力。需要能适应不同的用户规模的、差异化的技术场景,提供和架构出和对应的场景相互匹配的高并发IM系统。也就是说,IM系统综合性相对较强,相关的技术需要覆盖到满足各种不同场景的网络传输、分布式协调、分布式缓存、服务化架构等等。

    来具体看看高并发IM的应用场景吧。

    1.1.2. 高并发IM的应用场景

    一切高实时性通讯、消息推送的场景,都需要高并发 IM 。

    随着移动互联网、AI的飞速发展,高性能高并发IM(即时通讯),有着非常广泛的应用场景。

    典型的应用场景如下:私信、聊天、大规模推送、视频会议、弹幕、抽奖、互动游戏、基于位置的应用(Uber、滴滴司机位置)、在线教育、智能家居等。

    在这里插入图片描述

    尤其是对于APP开发的小伙伴们来说,即时通讯,已经成为大多数APP标配。移动互联网时代,推送(Push)服务成为App应用不可或缺的重要组成部分,推送服务可以提升用户的活跃度和留存率。我们的手机每天接收到各种各样的广告和提示消息等大多数都是通过推送服务实现的。

    随着5G时代物联网的发展,未来所有接入物联网的智能设备,都将是IM系统的客户端,这就意味着推送服务未来会面临海量的设备和终端接入。为了支持这些千万级、亿级终端,一定是需要强悍的后台系统。

    有这么多的应用场景,对于想成长为JAVA高手的小伙伴们,高并发IM 都绕不开一个话题。

    对于想在后台有所成就的小伙伴们来说,高并发IM实战,更是在终极BOSS PK之前的一场不可或缺的打怪练手。

    总之,真刀真枪的完成一个高并发IM的实战,既可以积累到非常全面的高并发经验,又可以获得更多的挑战机会。

    1.2. 高并发IM的架构

    1.2.1. 支撑亿级流量的IM整体架构

    整体的架构如下图:

    在这里插入图片描述

    主要的集群介绍如下:

    (1)Netty 服务集群

    主要用来负责维持和客户端的TCP连接

    (2)连接器集群

    负责 Netty Server 集群的管理,包括注册、路由、负载均衡。集群IP注册和节点ID分配。主要在基于Zookeeper集群提供底层服务,来完成。

    (3)缓存集群

    负责用户、用户绑定关系、用户群组关系、用户远程会话等等数据的缓存。缓存临时数据、加快读速度。

    (4)DB持久层集群

    存在用户、群组、离线消息等等

    (5)消息队列集群

    用户状态广播,群组消息广播等等。

    并没有完全涉及全部的集群介绍。只是介绍其中的部分核心功能。 如果全部的功能感兴趣,请关注疯狂创客圈的亿级流量实战学习项目。

    理论上,以上集群具备完全的扩展能力,进行合理的横向扩展和局部的优化,支撑亿级流量,没有任何问题。

    为什么这么说呢

    单体的Netty服务器,远远不止支持10万并发,在CPU 、内存还不错的情况下,如果配置得当,甚至能撑到100万级别。所以,通过合理的高并发架构,能够让系统动态扩展到成百上千的Netty节点,支撑亿级流量,是没有任何问题的。

    单体的Netty服务器,如何支撑100万高并发,请查询疯狂创客圈社群的文章《Netty 100万级高并发服务器配置》

    1.2.2. IM通讯协议介绍

    IM通讯协议,属于数据交换协议。IM系统的客户端和服务器节点之间,需要按照同一种数据交换协议,进行数据的交换。

    数据交换协议的功能,简单的说:就是规定网络中的字节流数据,如何与应用程序需要的结构化数据相互转换。

    数据交换协议主要的工作分为两步:结构化数据到二进制数据的序列化和反序列化。

    数据交换协议按序列化类型:分为文本协议和二进制协议。

    常见的文本协议包括XML、JSON。文本协议序列化之后,可读性好,便于调试,方便扩展。但文本协议的缺点在于解析效率一般,有很多的冗余数据,这一点主要体现在XML格式上。

    常见的二进制协议包括PrototolBuff、Thrift,这些协议都自带了数据压缩,编解码效率高,同时兼具扩展性。二进制协议的优势很明显,但是劣势也非常的突出。和文本协议相反,序列化之后的二进制协议报文数据,基本上没有什么可读性,很显然,这点不利于大家开发和调试。

    因此,在协议的选择上,对于并发度不高的IM系统,建议使用文本协议,比如JSON。对于并发度非常之高,QPS在千万级、亿级的通讯系统,尽量选择二进制的协议。

    据说,微信所使用的数据交换协议,就是 PrototolBuff二进制协议。

    1.2.3. 长连接和短连接

    什么是长连接呢?

    客户端client向server发起连接,server接受client连接,双方建立连接。Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。

    大家知道,TCP协议的连接过程是比较繁琐的,建立连接是需要三次握手的,而释放则需要4次握手,所以说每个连接的建立都是需要资源消耗和时间消耗的。

    在高并发的IM系统中,客户端和服务器之间,需要大量的发送通讯的消息,如果每次发送消息,都去建立连接,客户端的和服务器的连接建立和断开的开销是非常巨大的。所以,IM消息的发送,肯定是需要长连接。

    什么是短连接呢?

    客户端client向server发起连接,server接受client连接,在三次握手之后,双方建立连接。Client与server完成一次读写,发送数据包并得到返回的结果之后,通过客户端和服务端的四次握手进行关闭断开。

    短连接适用于数据请求频度较低的场景。比如网站的浏览和普通的web请求。短连接的优点是:管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。

    在高并发的IM系统中,客户端和服务器之间,除了消息的通讯外,还需要用户的登录与认证、好友的更新与获取等等一些低频的请求,这些都使用短连接来实现。

    综上所述,在这个高并发IM系统中,存在两个类的服务器。一类短连接服务器和一个长连接服务器。

    短连接服务器也叫Web服务服务器,主要是功能是实现用户的登录鉴权和拉取好友、群组、数据档案等相对低频的请求操作。

    长连接服务器也叫IM即时通讯服务器,主要作用就是用来和客户端建立并维持长连接,实现消息的传递和即时的转发。并且,分布式网络非常复杂,长连接管理是重中之重,需要考虑到连接保活、连接检测、自动重连等方方面面的工作。

    短连接Web服务器和长连接IM服务器之间,是相互配合的。在分布式集群的环境下,用户首先通过短连接登录Web服务器。Web服务器在完成用户的账号/密码验证,返回uid和token时,还需要通过一定策略,获取目标IM服务器的IP地址和端口号列表,返回给客户端。客户端开始连接IM服务器,连接成功后,发送鉴权请求,鉴权成功则授权的长连接正式建立。

    如果用户规模庞大,无论是短连接Web服务器,还是长连接IM服务器,都需要进行横向的扩展,都需要扩展到上十台、百台、甚至上千台机器。只有这样,才能有良好性能,提高良好的用户体验。因此,需要引入一个新的角色,短连接网关(WebGate)。

    WebGate短连接网关的职责,首先是代理大量的Web服务器,从而无感知的实现短连接的高并发。在客户端登录时和进行其他短连接时,不直接连接Web服务器,而是连接Web网关。围绕Web网关和Web高并发的相关技术,目前非常成熟,可以使用SpringCloud 或者 Dubbo 等分布式Web技术,也很容易扩展。

    除此之外,大量的IM服务器,又如何协同和管理呢?

    基于Zookeeper或者其他的分布式协调中间件,可以非常方便、轻松的实现一个IM服务器集群的管理,包括而且不限于命名服务、服务注册、服务发现、负载均衡等管理。

    当用户登录成功的时候,WebGate短连接网关可以通过负载均衡技术,从Zookeeper集群中,找出一个可用的IM服务器的地址,返回给用户,让用户来建立长连接。

    1.2.4. 技术选型

    (1)核心:

    Netty4.x + spring4.x + zookeeper 3.x + redis 3.x + rocketMQ 3.x

    (2)短连接服务:spring cloud

    基于restful 短连接的分布式微服务架构, 完成用户在线管理、单点登录系统。

    (3)长连接服务:Netty

    Netty就不用太多介绍了。

    (4)消息队列:

    rocketMQ 高速队列。整流作用。

    (5)底层数据库:mysql+mongodb

    mysql做业务还是很方便的,用来存储结构化数据,如用户数据。

    mongodb 很重要,用来存储非结构化离线消息。

    (6)协议 Protobuf + JSON

    Protobuf 是最高效的IM二进制协议,用于长连接。

    JSON 是最紧凑的文本协议,用于短连接。

    文本协议 Gson + fastjson。 Gson 谷歌的东西,fastjson 淘宝的东西,两者互补,结合使用。

    1.3. 分布式IM命名服务

    前面提到,一个高并发系统会有很多的节点组成,而且,节点的数量是不断动态变化的。

    在一个即时消息通讯系统中,从0到1到N,用户量可能会越来越多,或者说由于某些活动影响,会不断的出现流量洪峰。这时需要动态加入大量的节点。另外,由于机器或者网络的原因,一些节点主动的离开的集群。如何为大量的动态节点命名呢?最好的办法是使用分布式命名服务,按照一定的规则,为动态上线和下线的工作节点命名。

    疯狂创客圈的高并发IM实战学习项目,基于Zookeeper构建分布式命名服务,为每一个IM工作服务器节点动态命名。

    1.3.1. IM节点的POJO类

    首先定义一个POJO类,保存IM worker节点的基础信息如Netty 服务IP、Netty 服务端口,以及Netty的服务连接数。

    具体如下:

    /**
    
     * create by 尼恩 @ 疯狂创客圈
    
     **/
    
    @Data
    
    public class ImNode implements Comparable<ImNode> {
    
     
    
    ​    //worker 的Id,由Zookeeper负责生成
    
    ​    private long id;
    
     
    
    ​    //Netty 服务 的连接数
    
    ​    private AtomicInteger balance; 
    
     
    
    ​    //Netty 服务 IP
    
    ​    private String host;
    
     
    
    ​    //Netty 服务 端口
    
    ​    private String port;
    
     
    
    ​    //...
    
     
    
    }

    这个POJO类的IP、端口、balance负载,和每一个节点的Netty服务器相关。而id属性,则由利用Zookeeper的中Znode子节点能顺序编号的性质,由Zookeeper生成。

    1.3.2. IM节点的ImWorker类

    命名服务的思路是:所有的工作节点,都在Zookeeper的同一个的父节点下,创建顺序节点。然后从返回的临时路径上,取得属于自己的那个后缀的编号。

    主要的代码如下:

    package com.crazymakercircle.imServer.distributed;
    
    import com.crazymakercircle.imServer.server.ServerUtils;
    import com.crazymakercircle.util.ObjectUtil;
    import com.crazymakercircle.zk.ZKclient;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    
    
    public class ImWorker {
    
        //Zk curator 客户端
        private CuratorFramework client = null;
    
        //保存当前Znode节点的路径,创建后返回
        private String pathRegistered = null;
    
        private ImNode node = ImNode.getLocalInstance();
    
        private static ImWorker singleInstance = null;
    
        //取得单例
        public static ImWorker getInst() {
    
            if (null == singleInstance) {
    
                singleInstance = new ImWorker();
                singleInstance.client =
                        ZKclient.instance.getClient();
                singleInstance.init();
            }
            return singleInstance;
        }
    
        private ImWorker() {
    
        }
    
        // 在zookeeper中创建临时节点
        public void init() {
    
            createParentIfNeeded(ServerUtils.MANAGE_PATH);
    
            // 创建一个 ZNode 节点
            // 节点的 payload 为当前worker 实例
    
            try {
                byte[] payload = ObjectUtil.Object2JsonBytes(node);
    
                pathRegistered = client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .forPath(ServerUtils.pathPrefix, payload);
    
                //为node 设置id
                node.setId(getId());
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    
        public long getId() {
            String sid = null;
            if (null == pathRegistered) {
                throw new RuntimeException("节点注册失败");
            }
            int index = pathRegistered.lastIndexOf(ServerUtils.pathPrefix);
            if (index >= 0) {
                index += ServerUtils.pathPrefix.length();
                sid = index <= pathRegistered.length() ? pathRegistered.substring(index) : null;
            }
    
            if (null == sid) {
                throw new RuntimeException("节点ID生成失败");
            }
    
            return Long.parseLong(sid);
    
        }
    
    
        public boolean incBalance() {
            if (null == node) {
                throw new RuntimeException("还没有设置Node 节点");
            }
            // 增加负载:增加负载,并写回zookeeper
            while (true) {
                try {
                    node.getBalance().getAndIncrement();
                    byte[] payload = ObjectUtil.Object2JsonBytes(this);
                    client.setData().forPath(pathRegistered, payload);
                    return true;
                } catch (Exception e) {
                    return false;
                }
            }
    
        }
    
        public boolean decrBalance() {
            if (null == node) {
                throw new RuntimeException("还没有设置Node 节点");
            }
            // 增加负载:增加负载,并写回zookeeper
            while (true) {
                try {
                    int i = node.getBalance().decrementAndGet();
                    if (i < 0) {
                        node.getBalance().set(0);
                    }
                    byte[] payload = ObjectUtil.Object2JsonBytes(this);
                    client.setData().forPath(pathRegistered, payload);
                    return true;
                } catch (Exception e) {
                    return false;
                }
            }
    
        }
    
        private void createParentIfNeeded(String managePath) {
    
            try {
                Stat stat = client.checkExists().forPath(managePath);
                if (null == stat) {
                    client.create()
                            .creatingParentsIfNeeded()
                            .withProtection()
                            .withMode(CreateMode.PERSISTENT)
                            .forPath(managePath);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    
    }

    注意,这里有三个Znode相关的路径:

    (1)MANAGE_PATH

    (2)pathPrefix

    (3)pathRegistered

    第一个MANAGE_PATH是一个常量。为所有临时工作Worker节点的父亲节点的路径,在创建Worker节点之前,首先要检查一下,父亲Znode节点是否存在,否则的话,先创建父亲节点。父亲节点的创建方式是:持久化节点,而不是临时节点。

    检查和创建父亲节点的代码如下:

    
        private void createParentIfNeeded(String managePath) {
    
            try {
                Stat stat = client.checkExists().forPath(managePath);
                if (null == stat) {
                    client.create()
                            .creatingParentsIfNeeded()
                            .withProtection()
                            .withMode(CreateMode.PERSISTENT)
                            .forPath(managePath);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    

    第二路径pathPrefix是所有临时节点的前缀。例子的值“/im/Workers/”,是在工作路径后,加上一个“/”分割符。也可是在工作路径的后面,加上“/”分割符和其他的前缀字符,如:“/im/Workers/id-”,“/im/Workers/seq-”等等。

    第三路径pathRegistered是临时节点的创建成功之后,返回的完整的路径。比如:/im/Workers/0000000000,/im/Workers/0000000001 等等。后边的编号是顺序的。

    创建节点成功后,截取后边的数字,放在POJO对象中,供后边使用:

    
    //为node 设置id
    
    node.setId(getId());

    1.4. 即时通讯消息的路由和转发

    如果连接在不同的Netty工作站点的客户端之间,需要相互进行消息的发送,那么,就需要在不同的Worker节点之间进行路由和转发。

    Worker节点路由是指,根据消息需要转发的目标用户,找到用户的连接所在的Worker节点。由于节点和节点之间,都有可能需要相互转发,所以,节点之间的关系是一种网状结构。每一个节点,都需要具备路由的能力。

    1.4.1. IM路由器WorkerRouter

    为每一个Worker节点增加一个IM路由器类,叫做WorkerRouter 。为了能够转发到所有的节点,需要一是要订阅到集群中所有的在线Netty服务器,并且保存起来,二是要其他的Netty服务器建立一个长连接,用于转发消息。

    WorkerRouter 核心代码,节选如下:

    
    /**
    
    \* create by 尼恩 @ 疯狂创客圈
    
    **/
    
    @Slf4j
    
    public class WorkerRouter {
    
     
    
    ​    //Zk客户端
    
    ​    private CuratorFramework client = null;
    
     
    
    ​    //单例模式
    
    ​    private static WorkerRouter singleInstance = null;
    
    ​    //监听路径
    
    ​    private static final String path =
    
    ​              "/im/Workers";
    
     
    
    ​    //节点的容器
    
    ​    private ConcurrentHashMap<Long, WorkerReSender> workerMap =
    
    ​            new ConcurrentHashMap<>();
    
     
    
     
    
    public static WorkerRouter getInst() {
    
    ​       if (null == singleInstance) {
    
    ​               singleInstance = new WorkerRouter();
    
    ​               singleInstance.client = ZKclient.instance.getClient();
    
    ​               singleInstance.init();
    
    ​       }
    
    ​       return singleInstance;
    
    }
    
     
    
    private void init() {
    
    try {
    
     
    
    ​       //订阅节点的增加和删除事件
    
    ​       TreeCache treeCache = new TreeCache(client, path);
    
    ​       TreeCacheListener l = new TreeCacheListener() {
    
    ​       @Override
    
    ​           public void childEvent(CuratorFramework client,
    
    ​                            TreeCacheEvent event) throws Exception 
    
    ​           {
    
    ​               ChildData data = event.getData();
    
    ​               if (data != null) {
    
    ​                   switch (event.getType()) {
    
    ​                       case NODE_REMOVED:
    
    ​                           processNodeRemoved(data);
    
     
    
    ​                           break;
    
     
    
    ​                       case NODE_ADDED:
    
    ​                           processNodeAdded(data);
    
     
    
    ​                           break;
    
    ​                       default:
    
    ​                           break;
    
    ​                   }
    
    ​               } 
    
    ​           }
    
    ​       };
    
     
    
    ​       treeCache.getListenable().addListener(l);
    
    ​       treeCache.start();
    
    ​       } catch (Exception e) {
    
    ​               e.printStackTrace();
    
    ​       }
    
     
    
    ​       //...
    
    }
    

    在上一小节中,我们已经知道,一个节点上线时,首先要通过命名服务,加入到Netty 集群中。上面的代码中,WorkerRouter 路由器使用curator的TreeCache 缓存,订阅了节点的NODE_ADDED节点新增消息。当一个新的Netty节点加入是,通过processNodeAdded(data) 方法, 在本地保存一份节点的POJO信息,并且建立一个消息中转的Netty客户连接。

    处理节点新增的方法 processNodeAdded(data)比较重要,代码如下:

    
    private void processNodeAdded(ChildData data) {
    
    ​   log.info("[TreeCache]节点更新端口, path={}, data={}",
    
    ​                   data.getPath(), data.getData());
    
    ​   byte[] payload = data.getData();
    
    ​   String path = data.getPath();
    
    ​   ImNode imNode =
    
    ​                   ObjectUtil.JsonBytes2Object(payload, ImNode.class);
    
    ​   long id = getId(path);
    
    ​   imNode.setId(id);
    
     
    
    ​   WorkerReSender reSender = workerMap.get(imNode.getId());
    
    ​   //重复收到注册的事件
    
    ​   if (null != reSender && reSender.getNode().equals(imNode)) {
    
    ​           return;
    
    ​   }
    
    ​   
    
    ​   //服务器重新上线
    
    ​   if (null != reSender) {
    
    ​           //关闭老的连接
    
    ​           reSender.stopConnecting();
    
    ​   }
    
    ​   
    
    ​   //创建一个消息转发器
    
    ​   reSender = new WorkerReSender(imNode);
    
    ​   //建立转发的连接
    
    ​   reSender.doConnect();
    
    ​   workerMap.put(id, reSender);
    
    }

    router路由器有一个容器成员workerMap,用于封装和保存所有的在线节点。当一个节点新增时,router取到新增的Znode路径和负载。Znode路径中有新节点的ID,Znode的payload负载中,有新节点的Netty服务的IP和端口,这个三个信息共同构成新节点的POJO信息 —— ImNode节点信息。 router在检查完和确定本地不存在该节点的转发器后,新增一个转发器 WorkerReSender,将新节点的转发器,保存在自己的容器中。

    这里有一个问题,为什么在router路由器中,不简单、直接、干脆的保存新节点的POJO信息呢?

    因为router路由器的主要作用,除了路由节点,还需要方便的进行消息的转发,所以,router路由器保存的是转发器 WorkerReSender,而新增的远程Netty节点的POJO信息,封装在转发器中。

    1.4.2. IM转发器WorkerReSender

    IM转发器,封装了远程节点的IP、端口、以及ID消息,具体是在ImNode类型的成员中。另外,IM转发器还维持一个到远程节点的长连接。也就是说,它是一个Netty的NIO客户端,维护了一个到远程节点的Netty Channel 通道成员,通过这个通道,将消息转发给远程的节点。

    IM转发器的核心代码,如下:

    package com.crazymakercircle.imServer.distributed;
    
    import com.crazymakercircle.im.common.bean.User;
    import com.crazymakercircle.im.common.codec.ProtobufEncoder;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.util.concurrent.GenericFutureListener;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Slf4j
    @Data
    public class WorkerReSender {
        //连接远程节点的Netty 通道
        private Channel channel;
    
        //连接远程节点的POJO信息
        private ImNode remoteNode;
        /**
         * 连接标记
         */
        private boolean connectFlag = false;
    
        GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) ->
        {
            log.info(": 分布式连接已经断开……", remoteNode.toString());
            channel = null;
            connectFlag = false;
            WorkerRouter.getInst().removeWorkerById(remoteNode);
    
        };
    
        private GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) ->
        {
            final EventLoop eventLoop
                    = f.channel().eventLoop();
            if (!f.isSuccess()) {
                log.info("连接失败!在10s之后准备尝试重连!");
                eventLoop.schedule(
                        () -> WorkerReSender.this.doConnect(),
                        10,
                        TimeUnit.SECONDS);
    
                connectFlag = false;
            } else {
                connectFlag = true;
    
                log.info("分布式IM节点连接成功:", remoteNode.toString());
                channel = f.channel();
                channel.closeFuture().addListener(closeListener);
    
            }
        };
    
    
        private Bootstrap b;
        private EventLoopGroup g;
    
        public WorkerReSender(ImNode n) {
            this.remoteNode = n;
    
            /**
             * 客户端的是Bootstrap,服务端的则是 ServerBootstrap。
             * 都是AbstractBootstrap的子类。
             **/
    
            b = new Bootstrap();
            /**
             * 通过nio方式来接收连接和处理连接
             */
    
            g = new NioEventLoopGroup();
    
    
        }
    
       // 连接和重连
        public void doConnect() {
    
            // 服务器ip地址
            String host = remoteNode.getHost();
            // 服务器端口
            int port = Integer.parseInt(remoteNode.getPort());
    
            try {
                if (b != null && b.group() == null) {
                    b.group(g);
                    b.channel(NioSocketChannel.class);
                    b.option(ChannelOption.SO_KEEPALIVE, true);
                    b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                    b.remoteAddress(host, port);
    
                    // 设置通道初始化
                    b.handler(
                            new ChannelInitializer<SocketChannel>() {
                                public void initChannel(SocketChannel ch) {
                                    ch.pipeline().addLast(new ProtobufEncoder());
                                }
                            }
                    );
                    log.info(new Date() + "开始连接分布式节点", remoteNode.toString());
    
                    ChannelFuture f = b.connect();
                    f.addListener(connectedListener);
    
    
                    // 阻塞
                    // f.channel().closeFuture().sync();
                } else if (b.group() != null) {
                    log.info(new Date() + "再一次开始连接分布式节点", remoteNode.toString());
                    ChannelFuture f = b.connect();
                    f.addListener(connectedListener);
                }
            } catch (Exception e) {
                log.info("客户端连接失败!" + e.getMessage());
            }
    
        }
    
        public void stopConnecting() {
            g.shutdownGracefully();
            connectFlag = false;
        }
    
        public void writeAndFlush(Object pkg) {
            if (connectFlag == false) {
                log.error("分布式节点未连接:", remoteNode.toString());
                return;
            }
            channel.writeAndFlush(pkg);
        }
    
    
    }
    

    IM转发器中,主体是与Netty相关的代码,比较简单。至少,IM转发器比Netty服务器的代码,简单太多了。

    转发器有一个消息转发的方法,直接通过Netty channel通道,将消息发送到远程节点。

        public void writeAndFlush(Object pkg) {
            if (connectFlag == false) {
                log.error("分布式节点未连接:", remoteNode.toString());
                return;
            }
            channel.writeAndFlush(pkg);
        }
    
    

    1.5. Worker集群的负载均衡

    理论上来说,负载均衡是一种手段,用来把对某种资源的访问分摊给不同的服务器,从而减轻单点的压力。

    在高并发的IM系统中,负载均衡就是需要将IM长连接分摊到不同的Netty服务器,防止单个Netty服务器负载过大,而导致其不可用。

    前面讲到,当用户登录成功的时候,短连接网关WebGate需要返回给用户一个可用的Netty服务器的地址,让用户来建立Netty长连接。而每台Netty工作服务器在启动时,都会去zookeeper的“/im/Workers”节点下注册临时节点。

    因此,短连接网关WebGate可以在用户登录成功之后,去“/im/Workers”节点下取得所有可用的Netty服务器列表,并通过一定的负载均衡算法计算得出一台Netty工作服务器,并且返回给客户端。

    1.5.1. ImLoadBalance 负载均衡器

    短连接网关WebGate 获得Netty服务器的地址,通过查询Zookeeper集群来实现。定义一个负载均衡器,ImLoadBalance类 ,将计算最佳服务器的算法,放在负载均衡器中。

    ImLoadBalance类 的核心代码,如下:

    package com.crazymakercircle.Balance;
    
    import com.crazymakercircle.ObjectUtil;
    import com.crazymakercircle.util.ImNode;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Data
    @Slf4j
    public class ImLoadBalance {
    
        //Zk客户端
        private  CuratorFramework client = null;
    
        //工作节点的路径
        private String mangerPath =  "/im/Workers";
    
        public ImLoadBalance() {
        }
    
        public ImLoadBalance(CuratorFramework client, String mangerPath)
        {
            this.client = client;
            this.mangerPath = mangerPath;
        }
    
    
        public static ImLoadBalance instance() {
        }
    
        public ImNode getBestWorker()
        {
            List<ImNode> workers =getWorkers();
            ImNode best= balance(workers);
    
            return best;
        }
    
        protected ImNode balance(List<ImNode> items) {
            if (items.size() > 0) {
                // 根据balance值由小到大排序
                Collections.sort(items);
    
                // 返回balance值最小的那个
                return items.get(0);
            } else {
                return null;
            }
        }
    
    
      ///....
    
    }

    短连接网关WebGate 会调用getBestWorker()方法,取得最佳的IM服务器。而在这个方法中,有两个很重要的方法。 一个是取得所有的IM服务器列表,注意是带负载的。二个是通过负载信息,计算最小负载的服务器。

    所有的IM服务器列表的代码如下:

     /**
         * 从zookeeper中拿到所有IM节点
         */
        protected List<ImNode> getWorkers() {
    
            List<ImNode> workers = new ArrayList<ImNode>();
    
            List<String> children = null;
            try {
                children = client.getChildren().forPath(mangerPath);
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
    
            for (String child : children) {
                log.info("child:", child);
                byte[] payload = null;
                try {
                    payload = client.getData().forPath(child);
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (null == payload) {
                    continue;
                }
                ImNode worker = ObjectUtil.
                        JsonBytes2Object(payload, ImNode.class);
                workers.add(worker);
            }
            return workers;
    
        }
    

    代码中,首先取得 "/im/Workers" 目录下所有的临时节点,使用的是curator的getChildren 获取子节点方法。然后,通过getData方法,取得每一个子节点的二进制负载。最后,将负载信息转成成 POJO ImNode 对象。

    取到了工作节点的POJO 列表之后,通过一个简单的算法,计算出balance值最小的ImNode对象。

    取得最小负载的 balance 方法的代码如下:

    protected  ImNode  balance (List<ImNode> items) {
    
    ​       if (items.size() > 0) {
    
    ​               // 根据balance由小到大排序
    
    ​               Collections.sort(items);
    
    ​               // 返回balance值最小的那个
    
    ​               return items.get(0);
    
    ​       } else {
    
    ​               return null;
    
    ​       }
    
    }

    1.5.2. 与WebGate的整合

    在用户登录的Http API 方法中,调用ImLoadBalance类的getBestWorker()方法,取得最佳的IM服务器信息,返回给登录的客户端。

    核心代码如下:

    @EnableAutoConfiguration
    @RestController
    @RequestMapping(value = "/user",
    produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public class UserAction extends BaseController
    {
    
    ​    @Resource
    ​    private UserService userService;
    ​    @RequestMapping(value = "/login/{username}/{password}")
    ​    public String loginAction(
    ​            @PathVariable("username") String username,
    ​            @PathVariable("password") String password)
    
    ​    {
    
    ​        User user = new User();
    ​        user.setUserName(username);
    ​        user.setPassWord(password);
    ​        User loginUser = userService.login(user);
    ​        ImNode best=ImLoadBalance.instance().getBestWorker();
    
    LoginBack back =new LoginBack();
    ​        back.setImNode(best);
    ​        back.setUser(loginUser);
    ​        back.setToken(loginUser.getUserId().toString());
    ​        String r = super.getJsonResult(back);
    ​        return r;
    ​    }
      //....
    
    }

    1.7. 分布式Session

    什么是会话?

    为了方便客户端的开发,管理与服务器的连接,这里引入一个非常重要的中间角色——Session (会话)。有点儿像Web开发中的Tomcat的服务器 Session,但是又有很大的不同。

    1.7.1. SessionLocal本地会话

    客户端的本地会话概念图,如下图所示:

    在这里插入图片描述

    客户端会话有两个很重的成员,一个是user,代表了拥有会话的用户。一个是channel,代表了连接的通道。两个成员的作用是:

    (1)user成员 —— 通过它可以获得当前的用户信息

    (2)channel成员 —— 通过它可以发送Netty消息

    Session需要和 channel 相互绑定,为什么呢?原因有两点:

    (1)消息发送的时候, 需要从Session 写入 Channel ,这相当于正向的绑定;

    (2)收到消息的时候,消息是从Channel 过来的,所以可以直接找到 绑定的Session ,这相当于反向的绑定。

    Session和 channel 相互绑定的代码如下:

    //正向绑定
    
    ClientSession session = new (channel);
    
    //反向绑定
    
    channel.attr(ClientSession.SESSION).set(session);

    正向绑定,是直接通过ClientSession构造函数完成。反向绑定是通过channel 自身的所具备的容器能力完成。Netty的Channel类型实现了AttributeMap接口 ,它相当于一个 Map容器。 反向的绑定,利用了channel 的这个特点。

    总的来说,会话Session 左手用户实例,右手服务器的channel连接通道,可以说是左拥右抱,是开发中经常使用到的类。

    1.7.2. SessionDistrubuted分布式会话

    在分布式环境下,本地的Session只能绑定本地的用户和通道,够不着其他Netty节点上的用户和通道。

    如何解决这个难题呢? 一个简单的思路是:制作一个本地Session的副本,保存在分布式缓存Redis中。对于其他的Netty节点来说,可以取到这份Redis副本,从而进行消息的路由和转发。

    基于redis进行分布式的Session 缓存,与本地Session的内容不一样,不需要保存用户的具体实例,也不需要保存用户的Netty Channel通道。只需要能够根据它找到对于的Netty服务器节点即可。

    我们将这个Session,命名为 SessionDistrubuted。代码如下:

    /**
    
    \* create by 尼恩 @ 疯狂创客圈
    
    **/
    
    @Data
    
    public class SessionDistrubuted implements ServerSession {
    
    //用户ID
    
    private String userId;
    //Netty 服务器ID
    private long nodeId;
    //sessionId
    private String sessionId;
    
    public SessionDistrubuted(
    
    ​       String sessionId, String userId, long nodeId) {
    
    ​   this.sessionId = sessionId;
    
    ​   this.userId = userId;
    
    ​   this.nodeId = nodeId;
    
    }
    
     
    
    //...
    
    }

    如何判断这个Session是否有效呢? 可以根据其nodeId,在本地路由器WorkerRouter中查找对应的消息转发器,如果没有找到,说明该Netty服务节点是不可以连接的。于是,该Session为无效。

    判断Session是否有效的代码如下:

    @Override
    
    public boolean isValid() {
    
    ​   WorkerReSender sender = WorkerRouter.getInst()
    
                        .getRedirectSender(nodeId);
    
        if (null == sender) {
    
                return false;
    
        }
    
        return true;
    
    }

    只要该Session为有效。就可以通过它,转发消息到目的nodeId对应的Netty 服务器。

    代码如下:

    @Override
    
    public void writeAndFlush(Object pkg) {
    
    ​   WorkerReSender sender = WorkerRouter
    
    ​                   .getInst().getRedirectSender(nodeId);
    
     
    
    ​   sender.writeAndFlush(pkg);
    
    }

    在分布式环境下,结合本地Session和远程Session,发送消息也就变得非常之简单。如果在本地找到了目标的Session,就直接通过其Channel发送消息到客户端。反之,就通过远程Session,将消息转发到客户端所在的Netty服务器,由该服务器发送到目标客户端。

    1.8. 分布式的在线用户统计

    顾名思义,计数器是用来计数的。在分布式环境中,常规的计数器是不能使用的,在此介绍基本zookeeper实现的分布式计数器。利用ZooKeeper可以实现一个集群共享的计数器,只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。

    1.8.1. Curator的分布式计数器

    Curator有两个计数器, 一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

    这里使用DistributedAtomicLong来实现高并发IM系统中的在线用户统计。

    代码如下:

    /**
    
     * create by 尼恩 @ 疯狂创客圈
    
     **/
    
    public class OnlineCounter {
    ​    private static final int QTY = 5;
    
    ​    private static final String PATH = "/im/OnlineCounter";
    ​    //Zk客户端
    
    ​    private CuratorFramework client = null;
    ​    //单例模式
    
    ​    private static OnlineCounter singleInstance = null;
    ​    DistributedAtomicLong onlines = null;
    
    ​    public static OnlineCounter getInst() {
    
    ​        if (null == singleInstance) {
    ​            singleInstance = new OnlineCounter();
    ​            singleInstance.client = ZKclient.instance.getClient();
    ​            singleInstance.init();
    
    ​        }
    ​        return singleInstance;
    ​    }
    ​    private void init() {
    ​        //分布式计数器,失败时重试10,每次间隔30毫秒
    ​        onlines = new DistributedAtomicLong( client, 
    ​               PATH, new RetryNTimes(10, 30));
        }
    ​    public boolean increment() {
            boolean result = false;
    
    ​        AtomicValue<Long> val = null;
    
    ​        try {
    
    ​            val = onlines.increment();
    ​            result = val.succeeded();
    ​            System.out.println("old cnt: " + val.preValue()
    ​                    + "   new cnt : " + val.postValue()
    ​                    + "  result:" + val.succeeded());
            } catch (Exception e) {
    ​            e.printStackTrace();
    ​        }
    ​        return result;
    ​    }
    
     ​    public boolean decrement() {
    ​        boolean result = false;
    ​        AtomicValue<Long> val = null;
    ​        try {
    ​            val = onlines.decrement();
    ​            result = val.succeeded();
    ​            System.out.println("old cnt: " + val.preValue()
    ​                    + "   new cnt : " + val.postValue()
    
    ​                    + "  result:" + val.succeeded());
    
    ​        } catch (Exception e) {
    
    ​            e.printStackTrace();
    ​        }
    ​        return result;
    
    ​    }
    }

    1.8.2. 用户上线和下线统计

    当用户上线的时候,使用increase方法,分布式的增加一次数量:

    
    /**
    
    \* 增加本地session
    
    */
    public void addSession(String sessionId, SessionLocal s) {
    
    ​   localMap.put(sessionId, s);
    
    ​   String uid = s.getUser().getUid();
    ​   //增加用户数
    
    ​   OnlineCounter.getInst().increment();
    
    ​   //...
    
    }

    当用户下线的时候,使用decrease方法,分布式的减少一次数量:

    
    /**
    
    \* 删除本地session
    
    */
    public void removeLocalSession(String sessionId) {
    
    ​   if (!localMap.containsKey(sessionId)) {
                return;
        }
        localMap.remove(sessionId);
    ​   //减少用户数
    
    ​   OnlineCounter.getInst().decrement();
    ​   //...
    }

    写在最后

    目前和几个小伙伴一起,组织了一个高并发的实战社群【疯狂创客圈】,完成整个项目的完整的架构和开发实战,欢迎参与。


    疯狂创客圈 亿级流量 高并发IM 学习实战

    • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

    • Netty 源码、原理、JAVA NIO 原理
    • Java 面试题 一网打尽
    • 疯狂创客圈 【 博客园 总入口 】


    转载于:https://www.cnblogs.com/crazymakercircle/p/10246934.html

    展开全文
  • netty 高并发实战

    万次阅读 2016-01-23 21:56:13
    很多时候开发人员常犯的错误是过度设计和提前优化,"When in doubt, use brute force", 有时简单的方案才是最好的方案。 架构设计需要考虑到服务器的业务逻辑和预计的用户量,比如同样的IM服务器设计,最高并发...

    分布式IM架构


    id="embed_dom" name="embed_dom" style="border:1px solid #000;display:block;width:800px; height:600px;" src="https://www.processon.com/embed/56ab6119e4b0ed3b6490f436" frameborder="0">

    IM服务器开发,从功能抽象的角度看可能非常简单,可以认为是管理大量的客户端连接和在不同的客户端之间传递消息,但具体到实现细节就比较复杂了。打个不恰当的比喻,OS的功能抽象也非常简单,无非是进程间的调度和硬件资源的管理,但要是自己去实现一个,一般人也就只能呵呵了。

    由于IM服务器里面的内容比较多,这个可以是一个系列的内容,所以这里只介绍服务器的架构以及为什么选择这样的架构

    我们的设计原则是保持简单,可以做一定的扩容,无单点故障。很多时候开发人员常犯的错误是过度设计和提前优化,"When in doubt, use brute force", 有时简单的方案才是最好的方案。

    架构设计需要考虑到服务器的业务逻辑和预计的用户量,比如同样的IM服务器设计,最高并发在线人数百万和千万的肯定有很大的不同, 如果按千万级别来设计一个只有百万级别的系统,增加的复杂度和工作量是非常可观的.

    目前我们的IM服务器架构设计的单机并发连接10万用户,总并发用户量可以达百万级,对于这样规模的服务器后台,可以采用很简单的架构来处理。有一个DispatchServer来分配客户端到一个消息服务器,消息服务器之间的数据交互通过一个中心的RouteServer来转发,和数据持久化层之间的交互由DBProxy服务器来处理.

    由于IM消息服务器和客户端之间交互非常频繁,但处理单个数据包的逻辑比较简单,没有IO或CPU密集型的操作,所以消息服务器采用单线程来处理,这样比较简单,没有死锁,竞争条件,出现问题非常好定位。

    分离出一个DBProxy服务器的的理由是,由于要操作DB和Redis,一个请求的回复会比较耗时,但是交互非常简单,一个请求对应一个回复,与消息服务器之间的长连接一般很稳定,不太需要处理太多的断连,所以这个可以用多线程的来处理大的IO等待。而且由于DBProxy服务器之间不需要交互,这样可以随着业务量的增加,添加很多实例来分散每个DBProxy的压力。

    DispatchServer和RouteServer的逻辑都非常简单,而且可以启动多个实例,这样就不存在单点故障。两个DispatchServer或两个RouteServer之间的状态同步是通过消息服务器来实现的,比如用户上下线时,需要把这个状态通知所有的DispatchServer和RouteServer。

    消息服务器支持TCP长连接和HTTP长轮询两种接入方式,目前的消息服务器承担了接入和业务逻辑处理两种角色,一般业界的做法是把这两种功能分解开,有一个接入服务器来处理客户端的接入,然后客户端的请求被分配到不同的业务逻辑处理服务器来处理,比如登陆服务器,状态通知服务器,好友管理服务器等。但这样运维起来比较麻烦,对于百万量级的IM来说,这样有点过度设计的感觉,所以我们把这些功能融合在一个消息服务器里面实现,但这样的缺点是,更新一个业务功能时,需要把重启消息服务器,这样客户端会有一个重新连接服务器的过程。以后可以调研是不是可以把业务逻辑写成动态加载库,这样修改业务逻辑时,只要Reload动态库就可以了

    搭建这些只是开了一个头,以后可以做很多有挑战性的技术,比如用分布式的消息存储方案代替现在的MySQL,用分布式的小文件系统存储系统代替现在依赖蘑菇街主站的图片存储方案,用一些Unsupervised Learning的算法对用户消息进行分析,来获取一些用户的profile信息。


    linux 内核参数修改


    “Cannot assign requested address.”是由于linux分配的客户端连接端口用尽,无法建立socket连接所致,虽然socket正常关闭,但是端口不是立即释放,而是处于TIME_WAIT状态,默认等待60s后才释放。

    vi /etc/sysctl.conf
    
    #fs.file-max:表示文件句柄的最大数量。文件句柄表示在Linux系统中可以打开的文件数量。
    fs.file-max = 1048576
    
    #增加可用端口:
    
    net.ipv4.ip_local_port_range = 1024 65535
    net.ipv4.tcp_mem = 786432 2097152 3145728
    net.ipv4.tcp_rmem = 4096 4096 16777216
    net.ipv4.tcp_wmem = 4096 4096 16777216
    
    #调低端口释放后的等待时间,默认为60s,修改为15~30s
    net.ipv4.tcp_fin_timeout=20
    
    #修改tcp/ip协议配置, 通过配置/proc/sys/net/ipv4/tcp_tw_resue, 默认为0,修改为1,释放TIME_WAIT端口给新连接使用
    net.ipv4.tcp_tw_reuse = 1
    
    #修改tcp/ip协议配置,快速回收socket资源,默认为0,修改为1
    
    #net.ipv4.tcp_timestamps开启,tw_recycle才会生效
    
    net.ipv4.tcp_timestamps=1
    
    net.ipv4.tcp_tw_recycle = 1
    
    
    通过vi /etc/security/limits.conf 添加如下配置参数:修改之后保存,注销当前用户,重新登录,通过
    
    ulimit -a 查看修改的状态是否生效。
    
    *  soft  nofile  1000000
    *  hard  nofile  1000000
    
    通过 ulimit -a 查看最大句柄数
    
    ulimit -n 10000000
    
    改完后,执行命令“sysctl -p”使参数生效,不需要reboot。
    
    
    
    


    netty设置

    ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class) // (3)
                 .childHandler(new SimpleChatServerInitializer())  //(4)
                 .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .option(ChannelOption.SO_KEEPALIVE, true)
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .option(ChannelOption.SO_RCVBUF, 10 * 1024)
                 .option(ChannelOption.SO_SNDBUF, 10 * 1024)
                 .option(EpollChannelOption.SO_REUSEPORT, true)
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
                System.out.println("SimpleChatServer 启动了");


    JVM监控

    1. 通过jstatd启动RMI服务
            配置java安全访问,将如下的代码存为文件 jstatd.all.policy,放到JAVA_HOME/bin中,其内容如下,
            grant codebase "file:${java.home}/../lib/tools.jar" {

                   permission java.security.AllPermission;

              };
                
              执行命令jstatd -J-Djava.security.policy=jstatd.all.policy -J-Djava.rmi.server.hostname=192.168.1.8 &

               (192.168.1.8  为你服务器的ip地址,&表示用守护线程的方式运行)
              jstatd命令详解 :http://hzl7652.iteye.com/blog/1183182 
             
              打开jvisualvm, 右键Remort,选择 "Add Remort Host...",在弹出框中输入你的远端IP,比如192.168.1.8. 连接成功.


    1.远程主机

    (1)修改JMX服务的配置文件:
      在JDK的根目录/jre/lib/management中,将jmxremote.password.template另存为jmxremote.password。
    用文件编辑软件按编辑jmxremote.password去掉
      # monitorRole QED
      # controlRole R&D
      前面的#注释,保存。
      如果当前系统属于AIX、Linux或者Solaris系统还需要更改jmxremote.access和jmxremote.password的权限
    为只读写,命令如下
      chmod 600 jmxremote.access jmxremote.password


    (2)修改JVM的启动配置信息:

     

    Windows系统
    set JAVA_OPTS=-Dcom.sun.management.jmxremote.port=<port> -Dcom.sun.management.jmxremote.ssl=false
    -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=<hostname>
    -Dcom.sun.management.jmxremote.ssl=false

     

    AIX、Linux或者Solaris
    export JAVA_OPTS="-Dcom.sun.management.jmxremote.port=<port> -Dcom.sun.management.jmxremote.ssl=false
    -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=<hostname>  
    -Dcom.sun.management.jmxremote.ssl=false"

    例如:
    set JAVA_OPTS=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false
    -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=192.168.1.24
    -Dcom.sun.management.jmxremote.ssl=false

     

    配置的说明如下:
    -Dcom.sun.management.jmxremote.port                           远程主机端口号的
    -Dcom.sun.management.jmxremote.ssl=false                   是否使用SSL连接
    -Dcom.sun.management.jmxremote.authenticate=false   是否开启远程服务权限
    -Djava.rmi.server.hostname                                              远程主机名,使用IP地址


    展开全文
  • 11 高并发性能调优 11.1 单机百万连接调优 11.1.1 如何模拟百万连接 使用两台虚拟机,一台服务端一台客户端 如何模拟呢? 情况1:一台Client一台Server,只能有6w连接,因为Client只剩下大概6w个端口可以去连接...

    11 高并发性能调优

    11.1 单机百万连接调优

    11.1.1 如何模拟百万连接

    使用两台虚拟机,一台服务端一台客户端

    如何模拟呢?

    情况1:一台Client一台Server,只能有6w连接,因为Client只剩下大概6w个端口可以去连接服务端
    在这里插入图片描述
    优化1:可以使用多个Client去连接服务端,要开启十几台虚拟机,麻烦

    优化2:服务端启动8000~8100端口,且因为TCP是一个四元组概念:源ip、源端口、目标ip、目标端口;
    这样Client同一个端口可以连接Server不同端口,会被当成不同的连接,这样就有600w连接,所以此处使用这种方法:
    在这里插入图片描述

    11.1.2 突破局部文件句柄限制

    默认情况下,单个进程打开的句柄数非常有限,而一个TCP连接对应一个句柄,这样导致了一个服务端建立的连接数收到了限制,所以我们要突破句柄总量,即一个进程能够打开的最大文件数

    首先使用ulimit -n,看看Linux一个进程能够打开的最大文件数(Linux中,一个TCP连接对应一个文件数),可以看到为1024

    然后使用sudo vim /etc/security/limits.conf,添加如下两行:

    #当前用户、限制、打开最大文件数、能够打开的最大文件数
    * hard nofile 1000000
    * soft nofile 1000000
    

    重启即可

    11.1.3 突破全局文件句柄限制

    即Linux下所有进程能够打开的文件数

    查看全局文件句柄:cat /proc/sys/fs/file-max

    修改:sudo vi /etc/sysctl.conf

    fs.file-max=1000000
    

    sudo sysctl -p生效,然后重启即可

    启动服务端客户端,全部连接成功后,如果发现没有到100w,那么使用htop查看服务端性能,可以发现CPU到100%了,所以无法继续运算了,不过也差不多了,可以通过加强机器的性能

    11.2 Netty应用级别性能调优:响应时间从秒级到毫秒级的优化

    11.2.1 瓶颈

    在服务端的ChannelHandler的channelRead0方法里面进行的业务逻辑如果耗时过多,会使响应时间越来越长,因为处理每个客户端线程都要在这里进行等待,这时候是秒级别

    11.2.2 优化

    1.将耗时的业务操作放到单独的业务的线程池去执行,有结果后再给到客户端

    这时候是毫秒级别,当然还会像之前一样,响应时间会持续地往上升到一个稳定值:1000qps里面有10个是 > 100ms的,对应到业务线程池的线程,可能很多线程会超过100ms,因此可以将线程数调大来降低这个比例(因为qps不会变,即客户端请求个数一定),这样<100ms的会更多

    增大多少呢?可以慢慢试,或者使用”最佳线程数“公式

    2.Netty提供了一种方式:可以把handler放到专门的线程池去处理,不用由NioEventLoop的线程处理

    添加handler的时候再传入一个线程池(EventLoopGroup)即可

    这种方式会将handler的所有方法都放到线程池去处理,如果分配内存,则在业务线程池里面分配,做不到内存的共享;第一种方式则可以自由选择要放到线程池的代码,可以将内存分配放在主线程里面,业务逻辑才放到业务线程池里,可以做到内存共享;
    不过第一种实现简单,对业务代码没有侵入

    展开全文
  • Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发性能、可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出...
  • Netty开发中间件:高并发性能优化

    万次阅读 多人点赞 2015-10-08 20:52:23
    Netty开发中间件:高并发性能优化最近在写一个后台中间件的原型,主要是做消息的分发和透传。因为要用Java实现,所以网络通信框架的第一选择当然就是Netty了,使用的是Netty 4版本。Netty果然效率很高,不用做太多...
  • 高并发Netty之Linux内核参数优化 局部文件句柄限制(单个进程最大文件打开数) ulimit -n :查看系统最大fd文件数 一个进程最大打开的文件数 fd 不同系统有不同的默认值 root身份编辑 vim /etc/security/limits....
  • Netty开发中间件:高并发性能优化 最近在写一个后台中间件的原型,主要是做消息的分发和透传。因为要用Java实现,所以网络通信框架的第一选择当然就是Netty了,使用的是Netty 4版本。Netty果然效率很高,不用做...
  • 用spring官网的简洁spring boot demo,整合netty实现高并发websocket,并引入slf4g+lombok,采用maven形式;直接导入运行,有测试页面也有实现代码及详细注释,主要参考了望星辰大海的博客,,欢迎下载学习~~(想...
  • Netty ChannelHandler 性能优化 1. 前言 本节我们主要来继续讲解 ChannelHandler 的其它特性,主要讲解如何去进行 ChannelHandler 业务链表的常见性能优化。 2. 优化途径 通常情况下为了提高自定义业务 ...
  • 一丶 Netty基础入门Netty是一个性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取...
  • Netty物联网高并发系统第一季

    千次阅读 2017-06-29 14:06:09
    第1集netty物联网介绍 第2集netty服务器编写 第3集netty客户端与服务器通信 第4集编码解码 第5集netty服务器架构上 第6集netty服务器架构下 ...Netty物联网高并发系统第二季 第9集Netty连接池讲解,第
  • Netty开发中间件:高并发性能优化 最近在写一个后台中间件的原型,主要是做消息的分发和透传。因为要用Java实现,所以网络通信框架的第一选择当然就是Netty了,使用的是Netty 4版本。Netty果然效率很高,不用做太...
  • Netty是一个性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 23,423
精华内容 9,369
关键字:

netty高并发优化