精华内容
下载资源
问答
  • 2021-06-05 19:15:50

    一般我们用的最多的就是http请求,但是频繁的请求可能对服务造成的压力很大,所以今天谈谈websocket长连接,一句话:简单

    1、什么是长连接?

    A:一次请求连接,终身使用,就可以长久的保持信息的交互,除非服务挂了

    2、QT里面是如何使用websocket进行长连接的

    ①首先在pro文件里面添加QT += websockets

    ②#include

    ③初始化

    _pdataRecvWS = new QWebSocket();

    connect(_pdataRecvWS,SIGNAL(disconnected()),this,SLOT(onDisconnected()),Qt::AutoConnection);

    connect(_pdataRecvWS,SIGNAL(textMessageReceived(QString)),this,SLOT(onTextReceived(QString)),Qt::AutoConnection);

    connect(_pdataRecvWS,SIGNAL(connected()),this,SLOT(onConnected()),Qt::AutoConnection);

    connect(_ptimer,SIGNAL(timeout()),this,SLOT(reconnect()),Qt::AutoConnection);

    _pdataRecvWS->open(QUrl("ws://localhost:8080"));

    ④对应的槽函数

    //断开连接会触发这个槽函数

    void MainWindow::onDisconnected()

    {

    _ptimer->start(3000);

    ui->textEdit->append("websocket is disconnected");

    }

    //连接成功会触发这个槽函数

    void MainWindow::onConnected()

    {

    _ptimer->stop();

    ui->textEdit->append("connect successful");

    }

    //收到服务发来的消息会触发这个槽函数

    void MainWindow::onTextReceived(QString msg)

    {

    ui->textEdit->append("textReceiveString" + msg);

    }

    //断开连接会启动定时器,触发这个槽函数重新连接

    void MainWindow::reconnect()

    {

    ui->textEdit->append("websocket reconnected");

    _pdataRecvWS->abort();

    _pdataRecvWS->open(QUrl("ws://localhost:8080"));

    }

    http中长连接和websocket的长连接的区别

    一.什么是http协议 HTTP是一个应用层协议,无状态的,端口号为80.主要的版本有1.0/1.1/2.0.   HTTP/1.* 一次请求-响应,建立一个连接,用完关闭: HTTP/1.1 串行化 ...

    微信小程序中如何使用WebSocket实现长连接(含完整源码)

    本文由腾讯云技术团队原创,感谢作者的分享. 1.前言   微信小程序提供了一套在微信上运行小程序的解决方案,有比较完整的框架.组件以及 API,在这个平台上面的想象空间很大.腾讯云研究了一番之后,发现 ...

    Qt::浅谈信号槽连接,参数在多线程中的使用

    Qt的信号槽有五种连接方式定义在enum Qt::ConnectionType,下面简单介绍 Qt::AutoConnection:自动判断连接方式,如果信号发送对象和执行槽对象在同一线程,那么等于Q ...

    Websocket如何建立连接

    前面提到,WebSocket复用了HTTP的握手通道.具体指的是,客户端通过HTTP请求与WebSocket服务端协商升级协议.协议升级完成后,后续的数据交换则遵照WebSocket的协议. 1.客户 ...

    QT中16进制字符串转汉字

    最经在研究AT指令接受短信,短信是unicode编码,接受后需要根据系统的编码方案进行相关的转码比如接受到了一串字符4F60597D,它是“你好”的unicode编码,一个unicode编码占两个字节 ...

    WebSocket与Tcp连接

    最近做了一个项目,客户端为WebSocket页面,服务器端为Tcp控制台 .将代码贴出来,供需要的参考. 1.服务器端代码 其中服务器的Session使用了第三方插件,为TCP连接. 2.客户端代码如 ...

    【土旦】Vue+WebSocket 实现长连接

    1.websocket 连接代码 created() { this.initWebsocket() }, methods: { // 初始化websocket initWebsocket() { le ...

    spring websocket自动断开连接再创建引发的问题解决方案

    问题:由于 web session 超时时间为 30 分钟,如用户在 web session 规定时间内没有退出系统,但由于其它原因 用户却断开的 websocket 的连接,如果用户还要聊天或是其它 ...

    关于QT内部16进制、十进制、QByteArray,QString

    QT里面的数据转化成十六进制比较麻烦,其他的int或者byte等型都有专门的函数,而十六进制没有特定的函数去转化,这我在具体的项目中已经解决(参考网上大神)->小项目程序 QT里面虽然有什么QS ...

    随机推荐

    Bus Hound 的使用方法

    背景: 最近在研究USB相关协议,需要对USB数据进行抓取分析,Bus Hound是个非常赞的工具,在此将其使用方法记录下来,以备下次快速上手使用. 正文: 主界面如下: 首先关注菜单栏三个选项: C ...

    浅析C++的内存管理

    在C++中,内存分成5个区,他们分别是堆.栈.自由存储区.全局/ 静态存储区和常量存储区. 栈,就是那些由编译器在需要的时候分配,在不需要的时候自动清楚的变量的存储区.里面的变量通常是局部变量.函数参 ...

    vs 点击就设置项目为默认启动项

    装好系统没注意 做项目的时候解决方案比较多 发现点击哪个项目哪个项目就成了默认启动项目 这个开始没觉出来 最后发现挺烦人的 想想难道是我装vs装的么 我于是卸载了全新安装了 还是一个吊样 无意间试了下 ...

    HTML高级标签(2)————窗体分帧(1)————分帧演示

    我们能够简单的编写一个多帧的窗体,而且能够随意的划分窗体区域.

    更多相关内容
  • WebSocket消息推送

    2020-09-07 20:00:16
    二、WebSocket简介与消息推送 三、WebSocket客户端 四、WebSocket服务器端 五、测试运行 六、小结与消息推送框架 6.1、开源Java消息推送框架 Pushlet 6.2、开源DotNet消息推送框架SignalR 七、代码下载 ...

    目录

    B/S结构的软件项目中有时客户端需要实时的获得服务器消息,但默认HTTP协议只支持请求响应模式,这样做可以简化Web服务器,减少服务器的负担,加快响应速度,因为服务器不需要与客户端长时间建立一个通信链接,但不容易直接完成实时的消息推送功能,如聊天室、后台信息提示、实时更新数据等功能,但通过polling、Long polling、长连接、Flash Socket以及HTML5中定义的WebSocket能完成该功能需要。

    一、Socket简介

    Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求。Socket的英文原义是“孔”或“插座”,作为UNIX的进程通信机制。Socket可以实现应用程序间网络通信。

    Socket可以使用TCP/IP协议或UDP协议。

    TCP/IP协议

    TCP/IP协议是目前应用最为广泛的协议,是构成Internet国际互联网协议的最为基础的协议,由TCP和IP协议组成:
    TCP协议:面向连接的、可靠的、基于字节流的传输层通信协议,负责数据的可靠性传输的问题。

    IP协议:用于报文交换网络的一种面向数据的协议,主要负责给每台网络设备一个网络地址,保证数据传输到正确的目的地。

    UDP协议

    UDP特点:无连接、不可靠、基于报文的传输层协议,优点是发送后不用管,速度比TCP快。

    二、WebSocket简介与消息推送

    B/S架构的系统多使用HTTP协议,HTTP协议的特点:

    1 无状态协议
    2 用于通过 Internet 发送请求消息和响应消息
    3 使用端口接收和发送消息,默认为80端口
    底层通信还是使用Socket完成。

    HTTP协议决定了服务器与客户端之间的连接方式,无法直接实现消息推送(F5已坏),一些变相的解决办法:

    双向通信与消息推送

    轮询:客户端定时向服务器发送Ajax请求,服务器接到请求后马上返回响应信息并关闭连接。 优点:后端程序编写比较容易。 缺点:请求中有大半是无用,浪费带宽和服务器资源。 实例:适于小型应用。

    长轮询:客户端向服务器发送Ajax请求,服务器接到请求后hold住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。 优点:在无消息的情况下不会频繁的请求,耗费资小。 缺点:服务器hold连接会消耗资源,返回数据顺序无保证,难于管理维护。 Comet异步的ashx,实例:WebQQ、Hi网页版、Facebook IM。

    长连接:在页面里嵌入一个隐蔵iframe,将这个隐蔵iframe的src属性设为对一个长连接的请求或是采用xhr请求,服务器端就能源源不断地往客户端输入数据。 优点:消息即时到达,不发无用请求;管理起来也相对便。 缺点:服务器维护一个长连接会增加开销。 实例:Gmail聊天

    Flash Socket:在页面中内嵌入一个使用了Socket类的 Flash 程序JavaScript通过调用此Flash程序提供的Socket接口与服务器端的Socket接口进行通信,JavaScript在收到服务器端传送的信息后控制页面的显示。 优点:实现真正的即时通信,而不是伪即时。 缺点:客户端必须安装Flash插件;非HTTP协议,无法自动穿越防火墙。 实例:网络互动游戏。

    Websocket:
    WebSocket是HTML5开始提供的一种浏览器与服务器间进行全双工通讯的网络技术。依靠这种技术可以实现客户端和服务器端的长连接,双向实时通信。
    特点:
    事件驱动
    异步
    使用ws或者wss协议的客户端socket

    能够实现真正意义上的推送功能

    缺点:

    少部分浏览器不支持,浏览器支持的程度与方式有区别。

    三、WebSocket客户端

    websocket允许通过JavaScript建立与远程服务器的连接,从而实现客户端与服务器间双向的通信。在websocket中有两个方法:  
        1、send() 向远程服务器发送数据
        2、close() 关闭该websocket链接
      websocket同时还定义了几个监听函数    
        1、onopen 当网络连接建立时触发该事件
        2、onerror 当网络发生错误时触发该事件
        3、onclose 当websocket被关闭时触发该事件
        4、onmessage 当websocket接收到服务器发来的消息的时触发的事件,也是通信中最重要的一个监听事件。msg.data
      websocket还定义了一个readyState属性,这个属性可以返回websocket所处的状态:
        1、CONNECTING(0) websocket正尝试与服务器建立连接
        2、OPEN(1) websocket与服务器已经建立连接
        3、CLOSING(2) websocket正在关闭与服务器的连接
        4、CLOSED(3) websocket已经关闭了与服务器的连接

      websocket的url开头是ws,如果需要ssl加密可以使用wss,当我们调用websocket的构造方法构建一个websocket对象(new WebSocket(url))的之后,就可以进行即时通信了。

    <!DOCTYPE html>
    <html>
    
        <head>
            <meta name="viewport" content="width=device-width" />
            <title>WebSocket 客户端</title>
        </head>
    
        <body>
            <div>
                <input type="button" id="btnConnection" value="连接" />
                <input type="button" id="btnClose" value="关闭" />
                <input type="button" id="btnSend" value="发送" />
            </div>
            <script src="js/jquery-1.11.1.min.js" type="text/javascript" charset="utf-8"></script>
            <script type="text/javascript">
                var socket;
                if(typeof(WebSocket) == "undefined") {
                    alert("您的浏览器不支持WebSocket");
                    return;
                }
    
                $("#btnConnection").click(function() {
                    //实现化WebSocket对象,指定要连接的服务器地址与端口
                    socket = new WebSocket("ws://192.168.1.2:8888");
                    //打开事件
                    socket.onopen = function() {
                        alert("Socket 已打开");
                        //socket.send("这是来自客户端的消息" + location.href + new Date());
                    };
                    //获得消息事件
                    socket.onmessage = function(msg) {
                        alert(msg.data);
                    };
                    //关闭事件
                    socket.onclose = function() {
                        alert("Socket已关闭");
                    };
                    //发生了错误事件
                    socket.onerror = function() {
                        alert("发生了错误");
                    }
                });
                
                //发送消息
                $("#btnSend").click(function() {
                    socket.send("这是来自客户端的消息" + location.href + new Date());
                });
                
                //关闭
                $("#btnClose").click(function() {
                    socket.close();
                });
            </script>
        </body>
    
    </html>

    四、WebSocket服务器端

    JSR356定义了WebSocket的规范,Tomcat7中实现了该标准。JSR356 的 WebSocket 规范使用 javax.websocket.*的 API,可以将一个普通 Java 对象(POJO)使用 @ServerEndpoint 注释作为 WebSocket 服务器的端点。

    @ServerEndpoint("/push")
     public class EchoEndpoint {
    
     @OnOpen
     public void onOpen(Session session) throws IOException {
     //以下代码省略...
     }
     
     @OnMessage
     public String onMessage(String message) {
     //以下代码省略...
     }
    
     @Message(maxMessageSize=6)
     public void receiveMessage(String s) {
     //以下代码省略...
     } 
    
     @OnError
     public void onError(Throwable t) {
     //以下代码省略...
     }
     
     @OnClose
     public void onClose(Session session, CloseReason reason) {
     //以下代码省略...
     } 
     
     }

    上面简洁代码即建立了一个WebSocket的服务端,@ServerEndpoint("/push")的annotation注释端点表示将WebSocket服务端运行在ws://[Server端IP或域名]:[Server端口]/项目/push的访问端点,客户端浏览器已经可以对WebSocket客户端API发起HTTP长连接了。
    使用ServerEndpoint注释的类必须有一个公共的无参数构造函数,@onMessage注解的Java方法用于接收传入的WebSocket信息,这个信息可以是文本格式,也可以是二进制格式。
    OnOpen在这个端点一个新的连接建立时被调用。参数提供了连接的另一端的更多细节。Session表明两个WebSocket端点对话连接的另一端,可以理解为类似HTTPSession的概念。
    OnClose在连接被终止时调用。参数closeReason可封装更多细节,如为什么一个WebSocket连接关闭。
    更高级的定制如@Message注释,MaxMessageSize属性可以被用来定义消息字节最大限制,在示例程序中,如果超过6个字节的信息被接收,就报告错误和连接关闭。

    package action;
    
    import javax.websocket.CloseReason;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    
    //ws://127.0.0.1:8087/Demo1/ws/张三
    @ServerEndpoint("/ws/{user}")
    public class WSServer {
        private String currentUser;
        
        //连接打开时执行
        @OnOpen
        public void onOpen(@PathParam("user") String user, Session session) {
            currentUser = user;
            System.out.println("Connected ... " + session.getId());
        }
    
        //收到消息时执行
        @OnMessage
        public String onMessage(String message, Session session) {
            System.out.println(currentUser + ":" + message);
            return currentUser + ":" + message;
        }
    
        //连接关闭时执行
        @OnClose
        public void onClose(Session session, CloseReason closeReason) {
            System.out.println(String.format("Session %s closed because of %s", session.getId(), closeReason));
        }
    
        //连接错误时执行
        @OnError
        public void onError(Throwable t) {
            t.printStackTrace();
        }
    }

    url中的字符张三是的路径参数,响应请求的方法将自动映射。

    五、测试运行

    六、小结与消息推送框架

     Socket在应用程序间通信被广泛使用,如果需要兼容低版本的浏览器,建议使用反向ajax或长链接实现;如果纯移动端或不需考虑非现代浏览器则可以直接使用websocket。Flash实现推送消息的方法不建议使用,因为依赖插件且手机端支持不好。关于反向ajax也有一些封装好的插件如“Pushlet”

    6.1、开源Java消息推送框架 Pushlet

    Pushlet 是一个开源的 Comet 框架,Pushlet 使用了观察者模型:客户端发送请求,订阅感兴趣的事件;服务器端为每个客户端分配一个会话 ID 作为标记,事件源会把新产生的事件以多播的方式发送到订阅者的事件队列里。

    源码地址:https://github.com/wjw465150/Pushlet

    Pushlet是一种comet实现:在Servlet机制下,数据从server端的Java对象直接推送(push)到(动态)HTML页面,而无需任何Javaapplet或者插件的帮助。它使server端可以周期性地更新client的web页面,这与传统的request/response方式相悖。浏览器client为兼容JavaScript1.4版本以上的浏览器(如InternetExplorer、FireFox),并使用JavaScript/DynamicHTML特性。而底层实现使用一个servlet通过Http连接到JavaScript所在的浏览器,并将数据推送到后者。

    6.2、开源DotNet消息推送框架SignalR

    SignalR是一个ASP .NET下的类库,可以在ASP .NET的Web项目中实现实时通信。在Web网页与服务器端间建立Socket连接,当WebSockets可用时(即浏览器支持Html5)SignalR使用WebSockets,当不支持时SignalR将使用长轮询来保证达到相同效果。

    官网:http://signalr.net/

    源码:https://github.com/SignalR/SignalR

     

    七、代码下载

    7.1、Java实现的服务器端代码与客户端代码下载

    点击下载服务器端代码

    点击下载客户端代码

    7.2、DotNet服务器端手动连接实现代码下载

    点击下载DotNet服务器端手动连接实现代码

    7.3、DotNet下使用SuperWebSocket三方库实现代码下载

    点击下载DotNet下使用SuperWebSocket三方库实现代码

    展开全文
  • 本文介绍了一简易的实时消息推送系统。 需求分析 后台同时对接了网页,微信公众号,iOS以及Android客户端。在某些特定场景下,比如一用户接收到其他用户的提问,我们就需要向这用户推送一条消息。用户或者在...

    本文介绍了一个简易的实时消息推送系统。

    需求分析

    后台同时对接了网页,微信公众号,iOS以及Android客户端。在某些特定场景下,比如一个用户接收到其他用户的提问,我们就需要向这个用户推送一条消息。用户或者在手机上收到了一条弹窗通知,或者在网页上看到了消息图标显示小红点。

    公众号消息推送使用客服接口推送消息。

    移动端的消息推送使用国内某些知名的推送平台。在用户从APP登录的时候,APP会主动向推送平台设置自己的ID。后台将消息发送到推送平台时指明这个ID即可。

    网页的消息推送一般常见的实现方法有轮询,长连接,WebSocket等等。关于三者的区别这里不加以讨论,总之我们使用的是WebSocket。

    消息传递的基本流程

    后台服务器在某些情况下生成了一条消息, 首先将消息保存到本地数据库,这样客户端可以调用API显示消息列表。随后消息被放入任务队列,任务队列将消息通过推送平台发送至APP,通过微信公众号后台发送至用户微信客户端。

    为了将消息通过WebSocket发送至在线的用户手中,我们先将消息发布到Redis。订阅了Redis的Node收到消息,将消息通过WebSocket传递至与之连接的浏览器。

    一个大致的消息流如下图所示:

    Redis的发布订阅机制

    所谓的Publish/Subscribe,可以让发布者将消息发布至某一个channel,所有订阅了这个channel的订阅者就可以立即收到这个消息。在Redis的发布订阅机制里面,一个消息可以被发布至多个channel,订阅者也可以同时订阅多个channel的消息。

    为了订阅一个名为message-channel的消息,我们可以在Redis命令行下执行

    127.0.0.1:6379> subscribe message-channel
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "message-channel"
    3) (integer) 1
    

    为了向message-channel发布消息,我们打开一个新的命令行窗口,运行

    127.0.0.1:6379> publish message-channel "This is a message"
    (integer) 1
    127.0.0.1:6379>
    

    消息发布之后,原本订阅的那个终端就可以收到消息了:

    Python发布消息·Node订阅消息

    向Redis布的消息的数据类型必须为byte,如果我们需要传递复杂的数据结构,就需要将数据dump为json格式。

    import json
    from redis import StrictRedis
    
    client = StrictRedis()
    data = {
        "uid": "user id",
        "message": {
            "content": "You have a message",
            "message_id": "message id"
        }
    }
    
    client.publish("message-channel", json.dumps(data))
    

    订阅message-channel的Node在获取消息之后,将消息体解析,获取到里面的user id,根据这个id决定消息发送的对象。如果此时用户不在线,消息就不会被发出。

    var redis = require('redis');
    var redisListener = redis.createClient();
    
    redisListener.subscribe(config.get('redis_message_channel'));
    redisListener.on('message', function(channel, data){
      console.log('get a redis message', channel, data);
      var data = JSON.parse(data);
      io.sockets.in(data.uid).emit('message', data.message);
    })
    

    创建WebSocket的服务

    本文对Socket.IO以及WebSocket没有加以严格的区分,但严格的来说,Socket.IO并非完全是WebSocket。Socket.IO是一个封装了WebSocket协议的库,隐藏了底层协议的细节,提供比较高层次的功能。它首先尝试创建一个长连接,在可能的情况之下尝试将连接升级到更加轻量级的WebSocket。此外它还提供了一些更加高级的功能,比如断线检测,断线重连等。

    Socket.IO的服务需要使用它自带的client去连接服务,浏览器默认的WebSocket对象是不能用的。

    如下代码可以创建一个简单的Socket.IO服务

    var app = require('express')();
    var server = require('http').createServer(app);
    var io = require('socket.io')(server);
    io.on('connection', function(){
    	
    });
    server.listen(3000);
    

    此时socket会运行一个自带的http服务,你可以打开http://127.0.0.1:3000/, 开启调试工具并执行如下代码

    // 加载socket.io 这个库
    var script = document.createElement('script');
    script.src = 'http://127.0.0.1:3000/socket.io/socket.io.js';
    document.body.append(script);
    
    // 连接到服务器,将任何收到的消息log到console
    var socket = io.connect();
    socket.on('message', function(data){
        console.log(data);
    })
    

    WebSocket的权限验证

    对于每一个WebSocket连接,你需要验证连接人的身份,验证后才能够向这个连接发送消息。

    一般的HTTP请求协议可以通过验证cookie,或者在HTTP头部放置token达到验证的目的。WebSocket也可以用类似的方法,不同之处在于WebSocket只需要在连接建立时验证一次即可。注意此时WebSocket服务以及后端的HTTP服务必须在同一个域下,不然后端服务的cookie不会被传递给WebSocket服务。一个可行的做法是使用nginx同时反向代理后端的HTTP以及WebSocket。

    后端的Node服务接收到连接请求之后,将cookie转发给Web服务做验证。转发给Web做验证的原因在于WebSocket常用于高并发的场景,应该避免Node服务直接请求数据库

    比如我们使用cookie验证用户,那么我们可以这样:

    var cookie = require('cookie');
    
    io.on('connection', function(){
        var cookies = cookie.parse(socket.handshake.headers.cookie || '');
        // 将cookie通过http协议发送至后端服务器验证。
        var uid = validateCookie(cookies);
        if(uid){
            // 加入一个房间,房间号即为用户id
            socket.join(uid);
        }
        else{
            socket.disconnect()
        }
    });
    

    如果验证失败,主动关闭连接,或者通知客户端关闭。如果验证成功,我们可以让这个连接监听加入一个专门的room,为了简单起见我们直接使用户的id。这样,从Redis获取的消息体里面也有用户id,我们可以据此将指定的消息送入指定用户的浏览器里面。

    可能存在的问题

    按照我们的需求,任何一个消息最终只会被分发给一个用户。而Socket.IO的设计初衷则是基于聊天室的。它认为一个消息有可能会被分发给一个聊天室里的所有用户。在这个矛盾之下,你会发现这个系统的水平扩展并不是很方便

    目前这个系统的消息会被发布到单一的Redis channel,并且只有一个Node进程在处理所有的连接。考虑连接过多,单一进程无法处理的情况,为了扩展,一般的做法无非是:

    1. 增加单一机器上面的Node进程数。
    2. 增加多台物理机器,每台物理机器运行多个Node服务。

    上面的(1比较容易实现,简单的来说,每一个用户的WebSocket会被随机分配到任何一个Node进程。所有Node进程订阅同一个Redis channel。这样一个消息会被所有Node查看到,然后可能会被其中一个进程传递给自己正在连接的用户。

    在(1这个方法里,消息如果仅仅被相关的Node进程捕获就好了,毕竟最终只会有一个Node进程处理这个消息。但退一步讲,哪怕消息被传递给了所有Node进程也应该不会有太大性能的问题。考虑实现(2的机制,难道对于一个消息也要将它发布多多台物理机器的多个Node进程上面去?

    一个可行的扩展方法是基于uid做一致性hash,客户端的连接按照uid被hash到指定的Node进程。Node进程按照同样的算法处理指定uid的消息,当然这个已经超出了本文的讨论范围。

    出处:https://ifconfiger.com/articles/push-message-with-redis-and-websocket

    展开全文
  • SpringCloud ZUUL集群 + Nginx + Redis 实现Websocket向客户端推送消息简介Nginx配置Zuul websocket配置Redis配置及websocket配置前端代码 简介 本文主要是针对分布式场景下的使用websocket的一解决方案。很遗憾的...

    SpringCloud ZUUL集群 + Nginx + Redis 实现Websocket向客户端推送消息

    简介

    本文主要是针对分布式场景下的使用websocket的一个解决方案。很遗憾的是,websocketsession是不支持序列化操作,所以也就不可能存在redis中。

    我们知道在单节点中我们只需要把websocketsession存储在Map中就OK,每次发送通知都从map中根据clientID获取对应的websocket的session进行消息通知。但是在分布式多节点的系统中,每个节点的websocketsession是存在当前节点的内存中的,当A服务向A客户端推送消息时,B服务并不知道,此时B客户端就会无动于衷。所以存在websocketsession共享的问题,本文通过redis订阅广播的消息实现多节点服务同时向客户端推送消息。

    Nginx配置

    Nginx配置:

    1. Windows Nginx安装:
      官网下载链接: link.

      选择Windows版本下载并解压选择Windows版本下载并解压
      双击nginx.exe即可启动nginx双击nginx.exe即可启动nginx
      解压之后把前端文件放在nginx服务器上(nginx文件夹下)

    2. 在启动Nginx之前要配置nginx.conf,配置文件在conf文件夹下

    #user  nobody;
    worker_processes  1;
    
    error_log  logs/error.log;
    error_log  logs/error.log  notice;
    error_log  logs/error.log  info;
    
    #pid        logs/nginx.pid;
    
    
    events {
        worker_connections  1024;
    }
    http {
    	# 开启nginx对websocket的支持
        map $http_upgrade $connection_upgrade {
    		default upgrade;
    		'' close;
    	}
    	
        include       mime.types;
        default_type  application/octet-stream;
    
        #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
        #                  '$status $body_bytes_sent "$http_referer" '
        #                  '"$http_user_agent" "$http_x_forwarded_for"';
    
        #access_log  logs/access.log  main;
    
        sendfile        on;
        #tcp_nopush     on;
    
        #keepalive_timeout  0;
        keepalive_timeout  65;
        #gzip  on;
    	#配置上游服务器网关端口集群
    	#upstream  backServer{
    	    # 127.0.0.1:81 为网关地址  weight 为权重,值越大,访问到该台网关的几率越大
    	    #server 127.0.0.1:8999 weight=1;  
    	    #server 127.0.0.1:82 weight=1;
    	#}
    	#配置上游服务器 集群,默认轮询机制
        upstream backServer{
    		#每个请求按访问ip的hash结果分配,这样每个访客固定访问一个后端服务器,可以解决session的问题.可查看参考Nginx的Upstream5种分配的方式
    		ip_hash; 
            server 127.0.0.1:8999; # 网关地址
            server 127.0.0.1:8777; # 网关地址
            # 补充: backup表示从服务器或者叫备用服务器  只有当主服务器(8182端口)都不能访问时才会访问此(83端口)备用服务器 当主服务器恢复正常后 则访问主服务器
            #server 127.0.0.1:83 backup;
        }
        server {
        	# 监听的请求地址及端口号
            listen       8200;
            server_name  localhost; 
         
            #charset koi8-r;
    
            #access_log  logs/host.access.log  main;
    
            location / {
                root   C:/nginx/nginx-1.21.1/front/;
                index  login.html;
            }
    		# /aaa 代表请求地址中包含/aaa的会被分发到网关地址 
    		location /aaa {
    			  proxy_pass http://backServer;
    			  proxy_redirect default;
    			  # 开启跨域支持
    			  add_header Access-Control-Allow-Origin *;
    			  add_header Access-Control-Allow-Methods *;
    			  add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
    			  proxy_set_header Host $host:$server_port;
    			  proxy_set_header X-Real-IP $remote_addr;
    		}
    		# /websocket-env-data 拦截并分发到websocket的地址
    		location /websocket-env-data {
    			  proxy_pass http://backServer;
    			  proxy_redirect default;
    			  add_header Access-Control-Allow-Origin *;
    			  add_header Access-Control-Allow-Methods *;
    			  add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
    			  proxy_set_header Host $host:$server_port;
    			  proxy_set_header X-Real-IP $remote_addr;
    			  proxy_http_version 1.1;
    			  # 开启nginx对websocket的支持,会将http请求转为websocket请求
    			  proxy_set_header Upgrade $http_upgrade;
    		    proxy_set_header Connection $connection_upgrade;
    		}
    		
            #error_page  404              /404.html;
    
            # redirect server error pages to the static page /50x.html
            #
            error_page   500 502 503 504  /50x.html;
            location = /login.html {
                root   front;
            }
    
            # proxy the PHP scripts to Apache listening on 127.0.0.1:80
            #
            #location ~ \.php$ {
            #    proxy_pass   http://127.0.0.1;
            #}
    
            # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
            #
            #location ~ \.php$ {
            #    root           html;
            #    fastcgi_pass   127.0.0.1:9000;
            #    fastcgi_index  index.php;
            #    fastcgi_param  SCRIPT_FILENAME  /scripts$fastcgi_script_name;
            #    include        fastcgi_params;
            #}
    
            # deny access to .htaccess files, if Apache's document root
            # concurs with nginx's one
            #
            #location ~ /\.ht {
            #    deny  all;
            #}
        }
    
    
        # another virtual host using mix of IP-, name-, and port-based configuration
        #
        #server {
        #    listen       8000;
        #    listen       somename:8080;
        #    server_name  somename  alias  another.alias;
    
        #    location / {
        #        root   html;
        #        index  index.html index.htm;
        #    }
        #}
    
    
        # HTTPS server
        #
        #server {
        #    listen       443 ssl;
        #    server_name  localhost;
    
        #    ssl_certificate      cert.pem;
        #    ssl_certificate_key  cert.key;
    
        #    ssl_session_cache    shared:SSL:1m;
        #    ssl_session_timeout  5m;
    
        #    ssl_ciphers  HIGH:!aNULL:!MD5;
        #    ssl_prefer_server_ciphers  on;
    
        #    location / {
        #        root   html;
        #        index  index.html index.htm;
        #    }
        #}
    
    }
    
        
    

    Zuul websocket配置

    1. zuul集群此处不再详细配置,具体参考网上教程
    2. MyRequestInterceptor.java
    import feign.RequestInterceptor;
    import feign.RequestTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    import org.springframework.web.context.request.RequestContextHolder;
    
    import javax.servlet.http.HttpServletRequest;
    
    @Configuration
    @Component
    public class MyRequestInterceptor implements RequestInterceptor {
    
        @Autowired
        HttpServletRequest request;
    
        @Override
        public void apply(RequestTemplate requestTemplate) {
    //        System.out.println("MyRequestInterceptor apply begin.");
            try {
                String sessionId = RequestContextHolder.currentRequestAttributes().getSessionId();
                if (null != sessionId) {
                    requestTemplate.header("Cookie", "SESSION=" + sessionId);
                }
            } catch (Exception e) {
                e.printStackTrace();
    //            System.out.println("MyRequestInterceptor exception: "+ e);
            }
        }
    }
    
    

    SessionConfig.java

    
    import jdk.nashorn.internal.runtime.GlobalConstants;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.session.data.redis.RedisFlushMode;
    import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
    @Configuration
    // 开启Spring对HttpSession和Redis的支持
    @EnableRedisHttpSession(maxInactiveIntervalInSeconds = 86400*30, redisFlushMode = RedisFlushMode.IMMEDIATE)
    public class SessionConfig {
        //  maxInactiveIntervalInSeconds: 设置 Session 失效时间,使用 Redis Session 之后,原 Boot 的 server.session.timeout 属性不再生效
    }
    

    WebSocketFilter.java

    
    import com.netflix.zuul.ZuulFilter;
    import com.netflix.zuul.context.RequestContext;
    import org.springframework.stereotype.Component;
    
    import javax.servlet.http.HttpServletRequest;
    
    /**
     * 功能简述:
     * ZUUL开启websocket支持
     *
     * @date 2021/8/20
     * @since 1.0.0
     */
    @Component
    public class WebSocketFilter extends ZuulFilter {
    
        @Override
        public String filterType() {
            return "pre";
        }
    
        @Override
        public int filterOrder() {
            return 0;
    
        }
    
        @Override
        public boolean shouldFilter() {
            return true;
    
        }
    
        @Override
        public Object run() {
            RequestContext context = RequestContext.getCurrentContext();
            HttpServletRequest request = context.getRequest();
            System.out.println("request1:"+request.getHeader("host").toString());
            String upgradeHeader = request.getHeader("Upgrade");
            if (null == upgradeHeader) {
                upgradeHeader = request.getHeader("upgrade");
            }
            if (null != upgradeHeader && "websocket".equalsIgnoreCase(upgradeHeader)) {
                context.addZuulRequestHeader("connection", "Upgrade");
            }
    //        System.out.println("request2:"+request.toString());
            return null;
        }
    }
    
    

    在ZuulApplication开启Websocket过滤

    	/**
         * WebSocket过滤器
         *
         * @return 自定义访问过滤器
         */
        @Bean
        public WebSocketFilter webSocketFilter() {
            return new WebSocketFilter();
        }
    

    在这里插入图片描述
    在zuul的配置文件中加入

    ## @FeignClient(value = "服务名r") 设置可以有多个类存在相同的FeignClient 中的value值
    spring.main.allow-bean-definition-overriding=true
    

    在pom.xml中加入相关jar包

    <!-- https://mvnrepository.com/artifact/org.springframework.session/spring-session-data-redis -->
        <dependency>
          <groupId>org.springframework.session</groupId>
          <artifactId>spring-session-data-redis</artifactId>
          <version>2.3.0.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-redis -->
        <dependency>
          <groupId>org.springframework.data</groupId>
          <artifactId>spring-data-redis</artifactId>
          <version>2.3.0.RELEASE</version>
        </dependency>
    

    Redis配置及websocket配置

    在这里插入图片描述

    RedisConfig

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.*;
    import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    @Configuration
    public class RedisConfig {
        // 注入 RedisConnectionFactory
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
    
        @Bean
        public RedisTemplate<String, Object> functionDomainRedisTemplate() {
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
            return redisTemplate;
        }
    
        /**
         * 设置数据存入 redis 的序列化方式
         * @param redisTemplate
         * @param factory
         */
        private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.setHashKeySerializer(new StringRedisSerializer());
            redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
            redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
            redisTemplate.setConnectionFactory(factory);
        }
    
        /**
         * 实例化 HashOperations 对象,可以使用 Hash 类型操作
         * @param redisTemplate
         * @return
         */
        @Bean
        public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
            return redisTemplate.opsForHash();
        }
    
        /**
         * 实例化 ValueOperations 对象,可以使用 String 操作
         * @param redisTemplate
         * @return
         */
        @Bean
        public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
            return redisTemplate.opsForValue();
        }
    
        /**
         * 实例化 ListOperations 对象,可以使用 List 操作
         * @param redisTemplate
         * @return
         */
        @Bean
        public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
            return redisTemplate.opsForList();
        }
    
        /**
         * 实例化 SetOperations 对象,可以使用 Set 操作
         * @param redisTemplate
         * @return
         */
        @Bean
        public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
            return redisTemplate.opsForSet();
        }
    
        /**
         * 实例化 ZSetOperations 对象,可以使用 ZSet 操作
         * @param redisTemplate
         * @return
         */
        @Bean
        public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
            return redisTemplate.opsForZSet();
        }
    }
    

    RedisMsg

    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.CloseStatus;
    import org.springframework.web.socket.WebSocketMessage;
    import org.springframework.web.socket.WebSocketSession;
    
    @Component
    public interface RedisMsg {
    	//新增socket
    	void afterConnectionEstablished(WebSocketSession session) throws Exception;
    
    	//接收socket信息
    	void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception;
    
    	void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
    
    	void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception;
    
    	boolean supportsPartialMessages();
    
    	/**
    	 * 接收redis广播的订阅信息
    	 * @param message
    	 */
    	public void receiveMessage(String message);
    }
    

    RedisPublishConfig

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    
    @Configuration
    @Component
    public class RedisPublishConfig {
    	/*@Autowired
    	private StaticProperties staticProperties;*/
    	/**
    	 * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
    	 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
    	 * 
    	 * @param connectionFactory
    	 * @param listenerAdapter
    	 * @return
    	 */
    	@Bean
    	// 相当于xml中的bean
    	RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    											MessageListenerAdapter listenerAdapter) {
    		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    		container.setConnectionFactory(connectionFactory);
    		// 订阅了一个叫chat 的通道
    		container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
    		// 这个container 可以添加多个 messageListener
    		return container;
    	}
     
    	/**
    	 * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
    	 * 
    	 * @param receiver
    	 * @return
    	 */
    	@Bean
    	MessageListenerAdapter listenerAdapter(RedisMsg receiver) {
    		// 这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
    		// 也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
    		return new MessageListenerAdapter(receiver, "receiveMessage");
    	}
     
    }
    

    SpringUtilsCopy WebsocketHandler不支持自动注入

    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.NoSuchBeanDefinitionException;
    import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
    import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
    import org.springframework.stereotype.Component;
     
    @Component
    public class SpringUtilsCopy implements BeanFactoryPostProcessor {
     
        private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境
     
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            SpringUtilsCopy.beanFactory = beanFactory;
        }
     
        public static ConfigurableListableBeanFactory getBeanFactory() {
            return beanFactory;
        }
     
        /**
         * 获取对象
         *
         * @param name
         * @return Object 一个以所给名字注册的bean的实例
         * @throws org.springframework.beans.BeansException
         *
         */
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String name) throws BeansException {
            return (T) getBeanFactory().getBean(name);
        }
     
        /**
         * 获取类型为requiredType的对象
         *
         * @param clz
         * @return
         * @throws org.springframework.beans.BeansException
         *
         */
        public static <T> T getBean(Class<T> clz) throws BeansException {
            T result = (T) getBeanFactory().getBean(clz);
            return result;
        }
     
        /**
         * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
         *
         * @param name
         * @return boolean
         */
        public static boolean containsBean(String name) {
            return getBeanFactory().containsBean(name);
        }
     
        /**
         * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
         *
         * @param name
         * @return boolean
         * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
         *
         */
        public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
            return getBeanFactory().isSingleton(name);
        }
     
        /**
         * @param name
         * @return Class 注册对象的类型
         * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
         *
         */
        public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
            return getBeanFactory().getType(name);
        }
     
        /**
         * 如果给定的bean名字在bean定义中有别名,则返回这些别名
         *
         * @param name
         * @return
         * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
         *
         */
        public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
            return getBeanFactory().getAliases(name);
        }
    }
    

    WebSocketClient

    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    /**
     * 功能简述:
     * websocket
     *
     * @date 2021/8/24
     * @since 1.0.0
     */
    @Component
    @Slf4j
    public class WebSocketClient {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        /**
         * 向登录指定用户的所有客户端推送消息
         * @param messageContent 消息内容
         * @param userName 推送用户
         */
        public void pushInfo(String messageContent,String userName){
            // TODO Auto-generated method stub
            JSONObject result = new JSONObject();
            if(StringUtil.isBlank(messageContent)) {
                result.put("result", "error");
            }else {
                try {
                    //发送失败广播出去,让其他节点发送
                    //广播消息到各个订阅者
                    JSONObject message = new JSONObject();
                    message.put("userName", userName);
                    message.put("message", messageContent);
                    // 通过redis的订阅发布消息,所有订阅的用户均可以收到消息
                    redisTemplate.convertAndSend("chat",message.toString());
    
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("推送给客户端失败");
                }
                result.put("result", "success");
            }
            return ;
        }
    
        /**
         * 向所有客户端推送消息
         * @param messageContent
         */
        public void pushInfoToAll(String messageContent){
            // TODO Auto-generated method stub
            JSONObject result = new JSONObject();
            if(StringUtil.isBlank(messageContent)) {
                result.put("result", "error");
            }else {
                try {
                    //发送失败广播出去,让其他节点发送
                    //广播消息到各个订阅者
                    JSONObject message = new JSONObject();
                    message.put("userName", "");
                    message.put("message", messageContent);
                    // 通过redis的订阅发布消息,所有订阅的用户均可以收到消息
                    redisTemplate.convertAndSend("chat",message.toString());
    
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("推送给客户端失败");
                }
                result.put("result", "success");
            }
            return ;
        }
    
        /**
         * 向多个指定用户推送消息
         * @param messageContent
         */
        public void pushInfoToUsers(String messageContent, List<SysUser> userList){
            // Auto-generated method stub
            JSONObject result = new JSONObject();
            if(StringUtil.isBlank(messageContent)) {
                result.put("result", "error");
            }else {
                try {
                    userList.stream().forEach(p ->{
                        //广播消息到各个订阅者
                        JSONObject message = new JSONObject();
                        message.put("userName", p.getLoginName());
                        message.put("message", messageContent);
                        // 通过redis的订阅发布消息,所有订阅的用户均可以收到消息
                        redisTemplate.convertAndSend("chat",message.toString());
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("推送给客户端失败");
                }
                result.put("result", "success");
            }
            return ;
        }
    }
    

    WebSocketConfig

    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.config.annotation.EnableWebSocket;
    import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
    import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
    
    /***
     * WebSocketConfig
     */
    @Configuration
    @EnableWebSocket
    public class WebSocketConfig implements WebSocketConfigurer {
    
    
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            // webSocket通道
            // 指定处理器和路径
            registry.addHandler(new CTIHandler(), "/websocket-env-data/{userId}")
                    // 指定自定义拦截器
                    .addInterceptors(new WebSocketInterceptor())
                    // 允许跨域
                    .setAllowedOrigins("*");
            // sockJs通道
            registry.addHandler(new CTIHandler(), "/sock-js")
                    .addInterceptors(new WebSocketInterceptor())
                    .setAllowedOrigins("*")
                    // 开启sockJs支持
                    .withSockJS();
    
            registry.addHandler(new CTIHandler(), "/websocket-env-data1")
                    // 指定自定义拦截器
                    .addInterceptors(new WebSocketInterceptor())
                    // 允许跨域
                    .setAllowedOrigins("*");
        }
    
    }
    

    WebSocketInterceptor

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
    import java.util.Map;
    
    @Slf4j
    public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
    
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse seHttpResponse,
                                       WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
    //		HttpServletRequest request = ((ServletServerHttpRequest) serverHttpRequest).getServletRequest();
            String[] aa = serverHttpRequest.getURI().toString().split("/");
            String userName = aa[aa.length-1];
            attributes.put("userName", userName);
            log.info("握手之前");
            //从request里面获取对象,存放attributes
            return super.beforeHandshake(serverHttpRequest, seHttpResponse, wsHandler, attributes);
        }
    
        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                                   Exception ex) {
            log.info("握手之后");
            super.afterHandshake(request, response, wsHandler, ex);
        }
    
    }
    

    CTIHandler

    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.socket.*;
    
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Service
    @Slf4j
    public class CTIHandler implements WebSocketHandler,RedisMsg{
    
        /**
         * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
         */
        private static ConcurrentHashMap<String, WebSocketSession> socketMap = new ConcurrentHashMap<String, WebSocketSession>();
        //新增socket
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            log.info("websocket连接成功");
            //获取用户信息
            String userName = (String) session.getAttributes().get("userName");
    //        log.info("获取当前"+socketMap.get(userName));
            if(socketMap.get(userName)==null) {
                socketMap.put(userName,session);
                sendMessageToUser(userName, new TextMessage("链接建立成功"));
                //并且通过redis发布和订阅广播给其他的的机器,或者通过消息队列
            }
    //        log.info("链接成功");
        }
    
        //接收socket信息
        @Override
        public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
    //        log.info("收到信息"+webSocketMessage.toString());
            String userName = (String) webSocketSession.getAttributes().get("userName");
            synchronized (webSocketSession){
                webSocketSession.sendMessage(new TextMessage("aaa"));
            }
            sendMessageToUser(userName, new TextMessage("我收到你的信息了"));
        }
    
        /**
         * 发送信息给指定用户 (所有登录该账号的客户端)
         * @param clientId 指定用户
         * @param message
         * @return
         */
        public boolean sendMessageToUser(String clientId, TextMessage message) {
            socketMap.forEach((key, value) -> {
                boolean flag = true;
                if(StringUtil.isNotBlank(clientId)){
                    // 向指定用户发送消息
                    if(key.contains(clientId+"websocket")){
                        WebSocketSession session = value;
                        if(session==null) {
                            flag = false;
                        }
    //                log.info("进入发送消息");
                        if (!session.isOpen()) {
                            flag = false;
                        }
                        try {
                            if(flag){
    //                        log.info("正在发送消息");
                                synchronized (session){
                                    session.sendMessage(message);
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }else{
                    // 如果指定用户为空,则向所有客户端发送消息
                    WebSocketSession session = value;
                    if(session==null) {
                        flag = false;
                    }
    //                log.info("进入发送消息");
                    if (!session.isOpen()) {
                        flag = false;
                    }
                    try {
                        if(flag){
    //                        log.info("正在发送消息");
                            synchronized (session){
                                session.sendMessage(message);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
            return true;
        }
    
    
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            if (session.isOpen()) {
                session.close();
            }
            log.info("连接出错");
        }
    
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
            //获取用户信息
            String userName = (String) session.getAttributes().get("userName");
            if(socketMap.get(userName)!=null) {
                socketMap.remove(userName);
                //并且通过redis发布和订阅广播给其他的的机器,或者通过消息队列
            }
            log.info("连接已关闭:" + status);
        }
    
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
        /**
         * 接受订阅信息
         */
        @Override
        public void receiveMessage(String message) {
            // TODO Auto-generated method stub
            JSONObject sendMsg = JSONObject.parseObject(message.substring(message.indexOf("{")));
            String clientId = sendMsg.getString("userName");
            TextMessage receiveMessage = new TextMessage(sendMsg.getString("message"));
            // 获取当前内存中clientId 的session,根据 clientId (即userName)推送消息
            boolean flag = sendMessageToUser(clientId, receiveMessage);
            if(flag) {
                log.info("推送消息("+sendMsg.getString("message")+")成功!!!");
            }
        }
    }
    

    调用WebSocketClient的pushInfo方法即可实现推送

    前端代码

    let callback = null;
    let ws = null;
    let close = null;
    let lockReconnect = false; //避免重复连接
    let stop = false;
    let closeCallback = null;
    let errorCallback = null
    
    function getIp() {
      var ip
      if (httpUrl) {
        return ip = httpUrl.split(":")[1]
      }
    }
    
    
    function initWebSocket() {
      if (typeof WebSocket == "undefined") {
        console.log("当前浏览器 Not support websocket");
      } else {
        // 用户名  token
        var loginName = JSON.parse(localStorage.getItem('userInfo')).loginName;
        var userToken = localStorage.getItem('token');
        var userFlag = loginName +  "websocket" + userToken;
        var wsUrl = "ws:" + getIp() + ":8080/websocket-env-data/"+userFlag;
        console.log("websocket URL:"+wsUrl);
        if (window.soketFlag == null) {
          ws = new WebSocket(wsUrl);
          window.soketFlag = ws;
        } else {
          ws = window.soketFlag;
        }
    
        ws.onopen = function () {
          // $message.success("WebSocket连接成功")
          heartCheck.reset().start(); //传递信息
          console.log("WebSocket连接成功");
        };
        ws.onerror = function () {
          console.log("WebSocket连接失败1");
          // setTimeout(function () {
          //   window.location.reload()
          // }, 2000);
          reconnect(wsUrl);
          // $message.success("WebSocket连接失败");
          console.log("WebSocket连接失败2");
          if (typeof errorCallback === "function") {
            errorCallback("WebSocket连接失败");
          }
        };
        ws.onclose = function () {
          reconnect(wsUrl);
          // $message.success("WebSocket关闭");
          console.log("WebSocket关闭");
          // setTimeout(function () {
          //   window.location.reload()
          // }, 2000);
          if (typeof closeCallback === "function") {
            closeCallback("WebSocket关闭");
          }
        };
        close = ws.onclose;
        ws.onmessage = function (e) {
          // console.log("心跳开始");
          heartCheck.reset().start();
          if (typeof callback === "function") {
            callback(e.data);
          }
        };
      }
    }
    
    function setStop() {
      stop = true;
    }
    
    //websocket重连
    function reconnect(url) {
      if (stop) {
        return;
      }
      if (lockReconnect) {
        return;
      }
      lockReconnect = true;
      setTimeout(function () {
        console.log("重连中");
        initWebSocket();
        lockReconnect = false;
      }, 2000);
    }
    
    // 心跳检测
    //websocket心跳检测
    var heartCheck = {
      timeout: 1000 * 25,
      timeoutObj: null,
      // serverTimeoutObj: null,
      reset: function () {
        clearTimeout(this.timeoutObj);
        // clearTimeout(this.serverTimeoutObj);
        return this;
      },
      start: function () {
        var self = this;
        this.timeoutObj = setTimeout(function () {
          //这里发送一个心跳,后端收到后,返回一个心跳消息,
          //onmessage拿到返回的心跳就说明连接正常
          ws.send("HeartBeat");
          // console.log("心跳开始");
    
          // self.serverTimeoutObj = setTimeout(function() {
          //   //如果超过一定时间还没重置,说明后端主动断开了
          //   console.log("关闭服务");
          //   wsReconnect(); //重新连接
          //   // ws.close(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
          // }, self.timeout);
        }, this.timeout);
      }
    };
    
    // send
    function websock(sendData) {
      if (ws.readyState === ws.OPEN) {
        // 若是开启状态
        // ws.send(sendData);
      } else if (ws.readyState === ws.CONNECTING) {
        // 若是正在开启状态 则等待1s后重新调用
        setTimeout(function () {
          websock(sendData);
        }, 1000);
      } else if (ws.readyState === ws.CLOSED) {
        setTimeout(function () {
          initWebSocket();
          websock(sendData);
        }, 1000);
      } else {
        // 若是未开启状态 则等待1s重新调用
        setTimeout(function () {
          websock(sendData);
        }, 1000);
      }
    }
    
    // bing  onmessage
    function bingWebsockMsg(call) {
      callback = call;
    }
    
    function wsOnCloseMsg(call) {
      closeCallback = call;
    
    }
    
    function wsOnErrorMsg(call) {
      errorCallback = call
    }
    
    // close
    function closeWs() {
      close();
    }
    
    
    // #index.html
    function divShow(e) {
      // console.log(e)
      if (e == "消息发布") {
        wsFlag = true
        // 调用相关方法
        isHasMsg2()
      } 
    }
    
    展开全文
  • Django实现WebSocket消息推送和聊天室

    千次阅读 多人点赞 2019-04-11 22:08:21
    Django使用WebSocket实现消息推送和聊天室,主要技术使用dwebsocket
  • 众所周知,Web 应用的交互过程通常是客户端通过浏览器发出一请求,服务器端接收请求后进行处理并返回结果给客户端,客端浏览器将信息呈现。但是对于实时性要求较高、海量并发的应用,比如金融证券的实时信息,web...
  • 近年来随着 Web 前端的快速发展,浏览器新特性层出不穷,越来越的应用可以在浏览器端或通过浏览器渲染引擎实现,Web 应用的即时通信方式 WebSocket 得到了广泛的应用。 WebSocket 是一种在单个 TCP 连接上进行全...
  • WebSocket消息推送和聊天功能实现

    千次阅读 2019-07-13 23:46:37
    WebSocket消息推送SpringBoot集成WebSocket实现消息推送和聊天Demogradle引入依赖测试用的Controller两测试页面WebSocket的Endpoint配置类业务逻辑类 WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全...
  • 近年来随着 Web 前端的快速发展,浏览器新特性层出不穷,越来越的应用可以在浏览器端或通过浏览器渲染引擎实现,Web 应用的即时通信方式 WebSocket 得到了广泛的应用。 WebSocket 是一种在单个 TCP 连接上进行全...
  • 又到了月更时间了,本来想再把Websocket拖一拖看看能不能写其它内容的,但烂大街的内容不想写,觉得烂大街的内容学习知识梳理写到自己的静态博客上就行了,拖到现在实在想不到有什么好的其它内容可以写,只好接上月...
  • vue websocket实现消息推送和语音提醒功能

    千次阅读 热门讨论 2020-12-24 21:28:24
    vue + webSocket 实时任务信息通知 WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。 它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的...
  • 近年来随着 Web 前端的快速发展,浏览器新特性层出不穷,越来越的应用可以在浏览器端通过浏览器渲染引擎实现,Web 应用的即时通信方式 WebSocket 也因此得到了广泛的应用。 WebSocket 是一种在单个 TCP 连接上进行...
  • WebSocket实现房间聊天室

    千次阅读 2018-06-21 14:49:51
    WebSocket API中,浏览器和服务器只需要做一握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。浏览器通过 JavaScript 向服务器发出建立 WebSocket 连接的请求,连接...
  • WebSocket协议提供标准化方法,可通过单个TCP连接在客户端和服务器之间建立全双工双向通信通道,交互始于一HTTP请求,该请求使用标头 Upgrade 进行升级。 Upgrade: websocket Connection: Upgrade 4.1.1. ...
  • 长、短连接4. 单向请求短轮询 polling长轮询 long pollingWebsocketSSE Server-Sent Events 短轮询 长轮询 Websocket Server-Sent Events 通信协议 http http 握手:http --> websocket http 触发...
  • 文章目录一、错误信息二、错误可能性分析及处理2-1 使用 select() 支持并发连接数有限2-1-1 基于当前脚本的错误分析2-1-2 JMeter 下 WsbSocket 长连接数测试 - 排除系统原因2-1-3 Locust 的单机进程使用(分布式...
  • 转自: ... 目录 ...短连接 ...长连接 ...http的长连接 ...什么时候用长连接,短连接?...TCP/IP是协议组,可分为三层次:网络层、传输层和应用层。 在网络层有IP协议、ICMP协议、ARP协议、RARP协议和BOOTP协议。...
  • websocket 并发_1230万并发WebSocket

    千次阅读 2020-09-08 23:30:46
    websocket 并发One thing about WebSockets is that you need a lot of resources on the client's side to generate high enough load for the server to actually eat up all the CPU resources. 关于WebSockets的...
  • 您将为每用户分配存储桶。 我们根据您的漏洞的大小(您的服务器平均需要多少时间来处理单个websocket请求,例如保存已发送的消息)来确定存储桶的大小(单个用户可以在固定时间内发送的流量)由用户进入数据库)。...
  • WebSocket集群解决方案

    千次阅读 2022-04-28 10:32:17
    最近做项目时遇到了需要用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。 期间我经过了几天的研究,总结出了几实现分布式WebSocket集群的办法,从zuul到spring cloud ...
  • 前言 你还在羡慕别人成熟的推送系统么?...通过这标题点进来就默认你对websocket有了基本的了解,如果你不知道,这里有很好的答案websocket是什么。 如果你不想听我bb,完整项目传送门(给star支持一下…3...
  • 那就是:如果我们的项目是分布式环境,登录的用户被Nginx的反向代理分配多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢...
  • websocket连接状态码

    千次阅读 2019-12-12 19:17:23
    最近在做websocket,需要用到这些,查资料记录下(官网:...返回一 unsigned short 类型的数字, 表示服务端发送的关闭码, 以下为已分配的状态码. 状态码 ?名称 ?描述 0–999 ...
  • WebSocket 支持

    2021-03-10 01:37:19
    WebSocket 支持简介Tomcat 支持由 RFC 6455 所定义的 WebSocket。应用开发Tomcat 实现由 JSR-356 定义的 Java WebSocket 1.1 API。关于 WebSocket API 的使用方法,可查看相关范例,既需要查看客户端 HTML 代码,也...
  • WebSocket详解

    千次阅读 2020-11-08 22:40:28
    WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间...
  • 场景描 资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群) 应用发布限制条件:由于场景需要,...只要网关配置高,能handle多个应用 需求:用户登录应用,需
  • 接收第一帧数据时正常的,后面再次接受解析数据帧时,发现解析的消息是异常、缺失的,导致服务端不能正确接收消息。查了相关资料,发现tcp再传输数据时,发送消息并非一包一包发送,存在粘包、拆包的情况。粘包、...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 10,882
精华内容 4,352
关键字:

websocket多个连接分配消息