精华内容
下载资源
问答
  • WebSocket的Controller中的onMessage方法中,message的交互经常需要用到路由,类于swoole-framework中的runMvc,在swoft中,是否有直接可用的功能?谢谢。</p><p>该提问来源于开源...
  • 问题描述:使用Tomcat自带的WebSocket实现,实现了我自己服务器端程序,但是启动服务器时,发生这样错误: No payload parameter present on the method [onMessage];查阅官方文档无果。 请教:这里payload...
  • 就在这个时候,房主再次选择答案进行再次onMessage交互。此时问题就出现了,前端发了消息(确定发了),但是后端需要等待很久(20S左右)才能收到。 经反复检查,发现假设:房主点击开始比赛时用是1号线程,他选择...

    业务场景:一个房间N个人,1为房主,房主进入房间之后,点击开始比赛后,发送一条消息至后端,后端拿到消息后给全部人发送第一道题,然后进入sleep20秒的休眠。

    就在这个时候,房主再次选择答案进行再次onMessage交互。此时问题就出现了,前端发了消息(确定发了),但是后端需要等待很久(20S左右)才能收到。

    在这里插入图片描述

    经反复检查,发现假设:房主点击开始比赛时用的是1号线程,他选择答案时,居然也是1号线程,但是此时1号线程处于sleep阶段。所以无法处理消息,导致消息延迟。

    在这里插入图片描述

    为什么会这样?该怎么解决呢?期待大佬的回复~

    展开全文
  • 背景我项目中,有一个需求是监听文件系统变化,然后进行其他操作,一旦在监听器中直接处理业务逻辑,就很容易引发很多bug(其实主要是早期设计不好,同步代码和锁还有莫名其妙东西满天飞,业务逻辑都堆在...

    背景

    我的项目中,有一个需求是监听文件系统的变化,然后进行其他的操作,一旦在监听器中直接处理业务逻辑,就很容易引发很多bug(其实主要是早期设计不好,同步代码和锁还有莫名其妙的东西满天飞,业务逻辑都堆在一起了),比较难以修改和使用,最终我决定采用反应器模式来重写这部分逻辑。

    项目主要用到了JNotify和lombok这两个东西,前者是监听文件变化的,后者就不多说了,我想应该都知道。

    概述

    什么是反应器模式呢?反应器基本上有这样几个部分:

    1. 分发器:分发一个事件到处理器中进行处理,一般来说是一个线程,他可以对事件的类型之类的进行判断,然后分配给合适的处理器,在典型的reactor模式中,这都是在同一个线程内进行的,分发器的主体应该含有一个事件循环,读取容器中存在的事件然后分发和处理他们。
    2. 监听器或连接器或者类似的东西:收集事件,然后将他们放入分发器的容器中,稍后这些事件就会被分发器发现并且分发到处理器中。
    3. 处理器:具体进行某种具体的业务处理。

    那么对于我的项目来说,首先应该收集文件事件到队列中,然后在分发器出队事件放入处理器,整个过程相比直接在listener中处理具体逻辑的做法,这是更加有序而可控的,对于这样一个业务来说,稳定性十分重要。

    大致结构

    我使用了一个单独的Thread作为分发器的线程,内部的run方法放置事件循环,然后同时继承listener,监听事件,看起来就像是这样:

    @Component
    public class ReactorDispatcher implements JNotifyListener,Runnable {
    
       private CycleBarrier monitor = new CycleBarrier(2);
    
       private Deque<FSEvent> queue = new ArrayDeque<>(); 
    
       private int watcherId;
    
       private Thread thread;
    
       private boolean state = true;
    
       // 这里使用了lombok
       // data注解会自动为下面的字段添加Getter和Setter
       // allArgsConstructor会给这个类添加一个默认的含有全部字段的构造方法
       @Data
       @AllArgsConstructor
       private class FSEvent {
          private String path;
          private String name;
          private int type;
       }
    
      // 初始化方法,spring完成装配后会首先执行他。
       @PostConstruct
       public void initialize() {
          try {
            // 使用JNotify开始监听文件系统
            watcherId = JNotify.addWatcher(new File("files").getAbsolutePath(),
                  JNotify.FILE_CREATED|JNotify.FILE_DELETED);
            // 创建线程,其实是不是也可以作为守护线程处理呢,
            // 不过守护线程的特点我不熟悉,就还是用来普通的方法。
            thread = new Thread(this);
            thread.setName("reactor - fs listener");
            thread.start();
          }catch(Exception e) {
          }
       }
    
      // 销毁方法,清理用。
       @PreDestory
       public void destory() {
         try {
           JNotify.removeWatcher(watcherId);
           state = false;
         } catch(Exception e) {
         }
       }
    
       /**
        * 这几种方法是监听器的方法,他会在文件发生变化的时候被回调。
        * 通过这些方法可以将监听到的内容变为事件对象,然后存入队列。
        */
       public void fileCreated(int type,String path,String name) {
          FSEvent event = new FSEvent(JNotify.FILE_CREATED,path,name);
          this.emit(event);
       }
    
       /**
        * 这几种方法是监听器的方法,他会在文件发生变化的时候被回调。
        * 通过这些方法可以将监听到的内容变为事件对象,然后存入队列。
        */
       public void fileRemoved(int type,String path,String name) {
         FSEvent event = new FSEvent(JNofiy.FILE_DELETED,path,name);
         this.emit(event);
       }
    
      // 省略不必要的监听方法,其实JNotify还有两个监听方法,这里就不在列举了。
      
      // 业务处理
      private void resolveEvent(FSEvent event) {
        // 在这里进行业务处理,ReactorDispatcher还会进行批量注入,将实现某个接口的实例作为list
        // 注入进来(但是这段代码里面我没写,因为这个是需要根据实际情况设计的)即通过接
        // 口注入一组业务逻辑的处理器
        // 然后在这里根据type进行分发,虽然标准的reactor是单线程的,但是我的业务处理很多
        // 时候是需要消耗相当的时间,所以我在这里用了线程池,所有的event都是异步处置,根据不同的
        // 场景,时间消耗较低的可以直接单线程。当然以nio为基础的话,这里完全可以单线程的。
        // 不过nio处理相对复杂一点,容易出现回调地狱这种问题,因此在没有好的处理方式之前,
        // 还不如直接线程池异步处理,只要小心锁的问题就是了。
      }
    
      public void emit(FSEvent event) {
        if(queue.isEmpty()) {
           quque.addFirst(event);
           try {
             monitor.await();
           } catch(Exception e) {
           }
        } else {
          quque.addFirst(event);
        }
      }
    
      public void run() {
        while(state) {
          try{
            // 阻塞线程,直到Event出现。
            monitor.await();
            if(queue.isEmpty()) {
              continue;
            }
            // removeLast 在没有下一个的时候会抛出空指针异常,需要catch一下,不然线程有
            // 异常后会直接退出,连报错都没有。之前遇到线程莫名自动退出的问题,就是因为run
            // 里面出现了异常。
            while(!quque.isEmpty()) {
              FSEvent event = queue.removeLast();
              this.resolveEvent(event);
            }
            mointor.reset();
          }catch(Exception e) {
          }
        }
      } 
     
    }

    可以看到,相比直接在listener进行处理,收集事件然后统一进行分配,这样更加容易控制,而且详细的逻辑可以不用和这个监听和分发的结构混在一起,整体结构更加清晰明确。

    当然和web的reactor模式相比,我编写的有比较明显的差别,这种差别还是来自具体的应用场景,主要的思路还是差不多的。

    77804c2ce3c63f139aabaccdc74f69ed.png

    大概的思路就像上图这样。


    如果分发器线程没有事件,一直在循环,什么都不做,是会消耗cpu资源的,所以增加了一个CycleBarrier,这个东西的作用,是在await的次数达到指定次数后,会释放锁,让线程继续执行,这里设置为2,当分发器线程没有event的时候,会进行一个await,这个时候分发器被锁住,线程阻塞,一直到出现event,在event出现后,CycleBarrier的await达到2,锁被释放,分发器可以继续执行。

    其实反应器模式应该就是在无事件的时候阻塞的,之前我没有想好该怎么处理,后来读某个源码的时候看到了CountDownLatch和CycleBarrier,就突然想到可以用在这里。

    2020 - 11 - 18 修改。

    展开全文
  • 有问题写法 @ClientEndpoint @Component public class SmkCenterConsumer { @Autowired private SmkCenterDataRepository repository; @OnMessage public void onMessage(String message) { ...

    有问题的写法

    @ClientEndpoint
    @Component
    public class SmkCenterConsumer {
    
        @Autowired
        private SmkCenterDataRepository repository;
        
         @OnMessage
        public void onMessage(String message) {
            repository.save(data);
        }
    }
    

    报错:

    [ ERROR] [2020-03-11 11:41:36] org.apache.tomcat.websocket.pojo.PojoEndpointBase [175] - No error handling configured for [SmkCenterConsumer] and the following error occurred
    java.lang.NullPointerException: null
    	at SmkCenterConsumer.onMessage(SmkCenterConsumer.java:45)
    

    正确的写法

    @ClientEndpoint
    @Component
    public class SmkCenterConsumer {
    	private static SmkCenterDataRepository repository;
    
        @Autowired
        public void setRepository(SmkCenterDataRepository repository) {
            SmkCenterConsumer.repository = repository;
        }
             @OnMessage
        public void onMessage(String message) {
            repository.save(data);
        }
    }
    

    背景知识

    在spring boot中引入

    compile('org.springframework.boot:spring-boot-starter-websocket')
    

    实际上涉及到下面三个层次的知识

    java websocket API(JSR-356)

    开发 WebSocket 的Java API 集合,比如javax.websocket.Endpoint

    spring websocket抽象

    实际上spring对websocket进行了一些api的抽象

    官方文档参考:websocket

    比如:org.springframework.web.socket.WebSocketHandler

    javadoc

    A handler for WebSocket messages and lifecycle events.

    spring无非就是把javax.websocket.Endpoint在onMessage的时候将相应的数据进行读取,传递到handleMessage()这个方法中,通过模板方法模式来重载

    spring boot整合websocket

    springboot使用编程方式javax.websocket.server.ServerContainer来部署websocket endpoint

    参考:spring boot中websocket endpoint是如何初始化及启动的

    原因

    通过jstack,查看线程,可以看到,每一个@ClientEndPoint实际上对应一个线程WebSocketClient-AsyncIO-digit(数字)

    org.apache.tomcat.websocket.AsyncChannelGroupUtil

    /**
     * This is a utility class that enables multiple {@link WsWebSocketContainer}
     * instances to share a single {@link AsynchronousChannelGroup} while ensuring
     * that the group is destroyed when no longer required.
     */
    public class AsyncChannelGroupUtil {
        
        // 使用线程池
        ExecutorService executorService = new ThreadPoolExecutor(
                        0,
                        Integer.MAX_VALUE,
                        Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                        new SynchronousQueue<Runnable>(),
                        new AsyncIOThreadFactory());
        
        ......
        
        // 启动线程
        @Override
        public Thread run() {
            Thread t = new Thread(r);
            t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet());
            t.setContextClassLoader(this.getClass().getClassLoader());
            t.setDaemon(true);
            return t;
        }
    }
    

    可以看到,这个线程是tomcat启动的

    在这个tomcat启动的线程中如何使用spring容器提供的@Autowired的单例bean呢?

    如果不是static,这个repository就是null

    在这个线程中也没有办法从spring容器中取到这个bean,所以只能把这个bean设置为static,这样这个单例bean就脱离了spring容器的限制,可以在所有线程中使用了

    另一个思路:实现BeanFactoryAware,这样可以通过注入的BeanFactory拿到这个bean,应该也是可行的

    展开全文
  • webSocket onmessage事件

    2017-01-18 07:41:56
    ws.onmessage = function(evt){ var em = $("#message em"); if(evt.data != 0){ em.css("display","block"); em.text(evt.data); } else { em.css("display","none")...
  • WebSocket是一种在单个TCP连接上进行全双工通信协议。 使得客户端和服务器之间数据交换变得更加简单,允许服务端主动向客户端推送数据。 在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接...

    github项目地址

    1. 什么是WebSocket?

    WebSocket是一种在单个TCP连接上进行全双工通信的协议。
    使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
    在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

    2. 优点(参考维基)

    WebSocket与HTTP对比图

    5fe59256a9092b7dfe1cfa3ddb3bcd9b.png

    客户端例子

    const ws = new WebSocket('ws://localhost:8888');
    
        ws.onopen = () => {
            console.log('WebSocket onopen');
        }
    
        ws.onmessage = e => {
            console.log('WebSocket onmessage');
            console.log('WebSocket message received:', e);
            console.log('WebSocket data received:', e.data);
        }
    
        ws.onclose = e => {
            console.log("WebSocket onclose");
        };
    1. WebSocket.onopen: 连接成功后调用
    2. WebSocket.onmessage: 当接收到服务器消息时调用
    3. WebSocket.onclose: 连接关闭后调用

    服务端例子(koa)

    const Koa = require('koa');
      const WebSocket = require('ws');
    
      const app = new Koa();
      const ws = new WebSocket.Server({port: 8888});
    
      ws.on('connection', ws => {
          console.log('server connection');
    
          ws.on('message', msg => {
            console.log('server receive msg:', msg);
          });
    
          ws.send('Information from the server');
      });
    
      app.listen(3000);

    运行结果

    客户端

    a1e278aa84557839d7f23a8066e0c246.png

    服务端

    8a3ed2073ec64d7938da9cc18f659c51.png

    名词解释

    1. 握手: 一般创建WebSocket链接, 需要通过浏览器发出请求,服务器做出回应, 这个过程称为“握手”

    参考链接

    WebSocket协议:5分钟从入门到精通
    MDN

    展开全文
  • 作者:长东转载自:https://www.cnblogs.com/lingbing/p/6089331.htmlHTML5给Web浏览器带来了全双工TCP连接websocket标准服务器能力。换句话说,浏览器能够与服务器建立连接,通过已建立通信信道来发送和接收...
  • = null && client.WebSocketContext.WebSocket.State == WebSocketState.Open) { // client.WebSocketContext.WebSocket.State client.Send(message); } /// /// 异常 /// public ...
  • Websocket

    2020-07-28 10:24:32
    1、功能:服务器主动向客户端发送消息 2、服务器端:继承AbstractWebSocketHandler, 实现handleTextMessage方法,从客户端端发送... 客户端:注册websocket的onmessage消息,能接收到服务器端发送的消息。 ...
  • Vue和Websocket最佳解决方案 在Vue的实例的选项中定义套接字... // 在Vue文件中定义 sockets options 接收websocket 的onmessage data ( ) { return { name : '' } } sockets : { config ( data ) { console
  • * @Desc 处理WEBSOCKET的请求 */ public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger.getLogger(WebsocketServerHandler....
  • WebSocket 教程 - 阮一峰网络日志​www.ruanyifeng.com鉴于致学先致史原则1.为什么需要WebSocket?初次接触 WebSocket 人,都会问同样问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么...
  • websocket的使用

    2019-06-21 11:32:00
    import java.io.IOException; import java.net.http.WebSocket; import java.util.concurrent.CopyOnWriteArraySet; ...import javax.websocket.OnClose;...import javax.websocket.OnMessage; import java...
  • 多个 socket.send方法不要并行放在一起,会线程冲突
  • WebSocket

    2021-01-22 22:24:53
    二、简介三、客户端简单示例四、客户端 API4.1 WebSocket 构造函数4.2 webSocket.readyState4.3 webSocket.onopen4.4 webSocket.onclose4.5 webSocket.onmessage4.6 webSocket.send()4.7 webSocket.buf
  • 在实践中,会发现在websocket的onmessage方法内处理数据时,从websocket得到的数据无法在其他函数内调用,其他变量和函数也无法在websocket里使用,这是因为数据的作用域不同使得无法访问。 我们可以进行如下修改 ...
  • 如题,websocket已经成功连接,readystate状态码是1(已建立连接可通讯意思),后台向前台推送数据其实是可以接收到,但是只能靠我手动刷新页面,他才会调用一次onmessage回调方法,没有办法实时接收到数据,...
  • <p>websocket游览器端是写在主页面的,...window.parent.webSocket获取到webSocketwebsocket.onmessage方法获取服务端推送消息,这时候打开一个新的tab窗口查看...这时候上一个页面的onmessage就失效了</p>
  • 我现在的业务中需要用到websocket,实时向前端发送消息。现在我已经写好了注解板的websocket,但是在service层不知道怎么调用websocket的onMessage方法 向前台发送消息?求大神指点, 最好 写下伪代码。
  • Springboot: MyWebsocket.javapackage ...import javax.websocket.OnClose;import javax.websocket.OnError;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.webso...
  • WebSocket的一个小例子

    千次阅读 2016-06-20 16:09:33
    后台代码: import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; ...import javax.websocket.OnError...import javax.websocket.OnMessage; import javax.
  • WebSocket是前后端交互长连接,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正双向平等对话,属于服务器推送技术一种。项目中,我们经常会使用WebSocket和服务器建立持久连接。...
  • websocket在Vue里调用

    千次阅读 2018-10-30 08:52:10
    //初始化websocket initWebsocket(wbUrl){ this.websocket = new WebSocket(wbUrl); this.websocket.onopen = this.websocketOpen;... this.websocket.onmessage = this.websocketMessage; this.websocket.onerro...
  • JSR356 标准的WebSocket

    2020-12-24 10:40:08
    package test; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List;...import javax.websocket.OnClose;...import javax.websocket.OnMessage; imp
  • 以下代码看似没有什么问题! package websocket; import java.io.IOException; import javax.websocket.OnClose;...import javax.websocket.OnMessage; import javax.websocket.OnOpen; import
  • 使用中发现一个问题,在WebSocket的onMessage中,我想调用dao层将消息存入数据库,一开始使用spring自动注入。 @Autowired private MsgDao msgDao; 发现这样是空指针,注入失败的。 查了相关的文档之后解决了...
  • #### 如下所示,在vue中我共用一个websocket连接,一次发送多条指令,那么,此时onmessage会得到四个返回结果集,如何去区分这四个结果集对应是哪条指令呢,求解惑~ ``` ws.send(JSON.stringify(this.$Command...

空空如也

空空如也

1 2 3 4 5 ... 16
收藏数 306
精华内容 122
关键字:

websocket的onmessage