精华内容
下载资源
问答
  • netty websocket

    2013-03-01 16:20:53
    netty websocket
  • nettyWebSocket

    2018-06-19 18:02:52
    Spring+Netty+WebSocket实例,通用性还行,贴近生产,注释写的全,容易理解
  • nettyWebsocket websocket in netty 基于websocket实现的简单聊天服务器
  • nettywebsocket netty权威指南WebSocket章节源码
  • netty websocket源码

    2016-01-20 09:51:05
    NettyWebsocket具体案例,主要介绍如何使用Netty开发websocket
  • Netty Websocket使用

    2020-09-21 14:49:00
    Netty Websocket协议开发 Netty基于HTTP协议栈开发了WebSocket协议栈,利用NettyWebSocket协议栈可以方便开发WebSocket客户端和服务端 Netty Websocket服务端 1Netty Websocket服务端启动类 package ...

    Netty Websocket协议开发

      Netty基于HTTP协议栈开发了WebSocket 协议栈,利用Netty的WebSocket 协议栈可以方便开发WebSocket 客户端和服务端

    Netty Websocket 服务端

    1 Netty Websocket 服务端启动类

    package com.webSocket;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    public class WebSocketServer {
    
        public void bind(int port) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                //websocket协议本身是基于Http协议的,所以需要Http解码器
                                 ch.pipeline().addLast("http-codec",new HttpServerCodec());
                                //以块的方式来写的处理器
                                 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
                                //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                                //netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
                                ch.pipeline().addLast(new HttpObjectAggregator(1024*1024*1024));
                                ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws",null,true,65535));
                               // ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                                ch.pipeline().addLast(new WebSocketServerHandler());
    
                            }
                        });
    
                ChannelFuture sync = bootstrap.bind(port).sync();
                sync.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new WebSocketServer().bind(9090);
        }
    }
    

    2 Netty Websocket 服务端启动处理类

     

    package com.webSocket;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.websocketx.*;
    
    public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
    
        private WebSocketServerHandshaker handshaker;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            if (msg instanceof FullHttpRequest) {
                //以http请求形式接入,但是走的是websocket
               handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) {
                //处理websocket客户端的消息
               handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
    
    
    
        public void channelReadComplete(ChannelHandlerContext context) throws  Exception
        {
    
            context.flush();
        }
        /*
         http
         */
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
    
    
            WebSocketServerHandshakerFactory webFactory=new WebSocketServerHandshakerFactory("ws://127.0.0.1:9090",null,false);
    
                    if(req instanceof  CloseWebSocketFrame)
                    {
                        handshaker.close(ctx.channel(),(CloseWebSocketFrame) req.retain());
                        if(handshaker==null)
                        {
                            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
    
                        }
                        else
                        {
                             handshaker.handshake(ctx.channel(),req);
                        }
                    }
    
        }
    
        /*
         webSocket
         */
        private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    
            if (frame instanceof CloseWebSocketFrame)
            {
                handshaker.close(ctx.channel(),(CloseWebSocketFrame)frame.retain());
                return;
            }
    
            if(frame instanceof TextWebSocketFrame)
            {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            if(!(frame instanceof  TextWebSocketFrame))
            {
    
            }
    
            String request=((TextWebSocketFrame) frame).text();
            ctx.channel().write(request+"欢迎Netty WebSocket");
        }
    
    }
    

    Netty Websocket 客户端

    1 Netty Websocket 客户端启动类

     

    package com.webSocket;
    
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import java.net.URI;
    
    public class WebSocketClients {
    
    
    
    
        public static void main(String[] args) throws Exception {
            //netty基本操作,线程组
            EventLoopGroup group = new NioEventLoopGroup();
            //netty基本操作,启动类
            Bootstrap boot = new Bootstrap();
            boot.option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .group(group)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec(),
                                    new HttpObjectAggregator(1024*1024*10)});
                            //websocket协议本身是基于Http协议的,所以需要Http解码器
                            ch.pipeline().addLast("http-codec",new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
                         //   ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                            ch.pipeline().addLast("WebSocketClientHandler",new WebSocketClientHandler());
    
                        }
                    });
    
            //websocke连接的地址,/hello是因为在服务端的websockethandler设置的
            URI websocketURI = new URI("ws://localhost:9090/ws");
            HttpHeaders httpHeaders = new DefaultHttpHeaders();
            //进行握手
            WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
    
            //客户端与服务端连接的通道,final修饰表示只会有一个
            final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
            WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("WebSocketClientHandler");
    
            handler.setHandshaker(handshaker);
            handshaker.handshake(channel);
            //阻塞等待是否握手成功
            handler.handshakeFuture().sync();
            System.out.println("握手成功");
            //给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
            sengMessage(channel);
    
    
        }
    public static void sengMessage(Channel channel){
            //发送的内容,是一个文本格式的内容
            String putMessage="你好,我是客户端";
            TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
            channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        System.out.println("消息发送成功,发送的消息是:"+putMessage);
                    } else {
                        System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
                    }
                }
            });
        }
    
    }
    

    2 Netty Websocket 客户端启动处理类

     

    package com.webSocket;
    
    import io.netty.channel.*;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.util.CharsetUtil;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
        //握手的状态信息
        WebSocketClientHandshaker handshaker;
    
    
        //netty自带的异步处理
        ChannelPromise handshakeFuture;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
            Channel ch = ctx.channel();
            FullHttpResponse response;
            //进行握手操作
            if (!this.handshaker.isHandshakeComplete()) {
                try {
                    response = (FullHttpResponse)msg;
                    //握手协议返回,设置结束握手
                    this.handshaker.finishHandshake(ch, response);
                    //设置成功
                    this.handshakeFuture.setSuccess();
                    System.out.println("服务端的消息"+response.headers());
                } catch (WebSocketHandshakeException var7) {
                    FullHttpResponse res = (FullHttpResponse)msg;
                    String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                    this.handshakeFuture.setFailure(new Exception(errorMsg));
                }
            } else if (msg instanceof FullHttpResponse) {
                response = (FullHttpResponse)msg;
                throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
            } else {
                //接收服务端的消息
                WebSocketFrame frame = (WebSocketFrame)msg;
                //文本信息
                if (frame instanceof TextWebSocketFrame) {
                    TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                    System.out.println("客户端接收的消息是:"+textFrame.text());
                }
                //二进制信息
                if (frame instanceof BinaryWebSocketFrame) {
                    BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
                    System.out.println("BinaryWebSocketFrame");
                }
                //ping信息
                if (frame instanceof PongWebSocketFrame) {
                    System.out.println("WebSocket Client  pong");
                }
                //关闭消息
                if (frame instanceof CloseWebSocketFrame) {
                    System.out.println("receive close frame");
                    ch.close();
                }
    
            }
        }
    
        /**
         * Handler活跃状态,表示连接成功
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("与服务端连接成功");
        }
    
        /**
         * 非活跃状态,没有连接远程主机的时候。
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("主机关闭");
        }
    
        /**
         * 异常处理
         * @param ctx
         * @param cause
         * @throws Exception
         */
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("连接异常:"+cause.getMessage());
            ctx.close();
        }
    
        public void handlerAdded(ChannelHandlerContext ctx) {
            this.handshakeFuture = ctx.newPromise();
        }
    
        public WebSocketClientHandshaker getHandshaker() {
            return handshaker;
        }
    
        public void setHandshaker(WebSocketClientHandshaker handshaker) {
            this.handshaker = handshaker;
        }
    
        public ChannelPromise getHandshakeFuture() {
            return handshakeFuture;
        }
    
        public void setHandshakeFuture(ChannelPromise handshakeFuture) {
            this.handshakeFuture = handshakeFuture;
        }
    
        public ChannelFuture handshakeFuture() {
            return this.handshakeFuture;
        }
    
    }
    

     

    结果:

    当前握手的状态false
    服务端的消息DefaultHttpHeaders[upgrade: websocket, connection: upgrade, sec-websocket-accept: MVirCoc2+74v5fT6LiEzbDXl7nM=, content-length: 0]
    握手成功
    消息发送成功,发送的消息是:你好,我是客户端
    当前握手的状态true
    WebSocket Client  pong

     

    展开全文
  • Netty websocket

    千次阅读 2019-05-31 11:23:55
    我们将通过编写示例应用程序来探索Netty对它们的支持。 在第12章中,您将学习如何使用WebSocket实现双向数据传输,方法是构建一个聊天室服务器,其中多个浏览器客户端可以实时通信。 您还将看到如何通过检测客户端...

    Network protocols

    WebSocket是一种高级网络协议,旨在提高Web应用程序的性能和响应能力。 我们将通过编写示例应用程序来探索Netty对它们的支持。

    在第12章中,您将学习如何使用WebSocket实现双向数据传输,方法是构建一个聊天室服务器,其中多个浏览器客户端可以实时通信。 您还将看到如何通过检测客户端是否支持它,从应用程序中的HTTP切换到WebSocket协议。

    我们将在第13章中总结第3部分,研究Netty对用户数据报协议(UDP)的支持。在这里,您将构建一个广播服务器和监视器客户端,可以适应许多实际用途。

    本章讲介绍:

    • real-time web 的概念
    • WebSocket 协议
    • 使用Netty 创建一个基于 WebSocket 的聊天室服务端程序

    如果您关注网络技术的最新发展,您很可能会遇到实时网络短语,如果您有工程领域的实时应用程序经验,您可能会对这个术语的含义持怀疑态度。

    因此,我们首先要澄清的是,这不是所谓的硬实时服务质量(QoS),其中保证了在指定时间间隔内交付计算结果。 仅仅HTTP的请求/响应设计使得这个问题非常严重,因为过去设计的方法都没有提供令人满意的解决方案。

    虽然已经有一些关于正式定义定时Web服务语义的学术讨论,但普遍接受的定义似乎并未出现。 所以现在我们将接受来自维基百科的以下非权威性描述:

    The real-time web is a network web using technologies and practices that
    enable users to receive information as soon as it is published by its
    authors, rather than requiring that they or their software check a source
    periodically for updates.
    实时网络是一种使用技术和实践的网络网络,使用户能够在作者发布信息后立即接收信息,而不是要求他们或他们的软件定期检查信息源以进行更新。

    简而言之,一个成熟的实时网络可能不会即将到来,但其背后的想法正在推动对几乎即时访问信息的不断增长的期望。 我们将在本章中讨论的WebSocket协议是朝着这个方向的良好支持的步骤。

    12.1 Introducing WebSocket

    WebSocket协议是从头开始设计的,旨在为Web上的双向数据传输问题提供实用的解决方案,允许客户端和服务器随时传输消息,从而要求它们异步处理消息接收。 (最新的浏览器支持WebSocket作为HTML5的客户端API。)

    Netty对WebSocket的支持包括所有正在使用的主要实现,因此在您的下一个应用程序中采用它非常简单。 与Netty一样,您可以完全使用协议,而无需担心其内部实现细节。 我们将通过创建基于WebSocket的实时聊天应用程序来证明这一点。

    12.2 Our example WebSocket application

    我们的示例应用程序将通过使用WebSocket协议实现基于浏览器的聊天应用程序来演示实时功能,例如您可能在Facebook的文本消息功能中遇到过。 我们将通过允许多个用户同时相互通信来进一步发展。
    图12.1说明了应用程序逻辑:

    1. 一个客户端发送一条消息。
    2. 这条消息广播到所有已经建立连接的其他客户端。

    这就是您期望聊天室工作的方式:每个人都可以与其他人交谈。 在我们的示例中,我们将仅实现服务器端,客户端是通过网页访问聊天室的浏览器。 正如您将在接下来的几页中看到的那样,WebSocket使编写此服务器变得简单。

    Figure 12.1 WebSocket application logic

    12.3 Adding WebSocket support

    称为升级握手的机制用于从标准HTTP或HTTPS协议切换到WebSocket。 因此,使用WebSocket的应用程序将始终以HTTP / S开头,然后执行升级。 当恰好发生这种情况时,应用程序是特定的; 它可能是在启动时或者在请求特定URL时。

    我们的应用程序采用以下约定:如果请求的URL以/ ws结尾,我们将协议升级到WebSocket。 否则,服务器将使用基本HTTP / S. 连接升级后,所有数据都将使用WebSocket传输。 图12.2说明了服务器逻辑,它一如Netty,将由一组ChannelHandler实现。 在我们解释用于处理HTTP和WebSocket协议的技术时,我们将在下一节中对它们进行描述。

    12.3.1 Handling HTTP requests

    首先,我们将实现处理HTTP请求的组件。 此组件将提供访问聊天室的页面,并显示已连接客户端发送的消息。 代码清单12.1包含了这个HttpRequestHandler的代码,它为SimpleHttpRequest消息扩展了SimpleChannelInboundHandler。 请注意channelRead0() 的实现如何转发URI / ws的任何请求。

    Figure 12.2

    // Extends SimpleChannelInboundHandler to handle FullHttpRequest messages
    public class HttpRequestHandler
        extends SimpleChannelInboundHandler<FullHttpRequest> {
        private final String wsUri;
        private static final File INDEX;
        
        static {
            URL location = HttpRequestHandler.class
                             .getProtectionDomain()
                             .getCodeSource().getLocation();
            try {
                String path = location.toURI() + "index.html";
                path = !path.contains("file:") ? path : path.substring(5);
                INDEX = new File(path);
            } catch (URISyntaxException e) {
                throw new IllegalStateException("Unable to locate index.html", e);
            }
         }
    
         public HttpRequestHandler(String wsUri) {
             this.wsUri = wsUri;
         }
      
         @Override
         public void channelRead0(ChannelHandlerContext ctx,
             FullHttpRequest request) throws Exception {
             // If a WebSocket upgrade is requested, increments the reference count(retain) and passes it to the next ChannelInboundHandler
             if(wsUri.equalsIgnoreCase(request.getUri())) {
                 ctx.fireChannelRead(request.retain());
             } else {
               // Handlers 100 Continue requests in conformity with HTTP 1.1
               if(HttpHeaders.is100ContinueExpected(request)) {
                   send100Continue(ctx);
               }
               // Reads index.html
               RandomAccessFile file = new RandomAccessFile(INDEX, "r");
               HttpResponse response = new DefaultHttpResponse(
                   request.getProtocolVersion(), HttpResponseStatus.OK);
               response.headers().set(
                   HttpHeaders.Names.CONTENT_TYPE,
                   "text/plain; charset=UTF-8");
               boolean keepAlive = HttpHeaders.isKeepAlive(request);
               // If keepalive is requested, adds the required headers
               if(keepAlive) {                                        
                  response.headers().set(
                      HttpHeaders.Names.CONTENT_LENGTH, file.length());
                  response.headers().set(HttpHeaders.Names.CONNECTION,
                      HttpHeaders.Values.KEEP_ALIVE);
               }
    
               // Writes the HttpResponse to the client
               ctx.write(response);
               // Writes index.html to the client
               if (ctx.pipeline().get(SslHandler.class) == null) {
                   ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
               } else {
                  ctx.write(new ChunkedNioFile(file.getChannel()));
               }
               // Writes and flushes the LastHttpContent to the client
               ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
               if(!keepAlive) {
                  future.addListener(ChannelFutureListener.CLOSE);
                }
            }
        }
    
        private static void send100Continue(ChannelHandlerContext ctx) {
           FullHttpResponse response = new DefaultFullHttpResponse{
               HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
               ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
                                           
    

    如果HTTP请求引用URI / ws,则HttpRequestHandler在FullHttpRequest上调用 retain() 并通过调用 fireChannelRead(msg) 将其转发到下一个 ChannelInboundHandler . 需要调用retain(),因为在 channelRead() 完成后,它将调用 FullHttpRequest 上的 release() 来释放其资源。(请参阅第6章中对SimpleChannelInboundHandler的讨论。)

    如果客户端发送HTTP 1.1标头Expect:100-continue,则HttpRequestHandler发送100 Continue响应。 在设置标头后,HttpRequestHandler将HttpResponse d写回客户端。 这不是FullHttpResponse,因为它只是响应的第一部分。 此外,此处不调 writeAndFlush()。
    这是在最后完成的。

    如果既不需要加密也不需要压缩,则可以通过将index.html e的内容存储在DefaultFileRegion中来实现最高效率。 这将利用零拷贝来执行传输。 因此,您需要检查ChannelPipeline中是否存在SslHandler。 或者,您使用ChunkedNioFile。

    HttpRequestHandler写一个LastHttpContent来标记响应的结束。 如果未请求keepalive,则HttpRequestHandler将ChannelFutureListener添加到上次写入的 ChannelFuture 并关闭连接。 这是您调用 writeAndFlush() 来刷新所有以前写入的消息的地方。

    这代表聊天服务器的第一部分,它管理纯HTTP请求和响应。 接下来我们将处理WebSocket帧,它们传输实际的聊天消息。

    WebSocket Frames WebSockets以帧的形式传输数据,每个帧都代表消息的一部分。 完整的消息可能包含许多帧。

    12.3.2 Handling WebSocket frames

    由IETF发布的WebSocket RFC定义了六个帧; Netty为每个人提供POJO实施。 表12.1列出了帧类型并描述了它们的用法。

    Frame type描述
    BinaryWebSocketFrame包含 binary data
    TextWebSocketFrame包含 text data
    ContinuationWebSocketFrame包含 text 或者 binary 数据,他属于前一个 BinaryWebSocketFrame 或者 TextWebSocketFrame
    CloseWebSocketFrame表示CLOSE请求,包含关闭状态代码和短语
    PingWebSocketFrame请求传输PongWebSocketFrame
    PongWebSocketFrame作为响应发送给一个 PingWebSocketFrame

    Table 12.1 WebSocketFrame types

    我们的聊天程序将会使用下面这些 frame 类型:

    • CloseWebSocketFrame
    • PingWebSocketFrame
    • PongWebSocketFrame
    • TextWebSocketFrame

    TextWebSocketFrame是我们实际需要处理的唯一一个。 根据WebSocket RFC,Netty提供了一个WebSocketServerProtocolHandler来管理其他的。

    下面的清单显示了TextWebSocketFrames的ChannelInboundHandler,它还将跟踪其ChannelGroup中的所有活动WebSocket连接。

    // Extends SimpleChannelInboundHandler and handle TextWebSocketFramemessages
    pyblic class TextWebSocketFrameHandler
        extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        private final ChannelGroup group;
      
        public TextWebSocketFrameHandler(ChannelGroup group) {
            this.group = group;
        }
    
        // Overrides userEventTriggered() to handle custome events
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx,
            Object evt) throws Exception {
            if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)     
            {
            ctx.pipeline().remove(HttpRequestHandler.class);
            // Notifies all connected WebSocket clients that new Client has connected
            group.writeAndFlush(new TextWebSocketFrame(
                "Client" + ctx.channel() + " joined"));
            group.add(ctx.channel());
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
            TextWebSocketFrame msg) throws Exception {
            // Increments the reference count of the message and writes it to all connected clients in the ChannelGroup
            group.writeAndFlush(msg.retain());
        }
    }          
                
    

    TextWebSocketFrameHandler只有很少的职责。 当与新客户端的WebSocket握手成功完成时,它通过写入ChannelGroup中的所有Channel来通知所有连接的客户端,然后将新Channel添加到ChannelGroup.

    如果收到TextWebSocketFrame,它会调用retain()并使用writeAndFlush()将其传输到ChannelGroup,以便所有连接的WebSocket Channel都接收它。

    和以前一样,调用 retain() 是必需的,因为当 channelRead0() 返回时TextWebSocketFrame 的引用计数将减少。 由于所有操作都是异步的,因此writeAndFlush() 可能会在以后完成,并且它不能访问已变为无效的引用。

    由于Netty在内部处理大部分剩余功能,因此现在唯一要做的就是为每个创建的新Channel初始化ChannelPipeline。 为此,我们需要一个ChannelInitializer。

    12.3.3 Initializing the ChannelPipeline

    如您所知,要在 ChannelPipeline 中安装 ChannelHandler,您需要扩展 ChannelInitializer并实现 initChannel() 。 以下清单显示了生成的ChatServerInitializer的代码。

    // Extends ChannelInitializer
    public class ChatServerIntializer extends ChannelIntializer<Channel> {
        private final ChannelGroup group;
    
        public ChatServerIntializer(ChannelGroup group) {
            this.group = group;
        }
    
        // Adds all needed ChannelHandlers to the ChannelPipeline
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(new ChunkedWriteHandler());
            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
            pipeline.addLast(new HttpRequestHandler("/ws"));
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            pipeline.addLast(new TextWebSocketFrameHandler(group));
        }
    }            
    

    对 initChannel() 的调用通过安装所有必需的 ChannelHandler 来设置新注册 Channel 的ChannelPipeline。 表12.2总结了这些内容及其各自的职责。

    ChannelHandlerResponsibility
    HttpServerCodec将字节解码为HttpRequest,HttpContent,和LastHttpContent。 编码HttpRequest,HttpContent和LastHttpContent到字节。
    ChunkedWriteHandler写入文件的内容。
    HttpObjectAggregator将HttpMessage及其后续HttpContent聚合到单个FullHttpRequest或FullHttpResponse中(取决于它是否用于处理请求或响应)。 安装此选项后,管道中的下一个ChannelHandler将仅接收完整的HTTP请求。
    HttpRequestHandler处理FullHttpRequest(未发送到 /ws URI)。
    WebSocketServerProtocolHandler根据WebSocket规范的要求,处理WebSocket升级握手,PingWebSocketFrames,PongWebSocketFrames和CloseWebSocketFrames。
    TextWebSocketFrameHandler处理TextWebSocketFrames和握手完成事件

    Table 12.2 ChannelHandlers

    Table 12.2 ChannelHandlers for the WebSocket chat server
    Netty 的 WebSocketServerProtocolHandler 处理所有强制 WebSocket 帧类型和升级握手本身。 如果握手成功,则将所需的ChannelHandler添加到管道中,并删除不再需要的那些。

    Figure 12.3 ChannelPipeline before WebSocket upgrade

    升级前的管道状态如图12.3所示。 这表示ChatServerInitializer初始化后的ChannelPipeline。

    升级完成后,WebSocketServerProtocolHandler将替换带有WebSocketFrameDecoder的HttpRequestDecoder和带有WebSocketFrameEncoder的HttpResponseEncoder。 为了最大限度地提高性能,它将删除WebSocket连接不需要的任何ChannelHandler。 这些将包括图12.3中所示的HttpObjectAggregator和HttpRequestHandler。

    图12.4显示了这些操作完成后的ChannelPipeline。 请注意,Netty目前支持四种版本的WebSocket协议,每种版本都有自己的实现类。 根据客户端(此处为浏览器)支持的内容,自动执行正确版本的WebSocketFrameDecoder和WebSocketFrameEncoder的选择。

    Figure 12.4 ChannelPipeline after WebSocket upgrade

    12.3.4 Bootstrapping

    图片的最后一部分是引导服务器并安装ChatServerInitializer的代码。 这将由ChatServer类处理,如此处所示。

    public class ChatServer {
        // Creates DefaultChannelGroup that will hold all connected WebSocket channels
        private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        private final EventLoopGroup group = new NioEventLoopGroup();
        private Channel channel;
    
        public ChannelFuture start(InetSocketAddress address) {
            // Bootstraps the server
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
            ChannelFuture future = bootstrap.bind(address);
            future.syncUninterruptibly();
            channel = future.channel();
            return future;
       }
    
    
       // Creates the ChatServerInitializer
       protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
           return new ChatServerInitializer(group);
       }
    
       // Handles server shutdown and releases all resources
       public void destroy() {
           if(channel != null) {
               channel.close();
           }
           channelGroup.close();
           group.shutdownGracefully();
       }
      
       public static void main(String[] args) throws Exception {
           if (args.length != 1) {
               System.err.println("Please give port as argument.");
               System.exit(1);
           }
           int port = Integer.praseInt(args[0]);
           final ChatServer endpoint = new ChatServer();
           ChannelFuture future = endpoint.start( new InetSocketAddress(port));
           Runtime.getRuntime().addShutdownHook(new Thread() {
               @Override
               public void run() {
                   endpoint.destroy();
               }
           });
           future.channel().closeFuture().syncUninterruptibly();
       }
     }                                   
    

    这完成了应用程序本身。 现在让我们来测试吧。

    12.4 Testing the application

    chapter12目录中的示例代码包含构建和运行服务器所需的一切。 (如果您尚未设置包括Apache Maven在内的开发环境,请参阅第2章中的说明。)

    我们将使用以下Maven命令来构建和启动服务器:

    mvn -PChatServer clean package exec:exec
    

    项目文件pom.xml配置为在端口9999上启动服务器。要使用其他端口,您可以编辑文件中的值或使用System属性覆盖它:

    mvn -PChatServer -Dport=1111 clean package exec:exec
    

    以下清单显示了命令的主要输出(已删除非必要行)。

    Listing 12.5 Compile and start the ChatServer

    Figure 12.5 WebSocket ChatServer demonstration

    您可以通过将浏览器指向http:// localhost:9999来访问该应用程序。 图12.5显示了Chrome浏览器中的UI。

    该图显示了两个连接的客户端。 第一个是使用顶部的界面连接。 第二个客户端通过底部的Chrome浏览器命令行连接。 您会注意到两个客户端都发送了消息,并且每条消息都显示在两个客户端上。

    这是一个非常简单的演示,说明WebSocket如何在浏览器中实现实时通信。

    12.4.1 What about encryption?

    在现实生活中,您很快就会被要求为此服务器添加加密。 使用Netty,只需将SslHandler添加到ChannelPipeline并进行配置即可。 以下清单显示了如何通过扩展ChatServerInitializer来创建SecureChatServerInitializer来完成此操作。

    // Adding encryption to the ChannelPipeline
    // Extends ChatServerInitializer to add encryption
    public class SecureChatServerInitializer extends ChatServerInitializer {
        private final SslContext context;
    
        public SecureChatServerInitializer(ChannelGroup group,
            SslContext context) {
            super(group);
            this.context = context;
       }
    
       @Override
       protected void initChannel(Channel ch) throws Exception {
           // Calls the parent's initChannel()
           super.initChannel(ch);
           SSLEngine engine = context.newEngine(ch.alloc());
           // Adds the SslHandler to the ChannelPipeline
           ch.pipeline().addFirst(new SslHandler(engine));   
      }
    }     
    

    最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在管道中安装SslHandler。 这给了我们这里显示的SecureChatServer。

    // Adding encryption to the ChatServer
    // SecureChatServer extends ChatServer to support encryption
    public class SecureChatServer extends ChatServer {
        private final SslContext context;
    
        public SecureChatServer(SslContext context) {
            this.context = context;
        }
    
        @Override
        protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
            // Returns the previously created SecureChatServerInitializer to enable encryption
            return new SecureChatServerInitializer(group, context);
       }
    
       public static void main(String[] args) throws Exception {
           if(args.length != 1) {
               System.err.println("Please give port as argument");
               System.exit(1);
           }
           int port = Integer.parseInt(args[0]);
           SelfSignedCertificate cert = new SelfSignedCertificate();
           SslContext context = SslContext.newServerContext(
           cert.certificate(), cert.privateKey());
    
           final SecureChatServer endpoint = new SecureChatServer(context);
           ChannelFuture future = endpoint.start(new InetSocketAddress(port));
           Runtime.getRuntime().addShutdownHook(new Thread() {
               @Override
               public void run() {
                   endpoint.destroy();
               }
           });
           future.channel().closeFuture().syncUninterruptibly();
        }
     }                                  
    

    这就是为所有通信启用SSL / TLS加密所需的全部内容。 和以前一样,您可以使用Apache Maven来运行应用程序。 它还将检索任何所需的依赖项。

    Listing 12.8 Starting the SecureChatServer
    现在,您可以从其HTTPS URL访问SecureChatServer:https://localhost:9999

    12.5 Summary

    在本章中,您学习了如何使用Netty的WebSocket实现来管理Web应用程序中的实时数据。 我们介绍了支持的数据类型,并讨论了您可能遇到的限制。 虽然在所有情况下都可能无法使用WebSocket,但应该清楚它代表了一个重要的进步网络技术。

    展开全文
  • netty webSocket

    千次阅读 2019-04-21 23:29:28
    1、导包 pom.xml主要加入 <!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <vers...
    • 1、导包
      pom.xml主要加入
        <!--netty-->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.27.Final</version>
            </dependency>
    
    • 2、建立服务端
      WebSocketServer.java
    package com.rw.article.chat.websocket;
    
    import com.rw.article.chat.action.ApiController;
    import com.rw.article.chat.websocket.handler.BinaryWebSocketFrameHandler;
    import com.rw.article.chat.websocket.handler.TextWebSocketHandler;
    import com.rw.article.common.constant.Constants;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author Zhou Zhong Qing
     * @Title: ${file_name}
     * @Package ${package_name}
     * @Description: WebSocket
     * @date 2019/4/16 17:26
     */
    public class WebSocketServer {
        private Logger log = LoggerFactory.getLogger(this.getClass()); // 日志对象
    
        public void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workGroup)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .handler(new LoggingHandler(LogLevel.TRACE))
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(new LoggingHandler(LogLevel.TRACE))
                                        // HttpRequestDecoder和HttpResponseEncoder的一个组合,针对http协议进行编解码
                                        .addLast(new HttpServerCodec())
                                        // 分块向客户端写数据,防止发送大文件时导致内存溢出, channel.write(new ChunkedFile(new File("bigFile.mkv")))
                                        .addLast(new ChunkedWriteHandler())
                                        // 将HttpMessage和HttpContents聚合到一个完成的 FullHttpRequest或FullHttpResponse中,具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应
                                        // 需要放到HttpServerCodec这个处理器后面
                                        .addLast(new HttpObjectAggregator(10240))
                                        // webSocket 数据压缩扩展,当添加这个的时候WebSocketServerProtocolHandler的第三个参数需要设置成true
                                        .addLast(new WebSocketServerCompressionHandler())
                                        // 自定义处理器 - 处理 web socket 文本消息
                                        .addLast(new TextWebSocketHandler())
                                        // 自定义处理器 - 处理 web socket 二进制消息
                                        .addLast(new BinaryWebSocketFrameHandler())
                                        // 服务器端向外暴露的 web socket 端点,当客户端传递比较大的对象时,maxFrameSize参数的值需要调大
                                        .addLast(new WebSocketServerProtocolHandler(Constants.DEFAULT_WEB_SOCKET_LINK, null, true, 10485760));
    
                            }
                        });
                ChannelFuture channelFuture = bootstrap.bind(8092).sync();
                log.info("webSocket server listen on port : [{}]", 8092);
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    
    • 2、写handle
      • TextWebSocketHandler.java 文本消息的处理类
     package com.rw.article.chat.websocket.handler;
    
    import com.alibaba.fastjson.JSON;
    import com.fasterxml.jackson.databind.util.BeanUtil;
    import com.rw.article.chat.entity.vo.Message;
    import com.rw.article.chat.queue.DelayOrderQueueManager;
    import com.rw.article.chat.queue.DelayOrderWorker;
    import com.rw.article.chat.websocket.OnlineContainer;
    import com.rw.article.chat.websocket.protocol.IMsgCode;
    import com.rw.article.chat.websocket.protocol.ProcessorContainer;
    import com.rw.article.common.configuration.GenericConfiguration;
    import com.rw.article.common.constant.Constants;
    import com.rw.article.common.spring.BeansUtils;
    import com.rw.article.common.type.MessageSendType;
    import com.rw.article.common.type.MessageType;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeanUtils;
    
    import java.net.InetSocketAddress;
    import java.time.LocalTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Zhou Zhong Qing
     * @Title: ${file_name}
     * @Package ${package_name}
     * @Description: 文本消息处理
     * @date 2019/4/16 17:29
     */
    public class TextWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        private Logger log = LoggerFactory.getLogger(this.getClass()); // 日志对象
    
        private OnlineContainer onlineContainer;
    
    
        private BeansUtils beansUtils;
    
        public TextWebSocketHandler() {
            onlineContainer = BeansUtils.getBean(OnlineContainer.class);
        }
    
        /*
        经过测试,在 ws 的 uri 后面不能传递参数,不然在 netty 实现 websocket 协议握手的时候会出现断开连接的情况。
       针对这种情况在 websocketHandler 之前做了一层 地址过滤,然后重写
       request 的 uri,并传入下一个管道中,基本上解决了这个问题。
        * */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (null != msg && msg instanceof FullHttpRequest) {
    
                FullHttpRequest request = (FullHttpRequest) msg;
                // log.info("调用 channelRead request.uri() [ {} ]", request.uri());
                String uri = request.uri();
                // log.info("Origin [ {} ] [ {} ]", request.headers().get("Origin"), request.headers().get("Host"));
                String origin = request.headers().get("Origin");
                if (null == origin) {
                    log.info("origin 为空 ");
                    ctx.close();
                } else {
                    if (null != uri && uri.contains(Constants.DEFAULT_WEB_SOCKET_LINK) && uri.contains("?")) {
                        String[] uriArray = uri.split("\\?");
                        if (null != uriArray && uriArray.length > 1) {
                            String[] paramsArray = uriArray[1].split("=");
                            if (null != paramsArray && paramsArray.length > 1) {
                                onlineContainer.putAll(paramsArray[1], ctx);          
                            }
                        }
                        request.setUri(Constants.DEFAULT_WEB_SOCKET_LINK);
                    }
                } else {
                    log.info("不允许 [ {} ] 连接 强制断开", origin);
                    ctx.close();
                }
    
            }
            super.channelRead(ctx, msg);
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
            log.info("接收到客户端的消息:[{}]", msg.text());
            // 如果是向客户端发送文本消息,则需要发送 TextWebSocketFrame 消息
            InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
            String ip = inetSocketAddress.getHostName();
            String txtMsg = "[" + ip + "][" + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + "] ==> " + msg.text();
            //TODO 这是发给自己
            ctx.channel().writeAndFlush(new TextWebSocketFrame(txtMsg));
       
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //移除map
            onlineContainer.removeAll(ctx.channel().id().asLongText());
            ctx.close();
            log.error("服务器发生了异常: [ {} ]", cause);
        }
    
    
       
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 添加
            //log.info(" 客户端加入 [ {} ]", ctx.channel().id().asLongText());
            super.channelActive(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // 移除
            //log.info(" 离线 [ {} ] ", ctx.channel().id().asLongText());
    
    
            super.channelInactive(ctx);
            //移除map
            String key = onlineContainer.removeAll(ctx.channel().id().asLongText());
            ctx.close();
        }
    
    }
    
    • BinaryWebSocketFrameHandler.java 二进制消息的处理类 例如发送图片
    package com.rw.article.chat.websocket.handler;
    
    import com.alibaba.fastjson.JSON;
    import com.rw.article.chat.service.IFileUploadService;
    import com.rw.article.chat.service.impl.AliyunOSSClientServiceImpl;
    import com.rw.article.chat.websocket.OnlineContainer;
    import com.rw.article.common.configuration.GenericConfiguration;
    import com.rw.article.common.constant.Constants;
    import com.rw.article.common.spring.BeansUtils;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufInputStream;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    import org.apache.tomcat.util.http.fileupload.IOUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import sun.misc.BASE64Decoder;
    
    import java.io.*;
    import java.sql.Blob;
    import java.util.UUID;
    import java.util.concurrent.ExecutorService;
    
    /**
     * @author Zhou Zhong Qing
     * @Title: ${file_name}
     * @Package ${package_name}
     * @Description: 二进制消息处理
     * @date 2019/4/16 17:30
     */
    public class BinaryWebSocketFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
        private Logger log = LoggerFactory.getLogger(this.getClass()); // 日志对象
     
     
        public BinaryWebSocketFrameHandler() {
          
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         
            super.channelRead(ctx, msg);
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
            log.info("服务器接收到二进制消息. [{}]",msg.toString());
            ByteBuf content = msg.content();
            content.markReaderIndex();
            int flag = content.readInt();
            log.info("标志位:[{}]", flag);
            content.resetReaderIndex();
    
            ByteBuf byteBuf = Unpooled.directBuffer(msg.content().capacity());
            byteBuf.writeBytes(msg.content());
    
            //转成byte
            byte [] bytes = new byte[msg.content().capacity()];
            byteBuf.readBytes(bytes);
            //byte转ByteBuf
            ByteBuf byteBuf2 = Unpooled.directBuffer(bytes.length);
            byteBuf2.writeBytes(bytes);
    
          
            log.info("JSON.toJSONString(byteBuf) [ {} ]",JSON.toJSONString(byteBuf));
              //TODO 这是发给自己
            ctx.writeAndFlush(new BinaryWebSocketFrame(byteBuf));
    
    
        }
    
    
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 添加
            log.info(" 客户端加入 [ {} ]", ctx.channel().id().asLongText());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // 移除
            log.info(" 离线 [ {} ] ", ctx.channel().id().asLongText());
        }
    }
    
    
    • OnlineContainer.java 是用来做存储在线用户的,这个对象存在spring中,通过ApplicationContext获取到,没用spring反正建个单例对象都行。
     package com.rw.article.chat.websocket;
    
    import io.netty.channel.ChannelHandlerContext;
    import lombok.val;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.management.relation.Relation;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @author Zhou Zhong Qing
     * @Title: ${file_name}
     * @Package ${package_name}
     * @Description: 存储在线ws用户的容器
     * @date 2019/4/16 20:50
     */
    @Component
    public class OnlineContainer {
    
        private Logger log = LoggerFactory.getLogger(this.getClass()); // 日志对象
        /**
         * <session,ChannelHandlerContext>
         **/
        private Map<String, ChannelHandlerContext> onlineUserMap = new ConcurrentHashMap<>();
    
        /**
         * <userId,sessionId>
         **/
        private Map<String, String> userMap = new ConcurrentHashMap<>();
    
    
     
    
        public Map<String, String> getUserMap() {
            return userMap;
        }
    
        public void setUserMap(Map<String, String> userMap) {
            this.userMap = userMap;
        }
    
        public Map<String, ChannelHandlerContext> getOnlineUserMap() {
            return onlineUserMap;
        }
    
        public void setOnlineUserMap(Map<String, ChannelHandlerContext> onlineUserMap) {
            this.onlineUserMap = onlineUserMap;
        }
    
    
     
    
        /***
         * 根据userId得到通道
         * */
        public ChannelHandlerContext getChannelHandlerContextByUserId(String userId) {
            return onlineUserMap.getOrDefault(userMap.getOrDefault(userId, ""), null);
        }
    
        /***
         * 添加session信息
         * */
        public void putAll(String userId, ChannelHandlerContext ctx) {
            userMap.put(userId, ctx.channel().id().asLongText());
            onlineUserMap.put(ctx.channel().id().asLongText(), ctx);
            log.info("用户 [ {} ] 上线", userId);
        }
    
    
        /***
         * 删除session信息
         * */
        public String removeAll(String sessionId) {
            //如果存在则删除
            String key = null;
            if (userMap.containsValue(sessionId)) {
    
                for (Map.Entry<String, String> entry : userMap.entrySet()) {
                    if (null != entry.getValue() && entry.getValue().equals(sessionId)) {
                        key = entry.getKey();
                        break;
                    }
                }
                if (null != key) {
                    log.info("用户 [ {} ] 离线 ", key);
                    userMap.remove(key);
                }
                onlineUserMap.remove(sessionId);
            }
            return key;
        }
    }
    
    
    • BeansUtils.java 主要用来拿spring容器中bean的实例
     package com.rw.article.common.spring;
    
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.lang.Nullable;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @author Zhou Zhong Qing
     * @Title: ${file_name}
     * @Package ${package_name}
     * @Description: spring util
     * @date 2018/8/1 17:08
     */
    @Component
    public class BeansUtils  implements ApplicationContextAware {
    
        private static ApplicationContext context;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            BeansUtils.context = applicationContext;
        }
    
        public static   <T> T getBean(Class<T> bean) {
            return context.getBean(bean);
        }
        public  static  <T> T getBean(String var1, @Nullable Class<T> var2){
            return context.getBean(var1, var2);
        }
    
        public static   ApplicationContext getContext() {
            return context;
        }
    
    }
    
    
    • Constants.java
     /**
     * @author Zhou Zhong Qing
     * @Title: ${file_name}
     * @Package ${package_name}
     * @Description: 常量及公共方法
     * @date 2018/8/117:03
     */
    public class Constants {
    
        /** 移除聊天关系 **/
        public static final String REMOVE_CUSTOMER_RELATION = "REMOVE_CUSTOMER_RELATION";
    
        public static final String DEFAULT_SUCCESS_STRING = "success";
    
    
        public static final String DEFAULT_ZERO = "0";
    
        /** 默认客服前缀**/
        public static final String DEFAULT_CUSTOMER_PREFIX = "tb_";
    
        /** 默认用户前缀**/
        public static final String DEFAULT_USER_PREFIX = "user_";
    
    
        public static final String DEFAULT_WEB_SOCKET_LINK = "/ws";
        }
    
    • 3、页面测试这是一个测试websocket的页面
      • chat.html
    <!DOCTYPE html>
    <html  lang="zh-CN" xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8">
        <title>web socket 测试</title>
    </head>
    <body>
    
    <div style="width: 600px;height: 400px;">
        <p>服务器输出:</p>
        <div style="border: 1px solid #CCC;height: 300px;overflow: scroll" id="server-msg-container">
    
        </div>
        <p>
            <textarea id="inp-msg" style="height: 50px;width: 500px"></textarea><input type="button" value="发送" id="send"><br/>
            选择图片: <input type="file" id="send-pic">
        </p>
    </div>
    
    <script type="application/javascript">
        //?userId=tb_1
        var ws = new WebSocket("ws://localhost:8092/ws?userId=tb_1");
        ws.onopen = function (ev) {
    
        };
        ws.onmessage = function (ev) {
            console.info("onmessage", ev);
            var inpMsg = document.getElementById("server-msg-container");
            if (typeof  ev.data === "string") {
                inpMsg.innerHTML += ev.data + "<br/>";
            } else {
                var result = ev.data;
                var flagReader = new FileReader();
                flagReader.readAsArrayBuffer(result.slice(0, 4));
                flagReader.onload = function (flag) {
                    console.log(new DataView(flag.target.result).getInt32(0))
                    if (new DataView(flag.target.result).getInt32(0) === 20) {
                        var imageReader = new FileReader();
                        imageReader.readAsDataURL(result.slice(4));
                        imageReader.onload = function (img) {
                            var imgHtml = "<img src='" + img.target.result + "' style='width: 100px;height: 100px;'>";
                            inpMsg.innerHTML += imgHtml.replace("data:application/octet-stream;", "data:image/png;") + "<br />";
                        }
                    } else {
                        alert("后端返回的是非图片类型数据,无法显示。");
                    }
                }
            }
        };
        ws.onerror = function () {
            var inpMsg = document.getElementById("server-msg-container");
            inpMsg.innerHTML += "发生异常" + "<br/>";
        };
        ws.onclose = function () {
            var inpMsg = document.getElementById("server-msg-container");
            inpMsg.innerHTML += "webSocket 关闭" + "<br/>";
        };
    
        // 发送文字消息
        document.getElementById("send").addEventListener("click", function () {
            var data = {};
            var text = document.getElementById("inp-msg").value;
            data.text = text;
            data.toUserId = "user_1";
            data.fromUserId = "tb_1";
            ws.send(JSON.stringify(data));
        }, false);
    
        // 发送图片
        document.querySelector('#send-pic').addEventListener('change', function (ev) {
            var files = this.files;
            if (files && files.length) {
                var file = files[0];
                var fileType = file.type;
                // 表示传递的是 非图片
                var dataType = 20;
                if (!/^image/.test(fileType)) {
                    // 表示传递的是 图片
                    dataType = 10;
                    return;
                }
                var fileReader = new FileReader();
    
                //base64
                fileReader.readAsDataURL(file);
                fileReader.onload = function (e) {
                    console.log(this.result);
                    var data = {};
                    data.text = this.result;
                    data.toUserId = "user_1";
                    data.fromUserId = "tb_1";
                    ws.send(JSON.stringify(data));
    
                }
                //Blob对象方式
                /*fileReader.readAsArrayBuffer(file);
                fileReader.onload = function (e) {
                    // 获取到文件对象
                    var result = e.target.result;
    				console.log("result : " +result);
                    // 创建一个 4个 字节的数组缓冲区
                    var arrayBuffer = new ArrayBuffer(4);
                    var dataView = new DataView(arrayBuffer);
                    // 从第0个字节开始,写一个 int 类型的数据(dataType),占4个字节
                    dataView.setInt32(0, dataType);
                    // 组装成 blob 对象
                    var blob = new Blob([arrayBuffer, result]);
    				var objectUrl = URL.createObjectURL(blob);
    				console.log("objectUrl : " + objectUrl);
                    // 发送到 webSocket 服务器端
                    ws.send(blob);
                }*/
            }
        }, false);
    </script>
    
    </body>
    </html>
    
    • 发送图片的功能上面两种方式一种是发base64的字符串,一种是Blob对象方式。
    • 目前我对netty的还不是很熟悉,如果有错误的地方还望指正。
    展开全文
  • 局域网聊天工具基于springboot+netty websocket,可发送图片表情包。
  • 简易版netty websocket通讯demo 聊天
  • 基于Netty5.0的高级案例NettyWebsocket,详情请参见博文:http://blog.csdn.net/l1028386804/article/details/55026558
  • 这是一个java web项目集成了netty websocket的完整代码。java web项目作为服务器端和客户端进行数据通信。但是常常存在提示Max frame length of 65536 has been exceeded问题。初始化握手对象时指定了...
  • netty-netty websocket协议实现 摘自<netty权威指南> http协议弊端 http 协议采用半双工协议。半双工通信指数据可以在客户端和服务器端上传输,但不能同时传输 http 消息冗长繁琐 WebSocket协议 websocket ...

    netty-netty websocket协议实现

    摘自<netty权威指南>

    http协议弊端

    • http 协议采用半双工协议。半双工通信指数据可以在客户端和服务器端上传输,但不能同时传输
    • http 消息冗长繁琐

    WebSocket协议

    websocket 特点:

    • 单一的TCP连接,采用全双工模式通信
    • 对代理,防火墙和路由器透明
    • 无头部信息、cookie 和身份认证
    • 无安全开销
    • 通过 ping/pong保持链路激活
    • 服务器可以主动传递消息给客户端,不再需要客户端轮询

    netty实现websocket协议

    服务器端:

    public class WebsocketServer {
    
    	private int port;
    
    	public WebsocketServer(int port) {
    		this.port = port;
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    
    		int port = 8080;
    		new WebsocketServer(port).start();
    	}
    
    	private void start() throws InterruptedException {
    
    		EventLoopGroup boss = new NioEventLoopGroup();
    		EventLoopGroup work = new NioEventLoopGroup();
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(boss, work)
    					.channel(NioServerSocketChannel.class)
    					.option(ChannelOption.SO_BACKLOG, 100)
    					.childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						protected void initChannel(SocketChannel ch) throws Exception {
    
    							ch.pipeline().addLast("http-decoder", new HttpServerCodec());
    							ch.pipeline().addLast("http-aggregate", new HttpObjectAggregator(65536));
    							ch.pipeline().addLast("http-chunk",new ChunkedWriteHandler());
    							ch.pipeline().addLast("", new WebSocketServerHandler());
    
    
    
    						}
    					});
    			ChannelFuture future = b.bind(port);
    			future.addListener(new ChannelFutureListener() {
    				@Override
    				public void operationComplete(ChannelFuture future) throws Exception {
    					if (future.isSuccess()){
    						System.out.println("server start success...");
    					}else {
    						System.out.println("server start failed...");
    					}
    				}
    			});
    			future.channel().closeFuture().sync();
    		} finally {
    
    			boss.shutdownGracefully();
    			work.shutdownGracefully();
    		}
    
    	}
    }
    
    

    WebSocketServerHandler :

    public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    
    	private WebSocketServerHandshaker handshaker;
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, 
                                        Object msg) throws Exception {
    
    		if (msg instanceof FullHttpRequest) {
    			handleHttpRequest(ctx, (FullHttpRequest) msg);
    
    		} else if (msg instanceof WebSocketFrame) {
    
    			handleWebSocketFrame(ctx, (WebSocketFrame) msg);
    		}
    	}
    
    	private void handleWebSocketFrame(ChannelHandlerContext ctx, 
                                                    WebSocketFrame frame) {
    
    		if (frame instanceof CloseWebSocketFrame) {
    			handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
    			return;
    		}
    
    		if (frame instanceof PingWebSocketFrame) {
    			ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
    			return;
    		}
    
    
    		if (!(frame instanceof TextWebSocketFrame)) {
    			throw new UnsupportedOperationException(
    					String.format("%s frame types not supported", 
                        frame.getClass().getName()));
    		}
    
    		String text = ((TextWebSocketFrame) frame).text();
    
    		System.out.println(String.format("%s recv %s", ctx.channel(), text));
    		ctx.channel().write(new TextWebSocketFrame(text
    				+ ",欢迎使用websocket服务,现在时刻:" + new Date().toString()));
    
    	}
    
    	private void handleHttpRequest(ChannelHandlerContext ctx,
                                                         FullHttpRequest request) {
    		if (!request.decoderResult().isSuccess() ||
    				(!"websocket".equals(request.headers().get("Upgrade")))) {
    			sendHttpResponse(ctx, request, new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
    		}
    
    		WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
    				"ws://localhost:8080/websocket", null, false
    		);
    
    		handshaker = factory.newHandshaker(request);
    		if (handshaker == null) {
    
    			WebSocketServerHandshakerFactory
                            .sendUnsupportedVersionResponse(ctx.channel());
    		} else {
    			handshaker.handshake(ctx.channel(), request);
    
    		}
    	}
    
    	private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request,
    	                              FullHttpResponse response) {
    
    		if (response.status().code() != 200) {
    			ByteBuf byteBuf = Unpooled.copiedBuffer(response.status().toString(),
    					CharsetUtil.UTF_8);
    			response.content().writeBytes(byteBuf);
    			byteBuf.release();
    			response.headers().set(HttpHeaderNames.CONTENT_LENGTH,
    					response.content().readableBytes());
    		}
    
    		ChannelFuture future = ctx.writeAndFlush(response);
    		if (!HttpUtil.isKeepAlive(request) || response.status().code() != 200) {
    			future.addListener(ChannelFutureListener.CLOSE);
    		}
    	}
    
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) 
                                                        throws Exception {
    		ctx.flush();
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, 
                                        Throwable cause) throws Exception {
    		cause.printStackTrace();
    		ctx.close();
    	}
    }
    
    
    

    客户端:

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>时间服务器</title>
    </head>
    <body>
    
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
    
        if (window.WebSocket){
            socket = new WebSocket("ws://localhost:8080/websocket");
            socket.onmessage = function (event) {
                var resp = document.getElementById("responseText");
                resp.value = '';
                resp.value = event.data;
            }
    
            socket.onopen = function (event) {
                var txt = document.getElementById("responseText");
                txt.value = "打开WebSocket正常,浏览器支持WebSocket!";
            }
            
            socket.onclose = function (event) {
                var resp = document.getElementById("responseText");
                resp.value = '';
                resp.value = event.data;
            }
        }else {
            alert("抱歉,您的浏览器不支持WebSocket协议");
        }
    
    
        function sendMsg(message) {
            if (!window.WebSocket){
                return;
            }
    
            if (socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else {
                alert("WebSocket连接没有成功");
            }
            
        }
    </script>
    
    
    <form onsubmit="false">
        <input type="text" name="message" value="netty definitive guide">
        <br>
        <br>
        <input type="button" value="send websocket request" 
                            onclick="sendMsg(this.form.message.value)">
        <hr color="red">
        <h3>服务器响应消息</h3>
        <textarea id="responseText" style="width:500px;height: 300px;"></textarea>
    </form>
    </body>
    </html>
    
    

    运行结果:

    server start success...
    [id: 0x0a496855, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:53948]
    									        recv netty definitive guide
    
    展开全文
  • netty WebSocket后面加参数

    千次阅读 2019-04-21 23:48:25
    依赖于Tomcat的webSocket地址后面是可以随便跟参数的,但是发现netty WebSocket却不能加参数,代码如下: WebSocketServer.java package com.rw.article.chat.websocket; import ...
  • netty websocket配置wss

    千次阅读 2019-06-10 14:14:27
    测试的时候使用netty websocket正常在正事环境报错 The page at 'https://xxxxxx/h5/#/chatIM' was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://192.168.0.50:9090//...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 12,056
精华内容 4,822
关键字:

nettywebsocket