精华内容
下载资源
问答
  • websocket 推送消息

    2020-10-19 14:48:10
    websocket java推送消息实例 ** 后端类 msg类 package com.glaway.mro.util; import java.util.Date; /** @author : admin @DESC : WebSocket消息模型 */ public class Msg { // 推送人ID private String from...

    **

    websocket java推送消息实例

    **
    后端类

    msg类

    package com.glaway.mro.util;

    import java.util.Date;

    /**

    • @author : admin

    • @DESC :

      WebSocket消息模型


      */
      public class Msg {

      // 推送人ID
      private String fromUid;

      // 定点推送人ID
      private String toUid;

      // 定点推送单位ID
      private String toOrgId;

      // 消息体
      private String data;

      // 推送时间
      private Date createDate = new Date();

      // 消息状态
      private Integer flag;

      public Msg() {

      }

      public Msg(String fromUid, String toUid, String toOrgId, String data, Date createDate, Integer flag) {
      this.fromUid = fromUid;
      this.toUid = toUid;
      this.toOrgId = toOrgId;
      this.data = data;
      this.createDate = createDate;
      this.flag = flag;
      }

      public String getFromUid() {
      return fromUid;
      }

      public void setFromUid(String fromUid) {
      this.fromUid = fromUid;
      }

      public String getToUid() {
      return toUid;
      }

      public void setToUid(String toUid) {
      this.toUid = toUid;
      }

      public String getToOrgId() {
      return toOrgId;
      }

      public void setToOrgId(String toOrgId) {
      this.toOrgId = toOrgId;
      }

      public String getData() {
      return data;
      }

      public void setData(String data) {
      this.data = data;
      }

      public Date getCreateDate() {
      return createDate;
      }

      public void setCreateDate(Date createDate) {
      this.createDate = createDate;
      }

      public Integer getFlag() {
      return flag;
      }

      public void setFlag(Integer flag) {
      this.flag = flag;
      }

      @Override
      public String toString() {
      return “Msg{” +
      “fromUid=’” + fromUid + ‘’’ +
      “, toUid=’” + toUid + ‘’’ +
      “, toOrgId=’” + toOrgId + ‘’’ +
      “, data=’” + data + ‘’’ +
      “, createDate=” + createDate +
      “, flag=” + flag +
      ‘}’;
      }
      }

    WSServer 类

    package com.glaway.mro.util;

    import com.alibaba.fastjson.JSON;
    import org.apache.commons.lang.StringUtils;
    import org.apache.log4j.Logger;

    import javax.servlet.http.HttpSession;
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;

    /**

    • @author : admin

    • @DESC :

      注解{@link ServerEndpoint}声明websocket 服务端


      */
      @ServerEndpoint(value = “/chat”, configurator = HttpSessionConfigurator.class)
      public class WSServer {

      static private Logger logger = Logger.getLogger(WSServer.class);

      // 在线人数 线程安全
      private static int onlineCount = 0;

      // 连接集合 userId => server 键值对 线程安全
      static public final ConcurrentMap<String, WSServer> map = new ConcurrentHashMap<>();

      // 与某个客户端的连接会话,需要通过它来给客户端发送数据
      private Session session;

      // 当前会话的httpsession
      private HttpSession httpSession;

      /**

      • @param session websocket连接sesson

      • @param config {@link com.github.websocket.configuration.HttpSessionConfigurator}

      • @DESC

        注解{@link OnOpen} 声明客户端连接进入的方法


        */
        @OnOpen
        public void onOpen(Session session, EndpointConfig config) {

        // 得到httpSession
        this.httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
        httpSession.setAttribute(“user”,session.getQueryString());
        // 获取session对象 SObject(这个就是java web登入后的保存的session对象,此处为用户信息,包含了userId)
        String user = (String) this.httpSession.getAttribute(“user”);

        this.session = session;
        // System.out.println(user+"-------"+this.httpSession.getId());

        //针对一个用户只能有一个链接
        if(map.get(user)!=null){
        // 移除连接
        map.remove(user);
        // 连接数-1
        subOnlineCount();
        }

        // 将连接session对象存入map
        map.put(user, this);

        // 连接数+1
        addOnlineCount();

        logger.info(“有新的连接,当前连接数为:” + getOnlineCount());
        }

      /**

      • {@link OnClose} 关闭连接

      */
      @OnClose
      public void onClose() {

       /**
        * 获取当前连接信息 {@code CommonConstant.USER_LOGIN_SESSION} 为Http session 名
        */
      
       String user = (String) this.httpSession.getAttribute("user");
       
       // 移除连接
       map.remove(user);
      
       // 连接数-1
       subOnlineCount();
      
       logger.info("有一连接断开,当前连接数为:" + getOnlineCount());
      

      }

      /**

      • {@link OnMessage} 消息监听处理方法

      • @param message 消息对象{@link com.github.websocket.msg.Msg}的JSON对象

      • @throws IOException 异常
        */
        @OnMessage
        public void onMessage(String message) throws IOException {

        // 将消息转Msg对象
        Msg msg = JSON.parseObject(message, Msg.class);

        //TODO 可以对msg做些处理…

        // 根据Msg消息对象获取定点发送人的userId
        WSServer _client = map.get(msg.getToUid());

        // 定点发送
        if (StringUtils.isNotEmpty(msg.getToUid())) {
        if (null != _client) {
        // 是否连接判断
        if (_client.session.isOpen())
        // 消息发送
        _client.session.getBasicRemote().sendText(JSON.toJSONString(msg));
        }
        }

        // 群发
        /* if (!StringUtils.isEmpty(msg.getToUid())) {
        // 群发已连接用户
        for (WSServer client : map.values()) {
        client.session.getBasicRemote().sendText(JSON.toJSONString(msg));
        }
        }*/

      }

      /**

      • {@link OnError} websocket系统异常处理

      • @param t 异常
        */
        @OnError
        public void onError(Throwable t) {
        logger.error(t);
        t.printStackTrace();
        }

      /**

      • 系统主动推送 这是个静态方法在web启动后可在程序的其他合适的地方和时间调用,这就实现了系统的主动推送

      • @param msg 消息对象{@link com.github.websocket.msg.Msg}的JSON对象
        */
        static
        public void pushBySys(Msg msg) {

        //TODO 也可以实现定点推送
        //msg传输的时候会带上需要发送消息给谁msg.getToUid()
        //通过map去获取那个用户所在的session
        WSServer ws=map.get(msg.getToUid());
        try {
        if(ws!=null){
        ws.session.getBasicRemote().sendText(“123456”);
        }
        } catch (IOException e1) {
        e1.printStackTrace();
        }

        // 群发
        /for (WSServer client : map.values()) {
        try {
        client.session.getBasicRemote().sendText(JSON.toJSONString(msg));
        } catch (IOException e) {
        e.printStackTrace();
        }
        }
        /
        }

      // 获取连接数
      private static synchronized int getOnlineCount() {
      return WSServer.onlineCount;
      }

      // 增加连接数
      private static synchronized void addOnlineCount() {
      WSServer.onlineCount++;
      }

      // 减少连接数
      private static synchronized void subOnlineCount() {
      WSServer.onlineCount–;
      }

    }

    HttpSessionConfigurator类

    package com.glaway.mro.util;

    import javax.servlet.http.HttpSession;
    import javax…HandshakeResponse;
    import javax.websocketwebsocket.server.HandshakeRequest;
    import javax.websocket.server.ServerEndpointConfig;
    import javax.websocket.server.ServerEndpointConfig.Configurator;

    /**

    • @author : admin

    • @DESC :

      讲http request的session 存入websocket的session内


      */
      public class HttpSessionConfigurator extends Configurator {

      @Override
      public void modifyHandshake(ServerEndpointConfig sec,
      HandshakeRequest request, HandshakeResponse response) {

       // 获取当前Http连接的session
       HttpSession httpSession = (HttpSession) request.getHttpSession();
       // 将http session信息注入websocket session
       sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
      

      }
      }

    需要jar包

    javax.websocket-api-1.0.jar

    使用说明

    1.后端出发事件中添加
    Msg msg=new Msg();
    Msg.setToUid(xx);
    WSServer.pushBySys(msg);

    2.在前端文件添加以下内容:

    展开全文
  • Golang websocket推送

    2019-04-16 15:29:00
    Golang websocket推送 在工作用主要使用的是Java,也做过IM(后端用的netty websocket)。最近想通过Golang重写下,于是通过websocket撸了一个聊天室。 项目地址 Github 依赖 golang.org/x/net下的websocket。 ...

    Golang websocket推送

    在工作用主要使用的是Java,也做过IM(后端用的netty websocket)。最近想通过Golang重写下,于是通过websocket撸了一个聊天室。

    项目地址

    Github

    依赖

    golang.org/x/net下的websocket。

    由于我使用的是golang版本是1.12,在国内访问golang.org/x需要借助代理,或者通过replace替换为github下的镜像。

    module github.com/xuanbo/pusher
    
    require golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
    
    replace (
            golang.org/x/crypto => github.com/golang/crypto v0.0.0-20190308221718-c2843e01d9a2
            golang.org/x/net => github.com/golang/net v0.0.0-20190404232315-eb5bcb51f2a3
            golang.org/x/sys => github.com/golang/sys v0.0.0-20190215142949-d0b11bdaac8a
            golang.org/x/text => github.com/golang/text v0.3.0
    )

    即工程下的go.mod.cn文件。

    websocket用法

    核心就是for循环下的处理收到的消息逻辑,然后对消息进行处理(转发、广播等)。

    // websocket Handler
    // usage: http.Handle("/websocket", websocket.Handler(pusher.Handler))
    func Handler(conn *websocket.Conn) {
        // handle connected
        var userId string
        var err error
        if userId, err = doConnected(conn); err != nil {
            fmt.Println("Client connect error: ", err)
            return
        }
    
        fmt.Println("Client connected, userId: ", userId)
    
        for {
            msg := new(Message)
    
            if err := websocket.JSON.Receive(conn, msg); err != nil {
                fmt.Println("Can't receive, error: ", err)
                break
            }
    
            msg.UpdateAt = Timestamp()
    
            fmt.Println("Received from client: ", msg)
    
            // handle received message
            if err := doReceived(conn, msg); err != nil {
                fmt.Println("Received message error: ", err)
                break
            }
        }
    
        // handle disConnected
        if err := doDisConnected(userId, conn); err != nil {
            fmt.Println("Client disconnected error: ", err)
            return
        }
    
        fmt.Println("Client disconnected, userId: ", userId)
    }

    连接管理

    在IM中比较重要的点就是管理客户端连接,这样我们才能通过服务端转发消息给对应的用户。注意,下面没有考虑集群,只在单机中考虑。

    // websocket connection manager
    type ConnManager struct {
        // websocket connection number
        Online *int32
        // websocket connection
        connections *sync.Map
    }

    上面定义了一个连接管理结构体,Online为在线的人数,connections为客户端的连接管理(key为userId,value为websocket connection)。

    下面为ConnManager添加一些方法来处理连接、断开连接、发送消息、广播等操作。

    // add websocket connection
    // online number + 1
    func (m *ConnManager) Connected(k, v interface{}) {
        m.connections.Store(k, v)
    
        atomic.AddInt32(m.Online, 1)
    }
    
    // remove websocket connection by key
    // online number - 1
    func (m *ConnManager) DisConnected(k interface{}) {
        m.connections.Delete(k)
    
        atomic.AddInt32(m.Online, -1)
    }
    
    // get websocket connection by key
    func (m *ConnManager) Get(k interface{}) (v interface{}, ok bool) {
        return m.connections.Load(k)
    }
    
    // iter websocket connections
    func (m *ConnManager) Foreach(f func(k, v interface{})) {
        m.connections.Range(func(k, v interface{}) bool {
            f(k, v)
            return true
        })
    }
    
    // send message to one websocket connection
    func (m *ConnManager) Send(k string, msg *Message) {
        v, ok := m.Get(k)
        if ok {
            if conn, ok := v.(*websocket.Conn); ok {
                if err := websocket.JSON.Send(conn, msg); err != nil {
                    fmt.Println("Send msg error: ", err)
                }
            } else {
                fmt.Println("invalid type, expect *websocket.Conn")
            }
        } else {
            fmt.Println("connection not exist")
        }
    }
    
    // send message to multi websocket connections
    func (m *ConnManager) SendMulti(keys []string, msg interface{}) {
        for _, k := range keys {
            v, ok := m.Get(k)
            if ok {
                if conn, ok := v.(*websocket.Conn); ok {
                    if err := websocket.JSON.Send(conn, msg); err != nil {
                        fmt.Println("Send msg error: ", err)
                    }
                } else {
                    fmt.Println("invalid type, expect *websocket.Conn")
                }
            } else {
                fmt.Println("connection not exist")
            }
        }
    }
    
    // broadcast message to all websocket connections otherwise own connection
    func (m *ConnManager) Broadcast(conn *websocket.Conn, msg *Message) {
        m.Foreach(func(k, v interface{}) {
            if c, ok := v.(*websocket.Conn); ok && c != conn {
                if err := websocket.JSON.Send(c, msg); err != nil {
                    fmt.Println("Send msg error: ", err)
                }
            }
        })
    }

    消息类型、格式

    消息类型(MessageType)主要有单聊、群聊、系统通知等。

    消息格式(MediaType)主要有文本格式、图片、文件等。

    type MessageType int
    type MediaType int
    
    const (
        Single MessageType = iota
        Group
        SysNotify
        OnlineNotify
        OfflineNotify
    )
    
    const (
        Text MediaType = iota
        Image
        File
    )
    
    // websocket message
    type Message struct {
        MessageType MessageType `json:"messageType"`
        MediaType   MediaType   `json:"mediaType"`
        From        string      `json:"from"`
        To          string      `json:"to"`
        Content     string      `json:"content,omitempty"`
        FileId      string      `json:"fileId,omitempty"`
        Url         string      `json:"url,omitempty"`
        CreateAt    int64       `json:"createAt,omitempty"`
        UpdateAt    int64       `json:"updateAt,omitempty"`
    }

    上面定义了一个统一的消息(Message)。

    效果

    前端的代码就不展示了,最终实现的聊天室效果如下:

    UI

    补充

    本例子没有涉及到用户认证、消息加密、idle、单聊、消息格式、消息持久化等等,只做了一个简单的群聊。

    欢迎感兴趣的道友,基于此扩展出自己的推送系统、IM等。

    说明

    Just for fun!

    转载于:https://www.cnblogs.com/bener/p/10717466.html

    展开全文
  • 某系统中一个服务生成一条消息,这条消息需要实时推送到多个终端,此时该如何进行有效的 WebSocket 推送呢?首先一起看看如下场景: 假设推送消息由消息实例 2 产生,但是终端真正连接的消息实例是实例 1 和实例 3,...

    写在前面

    系统开发的过程中,我们经常需要实现消息推送的需求。单端单实例的情况下很好处理(网上有许多教程这里不做展开),但在分布式系统及多端需要推送的情况下该如何处理呢?

    在分布式系统中,消息推送服务端是多实例的。某系统中一个服务生成一条消息,这条消息需要实时推送到多个终端,此时该如何进行有效的 WebSocket 推送呢?首先一起看看如下场景:

    假设推送消息由消息实例 2 产生,但是终端真正连接的消息实例是实例 1 和实例 3,并没有连接到生产消息的实例 2,系统是如何将实例 2 的消息同步推送到终端 1 和终端 2 的呢?下文将详细描述。

    file

    基本原理

    为了满足需求,我们采用 Redis 做协同中间件,用于存储用户信息、生成用户连接的唯一性标识以及 pod address,消息的生产者实例通过订阅 Redis 获取终端连接的唯一性标识和 pod address,并通知到对应的消息实例,最终由相应连接终端的消息实例通过 WebSocket 将消息发推送到用户终端。具体流程如下图:

    file

    为了实现构想,我们构造了两个组件:Client、ClientManager,实现逻辑如下。

    服务端实现

    Client

    Client 组件的作用,是当用户与消息服务中某个实例建立连接后,管理这个连接的信息,这里通过一个 Golang 结构体来定义:

    	type Client struct {
    			UUID   string 
    			UserID string
    			Socket *websocket.Conn
    			Send   chan []byte
    	}
    

    结构体中的数据类型说明如下:

    • UUID:对连接进行唯一性的标识,通过此标识可以查找到连接信息。

    • UserID:用户 ID。

    • Socket:连接对象。

    • Send:消息数据 channel。

    我们为 Client 结构体实现了两个方法:Read、Write 来处理消息的接受和发送。

    Read 方法

    Read 方法比较简单,从终端接收请求消息后,消息实例通过 WebSocket 回应接收消息状态,并不返回请求结果。结果通过 Write 方法返回。

    	func (c *Client) Read(close, renewal chan *Client) {
    			defer func() {
    					close <- c
    			}()
    	for {
    			_, message, err := c.Socket.ReadMessage()
    			if err != nil {
    					break
    			}
    		// ...
    			 // message logic
    	}
    	}
    

    Write 方法

    Write 方法将请求结果返回给终端。Client 会监听 send channel,当 channel 有数据时,通过 socket 连接将消息发送给终端。

    	func (c *Client) Write(close chan *Client) {
    			for {
    					select {
    					case message, ok := <-c.Send:
    							if !ok {
    									return
    							}
    							c.Socket.WriteMessage(websocket.TextMessage, message)
    					case <-c.Ctx.Done():
    				return
    					}
    			}
    	}
    

    ClientManger

    ClientManager 组件相当于连接池,可以管理所有的终端连接,并提供注册、注销、续期功能。

    	type ClientManager struct {
    			sync.RWMutex
    			Clients    map[string]*Client 
    			Register   chan *Client
    			Unregister chan *Client
    			Renewal    chan *Client
    	}
    

    结构体的数据类型说明如下:

    • Clients:是一个集合,用于存储创建的 Client 对象。

    • Register:注册的 channel。

      • 把连接注册到 Clients 中,并通过 key-value 加入 Client 集合中,key 是连接的唯一性标识 ,value 是连接本身。

      • 把连接的唯一性标识和用户的 ID 以及建立连接的 pod address 信息,存储到 Redis 中。

    • Unregister:注销的 channel。

      • 从 ClientManager 组件的 Clients 集合中移除连接对象。

      • 删除 Redis 对应的缓存信息。

    • Renewal:续期的 channel,用于对 Redis 的键续期。

    ClientManager 提供了一个 Start 方法,Start 方法提供监听注册、注销以及续期的 channel,通过监听这些 channel 来管理创建的连接对象。当这些 channel 有数据时,执行对应的操作。

    	func (manager *ClientManager) Start(ctx context.Context) {
    			for {
    					select {
    					case conn := <-manager.Register:
    							manager.Lock()
    							manager.Clients[conn.UUID] = conn
    							manager.Unlock()
    							_, err := manager.affair.Register(ctx, &RegisterReq{
    									UserID: conn.UserID,
    									UUID:   conn.UUID,
    									IP:     manager.IP,
    							})
    					case conn := <-manager.Unregister:
    							_, err := manager.affair.Unregister(ctx, &UnregisterReq{
    									UserID: conn.UserID,
    									UUID:   conn.UUID,
    							})
    							conn.Socket.Close()
    							close(conn.Send)
    							delete(manager.Clients, conn.UUID)
    					case conn := <-manager.Renewal:
    											//...
    							// Key renewal to Redis
    					}
    			}
    	}
    

    消息推送

    当一个消息服务实例生产用户的消息,需要推送消息给终端时,推送步骤如下:

    1. 根据 UserID 从 Redis 读取数据,得到连接唯一性标识和 pod address 地址,这些信息是在终端第一次与消息实例建立连接的时候写入 Redis 的。

    2. 此时根据 pod address,向对应的消息服务实例发送请求。

    3. 相应的消息服务实例接收到请求。

    服务端接收请求的处理逻辑如下:

    1. 根据传递过来连接唯一性标识的参数,找到标识对应的连接。我们为 ClientManager 提供了一个 Write 方法。

    此方法用到 ClientManager 组件的 Clients 集合,根据唯一性标识找到对应的 Client。再利用 Client 的 SendOut 方法,写出数据到终端。

    	func (manager *ClientManager) Write(message *Message) error {
    		manager.RLock()
    		client, ok := manager.Clients[message.Recipient]
    		manager.RUnlock()
    		if !ok {
    			 return errors.New("client miss [" + message.Recipient + "]")
    		}
    		return client.SendOut(message)
    	}
    
    1. 定义 Client 的 SendOut 方法。

    此方法只负责:把接收到的消息转换为字节数组后,发送 Client 的 Send Channel 中。

    	func (c *Client) SendOut(message *Message) error {
    			content, err := json.Marshal(message.Content)
    			if err != nil {
    					return err
    			}
    			c.Send <- content
    			return nil
    	}
    
    1. 发送数据到终端。

    前文 Client 组件的 Write 方法中,已说明 send channel 中有数据时,便会读取channel 产生的数据,并通过连接对象返回数据给对应的终端。

    总结

    以上是 Web Socket 推送消息给终端的主要思路:通过 Redis 把用户的信息以及连接的标识和 pod address 存储起来,当某个消息服务实例产生消息,从 Redis 读取信息,通知连接着终端的消息服务实例,再由这些服务实例通过 WebSocket 对象给终端发送消息。全象云低代码平台也集成了消息的实时推送,用户使用平台时能及时获取最新消息状态。
    下期我们将为大家带来 Knative Serving 自定义弹性伸缩,请大家持续关注。

    作者

    周慧婷 青云全象云软件开发工程师​

    展开全文
  • websocket推送消息 websocket是一种双向通信协议,HTTP是单向的。 直接上代码: 1.首先创建一个要整合websocket的springboot模块 本文讲解主要使用的类: 配置文件: server: port: 1013 spring: application: ...

    websocket推送消息

    websocket是一种双向通信协议,HTTP是单向的。
    直接上代码:
    1.首先创建一个要整合websocket的springboot模块
    本文讲解主要使用的类:
    在这里插入图片描述

    配置文件:

    server:
      port: 1013
    
    spring:
      application:
        name: IDEAL-WEBSOCKET
    
    

    2.添加依赖
    推荐idea在线创建springboot模块或者https://start.spring.io/。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>demo</artifactId>
            <groupId>com.example</groupId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>ideal-websocket-1013</artifactId>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- websocket的基本依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.73</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-commons</artifactId>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    3.本项目演示的是根据用户id推送给不同的客户端:实际场景中,一个账号可能支持多人同时登陆,每个登陆人员的客户端界面都要显示该用户的即时业务信息
    创建一个用户DTO;

    package com.demo.websocket.dto;
    
    import com.alibaba.fastjson.JSONObject;
    import lombok.Data;
    
    @Data
    public class UserDto {
    
        private String userId;
    
        private String userName;
    
        @Override
        public String toString () {
            return JSONObject.toJSONString(this);
        }
    }
    

    4.编写websocet服务:url是/show/user/peoples,这个是给前端调用的
    前端调用时候可以用websocket在线测试网址:地址:ws://127.0.0.1:1013/show/user/peoples,具体的后面会截图演示。

    package com.demo.websocket.websocket;
    
    import com.alibaba.fastjson.JSON;
    import com.demo.websocket.config.WebSocketConfig;
    import com.demo.websocket.dto.UserDto;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    @Slf4j
    @Component
    @ServerEndpoint(value = "/show/user/peoples")
    public class UserWebSocket {
    
        private static Map<String, List<Session>> clients = new ConcurrentHashMap<>();
    
        @OnOpen
        public void onOpen(Session session) {
            log.info("有新的客户端上线: {}", session.getId());
            List<Session> list = new CopyOnWriteArrayList<>();
            list.add(session);
            clients.put(session.getId(), list);
        }
    
        @OnClose
        public void onClose(Session session) {
            String sessionId = session.getId();
            log.info("有客户端离线: {}", sessionId);
            clients.remove(sessionId);
        }
    
        @OnError
        public void onError(Session session, Throwable throwable) {
            throwable.printStackTrace();
            if (clients.get(session.getId()) != null) {
                clients.remove(session.getId());
            }
        }
    
        @OnMessage
        public void onMessage(String message, Session session){
           UserDto userDto = JSON.parseObject(message, UserDto.class);
            List<Session> list = clients.get(userDto.getUserId());
            if(null == list){
                list = new CopyOnWriteArrayList<Session>();
                list.add(session);
            }else {
                list.add(session);
            }
            clients.put(userDto.getUserId(),list);
            log.info("用户新的客户端加入成功");
        }
    
        /**
         * 发送消息
         *
         * @param userDto 消息对象
         */
        public void sendToUser(UserDto userDto) {
            List<Session> list = clients.get(userDto.getUserId());
            if(null != list){
                log.info("需要发送的消息个数==={}",list.size());
                for(Session  session : list){
                    try {
                        session.getBasicRemote().sendText(userDto.toString());
                    }catch (IOException e){
                        log.error("发送websocket消息失败:{}", e);
                    }
    
                }
            }
        }
    }
    
    

    4.测试接口:WebSocketController

    package com.demo.websocket.controller;
    
    import com.alibaba.fastjson.JSONObject;
    import com.demo.websocket.dto.UserDto;
    import com.demo.websocket.websocket.OneToManyWebSocket;
    import com.demo.websocket.websocket.OneToOneWebSocket;
    import com.demo.websocket.websocket.UserWebSocket;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @RestController
    @Slf4j
    @RequestMapping("")
    public class WebSocketController {
        @Autowired
        private UserWebSocket userWebSocket;
    
        /**
         * 模拟给同一个用户的不同客户段发送消息
         * @return
         */
        @PostMapping(value = "/send/user")
        String sendUser(@RequestBody UserDto userDto){
            log.info("开始给用户发送消息");
            userWebSocket.sendToUser(userDto);
            return "成功";
        }
    }
    

    5.为了方便测试,模块中引入了swagger2的配置

    SwaggerConfig类:

    package com.demo.websocket.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import springfox.documentation.builders.ApiInfoBuilder;
    import springfox.documentation.builders.PathSelectors;
    import springfox.documentation.builders.RequestHandlerSelectors;
    import springfox.documentation.service.ApiInfo;
    import springfox.documentation.service.Contact;
    import springfox.documentation.spi.DocumentationType;
    import springfox.documentation.spring.web.plugins.Docket;
    
    /**
     * Description:swagger2配置类
     * @Date 2019-04-29 00:05
     */
    @Configuration
    public class SwaggerConfig {
    
      @Bean
      public Docket myDocket() {
        Docket docket = new Docket(DocumentationType.SWAGGER_2);
        ApiInfo apiInfo = new ApiInfoBuilder()
            .title("websocket---Api接口文档") // 标题
            .description("websocket学习") // 描述
            .contact(new Contact("", "", ""))
            .version("1.0") // 版本号
            .build();
        docket.apiInfo(apiInfo);
        //设置只生成被Api这个注解注解过的Ctrl类中有ApiOperation注解的api接口的文档
        docket.select()
            .apis(RequestHandlerSelectors.basePackage("com.demo.websocket.controller"))
            .paths(PathSelectors.any())
            .build();
        return docket;
      }
    
    }
    

    SwaggerAddressConfig类:

    package com.demo.websocket.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.web.context.WebServerInitializedEvent;
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    
    import java.net.Inet4Address;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    
    /**
     * @Date :2019/9/9 10:41
     * @Description:控制台输出 Swagger 接口文档地址
     */
    @Slf4j
    @Component
    public class SwaggerAddressConfig implements ApplicationListener<WebServerInitializedEvent> {
    
        private int serverPort;
    
        public int getPort(){
            return this.serverPort;
        }
    
        @Override
        public void onApplicationEvent(WebServerInitializedEvent webServerInitializedEvent) {
            try {
                InetAddress localHost = Inet4Address.getLocalHost();
                this.serverPort = webServerInitializedEvent.getWebServer().getPort();
                log.info("启动完成,接口文档地址:http://"+localHost.getHostAddress()+":"+serverPort+"/swagger-ui.html");
            } catch (UnknownHostException e) {
                log.info("获取本机的ip异常错误=={}",e);
            }
    
        }
    }
    

    WebSocketApplication模块启动类:添加上@EnableSwagger2

    package com.demo.websocket;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import springfox.documentation.swagger2.annotations.EnableSwagger2;
    
    @EnableDiscoveryClient // 除了eureka,还可以注册到其它的注册中心,如zookeeper上;
    @SpringBootApplication
    @EnableSwagger2
    public class WebSocketApplication {
        public static void main(String[] args) {
            SpringApplication.run(WebSocketApplication.class,args);
        }
    }
    

    启动项目以后:访问图片中红圈内的地址就能打开swagger页面
    在这里插入图片描述

    6.效果展示
    如下图:打开http://www.websocket-test.com/在线测试的网址,可能会被浏览器安全拦截,选择继续访问。
    websocket连接地址:ws://127.0.0.1:1013/show/user/peoples
    然后在发送的内容中发送一条用户body:表示用户111的某个客户端上线。

    {
      "userId": "111",
      "userName": "bbb"
    }
    

    模拟用户111的客户端1:
    在这里插入图片描述
    模拟用户111的客户端2:
    在这里插入图片描述
    模拟用户222的客户端:
    在这里插入图片描述

    接下来模拟后台数据变动给前端推送消息:
    测试接口的调用使用swagger进行,关于swagger的具体使用,可以自行百度;没有的直接用postman或者jmeter都行。
    将userId的值在"111"和"222"间切换,就能看到在websocket线测试网页右边显示出对应的消息了。如果userId=“111”,就只在”111“的页面显示发送的消息,如果userId=“222”,就只在”222“的页面显示发送的消息。
    在这里插入图片描述
    到此就演示结束了
    在这里插入图片描述

    展开全文
  • 通过WebSocket实现服务器主动推送 图像数据通过Base64编码,定义为Json格式传输 前端收到Json数据后 将图像还原 代码有任何问题可以和我联系 liangji@csu.ac.cn 一起学习,此外也可以看我的博客 博客有更详细的介绍...
  • Swoole实现Websocket推送

    2019-03-01 10:01:31
    Swoole实现Websocket主动推送 需求: 需要每秒钟向前台推送一次行情数据。 向某个用户推送消息 及时响应前端发出的请求 握手时效验令牌 每个ip 不超过十个连接 因为网上没有很具体的相关文档,只能摸着石头过河,现...
  • websocket推送告警

    千次阅读 2017-09-30 14:11:56
    功能:将采集到的告警利用websocket推送到前端 public class AlarmServiceImpl implements AlarmService { static{ startPushThread(); startCloseThread(); } // // public static void pushAllAlarm
  • 为什么使用websocket推送任务 一般现在服务器端有数据变化,我们采用的是定时拉去的方式,这种方式在一般数据量的时候还是可以接受的,但是对于数据量很大就不太合适了,websocket就是采取一种推的方式把任务推送给...
  • 广播式的websocket 推送

    千次阅读 2018-04-25 12:43:57
    WebSocket 为浏览器和服务器端提供异步通信的功能,即浏览器可以向服务器发送消息,服务器也可以向浏览器发送消息。WebSocket 需要浏览器的支持,如IE 10+、Chrome 13+、Firefox 6+。因为js插件对浏览器版本有...
  • WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。...
  • 用 Go 编写一个简单的 WebSocket 推送服务本文中代码可以在 github.com/alfred-zhong/wserver 获取。背景最近拿到需求要在网页上展示报警信息。以往报警信息都是通过短信,微信和 App 推送给用户的,现在要让登录...
  • Netty websocket 推送数据压缩以 js解压

    千次阅读 2017-11-24 12:03:11
    Netty websocket 推送数据压缩以 js解压 在项目开发的时候利用基于Netty 的websocket项目,有时会发现在推送过程中经常不推送了。经过研究调查发现服务器在高并发的情况下,推送的数据流量接近带宽流量峰值,会...
  • 前言 我们这里就不介绍websocket ...第一个问题:如果遇到前后台分离,我们如何像测试其他接口一样利用postman测试,这里websocket 它有自己的环境,可以利用websocket在线工具测试(例如:http://coolaf.com/too...
  • const WebSocket = require('ws'); // 获取ws请求携带的参数 function getParams (paramStr) { let result = {}; if (paramStr) { const params = paramStr.split('&'); for (var i = 0; i < params....
  • @JsonFormat(pattern = “yyyy-MM-dd HH:mm:ss”)在时间上面加入该注释,直接在application.yml配置是无法识别的
  • WebSocket 推送数据至前端接口实例

    千次阅读 2018-12-13 16:36:27
    package com.xxxx.jcbigdata.websocket; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; import javax.w...
  • 由于项目需求管理员这边发通知需要主动通知到用户,所以决定采用websocket方式进行前后端数据交换,后来便决定一个页面的所有请求全部用websocket,因此需要json格式来包装键值对的请求名和请求内容。 奉上...
  • private static ConcurrentMap, WebSocketSession> websockets = new ConcurrentHashMap(); @Override public void addSession(String userId, WebSocketSession session) { websockets.put(userId, ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,122
精华内容 4,448
关键字:

websocket推送json