精华内容
下载资源
问答
  • 分布式websocket实战

    2020-05-06 09:22:02
    分布式websocket实战 分布式系统websocket实战 近期spring cloud项目中用到websocket,记录一下踩坑之路。 一、websocket简介 1 . 什么是WebSocket? WebSocket是Html5提供的一种能在单个TCP连接上进行全双工通讯的...

    分布式websocket实战

    分布式系统websocket实战

    近期spring cloud项目中用到websocket,记录一下踩坑之路。

    一、websocket简介

    1 . 什么是WebSocket?

    WebSocket是Html5提供的一种能在单个TCP连接上进行全双工通讯的协议

    2. 为什么要使用WebSocket?

    使用WebSocket最大的好处就是客户端能主动向服务端发送消息,服务端也能主动向客户端发送消息,能最大程度的保证信息交换的实时性。
    WebSocket只需要建立一次连接,客户端和服务端只需要交换一次请求头和响应头就可以无数次交换信息

    3.WebSocket在分布式集群环境中的问题和解决思路

    单机情况下,当websocket需要给用户推送消息时,由于用户已经与websocket服务建立连接,消息推送能够成功。
    集群环境下,可能会遇到这样问题:给用户页面推送消息的websocket服务未必是与该用户建立websocket连接的服务

    解决方案思路1: 考虑websocket是否可以在多台机器上共享,实现数据共享,是否可以将websocketsession序列化后存储到redis里面?
    在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信。但是 ws的session无法序列化到redis, 因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。

    解决方案思路2确保和用户建立连接的websocket服务就是接收到消息的服务。将连接的服务端的ip存到redis里,gateway根据参数指定转发对应的服务器上, 还是能做到点对点,只要所有websocket工程都不会宕机。如果指定服务器宕机了,消息还是会发送失败。

    解决方案思路3:只要确保对于业务模块发送的消息,所有的websocket服务都能收到消息,只要做到了这一点,与用户建立连接websocket自然也能接收到消息。(而且,这种方式相对单台服务收到消息还有一个在处理多点登陆场景下的优势。对于允许多点登录的系统,同一用户可以在多处进行登录,同一用户与多个服务拥有多个websocket连接,这就要求我们保证多台用户消费同一台业务模块的消息。)
    按照此思路,引用消息队列,那么每一个websocket消息,我们在集群的每个节点上都进行推送,订阅了该消息的连接,不管有多少个,最终都能收到这个消息。

    二、websocket实现用户下线通知实战

    1 . 设计思路

    项目中用户token储存在redis当中,并设置了过期时间,监听过期key,通过websocket通知到前端即可实现。

    1)监听redis中token有效期是否到期
    2)token到期后,利用spring cloud stream rabbit发送通知(生产者)
    3)mq接收对应的通知消息,websocket向客户端发送消息(消费者)
    4) 前端使用sockjs接收消息,并定向通知

    2. redis监听

    1)开启事件
    redis 对事件的监听默认是关闭的,因为这会消耗性能
    修改redis.config 文件
    在这里插入图片描述
    2)redis监听规则

    事件是用  __keyspace@DB__:KeyPattern 或者  __keyevent@DB__:OpsType 的格式来发布消息的。
    DB表示在第几个库;KeyPattern则是表示需要监控的键模式(可以用通配符,如:__key*__:*);OpsType则表示操作类型。因此,如果想要订阅特殊的Key上的事件,应该是订阅keyspace。
    比如说,对 0 号数据库的键 mykey 执行 DEL 命令时, 系统将分发两条消息, 相当于执行以下两个 PUBLISH 命令:
    PUBLISH __keyspace@0__:sampleKey del
    PUBLISH __keyevent@0__:del sampleKey
    订阅第一个频道 __keyspace@0__:mykey 可以接收 0 号数据库中所有修改键 mykey 的事件, 而订阅第二个频道 __keyevent@0__:del 则可以接收 0 号数据库中所有执行 del 命令的键。
    

    3)监听器配置

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    
    /**
     * redis监听器
     */
    @Configuration
    public class RedisSubListenerConfig {
        // 初始化监听器
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                MessageListenerAdapter listenerAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            // 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息
            // __keyevent@*__:expired 订阅Redis的所有数据库的键值失效事件
            container.addMessageListener(listenerAdapter, new PatternTopic("__keyevent@*__:expired"));
            return container;
        }
    
        // 利用反射来创建监听到消息之后的执行方法
        @Bean
        MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
            return new MessageListenerAdapter(redisReceiver, "receiveMessage");
        }
    }
    
    

    4)监听后执行方法

    /**
     * redis监听事件接收处理器
     */
    @Component
    public class RedisReceiver {
        @Autowired
        private IUserProducerService userProducerService;
    
        // 收到通道的消息之后执行的方法
        public void receiveMessage(Object message) {
           
        }
    }
    

    5)重复监听问题
    集群环境存在重复监听的问题,可利用redis的getset 命令方法进行解决

    思路是:在过期回调事件中利用getset设置 [ key(当前监听到的过期key)+".lock"作为新的key ], 字符串"1"作为value,当某一个工程触发回调事件时,由于时第一次进入,此时 getset方法返回null(),由于redis是单线程,所以其他工厂虽然也到了这个方法这里,但是此时getset时返回的是我们设置的value值,所以通过判断,如果“1”.equals(返回的value)直接return掉,不往下执行,如果 ! “1”.equals(返回的value)则往下执行;当下面的步骤都执行完成了,再从redis删除掉这条数据,因为每一次过期回调都会利用getset产生一条数据,以免数据量多大造成积压。

    	@Override
        public <T> T getAndSet(final String key, T value, long second) {
            T oldValue = null;
            try {
                // 设置指定 key 的值,并返回 key 的旧值
                // 返回给定 key 的旧值。 当 key 没有旧值时,即 key 不存在时,返回 null 。
                //当 key 存在但不是字符串类型时,返回一个错误。
                oldValue = (T) redisTemplate.opsForValue().getAndSet(key, value);
    
                // 充当锁时加上过期时间,避免数据锁积压
                if(second > 0){
                    redisTemplate.expire(key, second, TimeUnit.SECONDS);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return oldValue;
    
        }
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Component;
    
    import com.zjrc.common.config.redis.IRedisService;
    import com.zjrc.common.constant.CommonConstants;
    import com.zjrc.websocket.mq.producer.IUserProducerService;
    
    import cn.hutool.core.util.ObjectUtil;
    
    /**
     * redis监听事件接收处理器
     */
    @Component
    public class RedisReceiver {
        @Resource
        private IUserProducerService userProducerService;
        @Resource
        private IRedisService redisService;
    
        // 收到通道的消息之后执行的方法
        public void receiveMessage(Object message) {
            if (ObjectUtil.isNull(message)) {
                return;
            }
    
            String key = message.toString(); // 获取到过期的key
            // 只处理token的key
            if (key.indexOf(CommonConstants.CacheKey.REDIS_LOGIN_ACCESS_TOKEN) < 0) {
                return;
            }
    
            // 插入一条数据,以充当锁使用
            String oldLock = redisService.getAndSet(key + ".lock", "1", 10);
            if ("1".equals(oldLock)) {
                return;
            }
    
            // 取用户号码当做用户标识
            String mobile = key.replace(CommonConstants.CacheKey.REDIS_LOGIN_ACCESS_TOKEN, "");
    
            // 生产登陆超时消息
            userProducerService.userLoginTimeout(mobile);
    
            // 锁用完之后就删除,避免数据锁积压
            redisService.del(key + ".lock");
    
        }
    }
    

    3. spring cloud stream rabbit消息队列

    1)简介
    Spring Cloud Stream是一个构建消息驱动微服务的框架,遵循“智能端点和哑管道”的原则。端点之间的通信由消息中间件(如RabbitMQ或Apache Kafka)驱动。服务通过这些端点或信道发布事件来进行通信。
    应用程序通过input(相当于consumer)、**output(相当于producer)**来与Spring Cloud Stream中Binder交互,而Binder负责与消息中间件交互;因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互。
    2)Maven依赖
    添加Spring Cloud Stream与RabbitMQ消息中间件的依赖

    		<!--spring cloud stream-rabbit begin-->
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    		</dependency>
            <!--spring cloud stream-rabbit end-->
    

    3)配置文件
    生产和消费如果不是在同一个服务上,只要配置对应的生产或者消费的信息,目前测试项目放到一起了。

    spring:
      cloud:
        stream:
          bindings:
            userInput: #input和output 名称需要与代码中@Input和@Output设置的通道保持一致(坑呀!)
              destination: test_rabbit #Exchange名称,input与output需保持一致
              binder: local_rabbit 
            userOutput:
              destination: test_rabbit 
              binder: local_rabbit
          binders:
            local_rabbit: 
              type: rabbit
              environment: #配置rabbimq连接环境
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
    

    启动之后可以在rabbitmq可视化界面Exchange中看到定义的test_rabbit
    在这里插入图片描述

    4)生产者
    定义生产的通道

    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     * 用户发送通道(生产者)
     */
    public interface UserSendChannel {
        String USER_OUTPUT = "userOutput";
    
        @Output(UserSendChannel.USER_OUTPUT)
        MessageChannel userOutput();
    }
    

    生产消息并将消息发送给消费者

    在这里插入代码片import javax.annotation.Resource;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.stereotype.Service;
    
    import com.google.gson.JsonObject;
    import com.zjrc.websocket.mq.channel.UserSendChannel;
    
    @Service
    @EnableBinding(UserSendChannel.class)
    public class UserProducerServiceIml implements IUserProducerService {
    
        @Resource
        @Output(UserSendChannel.USER_OUTPUT)
        private MessageChannel channel;
    
        /**
         * 登陆超时提示(生产者)
         *
         * @param userTags 用户标识
         */
        @Override
        public void userLoginTimeout(String userTags) {
            // 组装消息
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("mobile", userTags);
            jsonObject.addProperty("message", "该用户token已过期,请重新登录");
    
            // 发送消息给消费者
            channel.send(MessageBuilder.withPayload(jsonObject.toString()).build());
            System.out.println("消息发送成功" + jsonObject.toString());
        }
    
    }
    

    5)消费者
    消费通道

    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * 用户接收通道(消费者)
     */
    public interface UserReceiverChannel {
        String USER_INPUT = "userInput";
    
        @Input(UserReceiverChannel.USER_INPUT)
        SubscribableChannel userInput();
    
    }
    

    接收消息

    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    import com.zjrc.websocket.mq.channel.UserReceiverChannel;
    
    /**
     * 用户消息消费者
     */
    @Component
    @EnableBinding(UserReceiverChannel.class)
    public class UserConsumer {
        /**
         * 接收消息
         * 
         * @param message
         */
        @StreamListener(UserReceiverChannel.USER_INPUT)
        public void receive(Message<String> message) {
            System.out.println(message.getPayload());
        }
    }
    
    

    最终在rabbit界面Queues中查看到这个队列的生成和消费情况
    在这里插入图片描述

    4. websocket

    1)Maven依赖

    		<!--websocket begin-->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-websocket</artifactId>
    		</dependency>
    		<!--websocket end-->
    

    2)WebSocket配置文件

    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
    
    /**
     * websocket配置
     */
    @Configuration
    @EnableWebSocketMessageBroker // 注解开启STOMP协议来传输基于代理的消息,此时控制器支持使用
    public class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {
    
        /**
         * 将"/mq"路径注册为STOMP端点 PS:端点的作用——客户端在订阅或发布消息到目的地址前,要连接该端点。
         * 
         * @param registry
         */
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/mq") // 开启mq端点
                    .setAllowedOrigins("*") // 允许跨域访问
                    .withSockJS(); // 使用sockJS
        }
    
        /**
         * 配置了一个简单的消息代理,如果不重载,默认情况下回自动配置一个简单的内存消息代理,用来处理以"/topic"为前缀的消息。这里重载configureMessageBroker()方法,
         * 消息代理将会处理前缀为"/topic"和"/queue"的消息。
         * 
         * @param registry
         */
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            // 应用程序以/app为前缀,代理目的地以/topic、/user为前缀
            registry.enableSimpleBroker("/topic", "/user"); // 向客户端发消息topic用来广播,user用来实现p2p
            registry.setApplicationDestinationPrefixes("/app"); // 客户端向服务器端发送时的主题上面需要加"/app"作为前缀
            registry.setUserDestinationPrefix("/user"); // 给指定用户发送一对一的主题前缀是"/user"
        }
    
    }
    

    3)sockjs
    前端通过sockjs来进行websock连接

    <template>
      <div>
        <button @click="send">发消息</button>
      </div>
    </template>
    
    <script>
    import SockJS from "sockjs-client";
    import Stomp from "stompjs";
    export default {
      data() {
        return {
          stompClient: "",
          timer: ""
        };
      },
      methods: {
        initWebSocket() {
          this.connection();
          let that = this;
          // 断开重连机制,尝试发送消息,捕获异常发生时重连
          this.timer = setInterval(() => {
            try {
              that.stompClient.send("test");
            } catch (err) {
              console.log("断线了: " + err);
              that.connection();
            }
          }, 5000);
        },
        connection() {
          // 建立连接对象
          let socket = new SockJS("http://localhost:8000/websocket/mq");
          // 获取STOMP子协议的客户端对象
          this.stompClient = Stomp.over(socket);
          // 定义客户端的认证信息,按需求配置
          let headers = {
            Authorization: ""
          };
          // 向服务器发起websocket连接
          this.stompClient.connect(
            headers,
            () => {
              //     this.stompClient.subscribe('/topic/userOnlineStatus', (msg) => { // 订阅服务端提供的某个topic
              //     console.log('广播成功')
              //   console.log(msg);  // msg.body存放的是服务端发送给我们的信息
              // },headers);
    
              var userId = 1;
              this.stompClient.subscribe(
                "/user/" + userId + "/userOnlineStatus",
                msg => {
                  // 订阅服务端提供的某个topic
                  console.log("一对一发送成功");
                  console.log(msg); // msg.body存放的是服务端发送给我们的信息
                },
                headers
              );
    
              this.stompClient.send(
                "/app/chat.addUser",
                headers,
                JSON.stringify({ sender: "", chatType: "JOIN" })
              ); //用户加入接口
            },
            err => {
              // 连接发生错误时的处理函数
              console.log("失败");
              console.log(err);
            }
          );
        }, //连接 后台
        send() {
          this.stompClient.send(
            "/subscribe",
            {},
            JSON.stringify({ name: 1111111 })
          );
        },
        disconnect() {
          if (this.stompClient) {
            this.stompClient.disconnect();
          }
        } // 断开连接
      },
      mounted() {
        this.initWebSocket();
      },
      beforeDestroy: function() {
        // 页面离开时断开连接,清除定时器
        this.disconnect();
        clearInterval(this.timer);
      }
    };
    </script>
    
    <style></style>
    
    

    SockJS客户端开始时会发送一个GET类型的"/info"请求从服务器去获取基本信息, 这个请求之后SockJS必须决定使用哪种传输,可能是WebSocket,如果不是的话,在大部分浏览器中会使用HTTP Streaming或者HTTP长轮询。
    由于项目所有请求走的都是网关gateway,因此在网关这层必须要特殊处理这个"/info" 请求

    4)网关配置
    网关路由配置需要加上这两个配置,注意顺序不能调换。

    spring:
      cloud:
        gateway:
          routes:
            - id: websocket_sockjs_route 
              uri: lb://zjce-service-websocket
              predicates:
                - Path=/websocket/mq/info**  # 将对应请求路径的请求转发到 zjce-service-websocket服务
            - id: zjce-service-websocket
              uri: lb:ws://zjce-service-websocket
              predicates:
                - Path=/websocket/** # 将对应请求路径的请求转换成ws协议到zjce-service-websocket服务
    

    gateway内置 WebsocketRoutingFilter Websocket 路由网关过滤器。

    private void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {
    		// Check the Upgrade
    		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    		String scheme = requestUrl.getScheme().toLowerCase();
    		String upgrade = exchange.getRequest().getHeaders().getUpgrade();
    		// change the scheme if the socket client send a "http" or "https"
    		if ("WebSocket".equalsIgnoreCase(upgrade) && ("http".equals(scheme) || "https".equals(scheme))) {
    			String wsScheme = convertHttpToWs(scheme);
    			URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();
    			exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
    			if (log.isTraceEnabled()) {
    				log.trace("changeSchemeTo:[" + wsRequestUrl + "]");
    			}
    		}
    	}
    

    查看源码可以看到,匹配到WebSocket请求后,将http或https协议升级为ws或wss协议

    配置成功后可以看到前端info请求成功,并返回websocket:true
    在这里插入图片描述
    在这里插入图片描述

    5)发送websocket消息
    结合前面的消息消费者,将接受到的消息通过websocket发送到

    import javax.annotation.Resource;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.simp.SimpMessagingTemplate;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.zjrc.websocket.mq.channel.UserReceiverChannel;
    
    /**
     * 用户消息消费者
     */
    @Component
    @EnableBinding(UserReceiverChannel.class)
    public class UserConsumer {
        @Resource
        private SimpMessagingTemplate template;
    
        // 广播消息
        private final static String DESTINATION = "/topic/userOnlineStatus";
    
        // 点对点消息
        private final static String USER_DESTINATION = "/userOnlineStatus";
    
        /**
         * 接收消息并向客户端指定用户发送消息
         * 
         * @param message
         */
        @StreamListener(UserReceiverChannel.USER_INPUT)
        public void receive(Message<String> message) {
            System.out.println(message.getPayload());
    
            // 接收转换消息
            String dataStr = message.getPayload();
            JSONObject data = JSON.parseObject(dataStr);
            String mobile = (String) data.get("mobile"); // 客户端对应的用户
            String messageStr = (String) data.get("message"); // 客户端接收的消息
    
            // 向客户端广播消息
            template.convertAndSend(DESTINATION, messageStr);
    
            // 向客户端指定用户发送消息
            template.convertAndSendToUser(mobile, USER_DESTINATION, messageStr);
    
            System.out.println("消费成功" + messageStr);
        }
    }
    

    6)最终效果
    前端console打印后台发送的消息
    在这里插入图片描述

    展开全文
  • 分布式webSocket session无法共享问题

    分布式webSocket session无法共享问题

    问题:
    最近碰到一个麻烦的事情,就是在使用webSocket推送消息时候,发现session存放在各个节点,各节点之间无法获取session。而且经过研究发现:WebSocket与http协议一样都是基于TCP的,所以他们都是可靠的协议,调用的WebSocket的send函数在实现中最终都是通过TCP的系统接口进行传输的。WebSocket和Http协议一样都属于应用层的协议,WebSocket在建立握手连接时,数据是通过http协议传输的,但是在建立连接之后,真正的数据传输阶段是不需要参与的
    分析
    也就是说目前问题:1、wehsocketSession无法序列化,即无法存放在redis里面达到共享
    2、就算能在别的节点创建session对象,也无法推送消息,因为webSocket是基于tcp的协议,http,https在传输数据上是不会参与的,也就是说与哪个节点建立的连接必须那个节点推送消息
    解决思路
    1 使用订阅发布服务工具,各个节点即为订阅方也为发布方,一个节点发布,其余节点收到消息,检测自己是否存在改session,存在通过收到的消息推送数据。
    2记录session关键信息(节点ip,端口,用户数据)到缓存里面(推荐redis),通过查找session关键信息,定位到哪个节点,通过http请求改节点,达到推送效果(涉及http请求,可能响应回慢)

    ps:本来想使用方法一,通过redis订阅发布服务实现消息推送,但是项目节点很多,如果一个节点发布,其余都要查找,担心增加服务器压力,所以本人选择方法二,但是方法一我会在最下面做简单介绍
    websocket服务部署:
    redsi 存放关键信息,ip,端口

    @ServerEndpoint("/signWebsocket/{mobile}")
    @Component
    public class SignWebSocket {
    
        private static final Logger LOG = LoggerFactory.getLogger(SignWebSocket.class);
    
        private static SignWebSocket instance = new SignWebSocket();
    
        //保存客户端发起的唯一标识与长连接
        private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
    
        public static final String KEY_SOCKET_SESSION = "socket_session_";
    
        private static int KEY_SOCKET_TIME = 3000;
    
        private RedisCache redisCache;
    
        public SignWebSocket() {
            ApplicationContext act = ApplicationContextRegister.getApplicationContext();
            this.redisCache = act.getBean(RedisCache.class);
        }
    
        public static ConcurrentHashMap<String, Session> getSessionMap() {
            return sessionMap;
        }
    
        public static SignWebSocket getInstance() {
            return instance;
        }
    
        /**
         * 连接建立成功调用的方法
         *
         * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        @OnOpen
        public void onOpen(@PathParam("mobile") String mobile, Session session) {
    
            String path= AllConstant.getInstance().getLocalAddr()+":"+AllConstant.getInstance().getLocalPort();
            try {
                if (sessionMap.containsKey(mobile)) {
                    Session session_old = sessionMap.get(mobile);
                    session_old.close();
                    sessionMap.remove(mobile);
                }
            } catch (IOException e) {
                LOG.error(e.getMessage(),e);
            }
            sessionMap.put(mobile, session);
            this.redisCache.set(KEY_SOCKET_SESSION + mobile, path, 3000);
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose(@PathParam("mobile") String mobile, Session session) {
            try {
                if (sessionMap.containsKey(mobile)) {
            //        System.out.println("调用onClose:"+mobile);
                    session = sessionMap.get(mobile);
                    session.close();
                    sessionMap.remove(mobile);
                }
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
    
        /**
         * 收到客户端消息后调用的方法
         *
         * @param message 客户端发送过来的消息
         * @param session 可选的参数
         */
        @OnMessage
        public void onMessage(@PathParam("mobile") String mobile, String message, Session session) {
            try {
            } catch (Exception e) {
                LOG.error(e.getMessage(),e);
            }
        }
    
        /**
         * 发生错误时调用
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(@PathParam("mobile") String mobile, Session session, Throwable error) {
            try {
                if (sessionMap.containsKey(mobile)) {
                    sessionMap.remove(mobile);
                }
                session.close();
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
    
        /**
         * mobile ,向对应的客户端推送消息
         */
        public void pushMessage(String mobile, int coin, int flow) {
            try {
                Session session;
                System.out.println("sessionMap ========={}"+sessionMap);
                if (sessionMap.containsKey(mobile)) {
                    JSONObject user = new JSONObject();
                    user.put("coin", coin);
                    user.put("flow", flow);
                    session = sessionMap.get(mobile);
                    if (null != session) {
                        session.getBasicRemote().sendText(user.toString());
                    }
                }
    
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
    }
    

    http监听请求,推送消息

        @RequestMapping( value = {"/socket"},method = {RequestMethod.GET})
        @ResponseBody
        public JSONObject socket(String phone,Integer coin,Integer flow , HttpServletRequest request){
    
            JSONObject obj =new JSONObject();
            logger.error("开始推送");
            System.out.println("socket phone:"+phone+" coin:"+coin+" flow:"+flow);
            try{
                SignWebSocket.getInstance().pushMessage(phone,coin,flow);
            }catch (Exception e){
                e.printStackTrace();
            }
            obj.put("phone",phone);
            obj.put("coin",coin);
            obj.put("flow",flow);
            return obj;
        }
    

    方法二:
    使用redis订阅发布服务,每个节点都是发布方,其他节点都是订阅方,当接收到发布的消息时,检测webSocket sessionMap里面session,推送消息。

    展开全文
  • dSock是分布式WebSocket代理(在Go中,使用Redis)。 客户端可以进行身份​​验证和连接,并且您可以将文本/二进制消息作为API发送。 特征 每个用户和身份验证有多个客户端 dSock可以向某个用户(由用户ID和可选的...
  • 某马程序员-JavaEE 57期-day10-RocketMQ集群、分布式WebSocket实现以及地图找房功能实现
  • 分布式WebSocket集群解决方案

    千次阅读 2018-12-08 15:14:50
    问题起因 最近做项目时遇到了需要多用户...期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分...
        

    问题起因

    最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。

    期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。

    以下是我的场景描述

    • 资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)
    • 应用发布限制条件:由于场景需要,应用场所需要ssl认证的域名才能发布。因此ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)连接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/xxx),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用
    • 需求:用户登录应用,需要与服务器建立wss连接,不同角色之间可以单发消息,也可以群发消息
    • 集群中的应用服务类型:每个集群实例都负责http无状态请求服务与ws长连接服务

    系统架构图

    clipboard.png

    在我的实现里,每个应用服务器都负责http and ws请求,其实也可以将ws请求建立的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws请求的方式更为方便。下文会有解释

    本文涉及的技术栈

    • Eureka 服务发现与注册
    • Redis Session共享
    • Redis 消息订阅
    • Spring Boot
    • Zuul 网关
    • Spring Cloud Gateway 网关
    • Spring WebSocket 处理长连接
    • Ribbon 负载均衡
    • Netty 多协议NIO网络通信框架
    • Consistent Hash 一致性哈希算法

    相信能走到这一步的人都了解过我上面列举的技术栈了,如果还没有,可以先去网上找找入门教程了解一下。下面的内容都与上述技术相关,题主默认大家都了解过了...
    这里是描述一致性Hash算法最易懂的文章传送门

    技术可行性分析

    下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案

    WebSocketSession与HttpSession
    在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信:

    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
       System.out.println("服务器接收到的消息: "+ message );
       //send message to client
       session.sendMessage(new TextMessage("message"));
    }

    那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的
    有的人可能会想:我可不可以将sessin关键信息缓存到redis,集群中的服务器从redis拿取session关键信息然后重新构建websocket session...我只想说这种方法如果有人能试出来,请告诉我一声...

    以上便是websocket session与http session共享的区别,总的来说就是http session共享已经有解决方案了,而且很简单,只要引入相关依赖:spring-session-data-redisspring-boot-starter-redis,大家可以从网上找个demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底层实现的方式,我们无法做到真正的websocket session共享。

    解决方案的演变

    Netty与Spring WebSocket

    刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并没有websocket session这样的概念,与其类似的是channel,每一个客户端连接都代表一个channel。前端的ws请求通过netty监听的端口,走websocket协议进行ws握手连接之后,通过一些列的handler(责链模式)进行消息处理。与websocket session类似地,服务端在连接建立后有一个channel,我们可以通过channel进行与客户端的通信

       /**
        * TODO 根据服务器传进来的id,分配到不同的group
        */
       private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    
       @Override
       protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
           //retain增加引用计数,防止接下来的调用引用失效
           System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text());
           //将消息发送给group里面的所有channel,也就是发送消息给客户端
           GROUP.writeAndFlush(msg.retain());
       }

    那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现方式的优缺点

    • 使用netty实现websocket

      玩过netty的人都知道netty是的线程模型是nio模型,并发量非常高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采用了netty。如果我们单独使用netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:
      1.与系统的其他应用集成不方便,在rpc调用的时候,无法享受springcloud里feign服务调用的便利性
      2.业务逻辑可能要重复实现
      3.使用netty可能需要重复造轮子
      4.怎么连接上服务注册中心,也是一件麻烦的事情
      5.restful服务与ws服务需要分开实现,如果在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发相信很多人都习惯了。

    • 使用spring websocket实现ws服务

      spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务非常方便,做法非常简单
      第一步:添加依赖

      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-websocket</artifactId>
      </dependency>

      第二步:添加配置类

      @Configuration
      public class WebSocketConfig implements WebSocketConfigurer {
      @Override
      public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
          registry.addHandler(myHandler(), "/")
              .setAllowedOrigins("*");
      }
      
      @Bean
       public WebSocketHandler myHandler() {
           return new MessageHandler();
       }
      }

      第三步:实现消息监听类

      @Component
      @SuppressWarnings("unchecked")
      public class MessageHandler extends TextWebSocketHandler {
         private List<WebSocketSession> clients = new ArrayList<>();
      
         @Override
         public void afterConnectionEstablished(WebSocketSession session) {
             clients.add(session);
             System.out.println("uri :" + session.getUri());
             System.out.println("连接建立: " + session.getId());
             System.out.println("current seesion: " + clients.size());
         }
      
         @Override
         public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
             clients.remove(session);
             System.out.println("断开连接: " + session.getId());
         }
      
         @Override
         protected void handleTextMessage(WebSocketSession session, TextMessage message) {
             String payload = message.getPayload();
             Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
             System.out.println("接受到的数据" + map);
             clients.forEach(s -> {
                 try {
                     System.out.println("发送消息给: " + session.getId());
                     s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             });
         }
      }

      从这个demo中,使用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采用了spring websocket实现ws服务。
      因此我的应用服务架构是这样子的:一个应用既负责restful服务,也负责ws服务。没有将ws服务模块拆分是因为拆分出去要使用feign来进行服务调用。第一本人比较懒惰,第二拆分与不拆分相差在多了一层服务间的io调用,所以就没有这么做了。

    从zuul技术转型到spring cloud gateway

    要实现websocket集群,我们必不可免地得从zuul转型到spring cloud gateway。原因如下:

    zuul1.0版本不支持websocket转发,zuul 2.0开始支持websocket,zuul2.0几个月前开源了,但是2.0版本没有被spring boot集成,而且文档不健全。因此转型是必须的,同时转型也很容易实现。
    
    在gateway中,为了实现ssl认证和动态路由负载均衡,yml文件中以下的某些配置是必须的,在这里提前避免大家采坑
    server:
      port: 443
      ssl:
        enabled: true
        key-store: classpath:xxx.jks
        key-store-password: xxxx
        key-store-type: JKS
        key-alias: alias
    spring:
      application:
        name: api-gateway
      cloud:
        gateway:
          httpclient:
            ssl:
              handshake-timeout-millis: 10000
              close-notify-flush-timeout-millis: 3000
              close-notify-read-timeout-millis: 0
              useInsecureTrustManager: true
          discovery:
            locator:
              enabled: true
              lower-case-service-id: true
          routes:
          - id: dc
            uri: lb://dc
            predicates:
            - Path=/dc/**
          - id: wecheck
            uri: lb://wecheck
            predicates:
            - Path=/wecheck/**

    如果要愉快地玩https卸载,我们还需要配置一个filter,否则请求网关时会出现错误not an SSL/TLS record

    @Component
    public class HttpsToHttpFilter implements GlobalFilter, Ordered {
      private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
      @Override
      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
          URI originalUri = exchange.getRequest().getURI();
          ServerHttpRequest request = exchange.getRequest();
          ServerHttpRequest.Builder mutate = request.mutate();
          String forwardedUri = request.getURI().toString();
          if (forwardedUri != null && forwardedUri.startsWith("https")) {
              try {
                  URI mutatedUri = new URI("http",
                          originalUri.getUserInfo(),
                          originalUri.getHost(),
                          originalUri.getPort(),
                          originalUri.getPath(),
                          originalUri.getQuery(),
                          originalUri.getFragment());
                  mutate.uri(mutatedUri);
              } catch (Exception e) {
                  throw new IllegalStateException(e.getMessage(), e);
              }
          }
          ServerHttpRequest build = mutate.build();
          ServerWebExchange webExchange = exchange.mutate().request(build).build();
          return chain.filter(webExchange);
      }
    
      @Override
      public int getOrder() {
          return HTTPS_TO_HTTP_FILTER_ORDER;
      }

    }

    这样子我们就可以使用gateway来卸载https请求了,到目前为止,我们的基本框架已经搭建完毕,网关既可以转发https请求,也可以转发wss请求。接下来就是用户多对多之间session互通的通讯解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。

    session广播

    这是最简单的websocket集群通讯解决方案。场景如下:
    教师A想要群发消息给他的学生们

    1. 教师的消息请求发给网关,内容包含{我是教师A,我想把xxx消息发送我的学生们}
    2. 网关接收到消息,获取集群所有ip地址,逐个调用教师的请求
    3. 集群中的每台服务器获取请求,根据教师A的信息查找本地有没有与学生关联的session,有则调用sendMessage方法,没有则忽略请求

    clipboard.png

    session广播实现很简单,但是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,相当于浪费了一次循环遍历的计算力,该方案在并发需求不高的情况下可以优先考虑,实现很容易。

    spring cloud中获取服务集群中每台服务器信息的方法如下
    @Resource
    private EurekaClient eurekaClient;
    
    Application app = eurekaClient.getApplication("service-name");
    //instanceInfo包括了一台服务器ip,port等消息
    InstanceInfo instanceInfo = app.getInstances().get(0);
    System.out.println("ip address: " + instanceInfo.getIPAddr());
    服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系

    一致性哈希算法实现(本文的要点)

    这种方法是本人认为最优雅的实现方案,理解这种方案需要一定的时间,如果你耐心看下去,相信你一定会有所收获。再强调一次,不了解一致性哈希算法的同学请先看这里,现先假设哈希环是顺时针查找的。

    首先,想要将一致性哈希算法的思想应用到我们的websocket集群,我们需要解决以下新问题:

    1. 集群节点DOWN,会影响到哈希环映射到状态是DOWN的节点。
    2. 集群节点UP,会影响到旧key映射不到对应的节点。
    3. 哈希环读写共享。
    在集群中,总会出现服务UP/DOWN的问题。

    针对节点DOWN的问题分析如下:

    一个服务器DOWN的时候,其拥有的websocket session会自动关闭连接,并且前端会收到通知。此时会影响到哈希环的映射错误。我们只需要当监听到服务器DOWN的时候,删除哈希环上面对应的实际结点和虚结点,避免让网关转发到状态是DOWN的服务器上。
    实现方法:在eureka治理中心监听集群服务DOWN事件,并及时更新哈希环。

    针对节点UP的问题分析如下:

    现假设集群中有服务CacheB上线了,该服务器的ip地址刚好被映射到key1和cacheA之间。那么key1对应的用户每次要发消息时都跑去CacheB发送消息,结果明显是发送不了消息,因为CacheB没有key1对应的session。

    clipboard.png

    此时我们有两种解决方案。
    方案A简单,动作大:
    eureka监听到节点UP事件之后,根据现有集群信息,更新哈希环。并且断开所有session连接,让客户端重新连接,此时客户端会连接到更新后的哈希环节点,以此避免消息无法送达的情况。
    方案B复杂,动作小:
    我们先看看没有虚拟节点的情况,假设CacheCCacheA之间上线了服务器CacheB。所有映射在CacheCCacheB的用户发消息时都会去CacheB里面找session发消息。也就是说CacheB一但上线,便会影响到CacheCCacheB之间的用户发送消息。所以我们只需要将CacheA断开CacheCCacheB的用户所对应的session,让客户端重连。

    clipboard.png

    接下来是有虚拟节点的情况,假设浅色的节点是虚拟节点。我们用长括号来代表某段区域映射的结果属于某个Cache。首先是C节点未上线的情况。图大家应该都懂吧,所有B的虚拟节点都会指向真实的B节点,所以所有B节点逆时针那一部分都会映射到B(因为我们规定哈希环顺时针查找)。

    clipboard.png

    接下来是C节点上线的情况,可以看到某些区域被C占领了。

    clipboard.png

    由以上情况我们可以知道:节点上线,会有许多对应虚拟节点也同时上线,因此我们需要将多段范围key对应的session断开连接(上图红色的部分)。具体算法有点复杂,实现的方式因人而异,大家可以尝试一下自己实现算法。

    哈希环应该放在哪里?

    1. gateway本地创建并维护哈希环。当ws请求进来的时候,本地获取哈希环并获取映射服务器信息,转发ws请求。这种方法看上去不错,但实际上是不太可取的,回想一下上面服务器DOWN的时候只能通过eureka监听,那么eureka监听到DOWN事件之后,需要通过io来通知gateway删除对应节点吗?显然太麻烦了,将eureka的职责分散到gateway,不建议这么做。
    2. eureka创建,并放到redis共享读写。这个方案可行,当eureka监听到服务DOWN的时候,修改哈希环并推送到redis上。为了请求响应时间尽量地短,我们不可以让gateway每次转发ws请求的时候都去redis取一次哈希环。哈希环修改的概率的确很低,gateway只需要应用redis的消息订阅模式,订阅哈希环修改事件便可以解决此问题。
    至此我们的spring websocket集群已经搭建的差不多了,最重要的地方还是一致性哈希算法。现在有最后一个技术瓶颈,网关如何根据ws请求转发到指定的集群服务器上?答案在负载均衡。spring cloud gateway或zuul都默认集成了ribbon作为负载均衡,我们只需要根据建立ws请求时客户端发来的user id,重写ribbon负载均衡算法,根据user id进行hash,并在哈希环上寻找ip,并将ws请求转发到该ip便完事了。流程如下图所示:

    clipboard.png

    接下来用户沟通的时候,只需要根据id进行hash,在哈希环上获取对应ip,便可以知道与该用户建立ws连接时的session存在哪台服务器上了!

    spring cloud Finchley.RELEASE 版本中ribbon未完善的地方

    题主在实际操作的时候发现了ribbon两个不完善的地方......

    1. 根据网上找的方法,继承AbstractLoadBalancerRule重写负载均衡策略之后,多个不同应用的请求变得混乱。假如eureka上有两个service A和B,重写负载均衡策略之后,请求A或B的服务,最终只会映射到其中一个服务上。非常奇怪!可能spring cloud gateway官网需要给出一个正确的重写负载均衡策略的demo。
    2. 一致性哈希算法需要一个key,类似user id,根据key进行hash之后在哈希环上搜索并返回ip。但是ribbon没有完善choose函数的key参数,直接写死了default

    clipboard.png

    难道这样子我们就没有办法了吗?其实还有一个可行并且暂时可替代的办法!
    如下图所示,客户端发送一个普通的http请求(包含id参数)给网关,网关根据id进行hash,在哈希环中寻找ip地址,将ip地址返回给客户端,客户端再根据该ip地址进行ws请求。

    clipboard.png

    由于ribbon未完善key的处理,我们暂时无法在ribbon上实现一致性哈希算法。只能间接地通过客户端发起两次请求(一次http,一次ws)的方式来实现一致性哈希。希望不久之后ribbon能更新这个缺陷!让我们的websocket集群实现得更优雅一点。

    后记

    以上便是我这几天探索的结果。期间遇到了许多问题,并逐一解决难题,列出两个websocket集群解决方案。第一个是session广播,第二个是一致性哈希。这两种方案针对不同场景各有优缺点,本文并未用到ActiveMQ,Karfa等消息队列实现消息推送,只是想通过自己的想法,不依靠消息队列来简单地实现多用户之间的长连接通讯。希望能为大家提供一条不同于寻常的思路。

    展开全文
  • 期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。以下是我的场景描述...

    问题起因

    最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。

    期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。

    以下是我的场景描述

    资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)

    应用发布限制条件:由于场景需要,应用场所需要ssl认证的域名才能发布。因此ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)连接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/xxx),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用

    需求:用户登录应用,需要与服务器建立wss连接,不同角色之间可以单发消息,也可以群发消息

    集群中的应用服务类型:每个集群实例都负责http无状态请求服务与ws长连接服务

    系统架构图

    在我的实现里,每个应用服务器都负责http and ws请求,其实也可以将ws请求建立的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws请求的方式更为方便。下文会有解释

    本文涉及的技术栈

    Eureka 服务发现与注册

    Redis Session共享

    Redis 消息订阅

    Spring Boot

    Zuul 网关

    Spring Cloud Gateway 网关

    Spring WebSocket 处理长连接

    Ribbon 负载均衡

    Netty 多协议NIO网络通信框架

    Consistent Hash 一致性哈希算法

    相信能走到这一步的人都了解过我上面列举的技术栈了,如果还没有,可以先去网上找找入门教程了解一下。下面的内容都与上述技术相关,题主默认大家都了解过了...

    这里是描述一致性Hash算法最易懂的文章传送门

    技术可行性分析

    下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案

    WebSocketSession与HttpSession

    在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信:

    protected void handleTextMessage(WebSocketSession session, TextMessage message) {

    System.out.println("服务器接收到的消息: "+ message );

    //send message to client

    session.sendMessage(new TextMessage("message"));

    }

    那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的。

    有的人可能会想:我可不可以将sessin关键信息缓存到redis,集群中的服务器从redis拿取session关键信息然后重新构建websocket session...我只想说这种方法如果有人能试出来,请告诉我一声...

    以上便是websocket session与http session共享的区别,总的来说就是http session共享已经有解决方案了,而且很简单,只要引入相关依赖:spring-session-data-redis和spring-boot-starter-redis,大家可以从网上找个demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底层实现的方式,我们无法做到真正的websocket session共享。

    解决方案的演变

    Netty与Spring WebSocket

    刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并没有websocket session这样的概念,与其类似的是channel,每一个客户端连接都代表一个channel。前端的ws请求通过netty监听的端口,走websocket协议进行ws握手连接之后,通过一些列的handler(责链模式)进行消息处理。与websocket session类似地,服务端在连接建立后有一个channel,我们可以通过channel进行与客户端的通信

    /**

    * TODO 根据服务器传进来的id,分配到不同的group

    */

    private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

    //retain增加引用计数,防止接下来的调用引用失效

    System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text());

    //将消息发送给group里面的所有channel,也就是发送消息给客户端

    GROUP.writeAndFlush(msg.retain());

    }

    那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现方式的优缺点

    使用netty实现websocket

    玩过netty的人都知道netty是的线程模型是nio模型,并发量非常高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采用了netty。如果我们单独使用netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:

    1.与系统的其他应用集成不方便,在rpc调用的时候,无法享受springcloud里feign服务调用的便利性

    2.业务逻辑可能要重复实现

    3.使用netty可能需要重复造轮子

    4.怎么连接上服务注册中心,也是一件麻烦的事情

    5.restful服务与ws服务需要分开实现,如果在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发相信很多人都习惯了。

    使用spring websocket实现ws服务

    spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务非常方便,做法非常简单

    第一步:添加依赖

    org.springframework.boot

    spring-boot-starter-websocket

    第二步:添加配置类

    @Configuration

    public class WebSocketConfig implements WebSocketConfigurer {

    @Override

    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

    registry.addHandler(myHandler(), "/")

    .setAllowedOrigins("*");

    }

    @Bean

    public WebSocketHandler myHandler() {

    return new MessageHandler();

    }

    }

    第三步:实现消息监听类

    @Component

    @SuppressWarnings("unchecked")

    public class MessageHandler extends TextWebSocketHandler {

    private List clients = new ArrayList<>();

    @Override

    public void afterConnectionEstablished(WebSocketSession session) {

    clients.add(session);

    System.out.println("uri :" + session.getUri());

    System.out.println("连接建立: " + session.getId());

    System.out.println("current seesion: " + clients.size());

    }

    @Override

    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {

    clients.remove(session);

    System.out.println("断开连接: " + session.getId());

    }

    @Override

    protected void handleTextMessage(WebSocketSession session, TextMessage message) {

    String payload = message.getPayload();

    Map map = JSONObject.parseObject(payload, HashMap.class);

    System.out.println("接受到的数据" + map);

    clients.forEach(s -> {

    try {

    System.out.println("发送消息给: " + session.getId());

    s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));

    } catch (Exception e) {

    e.printStackTrace();

    }

    });

    }

    }

    从这个demo中,使用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采用了spring websocket实现ws服务。

    因此我的应用服务架构是这样子的:一个应用既负责restful服务,也负责ws服务。没有将ws服务模块拆分是因为拆分出去要使用feign来进行服务调用。第一本人比较懒惰,第二拆分与不拆分相差在多了一层服务间的io调用,所以就没有这么做了。

    从zuul技术转型到spring cloud gateway

    要实现websocket集群,我们必不可免地得从zuul转型到spring cloud gateway。原因如下:

    zuul1.0版本不支持websocket转发,zuul 2.0开始支持websocket,zuul2.0几个月前开源了,但是2.0版本没有被spring boot集成,而且文档不健全。因此转型是必须的,同时转型也很容易实现。

    在gateway中,为了实现ssl认证和动态路由负载均衡,yml文件中以下的某些配置是必须的,在这里提前避免大家采坑

    server:

    port: 443

    ssl:

    enabled: true

    key-store: classpath:xxx.jks

    key-store-password: xxxx

    key-store-type: JKS

    key-alias: alias

    spring:

    application:

    name: api-gateway

    cloud:

    gateway:

    httpclient:

    ssl:

    handshake-timeout-millis: 10000

    close-notify-flush-timeout-millis: 3000

    close-notify-read-timeout-millis: 0

    useInsecureTrustManager: true

    discovery:

    locator:

    enabled: true

    lower-case-service-id: true

    routes:

    - id: dc

    uri: lb://dc

    predicates:

    - Path=/dc/**

    - id: wecheck

    uri: lb://wecheck

    predicates:

    - Path=/wecheck/**

    如果要愉快地玩https卸载,我们还需要配置一个filter,否则请求网关时会出现错误not an SSL/TLS record

    @Component

    public class HttpsToHttpFilter implements GlobalFilter, Ordered {

    private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;

    @Override

    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    URI originalUri = exchange.getRequest().getURI();

    ServerHttpRequest request = exchange.getRequest();

    ServerHttpRequest.Builder mutate = request.mutate();

    String forwardedUri = request.getURI().toString();

    if (forwardedUri != null && forwardedUri.startsWith("https")) {

    try {

    URI mutatedUri = new URI("http",

    originalUri.getUserInfo(),

    originalUri.getHost(),

    originalUri.getPort(),

    originalUri.getPath(),

    originalUri.getQuery(),

    originalUri.getFragment());

    mutate.uri(mutatedUri);

    } catch (Exception e) {

    throw new IllegalStateException(e.getMessage(), e);

    }

    }

    ServerHttpRequest build = mutate.build();

    ServerWebExchange webExchange = exchange.mutate().request(build).build();

    return chain.filter(webExchange);

    }

    @Override

    public int getOrder() {

    return HTTPS_TO_HTTP_FILTER_ORDER;

    }

    }

    这样子我们就可以使用gateway来卸载https请求了,到目前为止,我们的基本框架已经搭建完毕,网关既可以转发https请求,也可以转发wss请求。接下来就是用户多对多之间session互通的通讯解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。

    session广播

    这是最简单的websocket集群通讯解决方案。场景如下:

    教师A想要群发消息给他的学生们

    教师的消息请求发给网关,内容包含{我是教师A,我想把xxx消息发送我的学生们}

    网关接收到消息,获取集群所有ip地址,逐个调用教师的请求

    集群中的每台服务器获取请求,根据教师A的信息查找本地有没有与学生关联的session,有则调用sendMessage方法,没有则忽略请求

    session广播实现很简单,但是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,相当于浪费了一次循环遍历的计算力,该方案在并发需求不高的情况下可以优先考虑,实现很容易。

    spring cloud中获取服务集群中每台服务器信息的方法如下

    @Resource

    private EurekaClient eurekaClient;

    Application app = eurekaClient.getApplication("service-name");

    //instanceInfo包括了一台服务器ip,port等消息

    InstanceInfo instanceInfo = app.getInstances().get(0);

    System.out.println("ip address: " + instanceInfo.getIPAddr());

    服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系

    一致性哈希算法实现(本文的要点)

    这种方法是本人认为最优雅的实现方案,理解这种方案需要一定的时间,如果你耐心看下去,相信你一定会有所收获。再强调一次,不了解一致性哈希算法的同学请先看这里,现先假设哈希环是顺时针查找的。

    首先,想要将一致性哈希算法的思想应用到我们的websocket集群,我们需要解决以下新问题:

    集群节点DOWN,会影响到哈希环映射到状态是DOWN的节点。

    集群节点UP,会影响到旧key映射不到对应的节点。

    哈希环读写共享。

    在集群中,总会出现服务UP/DOWN的问题。

    针对节点DOWN的问题分析如下:

    一个服务器DOWN的时候,其拥有的websocket session会自动关闭连接,并且前端会收到通知。此时会影响到哈希环的映射错误。我们只需要当监听到服务器DOWN的时候,删除哈希环上面对应的实际结点和虚结点,避免让网关转发到状态是DOWN的服务器上。

    实现方法:在eureka治理中心监听集群服务DOWN事件,并及时更新哈希环。

    针对节点UP的问题分析如下:

    现假设集群中有服务CacheB上线了,该服务器的ip地址刚好被映射到key1和cacheA之间。那么key1对应的用户每次要发消息时都跑去CacheB发送消息,结果明显是发送不了消息,因为CacheB没有key1对应的session。

    此时我们有两种解决方案。

    方案A简单,动作大:

    eureka监听到节点UP事件之后,根据现有集群信息,更新哈希环。并且断开所有session连接,让客户端重新连接,此时客户端会连接到更新后的哈希环节点,以此避免消息无法送达的情况。

    方案B复杂,动作小:

    我们先看看没有虚拟节点的情况,假设CacheC和CacheA之间上线了服务器CacheB。所有映射在CacheC到CacheB的用户发消息时都会去CacheB里面找session发消息。也就是说CacheB一但上线,便会影响到CacheC到CacheB之间的用户发送消息。所以我们只需要将CacheA断开CacheC到CacheB的用户所对应的session,让客户端重连。

    接下来是有虚拟节点的情况,假设浅色的节点是虚拟节点。我们用长括号来代表某段区域映射的结果属于某个Cache。首先是C节点未上线的情况。图大家应该都懂吧,所有B的虚拟节点都会指向真实的B节点,所以所有B节点逆时针那一部分都会映射到B(因为我们规定哈希环顺时针查找)。

    接下来是C节点上线的情况,可以看到某些区域被C占领了。

    由以上情况我们可以知道:节点上线,会有许多对应虚拟节点也同时上线,因此我们需要将多段范围key对应的session断开连接(上图红色的部分)。具体算法有点复杂,实现的方式因人而异,大家可以尝试一下自己实现算法。

    哈希环应该放在哪里?

    gateway本地创建并维护哈希环。当ws请求进来的时候,本地获取哈希环并获取映射服务器信息,转发ws请求。这种方法看上去不错,但实际上是不太可取的,回想一下上面服务器DOWN的时候只能通过eureka监听,那么eureka监听到DOWN事件之后,需要通过io来通知gateway删除对应节点吗?显然太麻烦了,将eureka的职责分散到gateway,不建议这么做。

    eureka创建,并放到redis共享读写。这个方案可行,当eureka监听到服务DOWN的时候,修改哈希环并推送到redis上。为了请求响应时间尽量地短,我们不可以让gateway每次转发ws请求的时候都去redis取一次哈希环。哈希环修改的概率的确很低,gateway只需要应用redis的消息订阅模式,订阅哈希环修改事件便可以解决此问题。

    至此我们的spring websocket集群已经搭建的差不多了,最重要的地方还是一致性哈希算法。现在有最后一个技术瓶颈,网关如何根据ws请求转发到指定的集群服务器上?答案在负载均衡。spring cloud gateway或zuul都默认集成了ribbon作为负载均衡,我们只需要根据建立ws请求时客户端发来的user id,重写ribbon负载均衡算法,根据user id进行hash,并在哈希环上寻找ip,并将ws请求转发到该ip便完事了。流程如下图所示:

    接下来用户沟通的时候,只需要根据id进行hash,在哈希环上获取对应ip,便可以知道与该用户建立ws连接时的session存在哪台服务器上了!

    spring cloud Finchley.RELEASE 版本中ribbon未完善的地方

    题主在实际操作的时候发现了ribbon两个不完善的地方......

    根据网上找的方法,继承AbstractLoadBalancerRule重写负载均衡策略之后,多个不同应用的请求变得混乱。假如eureka上有两个service A和B,重写负载均衡策略之后,请求A或B的服务,最终只会映射到其中一个服务上。非常奇怪!可能spring cloud gateway官网需要给出一个正确的重写负载均衡策略的demo。

    一致性哈希算法需要一个key,类似user id,根据key进行hash之后在哈希环上搜索并返回ip。但是ribbon没有完善choose函数的key参数,直接写死了default!

    难道这样子我们就没有办法了吗?其实还有一个可行并且暂时可替代的办法!

    如下图所示,客户端发送一个普通的http请求(包含id参数)给网关,网关根据id进行hash,在哈希环中寻找ip地址,将ip地址返回给客户端,客户端再根据该ip地址进行ws请求。

    由于ribbon未完善key的处理,我们暂时无法在ribbon上实现一致性哈希算法。只能间接地通过客户端发起两次请求(一次http,一次ws)的方式来实现一致性哈希。希望不久之后ribbon能更新这个缺陷!让我们的websocket集群实现得更优雅一点。

    后记

    以上便是我这几天探索的结果。期间遇到了许多问题,并逐一解决难题,列出两个websocket集群解决方案。第一个是session广播,第二个是一致性哈希。这两种方案针对不同场景各有优缺点,本文并未用到ActiveMQ,Karfa等消息队列实现消息推送,只是想通过自己的想法,不依靠消息队列来简单地实现多用户之间的长连接通讯。希望能为大家提供一条不同于寻常的思路。

    展开全文
  • 分布式WebSocket-下篇

    2021-02-01 19:17:26
    分布式WebSocket落地-生产验证 上篇文章主要讲述了单点Socket,以及它的使用场景。本章主要会从多个维度来探讨单机Socket存在问题以及解决方案。 上篇文章从功能层面实现了双向传输,但是带来了难受问题如下: 我们...
  • 期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。 以下是我的场景描述 ...
  • 基于及其具有高级抽象和HTTP API的分布式WebSocket服务器。 API文档 如何使用 服务器 更改/config文件夹中的/config ,然后部署此应用程序。 客户 客户端可以使用socket.io client与服务器进行通信。 const Socket...
  • 分布式WebSocket实现(通过RabbitMQ) 1、实现思路 1、WebSocket接收用户或者接口传过来的数据时,统一发送到RabbitMQ 2、每个服务器监听RabbitMQ数据并获取数据,通过判断数据中persons是否为空来判断是单发还是...
  • #去中心化的分布式websocket通信架构演进 概念: cluster:代表一个集群,集群中有多个节点,集群是去中心化的,字面上理解就是无中心节点,与任何节点连接websocket来通信都是等价的 node: 代表一个节点,是集群...
  • 深入浅出Websocket(二)分布式Websocket集群 深入浅出Websocket(三)分频道的Websocket(分析socket.io源码以及ws-wrapper) 正文 这个是我在造的玩具的一个简单架构图。将实时通信部分给抽离出来作...
  • 基于RocketMQ实现分布式WebSocket通信

    千次阅读 2020-04-04 10:22:58
    分布式WebSocket方案 实现分布式消息发送 导入RocketMQ相关依赖 添加RocketMQ配置 修改MessageHandler类 序列化MessageID 执行启动类 实现分布式消息接收 修改MessageHandler类 启动测试,发送消息 ...
  • 文章目录1. pom2. Yml3. MQ配置类4. Websocket配置类5. 服务端6. 控制器,发送消息7. 消息实体类8. 生产者9. 消费者10....本片以使用为主,概念请移动百度 ...分布式WebSocket一般可以通过以下两种方案来实现: 将消息...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 566
精华内容 226
关键字:

分布式websocket