精华内容
下载资源
问答
  • Phoenix + WebSocket分布式部署实验 前言 对于无状态的web服务,要做分布式部署相对比较简单,很多时候只要架一个反向代理就行。但是对于有状态的web服务,尤其是包含WebSocket成分的web应用,要做分布式部署一直是...

    Phoenix + WebSocket分布式部署实验

    前言

    对于无状态的web服务,要做分布式部署相对比较简单,很多时候只要架一个反向代理就行。但是对于有状态的web服务,尤其是包含WebSocket成分的web应用,要做分布式部署一直是一个麻烦。传统做法是搞一个中间层,例如Redis之类的做pubsub,但是这样做就不得不改动源码。同时,系统复杂度也随之增加,运维的成本也相应提高。Erlang/Elixir这种“面向并发编程”的语言在这方面会不会高人一筹?Phoenix框架是否已经支持WebSocket的横向扩展了呢?下面我们就来做个实验。

    资源

    你可以去https://gitee.com/aetherus/gossipy下载本文涉及的源码。

    目标

    不添加其他服务(如Redis、RabbitMQ等),不改动项目源码,仅通过添加/修改配置文件来达到WebSocket服务的横向扩展。

    实验器材

    • Ubuntu 16.04或其衍生发行版(我用的是Elementary OS Loki)
    • Docker
    • Docker compose
    • Elixir开发/运行环境
    • 一个最基本的Phoenix聊天室,不含数据库,不含assets,不含brunch。

    安装发布工具

    Elixir社区目前比较推荐的发布工具是Distillery(蒸馏器),这次实验就用它。

    安装只需要在项目根目录的mix.exs里添加如下内容就行

    defp deps do
      [
        {:distillery, "~> 1.5", runtime: false}   #<--- 这一行
      ]
    end
    

    这里的runtime: false表示distillery不会在web应用中用到,只在发布的时候用一下。
    添加完后只需mix deps.get一下就行。

    发布配置

    首先让distillery生成一些最基本的发布配置:

    $ mix release.init
    

    你会看到项目根目录下多了个rel目录,里面只有一个空的plugins目录和一个config.exs文件。这个文件的配置用来发布到单台服务器已经足够了,但是要做集群还是不太够,因为我们要让各台服务器上的Phoenix应用能连起来相互通信。为此,我们需要给每个运行的实例一个名称(namesname)。

    为了达到这个目的,我们需要一个vm.args文件。这个文件记录了Erlang启动虚拟机时所需的命令行参数。但是这个文件长啥样?我们现release一个,让它自动生成一个vm.args文件再说。

    $ MIX_ENV=prod mix release --env=prod
    

    这里的MIX_ENV=prod是指“用Phoenix的prod环境的配置来运行发布任务”,这样做可以使项目的编译得到优化,比如去除debug信息等。而--env=prod指的是“按rel/config.exs文件里:prod环境的配置去构建发布版”。这个prod和Phoenix的prod的意义完全不同,所以两个都不能少。

    既然说到了rel/config.exs里定义的环境,就先看看它长什么样吧。

    Path.join(["rel", "plugins", "*.exs"])
    |> Path.wildcard()
    |> Enum.map(&Code.eval_file(&1))
    
    use Mix.Releases.Config,
        default_release: :default,
        default_environment: Mix.env()
    
    environment :dev do
      set dev_mode: true
      set include_erts: false
      set cookie: :"<&9.`Eg/{6}.dwYyDOj>R6R]2IAK;5*~%JN(bKuIVEkr^0>jH;_iBy27k)4J1z=m"
    end
    
    environment :prod do
      set include_erts: true
      set include_src: false
      set cookie: :">S>1F/:xp$A~o[7UFp[@MgYVHJlShbJ.=~lI426<9VA,&RKs<RyUH8&kCn;F}zTQ"
    end
    
    release :gossipy do
      set version: current_version(:gossipy)
      set applications: [
        :runtime_tools
      ]
    end
    

    这就是一个完整的rel/config.exs文件内容(去掉了注释)。我们可以看到里面有个environment :prod块,还有一个environment :dev块,这两个块定义了两种不同的构建策略。这里比较重要的是set include_erts: true|false这一项。erts是“Erlang RunTime System”的缩写,也就是整个Erlang运行环境。如果把这一项设置成true,则打出来的包里包含整个Erlang运行环境,于是你的目标服务器上就可以不用装Erlang和Elixir了。

    上述命令运行完后,会生成_build/prod/rel目录及其下面所有的文件。在这里面找到vm.args文件(具体位置忘了),把它复制到项目根目录下的rel目录里,稍事修改:

    # 删除下面这一行
    # -name gossipy@127.0.0.1
    # 加入下面这一行
    -sname gossipy
    

    namesname的区别不多说了。因为到时候我们要部署到docker上去,用IP或全限定域名不方便,所以就用主机名。

    改完vm.args之后,我们要让distillery认识这个改动过的vm.args。我们在rel/config.exs里加上一行:

    environment :prod do
      ...
      set vm_args: "rel/vm.args"
    end
    

    除了这些,Distillery还要求在项目的config/prod.exs里加一些东西:

    config :gossipy, GossipyWeb.Endpoint,
      ...
      check_origin: false,
      server: true,
      root: ".",
      version: Application.spec(:gossipy, :vsn)
    
    • check_origin: false只是做实验的时候图一时方便,正式上产品的时候千万不要加这一行。
    • server: true的意思是这是一个web server,所以要用Cowboy去启动,而不是直接从Application启动。
    • root: "."表示静态文件(CSS,JS之类)的根在哪儿。因为我们这次没有静态文件,所以不配也OK。
    • version是发布的版本号。它的值通过 Application.spec(:gossipy, :vsn)获取,也就是mix.exs里那个版本号。

    另外,我们需要在这个配置文件里列出所有的分布式节点:

    config :kernel,
      sync_nodes_optional: [:"gossipy@ws1", :"gossipy@ws2"],
      sync_nodes_timeout: 10000
    

    sync_nodes_optional是指“如果在sync_nodes_timeout指定的时间范围内没有连上指定的节点,则忽略那个节点”。与之相对的还有一个sync_nodes_mandatory选项。

    最后,为了不让服务器因超时而主动切断WebSocket,我们需要在lib/gossipy_web/channels/user_socket.ex里加上一个配置,让WebSocket永不超时:

    transport :websocket, Phoenix.Transports.WebSocket,
      timeout: :infinity  # <---这里,别忘了在上一行结尾加个逗号
    

    所有配置都准备好后,先清除掉上次构建的发布版,再重新构建一次:

    $ MIX_ENV=prod mix release.clean
    $ MIX_ENV=prod mix release --env=prod
    

    然后就可以准备部署了

    创建Docker镜像

    既然是部署到Docker,就要先创建一份Dockerfile,内容如下:

    FROM ubuntu:xenial
    
    EXPOSE 4000
    
    ENV PORT=4000
    
    RUN mkdir -p /www/gossipy && \
        apt-get update && \
        apt-get install -y libssl-dev
    
    ADD ./_build/prod/rel/gossipy/releases/0.0.1/gossipy.tar.gz /www/gossipy
    
    WORKDIR /www/gossipy
    
    ENTRYPOINT ./bin/gossipy foreground
    

    因为发布包内的Erlang运行环境要求服务器的OS和Distillery运行时的OS尽可能一样,所以这里就用Ubuntu 16.04的服务器版。端口设为4000(你喜欢其他端口号也OK)。由于WebSocket需要crypto.so,所以先装一下libssl-dev,否则应用起不来。把打包出来的tar包扔进镜像(docker会替你自动解压),当docker启动的时候把这个服务启动起来就是了。

    为了能简化命令行命令,再建一个docker-compose.yml

    version: '3.2'
    
    services:
      ws1:
        build: .
        hostname: ws1
        ports:
          - 4001:4000
    
      ws2:
        build: .
        hostname: ws2
        ports:
          - 4002:4000
    

    我定义了两个节点,分别将宿主的4001和4002端口NAT到了docker容器的4000端口。另外,这里显式声明了每个节点的主机名(hostname),方便和Phoenix应用对接。

    一切OK后,docker-compose up

    然后你就可以想办法搞两个WebSocket客户端(如果你不知道怎么搞的话,可以参考附录1),分别连接宿主服务器的4001和4002端口,加入同一个房间,然后你就能看见它们能对话了!

    额外实验1. 杀节点

    先杀掉ws2那个容器(端口4002)

    $ docker-compose kill ws2
    

    结果当然是连在ws2上的WebSocket连接全部断开,而ws1上的连接依然正常工作。ws2的连接中断很正常。在实际项目中,我们不会把一个web服务分在多个端口号上,而是公用一个源(协议 + 域名 + 端口),这样只要客户端实现了合理的重连机制,很快就能和别的活着的服务器建立连接。

    然后我们再把ws2重新启动起来

    $ docker-compose start ws2
    

    重新建立和ws2的连接后,两台服务器上的连接又能正常通信了。

    额外实验2. 添加节点

    这次试的是在不重启现有服务器集群的前提下,向集群中添加服务器。

    为此,我们先在docker-compose.yml中添加一个服务

      ws3:
        build: .                   
        hostname: ws3              
        ports:                     
          - 4003:4000
    

    然后修改一下config/prod.exs,把新的节点加进去

    config :kernel,
      sync_nodes_optional: [:"gossipy@ws1", :"gossipy@ws2", :"gossipy@ws3"],  #<---- 注意新加ws3
      sync_nodes_timeout: 10000
    

    重新发布一下,并启动ws3容器

    $ MIX_ENV=prod mix release.clean
    $ MIX_ENV=prod mix release --env=prod
    $ docker-compose up --build ws3
    

    用浏览器测试相当成功!新加的节点马上就连上老节点, 老节点也立刻就认识新节点了。

    结论

    正如所料,Phoenix可以在不改动一行代码的情况下做到WebSocket的集群化。这就是Erlang/Elixir的特色之一——Location Transparency(位置透明)给我们带来的好处。单机运行代码和分布式运行代码完全一样!只是要用好这个位置透明,在没人手把手教你的情况下,你会尝试错误好几次。

    附录1. 测试用HTML

    <!doctype html>
    <html> 
      <head>
        <meta charset="utf-8">
        <title>Phoenix Channel Demo</title>     
      </head>
    
      <body> 
        <pre id="messages"></pre>  
    
        <input id="shout-content"> 
      
        <script>
          window.onload = function () {   
            var wsPort = window.location.search.match(/\bport=(\d+)\b/)[1];
            var messageBox = document.getElementById('messages');
            var ws = new WebSocket('ws://localhost:' + wsPort + '/socket/websocket');
      
            ws.onopen = function () {       
              ws.send(JSON.stringify({        
                topic: 'room:1',
                event: 'phx_join',
                payload: {},
                ref: 0
              }));
            };
    
            ws.onmessage = function (event) {
              var data = JSON.parse(event.data);
              if (data.event !== 'shout') return;
              messageBox.innerHTML += data.payload.message + "\n"; 
            }
    
            document.getElementById('shout-content').onkeyup = function (event) {
              if (event.which !== 13) return; 
              if (!event.target.value) return;
              ws.send(JSON.stringify({        
                topic: "room:1",
                event: "shout",
                payload: {message: event.target.value},
                ref: 0
              }));
              event.target.value = '';
            };
          }
        </script>
      </body>
    </html>
    

    你可以用任何手段host它,使得浏览器能通过HTTP访问到它(用file://协议不行)。例如,你可以把它存入文件ws.html,然后用python -m SimpleHTTPServer来启动一个简易HTTP服务(默认端口号8000),然后用浏览器访问http://localhost:8000/ws.html?port=4001。这里的port参数指定连接到哪个WebSocket端口。

    转载于:https://my.oschina.net/u/3390582/blog/1631370

    展开全文
  • WebSocket分布式思考

    2020-09-12 13:58:29
    机缘巧合,看了一篇websocket分布式部署的文章,以及向公司的同事请教了下关于现在公司的自研im的架构,特整理一下。 一、ws服务 1、架构图 基本流程为:用ws协议连接本服务,得到一个clientId,由客户端上报...

        以前用workman搭建websocket集群服务时,也曾考虑过他的中心服务器是单机的,虽然业务可以交给business服务器,但是对于regesiter服务始终还是存在单点故障的危险。机缘巧合,看了一篇websocket的分布式部署的文章,以及向公司的同事请教了下关于现在公司的自研im的架构,特整理一下。

    一、ws服务

    1、架构图

    基本流程为:用ws协议连接本服务,得到一个clientId,由客户端上报这个clinetId给服务端,服务端拿到这个clientId之后,可以给这个客户端发送信息,绑定这个客户端都分组,给分组发送消息。

    2、时序图

    核心点:

        1、redis映射了每个客户端所对应的ws服务器的ip地址和端口。

       2、当服务器需要发送消息时,需要判断这个client_id是不是属于本服务器:用reids的映射去验证。当不属于本服务器的client时,则通过rpc告知相应的ws服务器,让它去处理这个消息。

       3、群发消息时,通过队列来处理。公司目前用的kafka,保证了有序性。

      

    二、消息模型

    IM离不开消息模型,在以前的工作中使用的是传统的推模式。

    传统模式大体如下:对于在线的用户,消息会直接实时同步到在线的接收方,如:手机端和pc端。而对于离线的用户或者消息有两种方式:一种是专门的离线库储存,未读和离线的消息;另一种是保存一个标记点,记录离线或者是未读的数量。无论哪一种,这种方式都是在同步后再储存消息的模式,存在着丢消息、消息不同步,也不支持消息漫游等功能。如果强行一致,那数据库的压力则会非常大。

    另外一种模型是Timeline模型。

    TimeLine模型

    image

    Timeline可以简单理解为是一个消息队列,但这个消息队列有如下特性:

    • 每个消息拥有一个顺序ID(SeqId),在队列后面的消息的SeqId一定比前面的消息的SeqId大,也就是保证SeqId一定是增长的,但是不要求严格递增。
    • 新的消息永远在尾部添加,保证新的消息的SeqId永远比已经存在队列中的消息都大。
    • 可根据SeqId随机定位到具体的某条消息进行读取,也可以任意读取某个给定范围内的所有消息。

    有了这些特性后,消息的同步可以拿Timeline来很简单的实现。图中的例子中,消息发送方是A,消息接收方是B,同时B存在多个接收端,分别是B1、B2和B3。A向B发送消息,消息需要同步到B的多个端,待同步的消息通过一个Timeline来进行交换。A向B发送的所有消息,都会保存在这个Timeline中,B的每个接收端都是独立的从这个Timeline中拉取消息。每个接收端同步完毕后,都会在本地记录下最新同步到的消息的SeqId,即最新的一个位点,作为下次消息同步的起始位点。服务端不会保存各个端的同步状态,各个端均可以在任意时间从任意点开始拉取消息。

    消息漫游也是基于Timeline,和消息同步唯一的区别是,消息漫游要求服务端能够对Timeline内的所有数据进行持久化。

    参考:https://github.com/woodylan/go-websocket

     

    展开全文
  • ufire-springcloud-platform 学习微服-基于一致性哈希算法实现websocket分布式扩展的尝试。
  • 用户A两个地方登录连接到两个websocketServer节点,用户B连接到2节点。消息生产者发布消息的时候应为map格式receive=userid,msg="msg....",将消息推送给对应的userid。 1.该项目为springboot项目,先引入w...

    websocket实现同一账户多点登录、websocket服务多节点部署推送方案。已在多个实际项目中运行。

                                                                             简单架构图

    假设用户A两个地方登录连接到两个websocketServer服务节点1和2,用户B连接到2节点。

    websocketServer将websocket session保存在各自的Map<String,Session>中,key为userid,value为websocket Session。节点1保存了用户A的websocket session,节点2保存了用户A、B的websocket session。

    消息生产者发布消息的时候为json格式,如:[{"receive"="userid_a","msg"="您有1个未读消息"},{"receive"="userid_b","msg"="您有3个未读消息"}],将消息发到redis的一个Channel,如showNewestMsg。

    websocketServer中订阅redis的channel=showNewestMsg,收到消息后根据消息中receive冲map中找到对应的websocket session,发消息给客户端。

    接下来看核心代码

    1.该项目为springboot项目,先引入jar包,由于是从实际项目中抽出来写的记录,可能还缺jar请自行导入。

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!--websocket-->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    
    <!-- redis -->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
    <!-- 工具类 -->
    <dependency>
    	<groupId>cn.hutool</groupId>
    	<artifactId>hutool-all</artifactId>
    	<version>5.3.6</version>
    </dependency>
    
    <dependency>
    	<groupId>net.sf.json-lib</groupId>
    	<artifactId>json-lib</artifactId>
    	<version>2.4</version>
    	<classifier>jdk15</classifier>
    </dependency>

    2.websocket配置

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
     * spring websocket组件初始化
     * @author csf
     * 
     */
    //war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务
    @Configuration
    public class WebSocketConfig
    {
        @Bean
        public ServerEndpointExporter serverEndpointExporter()
        {
            return new ServerEndpointExporter();
        }
    }

    注意:war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务。

    3.websocket服务端实现

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import com.kingengine.plug.service.MessageService;
    
    import cn.hutool.core.util.StrUtil;
    import net.sf.json.JSONArray;
    import net.sf.json.JSONObject;
    
    /**
     * WebSocket服务类
     * @author csf
     * @date 2020年8月10日
     */
    @ServerEndpoint("/websocket/{custId}")
    @Component
    public class WebSocketServer
    {
        @Resource
        private MessageService messageService;
        
        Logger log = LoggerFactory.getLogger(this.getClass());
        
        // 当前在线连接数
        private static int onlineCount = 0;
        
        // 存放每个用户对应的WebSocket连接对象,key为custId_HHmmss,确保一个登录用户只建立一个连接
        private static Map<String, Session> webSocketSessionMap = new ConcurrentHashMap<String, Session>();
        
        // 与某个客户端的连接会话,需要通过它来给客户端发送数据
        private Session session;
        
        // 接收用户id
        private String custId = "";
        
        private static WebSocketServer webSocketServer;
        
        // 通过@PostConstruct实现初始化bean之前进行的操作
        @PostConstruct
        public void init()
        {
            // 初使化时将已静态化的webSocketServer实例化
            webSocketServer = this;
            webSocketServer.messageService = this.messageService;
        }
        
        /**
         * 连接建立成功调用的方法
         * @param session 连接会话,由框架创建
         * @param custId 用户id, 为处理用户多点登录都能收到消息,需传该格式custId_HHmmss
         * @author csf
         * @date 2020年8月10日
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("custId") String custId)
        {
            if (!webSocketSessionMap.containsKey(custId))
            {
                this.session = session;
                webSocketSessionMap.put(custId, session);
                addOnlineCount(); // 在线数加1
                log.info("有新连接[{}]接入,当前websocket连接数为:{}", custId, getOnlineCount());
            }
            
            this.custId = custId;
            try
            {
                // 第一次建立连接,推送消息给客户端,只会执行一次。后续的新消息由com.kingengine.plug.redis.RedisReceiver接收到redis订阅消息推送
                // 获取未读消息数
                // 由于前端传进来的custId是有时间后缀的,查询时需要去掉后缀。
                String qryCustId = custId.split("_")[0];
                JSONObject unreadMsg = webSocketServer.messageService.getUnreadCount(qryCustId);
                
                // 获取最新消息
                /*  JSONObject newMsg = webSocketServer.messageService.getNewestMsg(qryCustId);
                // 发送消息
                JSONArray msgArr = new JSONArray();
                if (newMsg!=null)
                {
                    msgArr.add(newMsg);
                }*/
                JSONArray msgArr = new JSONArray();
                msgArr.add(unreadMsg);
                sendMessage(custId, msgArr.toString());
            }
            catch (Exception e)
            {
                log.error("客户端连接websocket服务异常");
                e.printStackTrace();
            }
        }
        
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose(@PathParam("custId") String sessionKey)
        {
            if (webSocketSessionMap.containsKey(sessionKey))
            {
                try
                {
                    webSocketSessionMap.get(sessionKey).close();
                    webSocketSessionMap.remove(sessionKey);
                }
                catch (IOException e)
                {
                    log.error("连接[{}]关闭失败。", sessionKey);
                    e.printStackTrace();
                }
                subOnlineCount();
                log.info("连接[{}]关闭,当前websocket连接数:{}", sessionKey, onlineCount);
            }
        }
        
        /**
         * 接收客户端发送的消息
         * @param message 客户端发送过来的消息
         * @param session websocket会话
         */
        @OnMessage
        public void onMessage(String message, Session session)
        {
            log.info("收到来自客户端" + custId + "的信息:" + message);
        }
        
        /**
         * 连接错误时触发
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error)
        {
            try
            {
                session.close();
            }
            catch (IOException e)
            {
                log.error("发生错误,连接[{}]关闭失败。");
                e.printStackTrace();
            }
            // log.error("websocket发生错误");
            // error.printStackTrace();
        }
        
        /**
         * 给指定的客户端推送消息,可单发和群发
         * @param sessionKeys 发送消息给目标客户端sessionKey,多个逗号“,”隔开1234,2345...
         * @param message
         * @throws IOException
         * @author csf
         * @date 2020年8月11日
         */
        public void sendMessage(String sessionKeys, String message)
        {
            if (StrUtil.isNotBlank(sessionKeys))
            {
                String[] sessionKeyArr = sessionKeys.split(",");
                for (String key : sessionKeyArr)
                {
                    try
                    {
                        // 可能存在一个账号多点登录
                        List<Session> sessionList = getLikeByMap(webSocketSessionMap, key);
                        for (Session session : sessionList)
                        {
                            session.getBasicRemote().sendText(message);
                        }
                    }
                    catch (IOException e)
                    {
                        e.printStackTrace();
                        continue;// 某个客户端发送异常,不影响其他客户端发送
                    }
                }
            }
            else
            {
                log.info("sessionKeys为空,没有目标客户端");
            }
        }
        
        /**
         * 给当前客户端推送消息,首次建立连接时调用
         */
        public void sendMessage(String message)
            throws IOException
        {
            this.session.getBasicRemote().sendText(message);
        }
        
        /**
         * 检查webSocket连接是否在线
         * @param sesstionKey webSocketMap中维护的key
         * @return 是否在线
         */
        public static boolean checkOnline(String sesstionKey)
        {
            if (webSocketSessionMap.containsKey(sesstionKey))
            {
                return true;
            }
            else
            {
                return false;
            }
        }
        
        /**
         * 获取包含key的所有map值
         * @param map
         * @param keyLike
         * @return
         * @author csf
         * @date 2020年8月13日
         */
        private List<Session> getLikeByMap(Map<String, Session> map, String keyLike)
        {
            List<Session> list = new ArrayList<>();
            for (String key : map.keySet())
            {
                if (key.contains(keyLike))
                {
                    list.add(map.get(key));
                }
            }
            return list;
        }
        
        public static synchronized int getOnlineCount()
        {
            return onlineCount;
        }
        
        public static synchronized void addOnlineCount()
        {
            WebSocketServer.onlineCount++;
        }
        
        public static synchronized void subOnlineCount()
        {
            WebSocketServer.onlineCount--;
        }
    }
    

    4.redis消息订阅配置

    
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    
    @Configuration
    @EnableCaching
    public class RedisCacheConfig
    {
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter)
        {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            // 可以添加多个 messageListener,配置不同的交换机
            container.addMessageListener(listenerAdapter, new PatternTopic("showNewestMsg"));// 订阅最新消息频道
            return container;
        }
        
        @Bean
        MessageListenerAdapter listenerAdapter(RedisReceiver receiver)
        {
            // 消息监听适配器
            return new MessageListenerAdapter(receiver, "onMessage");
        }
        
        @Bean
        StringRedisTemplate template(RedisConnectionFactory connectionFactory)
        {
            return new StringRedisTemplate(connectionFactory);
        }
    }

    5.redis配置,直接放在springboot项目application.properties或application.yml中

    # 数据库索引(默认为0)
    spring.redis.database=0  
    spring.redis.host=192.168.1.100
    spring.redis.port=6379
    spring.redis.password=123456
    # 连接池最大连接数(使用负值表示没有限制)
    spring.redis.pool.max-active=8  
    # 连接池最大阻塞等待时间(使用负值表示没有限制)
    spring.redis.pool.max-wait=-1  
    # 连接池中的最大空闲连接
    spring.redis.pool.max-idle=8  
    # 连接池中的最小空闲连接
    spring.redis.pool.min-idle=0  
    # 连接超时时间(毫秒)
    spring.redis.timeout=5000

    6.接收消息生产者发布的消息,推送给对应的客户端

    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.stereotype.Component;
    
    import com.kingengine.plug.websocket.WebSocketServer;
    
    import cn.hutool.core.codec.Base64;
    import cn.hutool.core.util.StrUtil;
    import net.sf.json.JSONArray;
    import net.sf.json.JSONObject;
    
    /**
     * 消息监听对象,接收订阅消息
     * @author csf
     * @date 2020年8月13日
     */
    @Component
    public class RedisReceiver implements MessageListener
    {
        Logger log = LoggerFactory.getLogger(this.getClass());
        
        @Autowired
        WebSocketServer webSocketServer;
        
        /**
         * 处理接收到的订阅消息
         */
        @Override
        public void onMessage(Message message, byte[] pattern)
        {
            String channel = new String(message.getChannel());// 订阅的频道名称
            String msg = "";
            try
            {
                msg = new String(message.getBody(), "GBK");//注意与发布消息编码一致,否则会乱码
                if (StrUtil.isNotBlank(msg)){
                    if ("showNewestMsg".endsWith(channel))// 最新消息
                    {
                        JSONObject json = JSONObject.fromObject(msg);
                        webSocketServer.sendMessage(json.get("receive"),json.get("msg"));
                    }else{
                        //TODO 其他订阅的消息处理
                    }
                   
                }else{
                    log.info("消息内容为空,不处理。");
                }
            }
            catch (Exception e)
            {
                log.error("处理消息异常:"+e.toString())
                e.printStackTrace();
            }
        }
    }
    

    7.消息发布测试

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import net.sf.json.JSONObject;
    
    @RequestMapping("redis")
    @RestController
    public class RedisTestController
    {
        @Autowired
        StringRedisTemplate template;
        
        /**
         * 发布消息测试
         *@param userid
         * @param msg
         * @return
         */
        @PostMapping("sendMessage")
        public String sendMessage(String userid,String msg)
        {
            try
            {
                String newMessge=new String(msg.getBytes("GBK"),"GBK");
                Map<String,String> map = new HashMap<String, String>();
                map.put("receive", userid);
                map.put("msg", newMessge);
                template.convertAndSend("showNewestMsg",         
              JSONObject.fromObject(map).toString());
            }
            catch (UnsupportedEncodingException e)
            {
                e.printStackTrace();
            }
            return "消息发布成功!";
        }
    }

    8.客户端代码

    <!DOCTYPE html>
    
    <html>
    
    <head>
        <title>WebSocket测试</title>
        <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    </head>
    
    <body>
    <div>
        来自服务端消息:
        <p id="message"></p>
    </div>
    </body>
    
    
    <script src="http://apps.bdimg.com/libs/jquery/1.6.4/jquery.min.js"></script>
    
    <script>
        let webSocketClient;
        if (window.WebSocket)
        {
           let custid="132456_" + Math.random();//该参数会作为websocketServer中存储session的key,要保证唯一。
            webSocketClient = new WebSocket("ws://127.0.0.1:8082/bootapp/websocket/" + custid);
    
            //连通之后的回调事件
            webSocketClient.onopen = function () {
                webSocketClient.send("这里是地球,收到请回答。。。");
                //	webSocket.send('{"type":"1","data":"121"}');
            };
    
            //接收后台服务端的消息
            webSocketClient.onmessage = function (evt) {
                console.log("数据已接收:" + evt.data);
                showMessage("未读消息:" + evt.data);
            };
    
            //连接关闭的回调事件
            webSocketClient.onclose = function () {
                alert("连接已关闭...");
            };
    
        }else{
            alert("浏览器不支持websocket");
        }
    
        function showMessage(message) {
            $("#message").html(message);
        }
    </script>
    </html>

    核心代码至此。

    展开全文
  • ufire-springcloud-platform 基于一致性hash算法实现websocket分布式集群的尝试,提供模拟宕机演示解决单点故障demo,实现websocket服务的扩容。基于jenkins +github hook+docker-compose 实现自动化持续部署...
       ufire-springcloud-platform 基于一致性hash算法实现websocket分布式集群的尝试,提供模拟宕机演示解决单点故障demo,实现websocket服务的扩容。基于jenkins +github hook+docker-compose 实现自动化持续部署(https://github.com/ufiredong)。
    

    demo

    logs

    jenkins user:test password: test

    转载分布式WebSocket集群解决方案

    技术栈

    nacos 服务发现与注册
    redis pub sub 
    redis sentinel
    springboot
    thmyleaf
    springcloud getway
    consistent hash 一致性哈希算法
    websocket 协议
    rabbitmq
    docker-java api
    docker-compose
    jenkins
    nginx
    

    在这里插入图片描述

    Demo使用方法

    1、指定user发送信息,在输入框中输入 @user1:hello 点击send 即可发送。
    2、宕机模拟 stop当前服务容器即可。
    3、扩容模拟 新增一个websocket容器 由于容器启动需要时间 可能需要几十秒左右,需要重置的用户将在收到通知之后5s内重新连接。
    4、由于ECS服务器内存问题,最多只能支持5台websocket容器同时运行。
    5、代码目前有些地方还尚未进行优化,有时候会出现bug,如果时间长未看到效果,建议stop所有服务,重新从1台websocket容器开始模拟。
    

    在这里插入图片描述
    在这里插入图片描述

    为什么websocket的session不能被序列化,不能被共享

        如果我们要搭建一个http服务的集群,我们可以对httpSession进行序列化存到redis中,然后同步到其他服务节点。因为
     http服务是无状态的,即使http 1.1以后有了长连接的概念keep alive时间是短暂的,说明http连接不是持久化的,而我们的
     websocket是tcp持久化连接,这是一个长连接,websocket在成功建立连接后服务端存在2个socket对象,客户端存在1个,
     服务端2个一个负责监听(监听某个端口,将http协议升级为websocket协议),一个socket负责与客户端的socket建立1对1
     的连接关系 http服务在进行完一次request请求之后除了服务端负责监听的socket还存在,负责通信的2个socket随着request
     请求的结束而被释放。http服务的session并不是维护socket连接的,它只是标识与服务的对应关系。而websocket的session
     是用于维护长连接中socket对应关系的,是持久化的,是真实存在的。因为socket通信必须是1对1的。所以这种情况下session不
     能被共享,结合TCP/IP协议会更好理解一点。
    

    统一入口 nginx

        nginx 这里作用于服务代理,由于我的 ufire-springcloud-platform 项目整个都在docker容器网络内,目前只有 getway
     暴露9888端口,这里由nginx代理getway这样的话,docker微服网络内的服务只能由getway去实现负载路由,不易被入侵,将
     来我们直接在getway实现鉴权就可以了。
    

    网关 getway

        主要负载http message信息发送和ws建立连接。
    我们新建 ConsistencyChooseRule重写choose 方法传入 HashRingConfig 对象,通过 HashRingConfig的getServer(String userId)
    方法获取userId作为key该路由到节点,HashRingConfig对象里维护着一个HashRingEntity对象。
    具体属性如下:
           public class HashRingEntity {
               // 服务节点hash集合
               private SortedMap<Integer, String> serverMap;
               // 当前在线用户hash集合
               private SortedMap<Integer, String> userMap;
               // 上次服务节点hash集合
               private SortedMap<Integer, String> lastTimeServerMap;
           }
           
        我们需要在getway网关维护一个hash环,当服务节点新增(扩容)或者删除(宕机)及时更新hash环,这里我们通过redis的
     消息订阅去实现,nacos注册中心检测到up或者down事件之后会推送消息到getway,此时getway本地增加虚拟节点缓存更新hash环。
    

    websocket服务扩容

        既然实现hash一致性,我们在新增websocket服务容器的时候之后,肯定会影响以后连接的路由映射,
        假设websocket CacheB上线了,该服务器的ip地址刚好被映射到key1和cacheA之间。那么key1对应的用户每次要发消息时都
    跑去CacheB发送消息,结果明显是发送不了消息,因为CacheB没有key1对应的session。
    此时我们有两种解决方案。
    方案A简单,动作大:
        nacos监听到节点UP事件之后,根据现有集群信息,更新哈希环。并且断开所有session连接,让客户端重新连接,此时客户
    端会连接到更新后的哈希环节点,以此避免消息无法送达的情况。
    方案B复杂,动作小:
        我们先看看没有虚拟节点的情况,假设CacheC和CacheA之间上线了服务器CacheB。所有映射在CacheC到CacheB的用户发消息
    时都会去CacheB里面找session发消息。也就是说CacheB一但上线,便会影响到CacheC到CacheB之间的用户发送消息。所以我们
    只需要将CacheA断开CacheC到CacheB的用户所对应的session,让客户端重连。
        我们采取的是方案B,这里我利用了rabbitmq topic routingKey 实现的,具体思路如下:
    我们在新增重启一个websocket服务的时候,自动注册到注册中心之余,自动声明一个queue,reset-queue+ip地址。ip地址是随
    机的,每个queue和routingKey形成对应关系,这样当我们新上线一个websocket服务的时候,就会更新hashRing 网关是消息生
    产者他会通过计算得出哪些user需要重置,会得到一个重置list.这时候我们将需要重新连接的user消息推送到它当前所在的queue
    这样我们作为消费端的websocket服务就会监听到要消费的消息,知道哪些user需要重新连接,然后通知webosocket客户端执行
    ws.close()关闭掉这条连接,因为有重试机制的存在,web客户端会重新连接到它应当路由到的节点。
    

    遇到的问题

        一开始,我想利用nacos的上下线功能实现模拟宕机,但是nacos的下线功能,仅仅是把当前服务从websocket微服list中
      remove掉了,nacos的健康机制显示它还是健康,如果下线的服务中sessionPool不为空存在长链接,并不会断掉,服务的下线只
      会影响以后链接的路由动向,为了模拟宕机只能kill掉那个服务的进程,docker提供的docker-java api可以实现容器编排管理
      功能.我们直接stop掉当前服务就相当于服务宕机了,客户端的重连机制会重新连接到可用的服务,解决单点故障的问题,实现
      服务的高可用。
    
    展开全文
  • Swoole:WebSocket 分布式部署

    千次阅读 2019-08-08 12:06:17
    //分布式解决方案 //在该服务中 不存在 则把消息 推送到 队列中,其他服务订阅消息,收到消息,执行请求(检查是否在自己的服务器上,在则执行相应的推送) } echo "Message: {$frame->data}\n"; //心跳 回复 ...
  • websocket分布式共享session解决方法

    千次阅读 2018-10-29 14:47:59
    项目开发是基于cloud分布式开发模式,需要实时向客户端推送消费的kafka内容,由于websocket的session不可序列化,故不能够存储到redis等缓存当中。为解决其分布式多服务器共享session,想到以下解决方案。 1,...
  • 简述:websocket的基础概念就不再讲了 直接上干货 一、加入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</...
  • 部署时可以把客户端对应的唯一队列写在启动脚本那里(下面是脚本伪代码): 比如:localQueue=websocket.redirect.a,当部署另外一台服务时:localQueue=websocket.redirect.b,以此类推:localQueue=websocket.redirect...
  • 但是分布式存在websocket session共享问题,于是考虑radis存储session,但是遇到websocket session不支持序列化,无法存储。 一番搜索后有了以下几个方案 1. 使用spring session自定义session. 2. 既然
  • 前言 你还在羡慕别人成熟的推送系统么?...通过这个标题点进来就默认你对websocket有了基本的了解,如果你不知道,这里有个很好的答案websocket是什么。 如果你不想听我bb,完整项目传送门(给个star支持一下…3...
  • 背景我们部署单点websocket服务时很简单就可以实现,但是现实中的业务往往对性能要求非常高,因此单点服务并不能满足我们的需求,所以我们需要横向的扩展服务,来满足性能要求。举个简单的例子,我们建立一个简单的...
  • WebSocket分布式中简单的使用思路 如今WebSocket是公认解决轮询的良方,但是在实际运用中存在一些不可避免的问题。就拿当今开发最火的SpringCloud分布式微服务来说,WebSocket就存在一个不可避免的问题:下面是一...
  • 基于 springboot websocket 的群聊实现 仓库地址 https://github.com/yemingfeng/jchat-server 功能列表 分布式 同一帐号多设备登录 群聊 多设备 简单鉴权 心跳检查 依赖 maven jdk11 redis redis 配置 redis ...
  • gin websocket 简单分布式实现

    千次阅读 2018-03-18 15:55:09
    配合客户端重连机制,很好实现分布式长连接IM和PUSH扩展。 // 业务操作调用写rds func SendMsg2Rds(uid int64 , msg *WsMsg) { // list 或者 pubsub 结构 } // 业务操作调用读rds func ...
  • [WebSocket入门]手把手搭建WebSocket多人在线聊天室(SpringBoot+WebSocket) 本文内容摘要: 为何要改造为分布式集群 如何改造为分布式集群 用户在聊天室集群如何发消息 用户在聊天室集群如何接收消息 补充知识...
  • 本文主要是针对分布式场景下的使用websocket的一个解决方案。我们以下面的图来说明下业务使用场景。  针对如图的情况,很多人第一时间想到的是websocket的session共享,这是大多数的第一反应。很遗憾的是,...
  • gin websocket 简单分布式实现2018-09-182018年03月18日 15:55:09 阅读数:829main.goimport ( "github.com/gin-gonic/gin")func main() { ... // 连接ws会先发Get,正常返回101 r.GET("/ws", func(c *gin.Context) ...
  • 客户端与服务端之间采用http/https+websocket的方式进行交互。 网络框架使用Gin框架,日志使用logrus。 ETCD:主要负责集群管理,包括集群节点注册、服务发现。 Redis:主要负责集群数据缓存。 Nginx:主要负责集群节点...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 566
精华内容 226
关键字:

websocket分布式