精华内容
下载资源
问答
  • Websocket Session 共享解决(方案一) 既然Websocket Session不能直接序列化然后存储,而且如果用户请求的时候,开始与集群中的A节点创建链接,就算把这个Session 拿到B节点去再给用户Push消息,应该也是不通(没...

    Websocket Session 共享解决(方案一)

    既然Websocket Session 不能直接序列化然后存储,而且如果用户请求的时候,开始与集群中的A 节点创建链接,就算把这个  Session  拿到B 节点去再给用户Push 消息,应该也是不通(没测试)的。

    所以我的解决方案采用订阅式队列去触发  Websocket  反推的方式来实现看上去是  Session共享  的,思路如下图:

    Websocket Session  共享 思路图解释:

    模块一:这里没有什么好说明的,因为每一个用户只创建以一个  Session  ,当用户再次链接到第二个节点的时候,第一个节点的  Session  就会关闭和销毁。而每个节点采用静态(static )的成员变量去存储(Map)当前节点的所有Websocket Session 。

    模块二:因为每个节点存储的是每个节点的所有Websocket Session ,那么只需要判断下当前节点是否存在当前用户的Websocket Session ,存在则反推消息。

    模块三:当有消息需要通过  Websocket  推送给对应的用户的时候,而又不知道这个用户在哪个节点,那么通过任务调度模块,把消息以订阅MQ 的方式放到  MQ  中,这样每个节点都可以收到消息,从而参照 “模块二”判断当前用户是否存在当前节点。然后推送。

     

    Websocket Session 共享解决(方案二)

    其实我们之所以要做共享,其实就是因为在集群环境中,在HA/Nginx等方案中,做轮询请求来减轻节点压力,这样导致用户可能分配在多个节点,那么如果我们采用IP等的Hash值规则,或者其他可以参考的规则。解决一个用户只会分配到一个节点上,那么就不存在Session共享了。这样的中间件好多。如我们的“伞神”提出的 采用阿里的 Tengine 的session sticky 组件” 就可以解决。其他的也很多。

    展开全文
  • Websocket session closing

    2020-12-06 01:54:34
    <code>java.lang.IllegalStateException: Message will not be sent because the WebSocket session has been closed</code></p> <p>Here is the complete stack trace: <pre><code> 2020-08-29 15:54:25.257 DEBUG ...
  • Error in websocket session

    2020-12-09 02:50:54
    2019-02-28 09:33:58.211 ERROR 17084 --- [nio-8090-exec-1] graphql.servlet.GraphQLWebsocketServlet : Error in websocket session: 4f8 java.io.EOFException: null at org.apache.tomcat.util.net....
  • 分布式webSocket session无法共享问题

    分布式webSocket session无法共享问题

    问题:
    最近碰到一个麻烦的事情,就是在使用webSocket推送消息时候,发现session存放在各个节点,各节点之间无法获取session。而且经过研究发现:WebSocket与http协议一样都是基于TCP的,所以他们都是可靠的协议,调用的WebSocket的send函数在实现中最终都是通过TCP的系统接口进行传输的。WebSocket和Http协议一样都属于应用层的协议,WebSocket在建立握手连接时,数据是通过http协议传输的,但是在建立连接之后,真正的数据传输阶段是不需要参与的
    分析
    也就是说目前问题:1、wehsocketSession无法序列化,即无法存放在redis里面达到共享
    2、就算能在别的节点创建session对象,也无法推送消息,因为webSocket是基于tcp的协议,http,https在传输数据上是不会参与的,也就是说与哪个节点建立的连接必须那个节点推送消息
    解决思路
    1 使用订阅发布服务工具,各个节点即为订阅方也为发布方,一个节点发布,其余节点收到消息,检测自己是否存在改session,存在通过收到的消息推送数据。
    2记录session关键信息(节点ip,端口,用户数据)到缓存里面(推荐redis),通过查找session关键信息,定位到哪个节点,通过http请求改节点,达到推送效果(涉及http请求,可能响应回慢)

    ps:本来想使用方法一,通过redis订阅发布服务实现消息推送,但是项目节点很多,如果一个节点发布,其余都要查找,担心增加服务器压力,所以本人选择方法二,但是方法一我会在最下面做简单介绍
    websocket服务部署:
    redsi 存放关键信息,ip,端口

    @ServerEndpoint("/signWebsocket/{mobile}")
    @Component
    public class SignWebSocket {
    
        private static final Logger LOG = LoggerFactory.getLogger(SignWebSocket.class);
    
        private static SignWebSocket instance = new SignWebSocket();
    
        //保存客户端发起的唯一标识与长连接
        private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
    
        public static final String KEY_SOCKET_SESSION = "socket_session_";
    
        private static int KEY_SOCKET_TIME = 3000;
    
        private RedisCache redisCache;
    
        public SignWebSocket() {
            ApplicationContext act = ApplicationContextRegister.getApplicationContext();
            this.redisCache = act.getBean(RedisCache.class);
        }
    
        public static ConcurrentHashMap<String, Session> getSessionMap() {
            return sessionMap;
        }
    
        public static SignWebSocket getInstance() {
            return instance;
        }
    
        /**
         * 连接建立成功调用的方法
         *
         * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        @OnOpen
        public void onOpen(@PathParam("mobile") String mobile, Session session) {
    
            String path= AllConstant.getInstance().getLocalAddr()+":"+AllConstant.getInstance().getLocalPort();
            try {
                if (sessionMap.containsKey(mobile)) {
                    Session session_old = sessionMap.get(mobile);
                    session_old.close();
                    sessionMap.remove(mobile);
                }
            } catch (IOException e) {
                LOG.error(e.getMessage(),e);
            }
            sessionMap.put(mobile, session);
            this.redisCache.set(KEY_SOCKET_SESSION + mobile, path, 3000);
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose(@PathParam("mobile") String mobile, Session session) {
            try {
                if (sessionMap.containsKey(mobile)) {
            //        System.out.println("调用onClose:"+mobile);
                    session = sessionMap.get(mobile);
                    session.close();
                    sessionMap.remove(mobile);
                }
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
    
        /**
         * 收到客户端消息后调用的方法
         *
         * @param message 客户端发送过来的消息
         * @param session 可选的参数
         */
        @OnMessage
        public void onMessage(@PathParam("mobile") String mobile, String message, Session session) {
            try {
            } catch (Exception e) {
                LOG.error(e.getMessage(),e);
            }
        }
    
        /**
         * 发生错误时调用
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(@PathParam("mobile") String mobile, Session session, Throwable error) {
            try {
                if (sessionMap.containsKey(mobile)) {
                    sessionMap.remove(mobile);
                }
                session.close();
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
    
        /**
         * mobile ,向对应的客户端推送消息
         */
        public void pushMessage(String mobile, int coin, int flow) {
            try {
                Session session;
                System.out.println("sessionMap ========={}"+sessionMap);
                if (sessionMap.containsKey(mobile)) {
                    JSONObject user = new JSONObject();
                    user.put("coin", coin);
                    user.put("flow", flow);
                    session = sessionMap.get(mobile);
                    if (null != session) {
                        session.getBasicRemote().sendText(user.toString());
                    }
                }
    
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
    }
    

    http监听请求,推送消息

        @RequestMapping( value = {"/socket"},method = {RequestMethod.GET})
        @ResponseBody
        public JSONObject socket(String phone,Integer coin,Integer flow , HttpServletRequest request){
    
            JSONObject obj =new JSONObject();
            logger.error("开始推送");
            System.out.println("socket phone:"+phone+" coin:"+coin+" flow:"+flow);
            try{
                SignWebSocket.getInstance().pushMessage(phone,coin,flow);
            }catch (Exception e){
                e.printStackTrace();
            }
            obj.put("phone",phone);
            obj.put("coin",coin);
            obj.put("flow",flow);
            return obj;
        }
    

    方法二:
    使用redis订阅发布服务,每个节点都是发布方,其他节点都是订阅方,当接收到发布的消息时,检测webSocket sessionMap里面session,推送消息。

    展开全文
  • 上一篇文章中写了使用httpSessionId存储Websocket Session,下面在SpringBoot中在Contrller类获取前端请求的HttpSession,从而获得对应的websocket Session。 Controller类 import org.springframework.beans....

    上一篇文章中写了使用httpSessionId存储Websocket Session,下面在SpringBoot中在Contrller类获取前端请求的HttpSession,从而获得对应的websocket Session。

    Controller类
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import javax.servlet.http.HttpSession;
    
    
    @Controller
    @RequestMapping("/api/v0")
    public class MenuController {
    
        @ResponseBody
        @PostMapping("/test")
        public ResponseEntity<Object> test(){
    
                ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
                if(requestAttributes != null){
                    HttpServletResponse response = requestAttributes.getResponse();
                    HttpServletRequest request = requestAttributes.getRequest();
                    HttpSession httpSession = request.getSession();
                    System.out.println("SessionId:"+httpSession.getId());
                }
            return new ResponseEntity<>(null, HttpStatus.OK);
        }
    
    }
    

    在使用VUE+Springboot开发过程中,出现了一个SessionId不同的问题,导致获取到的sessionId无法找到对应的websocket Session。

    这时候需要前后端进行一些配置。

    前端封装axios文件中,对axios进行配置
    axios.js
    
    axios.defaults.withCredentials = true;//allowCredentials配置为true表示携带cookies信息,此时同一页面的httpsession不会改变
    后端Springboot跨域配置
    
    Webconfiguration.java
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.HttpRequest;
    import org.springframework.web.servlet.config.annotation.CorsRegistry;
    import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
    import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
    import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
    
    import javax.servlet.ServletRequest;
    
    @Configuration
    public class WebConfiguration implements WebMvcConfigurer {
    
        @Override
        public void addCorsMappings(CorsRegistry registry){
            registry.addMapping("/**")
                    .allowedOrigins("*")
                    .allowedMethods("GET","HEAD","POST","PUT","DELETE","OPTIONS")
                    .allowCredentials(true)//与前端保持一致
                    .maxAge(3600);
        }
    }
    response设置
    
    Filter.java
    
    import org.springframework.stereotype.Component;
    import org.springframework.web.filter.OncePerRequestFilter;
    
    import javax.servlet.FilterChain;
    import javax.servlet.ServletException;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    
    
    @Component
    public class Filter extends OncePerRequestFilter {
        @Override
        protected void doFilterInternal(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,
                                        FilterChain filterChain) throws ServletException, IOException {
            httpServletResponse.addHeader("X-Frame-Options", "DENY");
            httpServletResponse.addHeader("Cache-Control", "no-cache, no-store, must-revalidate, max-age=0");
            httpServletResponse.addHeader("Cache-Control", "no-cache='set-cookie'");
            httpServletResponse.addHeader("Pragma", "no-cache");
            httpServletResponse.setHeader("Access-Control-Allow-Origin",httpServletRequest.getHeader("origin"));//allowCredentials配置为true时,不能配置为“*”
            httpServletResponse.setHeader("Access-Control-Allow-Credentials","true");
            httpServletResponse.setHeader("Access-Control-Allow-Methods","OPTIONS,GET,PUT,POST,DELETE");
            httpServletResponse.setHeader("Access-Control-Allow-Max-Age","3600");
            filterChain.doFilter(httpServletRequest, httpServletResponse);
    
        }
    }

     

    展开全文
  • <p>when a Websocket Session is closed, but instead of closing the WebSocketClient, a new websocket is opened, the annotated Websocket Handler is kept in the memory. <p>Please find the attached java ...
  • 关于WebSocketSession传输JSON

    千次阅读 2019-05-31 17:38:10
    由于约定需要向前端传输JSON,但是之前调用的一直是WebSocketSession自带的sendMessage()方法。方法签名如下: void sendMessage(WebSocketMessage<?> var1) throws IOException; 所以在sendMessage()中是...

        由于约定需要向前端传输JSON,但是之前调用的一直是WebSocketSession自带的sendMessage()方法。方法签名如下:

    void sendMessage(WebSocketMessage<?> var1) throws IOException;

        所以在sendMessage()中是不能直接发送JSON的,必须是发送实现了WebSocketMessage接口的类型的对象,接口定义如下:

    package org.springframework.web.socket;
    
    public interface WebSocketMessage<T> {
        T getPayload();
    
        int getPayloadLength();
    
        boolean isLast();
    }
    

        TextMessage类继承了抽象类AbstractWebSocketSocketMessage(实现了WebSocketMessage接口)

        

    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.springframework.web.socket;
    
    import java.nio.charset.Charset;
    
    public final class TextMessage extends AbstractWebSocketMessage<String> {
        private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
        private final byte[] bytes;
    
        public TextMessage(CharSequence payload) {
            super(payload.toString(), true);
            this.bytes = null;
        }
    
        public TextMessage(byte[] payload) {
            super(new String(payload, UTF8_CHARSET));
            this.bytes = payload;
        }
    
        public TextMessage(CharSequence payload, boolean isLast) {
            super(payload.toString(), isLast);
            this.bytes = null;
        }
    
        public int getPayloadLength() {
            return this.asBytes().length;
        }
    
        public byte[] asBytes() {
            return this.bytes != null ? this.bytes : ((String)this.getPayload()).getBytes(UTF8_CHARSET);
        }
    
        protected String toStringPayload() {
            String payload = (String)this.getPayload();
            return payload.length() > 10 ? payload.substring(0, 10) + ".." : payload;
        }
    }
    

        所以TextMessage通常只用来传输字符串或者byte[],这时候考虑了对json序列化,然后考虑到效率,觉得不如直接用toString()来的好,所以在有需要传输json的场景则直接创建一个JSONObject 然后转换字符串传输就好。

    展开全文
  • 重构 - WebSocket Session共享

    千次阅读 2018-07-18 15:16:30
    流程是首先前端跟后端应用新建一个连接,并携带当前登录的用户ID,此时WebSocket会创建一个WebsocketSession来唯一绑定该连接,我们会在后端用Map建立用户ID与Session的映射关系: Map&lt;String userI...
  • 流程是首先前端跟后端应用新建一个连接,并携带当前登录的用户ID,此时WebSocket会创建一个WebsocketSession来唯一绑定该连接,我们会在后端用Map建立用户ID与Session的映射关系: Map<String userI...
  • websocket session操作

    2021-01-12 16:29:01
    请问如何在websocket的action中设置session登录状态,文档有提到在open action中可以获取验证session,但是我测试过在index设置好sesion,open中的session一直时undefined,获取的key(cookie)与在浏览器生成的不...
  • <div><p>The Jetty WebSocket Session.suspend() API is unimplemented in Jetty 10.0.x</p><p>该提问来源于开源项目:eclipse/jetty.project</p></div>
  • <div><p>Hi, I was able to find a specific websocket session and send message to the client with following code <pre><code> Iodine::Websocket.each do |ws| if ws.params[:cpid] == cpid ws....
  • <div><p>There are 3 variants of io.micronaut.websocket.WebSocketSession.send(): - send() - sendAsync() - sendSync() <p>I was trying to implement sending messages from the server to a WebSocket client ...
  • websocket服务器在面对并发量很大时压力会很大,而且session储存在Map中,内存压力也会很大。于是考虑分布式。 但是分布式存在websocket session共享问题,于是考虑radis存储session,但是遇到websocket session不...
  • <div><p>Within a WebSocket filter, what API is available to terminate the client connection? <pre><code> .map(|ws: warp::ws::Ws| { ws.on_upgrade(|websocket| async { // ? } </code></pre>该提问来源于...
  • 本文主要是针对分布式场景下的使用websocket的一...很遗憾的是,websocketsession是不支持序列化操作,所以也就不可能存在redis中。那么我们有什么其他的方式解决呢。  我们知道在单节点中我们只需要吧websockets...
  • java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session at org.apache.tomcat.websocket.WsSession.checkState...
  • tomcat 7.0.68 websocket session 共享
  • and various other timeouts rely on WebsocketSession's session_time property to determine when to execute/fire. This property is in turn based on Python's time.time() which does not guarantee ...
  • 大家都知道集群中会有websocketsession不一致的问题,会造成消息推送不成功,用redis发布订阅模式解决websocket session问题其实算不上session共享,因为websocketsession没法序列化存到redis,只能换一种方式实现...
  • t close websocket session of backend when client close it, and when the backend continue to send message to client, the gateway will show error: <pre><code> 2018-03-30 16:41:53.432 ERROR 9255 --- ...
  • 大家都知道集群中会有websocketsession不一致的问题,会造成消息推送不成功,用redis发布订阅模式解决websocket session问题其实算不上session共享,因为websocketsession没法序列化存到redis,只能换一种方式实现...
  • 在使用springboot的websocket功能stomp时,不能直接获取websocketsession,下面提供一个方法。 stomp配置可参考之前文章配置stomp 添加WebSocketHandler public class CustomSubProtocolWebSocketHandler ...
  • 由于websocket session不能序列化,所以不能存储在redis中。故在分布式系统中,可以通过把websocketsession存储在服务器本地map,然后把消息发布到redis指定的频道上,每个服务器节点都订阅该频道,这样的话,...
  • websocket session共享

    千次阅读 2018-09-30 00:20:04
    用户a通过服务器进入房间room,用户b也通过房间进入room,用户之间是通过session来通话的,所以session直接存储在集合中就可以了。 因为session存储在一台服务器的集合中,所以每次发送消息的时候,直接发给房间内的...
  • Currently <code>WebSocketSession.close()</code> can be called concurrently and there is no atomic state change to guard against this. <p>The problem is that calling <code>close()</code> may call ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,630
精华内容 1,052
关键字:

websocketsession