精华内容
下载资源
问答
  • WebSocket集群

    2019-08-26 13:01:45
    [WebSocket入门]手把手搭建WebSocket多人在线聊天室(SpringBoot+WebSocket) 本文内容摘要: 为何要改造为分布式集群 如何改造为分布式集群 用户在聊天室集群如何发消息 用户在聊天室集群如何接收消息 ...

    前言

    书接上文,我们开始对我们的小小聊天室进行集群化改造。

    上文地址:

    [WebSocket入门]手把手搭建WebSocket多人在线聊天室(SpringBoot+WebSocket)

    本文内容摘要:

    • 为何要改造为分布式集群
    • 如何改造为分布式集群
      • 用户在聊天室集群如何发消息
      • 用户在聊天室集群如何接收消息
    • 补充知识点:STOMP 简介
    • 功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布
    • 功能二:集群集群用户上下线通知——Redis订阅发布
    • 功能三:集群用户信息维护——Redis集合
    • WebSocket集群还有哪些可能性

    本文源码:(妈妈再也不用担心我无法复现文章代码啦)

    github.com/qqxx6661/sp…

    如果您觉得这个教程对您有用,请关注我的技术公众号:Rude3Knife,不定时更新技术点滴。

    正文

    WebSocket集群/分布式改造:实现多人在线聊天室

    为何要改造为分布式集群

    分布式就是为了解决单点故障问题,想象一下,如果一个服务器承载了1000个大佬同时聊天,服务器突然挂了,1000个大佬瞬间全部掉线,大概明天你就被大佬们吊起来打了。

    当聊天室改为集群后,就算服务器A挂了,服务器B上聊天的大佬们还可以愉快的聊天,并且在前端还能通过代码,让连接A的大佬们快速重连至存活的服务器B,继续和大家愉快的聊天,岂不美哉!

    总结一下:实现了分布式WebSocket后,我们可以将流量负载均衡到不同的服务器上并提供一种通信机制让各个服务器能进行消息同步(不然用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都没法收到)。

    如何改造为分布式集群

    当我们要实现分布式的时候,我们则需要在各个机器上共享这些信息,所以我们需要一个Publish/Subscribe的中间件。我们现在使用Redis作为我们的解决方案。

    1. 用户在聊天室集群如何发消息

    假设我们的聊天室集群有服务器A和B,用户Alice连接在A上,Bob连接在B上、

    Alice向聊天室的服务器A发送消息,A服务器必须要将收到的消息转发到Redis,才能保证聊天室集群的所有服务器(也就是A和B)能够拿到消息。否则,只有Alice在的服务器A能够读到消息,用户Bob在的服务器B并不能收到消息,A和B也就无法聊天了。

    2. 用户在聊天室集群如何接收消息

    说完了发送消息,那么如何保证Alice发的消息,其他所有人都能收到呢,前面我们知道了Alice发送的消息已经被传到了Redis的频道,那么所有服务器都必须订阅这个Redis频道,然后把这个频道的消息转发到自己的用户那里,这样自己服务器所管辖的用户就能收到消息。

    补充知识点:STOMP 简介

    上期我们搭建了个websocket聊天室demo,并且使用了STOMP协议,但是我并没有介绍到底什么是STOMP协议,同学们会有疑惑,这里对于STOMP有很好地总结:

    当直接使用WebSocket时(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。

    就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。

    与HTTP请求和响应类似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,如下就是发送数据的一个STOMP帧:

    >>> SEND
    transaction:tx-0
    destination:/app/marco
    content-length:20
    
    {"message":"Marco!"}
    复制代码

    好了,介绍完了概念,让我们开始动手改造!

    功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布

    如果你不熟悉Redis的sub/pub(订阅/发布)功能,请看这里进行简单了解它的用法,很简单:

    redisbook.readthedocs.io/en/latest/f…

    在我们上篇文章的Demo基础上,我们进行集群改造。上一篇文章的源码见下方:

    github.com/qqxx6661/sp…

    1. 添加Redis依赖pom

    <!-- redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    复制代码

    2. application.properties新增redis配置

    当然首先要确保你安装了Redis,windows下安装redis比较麻烦,你可以搜索redis-for-windows下载安装。

    # redis 连接配置
    spring.redis.database=0
    spring.redis.host=127.0.0.1
    spring.redis.password=
    spring.redis.port=6379
    spring.redis.ssl=false
    # 空闲连接最大数
    spring.redis.jedis.pool.max-idle=10
    # 获取连接最大等待时间(s)
    spring.redis.jedis.pool.max-wait=60000
    复制代码

    3. 在application.properties添加频道名定义

    # Redis定义
    redis.channel.msgToAll = websocket.msgToAll
    复制代码

    4. 新建redis/RedisListenerBean

    package cn.monitor4all.springbootwebsocketdemo.redis;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    
    import java.net.Inet4Address;
    import java.net.InetAddress;
    
    /**
     * Redis订阅频道属性类
     * @author yangzhendong01
     */
    @Component
    public class RedisListenerBean {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);
    
        @Value("${server.port}")
        private String serverPort;
    
        @Value("${redis.channel.msgToAll}")
        private String msgToAll;
    
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            // 监听msgToAll
            container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
            LOGGER.info("Subscribed Redis channel: " + msgToAll);
            return container;
        }
    }
    复制代码

    可以看到,我们在代码里监听了redis频道msgToAll,这个是在application.properties定义的,当然如果你懒得定义,这里可以写死。

    5. 聊天室集群:发消息改造

    我们单机聊天室的发送消息Controller是这样的:

    @MessageMapping("/chat.sendMessage")
    @SendTo("/topic/public")
        public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
            return chatMessage;
    复制代码

    我们前端发给我们消息后,直接给/topic/public转发这个消息,让其他用户收到。

    在集群中,我们需要把消息转发给Redis,并且不转发给前端,而是让服务端监听Redis消息,在进行消息发送。

    将Controller改为:

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
        
    @MessageMapping("/chat.sendMessage")
        public void sendMessage(@Payload ChatMessage chatMessage) {
            try {
                redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    复制代码

    你会发现我们在代码中使用了JsonUtil将实体类ChatMessage转为了Json发送给了Redis,这个Json工具类需要使用到FaskJson依赖:

    1. pom添加FastJson依赖
    <!-- json -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.58</version>
    </dependency>
    复制代码
    1. 添加Json解析工具类JsonUtil,提供对象转Json,Json转对象的能力
    package cn.monitor4all.springbootwebsocketdemo.util;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * JSON 转换
     */
    public final class JsonUtil {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);
    
        /**
         * 把Java对象转换成json字符串
         *
         * @param object 待转化为JSON字符串的Java对象
         * @return json 串 or null
         */
        public static String parseObjToJson(Object object) {
            String string = null;
            try {
                string = JSONObject.toJSONString(object);
            } catch (Exception e) {
                LOGGER.error(e.getMessage());
            }
            return string;
        }
    
        /**
         * 将Json字符串信息转换成对应的Java对象
         *
         * @param json json字符串对象
         * @param c    对应的类型
         */
        public static <T> T parseJsonToObj(String json, Class<T> c) {
            try {
                JSONObject jsonObject = JSON.parseObject(json);
                return JSON.toJavaObject(jsonObject, c);
            } catch (Exception e) {
                LOGGER.error(e.getMessage());
            }
            return null;
        }
    }
    复制代码

    这样,我们接收到用户发送消息的请求时,就将消息转发给了redis的频道websocket.msgToAll

    6. 聊天室集群:接收消息改造

    单机的聊天室,我们接收消息是通过Controller直接把消息转发到所有人的频道上,这样就能在所有人的聊天框显示。

    在集群中,我们需要服务器把消息从Redis中拿出来,并且推送到自己管的用户那边,我们在Service层实现消息的推送。

    • 在处理消息之后发送消息: 正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。 如果 @MessageMapping注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可以使用@SendTo 重写消息目的地; 如果 @SubscribeMapping注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。
    • 在应用的任意地方发送消息: spring-websocket 定义了一个 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以实现自由的向任意目的地发送消息,并且订阅此目的地的所有用户都能收到消息。

    我们在service实现发送,需要使用上述第二种方法。

    新建类service/ChatService:

    package cn.monitor4all.springbootwebsocketdemo.service;
    
    import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.messaging.simp.SimpMessageSendingOperations;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ChatService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);
    
        @Autowired
        private SimpMessageSendingOperations simpMessageSendingOperations;
    
        public void sendMsg(@Payload ChatMessage chatMessage) {
            LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
            simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
        }
    
    }
    复制代码

    我们在哪里调用这个service呢,我们需要在监听到消息后调用,所以我们就要有下面的Redis监听消息处理专用类

    新建类redis/RedisListenerHandle:

    package cn.monitor4all.springbootwebsocketdemo.redis;
    
    import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
    import cn.monitor4all.springbootwebsocketdemo.service.ChatService;
    import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    
    /**
     * Redis订阅频道处理类
     * @author yangzhendong01
     */
    @Component
    public class RedisListenerHandle extends MessageListenerAdapter {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);
    
        @Value("${redis.channel.msgToAll}")
        private String msgToAll;
    
        @Value("${server.port}")
        private String serverPort;
    
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
    
        @Autowired
        private ChatService chatService;
    
        /**
         * 收到监听消息
         * @param message
         * @param bytes
         */
        @Override
        public void onMessage(Message message, byte[] bytes) {
            byte[] body = message.getBody();
            byte[] channel = message.getChannel();
            String rawMsg;
            String topic;
            try {
                rawMsg = redisTemplate.getStringSerializer().deserialize(body);
                topic = redisTemplate.getStringSerializer().deserialize(channel);
                LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
                return;
            }
    
    
            if (msgToAll.equals(topic)) {
                LOGGER.info("Send message to all users:" + rawMsg);
                ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
                // 发送消息给所有在线Cid
                chatService.sendMsg(chatMessage);
            } else {
                LOGGER.warn("No further operation with this topic!");
            }
        }
    }
    复制代码

    7. 看看效果

    这样,我们的改造就基本完成了!我们看一下效果

    我们将服务器运行在8080上,然后打开localhost:8080,起名Alice进入聊天室

    随后,我们在application.properties中将端口server.port=8081

    再次运行程序(别忘了开启IDEA的“允许启动多个并行服务”设置,不然会覆盖掉你的8080服务,如下图),在8081启动一个聊天室,起名Bob进入聊天室。

     

     

     

    如下两图,我们已经可以在不同端口的两个聊天室,互相聊天了!(注意看url)

     

     

     

     

     

     

    在互相发送消息是,我们还可以使用命令行监听下Redis的频道websocket.msgToAll,可以看到双方传送的消息。如下图:

     

     

     

    我们还可以打开Chrome的F12控制台,查看前端的控制台发送消息的log,如下图:

     

     

     

    大功告成了吗?

    功能实现了,但是并不完美!你会发现,Bob的加入并没有提醒Bob进入了聊天室(在单机版是有的),这是因为我们在“加入聊天室”的代码还没有修改,在加入时,只有Bob的服务器B里的其他用户知道Bob加入了聊天室。我们还能再进一步!

    功能二/功能三:集群用户上下线通知,集群用户信息存储

    我们需要弥补上面的不足,将用户上线下线的广播发送到所有服务器上。

    此外,我还希望以后能够查询集群中所有的在线用户,我们在redis中添加一个set,来保存用户名,这样就可以随时得到在线用户的数量和名称。

    1. 在application.properties添加频道名定义

    # Redis定义
    redis.channel.userStatus = websocket.userStatus
    redis.set.onlineUsers = websocket.onlineUsers
    复制代码

    我们增加两个定义

    • 第一个是新增redis频道websocket.userStatus用来广播用户上下线消息

    • 第二个是redis的set,用来保存在线用户信息

    2. 在RedisListenerBean添加新频道监听

    container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));
    复制代码

    3. 在ChatService中添加

    public void alertUserStatus(@Payload ChatMessage chatMessage) {
            LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString());
            simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
        }
    复制代码

    在service中我们向本服务器的用户广播消息,用户上线或者下线的消息都通过这里传达。

    4. 修改ChatController中的addUser方法

    @MessageMapping("/chat.addUser")
        public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
    
            LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
            try {
                headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
                redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
                redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    复制代码

    我们修改了addUser方法,在这里往redis中广播用户上线的消息,并把用户名username写入redis的set中(websocket.onlineUsers)

    5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法

    @EventListener
        public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
    
            StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
    
            String username = (String) headerAccessor.getSessionAttributes().get("username");
    
            if(username != null) {
                LOGGER.info("User Disconnected : " + username);
                ChatMessage chatMessage = new ChatMessage();
                chatMessage.setType(ChatMessage.MessageType.LEAVE);
                chatMessage.setSender(username);
                try {
                    redisTemplate.opsForSet().remove(onlineUsers, username);
                    redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                }
    
            }
        }
    复制代码

    在用户关闭网页时,websocket会调用该方法,我们在这里需要把用户从redis的在线用户set里删除,并且向集群发送广播,说明该用户退出聊天室。

    6. 修改Redis监听类RedisListenerHandle

     else if (userStatus.equals(topic)) {
                ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
                if (chatMessage != null) {
                    chatService.alertUserStatus(chatMessage);
                }
    复制代码

    在监听类中我们接受了来自userStatus频道的消息,并调用service

    7. 看看效果

     

     

     

     

     

     

    此外,我们还可以在Reids中查询到用户信息:

     

     

     

    WebSocket集群还有哪些可能性

    有了这两篇文章的基础, 我们当然还能实现以下的功能:

    • 某用户A单独私信给某用户B,或者私信给某用户群(用户B和C)
    • 系统提供外部调用接口,给指定用户/用户群发送消息,实现消息推送
    • 系统提供外部接口,实时获取用户数据(人数/用户信息)

    感兴趣的同学可以自己试试看。

    参考文献

    深入浅出Websocket(二)分布式Websocket集群

    juejin.im/post/5ab846…

    Spring消息之STOMP:

    www.cnblogs.com/jmcui/p/899…

    总结

    我们在本文中把单机版的聊天室改为了分布式聊天室,大大提高了聊天室可用性。


    作者:qqxx6661
    链接:https://juejin.im/post/5d624b56f265da03c23edf94
    来源:掘金
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    展开全文
  • websocket集群

    2020-09-11 15:10:45
    https://www.cnblogs.com/zwcry/p/9723447.html
    展开全文
  • 本文对websocket集群的方案进行讨论: 实现websocket集群。 通过webscoket实现前端实时接收服务推送的信息的功能; 将指定的消息推送到指定的用户 webscoket集群方案 集群方案分析 在上个博文Spring Boot...

    概述

    本文对websocket集群的方案进行讨论:

    1. 在websocket集群中,后端准确将指定的消息推送到指定的用户,前端实时接收服务推送的消息
    2. 对websocket集群的方案进行讨论,并确定最佳方案

    webscoket集群方案

    集群方案分析

    这里写图片描述

    在上个博文Spring Boot系列20 Spring Websocket实现向指定的用户发送消息中实现向指定用户发送消息的功能,但是我们将提供websocket服务的服务进行集群(如上图)则存在如下问题:

    上图中,用户A通过websocket注册到服务A,服务A通过STOMP协议订阅RabbitMQ上的消息,同理用户B。如果用户A连接到服务A上,那么在位于服务B上的MQ模块即使使用SimpMessagingTemplate实例向用户A发送消息,此消息也无法到达用户A,原因是因为服务B上没有服务A的注册信息,无法准确的推送消息.只有在服务A上的MQ模块使用SimpMessagingTemplate实例向这个用户发送消息,消息才会到达用户A

    针对这个问题下文我们通过3个方案解决这个问题,并详细分析每个方案的有缺点。

    webSocket集群方案一

    概述
    不管消息的接收者连接在哪个服务上,每个服务A/B都接收消息,对相同的消息都使用SimpMessagingTemplate实例进行推送,保证总有一个消息会被用户收到。
    这里写图片描述

    详细流程如下
    1. 用户A/B分别通过ws连接服务A/B, 然后服务A/B通过stomp协议接入RabbitMQ
    2. 消息发送者将消息发送到RabbitMQ的交换机上,使用扇形交换机。这样保证同一个消息可以同时被服务A/B接收
    3. 两个服务上的MQ模块接收对应消息后,不管对应的用户是否是通过自己连接到RabbitMQ,直接使用SimpMessagingTemplate实例向消息中指定的用户推送消息
    4. 用户A/B接收到对应的消息

    优点
    1. 实现比较简单

    不足
    1. 消息生产者发送消息的RabbitMQ交换机必须是广播功能,如扇形交换机
    2. 为了保证消息顺利到达用户,相同的消息必须在两个服务A/B上执行相同的操作。这样如果服务越多,则重复的发送消息越多
    3. 如果用户不在线,无论发送多少消息用户都不能收到

    webSocket集群方案二

    概述
    使用redis缓存用户的websocket连接信息,记录用户登录到哪个服务上,当有消息过来时,将消息推送到用户登录的服务,然后服务都使用SimpMessagingTemplate实例进行推送

    这里写图片描述

    在方案一的基础上增加如下功能:
    1. 服务A/B上增加MQ模块,服务A/B上MQ模块会连接到RabbitMQ,分别订阅队列A/B
    2. 服务A/B增加WS模块,当websocket连接过来时,将此用户的连接信息存储到redis上,系统记住每个用户登录的到哪个服务
    3. 消息生产者将消息推送到交换机,不直接推送到服务A/B
    4. 增加新的模块dispatch,此模块接收到消息,然后从redis中读取要消息要推送到用户连接到那个服务器上,然后将消息发送到用户连接服务对应的队列中。如果消息要发送给用户B,则dispatch模块会将消息发送到队列B
    5. 服务A/B的MQ模块接收到消息后,使用SimpMessagingTemplate实例向指定用户推送消息

    优点
    1. 此方案克服上一个方案不足的地方

    缺点
    1. 实现复杂
    2. 发送MQ消息的次数增加1倍

    webSocket集群方案三

    概述
    不使用SimpMessagingTemplate,使用RabbitMQ的客户端API直接向用户在RabbitMQ上订阅的队列发送消息

    发现用户通过浏览器登录websocket并注册RabbitMQ时,此时这个连接会在RabbitMQ建立一个队列,队列的名称类似stomp-subscription-*,此队列绑定到默认交换机amq.topic,路由键为”web订阅队列名称+’-user’+websocket sessionId”(这里是demo-userpjplggbl,demo是stomp weboscket连接的队列名称,pjplggbl登录websocket登录时的websocket sessionId值),图片如下:
    这里写图片描述

    根据这个,设计如下架构:
    这里写图片描述

    在方案一的基础进行如下修改,新的架构图流程如下:
    1. 服务A增加WS模块,当websocket连接过来时,将此用户的连接信息(主要是websocket sesionId值)存储redis中
    2. 消息生产者发送消息到的交换机,这些服务不直接推送服务A/B
    3. 增加新的模块dispatch,此模块接收推送过来的信息,并从redis中读取消息接收用户对应的websocket sesionId值,然后根据上面的规则计算出用户对应的路由键,然后将消息发送到用户订阅的队列上
    4. 前端接收消息

    优点:
    1. 即克服第一个方案不足的地方,又比第二个方案简单

    结论

    方案三是最好的方案,下一篇文章,我们会介绍如何在代码中实现方案三

    展开全文
  • WebSocket系列:爱奇艺号 WebSocket集群推送网关

    前言

    经过前面的系列实战文章,相信大家现在都知道如何实现一个简单的websocket功能。
    但是在真实场景中,项目的用户数、访问量级达到一定的级别后,单个节点的websocket始终会出现性能瓶颈。那么websocket是否也能向我们的微服务的节点一样,实现横向扩张,通过增加节点来分摊请求负载呢?
    本节通过介绍爱奇艺号 WebSocket推送网关的设计和实践,相信大家会有所收获。


    一、业务场景

    • 用户评论。实时的将评论消息推送到浏览器。

    • 实名认证。合同签署前需要对用户进行实名认证,用户扫描二维码后进入第三方的认证页面,认证完成后异步通知浏览器认证的状态。

    • 活体识别。类似实名认证,当活体识别完成后,异步将结果通知浏览器。

    二、存在的问题

    • 首先,WebSocket 技术栈不统一,既有基于 Netty 实现的,也有基于 Web 容器实现的,给开发和维护带来困难;
    • 其次,WebSocket 实现分散在在各个工程中,与业务系统强耦合,如果有其他业务需要集成 WebSocket,面临着重复开发的窘境,浪费成本、效率低下;
    • 第三,WebSocket 是有状态协议的,客户端连接服务器时只和集群中一个节点连接,数据传输过程中也只与这一节点通信。

    三、集群的核心问题

    WebSocket 集群需要解决会话共享的问题。如果只采用单节点部署,虽然可以避免这一问题,但无法水平扩展支撑更高负载,有单点的风险;

    四、长连接网关的设计与实现

    为了解决以上问题,我们实现了统一的 WebSocket 长连接网关,具备如下特点:

    1. 集中实现长连接管理和推送能力。统一技术栈,将长连接作为基础能力沉淀,便于功能迭代和升级维护。

    2. 与业务解耦。将业务逻辑与长连接通信分离,使业务系统不再关心通信细节,也避免了重复开发,浪费研发成本。

    3. 使用简单。提供 HTTP 推送通道,方便各种开发语言的接入。业务系统只需要简单的调用,就可以实现数据推送,提升研发效率。

    4. 分布式架构。实现多节点的集群,支持水平扩展应对业务增长带来的挑战;节点宕机不影响服务整体可用性,保证高可靠。

    5. 多端消息同步。允许用户使用多个浏览器或标签页同时登陆在线,保证消息同步发送。

    6. 多维度监控与报警。自定义监控指标与现有微服务监控系统打通,出现问题时可及时报警,保证服务的稳定性。

    五、技术选型

    在众多的 WebSocket 实现中,从性能、扩展性、社区支持等方面考虑,最终选择了 Netty。Netty 是一个高性能、事件驱动、异步非阻塞的网络通信框架,在许多知名的开源软件中被广泛使用。

    WebSocket 是有状态的,无法像直接 HTTP 以集群方式实现负载均衡,长连接建立后即与服务端某个节点保持着会话,因此集群下想要得知会话属于哪个节点,有两种方案,一种是使用类似微服务的注册中心来维护全局的会话映射关系,一种是使用事件广播由各节点自行判断是否持有会话,两种方案对比如表 1 所示。
    在这里插入图片描述
    综合考虑实现成本与集群规模,选择了轻量级的事件广播方案。实现广播可以选择基于 RocketMQ 的消息广播、基于 Redis 的 Publish/Subscribe、基于 ZooKeeper 的通知等方案,其优缺点对比如表 2 所示。从吞吐量、实时性、持久化、实现难易等方面考虑,最终选择了 RocketMQ。
    在这里插入图片描述

    六、系统架构

    网关的整体架构如图 1 所示。
    WebSocket 长连接网关架构
    网关的整体流程如下:

    1. 客户端与网关任一节点握手建立起长连接,节点将其加入到内存维护的长连接队列。客户端定时向服务端发送心跳消息,如果超过设定的时间仍没有收到心跳,则认为客户端与服务端的长连接已断开,服务端会关闭连接,清理内存中的会话。

    2. 当业务系统需要向客户端推送数据时,通过网关提供的 HTTP 接口将数据发向网关。

    3. 网关在接收到推送请求后,将消息写入 RocketMQ。

    4. 网关作为消费者,以广播模式消费消息,所有节点都会接收到消息。

    5. 节点接收到消息后判断推送的消息目标是否在自己内存中维护的长连接队列里,如果存在则通过长连接推送数据,否则直接忽略。

    网关以多节点方式构成集群,每节点负责一部分长连接,可实现负载均衡,当面对海量连接时,也可以通过增加节点的方式分担压力,实现水平扩展。同时,当节点出现宕机时,客户端会尝试重新与其他节点握手建立长连接,保证服务整体的可用性。

    七、会话管理

    长连接建立起来后,会话维护在各节点的内存中。SessionManager 组件负责管理会话,内部使用了哈希表维护了 UID 与 UserSession 的关系;UserSession 代表用户维度的会话,一个用户可能会同时建立多个长连接,因此 UserSession 内部同样使用了一个哈希表维护 Channel 与 ChannelSession 的关系。为了避免用户无限制的创建长连接,UserSession 在内部的 ChannelSession 超过一定数量后,会将最早建立的 ChannelSession 关闭,减少服务器资源占用。SessionManager、UserSession、ChannelSession 的关系如图 2 所示。

    SessionManager 组件

    总结

    本文主要是通过介绍爱奇艺号 WebSocket推送网关的设计与实践,说明了WebSocket集群的设计中需要考虑的问题,对比了各个实现方案的优缺点,相信对大家在实现WebSocket集群过程中有一定帮助。

    参考:构建通用WebSocket推送网关的设计与实践

    展开全文
  • springboot websocket集群

    2020-05-12 21:46:14
    websocket集群在websocket基础上修改 原理:由于集群有多个websocket应用,两个用户有可能不在同一个应用中,发送消息时用户会接收不到消息。所以将用户发送的消息发布到redis消息主题中,所有应用都连接到同一个...
  • 上一篇文章Spring Boot系列21 Spring Websocket实现websocket集群方案讨论里详细介绍了WebSocket集群的有三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案 工程 工程名称: 本文在Spring Boot...
  • websocket集群化方案 基于tornado的websocket加上rabbitmq,来实现websocket的集群化处理
  • WebSocket简介和spring boot集成简单消息代理 Spring Boot 集成 websocket,使用RabbitMQ做为消息代理 ...Spring Websocket实现向指定的用户发送消息 ...Spring Websocket实现websocket集群方案的De...
  • websocket集群搭建方案

    千次阅读 2019-01-14 17:03:46
    1.前言 我们都知道http协议,http属于短链接,属于一个request对应一个response,但是而且请求后就必须有响应... 假设有个业务想要连接不断,并且更加自由,那么就得用websocket协议。本编文章讲述搭建websocket集群...
  • 期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。 以下是我的场景描述 ...
  • 期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。以下是我的场景描述...
  • 主要介绍了springboot websocket集群(stomp协议)连接时候传递参数,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 分布式WebSocket集群解决方案

    千次阅读 2018-12-08 15:14:50
    问题起因 最近做项目时遇到了需要多用户...期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分...
  • 使用RabbitMQ解决WebSocket集群问题 当前一个项目在做在线客服的功能,用到了WebSocket技术,由于服务是分布式服务,所以涉及到了WebSocket的集群问题。 假如我们部署了三个节点A、B、C,存在一个用户甲,甲用户...
  • 基于Netty实现websocket集群部署实现方案 每天多学一点点~ 话不多说,这就开始吧… 文章目录基于Netty实现websocket集群部署实现方案1.前言2. 整体思路3. 代码demo4. 测试5.结语 1.前言 最近公司在做saas平台,其中...
  • Nginx安装与使用+WebSocket集群实现及断开重连-部署
  • 关于websocket做一次全面的总结。...实现WebSocket集群的2种方式 用redis的订阅/推送功能实现的。(推荐) 用redis存客户端连接对应的服务器的IP+端口,再用http去调用对应服务器的接口。 用redi.
  • 深入浅出Websocket(二)分布式Websocket集群 深入浅出Websocket(三)分频道的Websocket(分析socket.io源码以及ws-wrapper) 正文 这个是我在造的玩具的一个简单架构图。将实时通信部分给抽离出来作...
  • Spring Boot+NettySocketIo+RabbitMQ实现websocket集群 效果 连接到不同的服务器的两个客户端可以互发消息 websocket的连接Client是不能序列化的,所以不能使用session共享那样的方法来做websocket的集群 netty...
  • websocket集群1

    2020-09-11 17:51:57
    https://blog.csdn.net/sinat_31982655/article/details/99222024

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 481
精华内容 192
关键字:

websocket集群