精华内容
下载资源
问答
  • 一个springboot集成websocket的简单例子,好多网上资源无法用,特上传
  • springboot集成WebSocket,实现小程序聊天功能,对用户进行群发消息以及单个用户发送消息户
  • springboot集成websocket实现消息的主动推送,包含一对一以及通知所有在线用户
  • 通过修改logback.xml文件,实现自定义获取日志方式,并放入阻塞队列中,定时任务获取队列内容并通过websocket发送到客户端,实现日志在前台的展示
  • springboot集成websocket实现即时通信,你发的消息将直接显示在另一台pc机的浏览器上
  • 主要介绍了SpringBoot集成WebSocket长连接实际应用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • springboot集成websocket

    2021-02-21 15:21:15
    springboot集成websocket 记录下自己使用wensocket的踩坑过程,如有不对,请见谅 引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter...

    springboot集成websocket

    记录下自己使用wensocket的踩坑过程,如有不对,请见谅

    引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    

    配置websocket拦截器

    此处需要注意,在拦截器中注入服务会失败。
    产生原因:spring管理的都是单例(singleton),和 websocket (多对象)相冲突。
    详细解释:项目启动时初始化,会初始化 websocket (非用户连接的),spring 同时会为其注入 service,该对象的 service 不是 null,被成功注入。但是,由于 spring 默认管理的是单例,所以只会注入一次 service。当客户端与服务器端进行连接时,服务器端又会创建一个新的 websocket 对象,这时问题出现了:spring 管理的都是单例,不会给第二个 websocket 对象注入 service,所以导致只要是用户连接创建的 websocket 对象,都不能再注入了。
    像 controller 里面有 service, service 里面有 dao。因为 controller,service ,dao 都有是单例,所以注入时不会报 null。但是 websocket 不是单例,所以使用spring注入一次后,后面的对象就不会再注入了,会报NullException。
    解决办法:在websocketConfig中把websocket拦截器当作bean加载进来,(应该还有其他更好办法,此处是我自己的解决方法)

    import com.ikang.app.base.common.exception.BizInfoException;
    import com.ikang.app.inspection.common.domain.oauth.OauthLoginReponse;
    import com.ikang.app.inspection.common.service.reference.UniuserService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.HandshakeInterceptor;
    import javax.annotation.Resource;
    import java.util.Map;
    @Component
    public class WebSocketInterceptor implements HandshakeInterceptor {
        private final static Logger logger = LoggerFactory.getLogger(WebSocketInterceptor.class);
        @Autowired
        private UniuserService uniuserService;
    
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
            logger.info("websocket握手前");
            return true;
        }
        @Override
        public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
            logger.info("websocket握手后");
        }
    
    

    开启websocket配置

    将websocket拦截器作为bean

    @Bean
        public WebSocketInterceptor getWebSocketInterceptor() {
            return new WebSocketInterceptor();
        }
    
    import com.ikang.app.inspection.web.WebSocket.WebSocketServer;
    import com.ikang.app.inspection.web.interceptor.WebSocketInterceptor;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.config.annotation.EnableWebSocket;
    import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
    import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    
    @Configuration
    @EnableWebSocket
    public class WebSocketConfig implements WebSocketConfigurer {
    
        @Bean
        public WebSocketInterceptor getWebSocketInterceptor() {
            return new WebSocketInterceptor();
        }
    
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            // 此处定义webSocket的连接地址以及允许跨域
            registry.addHandler(myHandler(), "/websocket").addInterceptors(getWebSocketInterceptor()).setAllowedOrigins("*");
            // 同上,同时开启了Sock JS的支持,目的为了支持IE8及以下浏览器
            registry.addHandler(myHandler(), "/sockjs/websocket").addInterceptors(getWebSocketInterceptor()).setAllowedOrigins("*").withSockJS();
        }
        @Bean
        public WebSocketServer myHandler() {
            return new WebSocketServer();
        }
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
    

    WebSocket实现

    import com.ikang.app.cdp.common.constant.Constants;
    import com.ikang.app.cdp.common.service.redis.LargeScreenRedisService;
    import com.ikang.app.cdp.common.webmodel.mgm.MgmOrderSalesView;
    import com.ikang.app.cdp.common.webmodel.screen.DauVO;
    import com.ikang.app.cdp.common.webmodel.screen.MobileAndAppletAndBrowserScreenVO;
    import com.ikang.app.cdp.common.webmodel.screen.SaleCondation;
    import com.ikang.app.cdp.service.IkangScreenService;
    import com.ikang.app.cdp.service.IntegratedScreenService;
    import com.ikang.app.cdp.service.TjbScreenService;
    import com.ikang.app.jackson.utils.JsonUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.CloseStatus;
    import org.springframework.web.socket.TextMessage;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.WebSocketMessage;
    import org.springframework.web.socket.WebSocketSession;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Objects;
    import java.util.Set;
    
    
    @Component
    public class WebSocketServer implements WebSocketHandler {
        private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
        private static Set<WebSocketSession> webSocketSet = new HashSet<>();
       
        /**
         * 建立连接后触发的回调
         * @param session
         * @throws Exception
         */
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            webSocketSet.add(session);
            logger.info("有新连接加入!当前在线人数为:{}" , webSocketSet.size());
        }
    
        
    
        /**
         * 收到消息时触发的回调
         * @param session
         * @param message
         * @throws Exception
         */
        @Override
        public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
            logger.info("收到新的消息!内容:{}" ,message.getPayload().toString());
        }
        /**
         * 发生异常,关闭连接
         * @param session
         * @param exception
         * @throws Exception
         */
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            webSocketSet.remove(session);
            logger.info("websocket发生异常!" ,exception);
        }
        /**
         * 关闭连接
         * @param session
         * @param closeStatus
         * @throws Exception
         */
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
            webSocketSet.remove(session);
            logger.debug("webSocket关闭连接,状态:{},当前连接数:{}", closeStatus, webSocketSet.size());
        }
        /**
         * 是否支持消息分片
         * @return
         */
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
    
        /**
         * 发送消息
         * @param message
         * @throws IOException
         */
        public static void sendString(String message) throws IOException {
            for (WebSocketSession webSocket : webSocketSet) {
                if (webSocket.isOpen()) {
                    webSocket.sendMessage(new TextMessage(message));
    
                }
            }
            logger.debug("webSocket发送消息,内容:{},当前连接数:{}", message, webSocketSet.size());
        }
        /**
         * 发送消息
         * @param map
         * @throws IOException
         */
        public static void sendMap(Map<String,Object> map) throws IOException {
            logger.debug("webSocket发送消息,内容:{},当前连接数:{}", JsonUtils.toJSONString(map), webSocketSet.size());
            for (WebSocketSession webSocket : webSocketSet) {
                if (webSocket.isOpen()) {
                    webSocket.sendMessage(new TextMessage(JsonUtils.toJSONString(map)));
                }
            }
        }
    
        /**
         * 发送消息
         * @param map
         * @throws IOException
         */
        public static void sendList(List<Object> map ,String type) throws IOException {
            if(webSocketSet.size() > 0){
                for (WebSocketSession webSocket : webSocketSet) {
                    if (webSocket.isOpen()) {
                        String urlType = getWebsocketUrlType(Objects.requireNonNull(webSocket.getUri()).toString());
                        if(type.equals(urlType)){
                            webSocket.sendMessage(new TextMessage(JsonUtils.toJSONString(map)));
                        }
                    }
                }
                logger.debug("webSocket发送消息,内容:{},当前连接数:{}", JsonUtils.toJSONString(map), webSocketSet.size());
            }
        }
    

    消息推送

    @ApiOperation(value = "发送webSocket消息")
    @PostMapping("/sendWebSocketMessage")
    public ResultMessage sendWebSocketMessage() String message) throws Exception {
        // 发送webSocket消息
        WebSocketConnect.broadCastInfo("你好");
        return new ResultMessage().success();
    }
    

    前端代码示例

    
    var socket;
        if (typeof (WebSocket) == "undefined") {
            console.log("遗憾:您的浏览器不支持WebSocket");
        } else {
            console.log("恭喜:您的浏览器支持WebSocket");
            //实现化WebSocket对象
            //指定要连接的服务器地址与端口建立连接
            //注意ws、wss使用不同的端口。我使用自签名的证书测试,
            //无法使用wss,浏览器打开WebSocket时报错
            //ws对应http、wss对应https。
            socket = new WebSocket("ws://localhost/api/webSocket");
            //连接打开事件
            socket.onopen = function() {
                console.log("Socket 已打开");
            };
            //收到消息事件
            socket.onmessage = function(msg) {
                console.log(msg.data);
            };
            //连接关闭事件
            socket.onclose = function() {
                console.log("Socket已关闭");
            };
            //发生了错误事件
            socket.onerror = function() {
                alert("Socket发生了错误");
            }
            //窗口关闭时,关闭连接
            window.unload=function() {
                socket.close();
            };
        }
    

    至此,websocket已经可以正常使用。下面是生产中会遇到的问题

    分布式部署,websocket共享问题

    为什么要使用这种模式呢?

    我们不妨设想一下,如果我们后端部署了多台服务器,其中某一个用户发布了消息,需要实时通知到其他在线的用户,以上示例是无法实现的。
    因为WebSocket Session是不支持序列化的,无法存储也就没有办法将所有后端服务器中连接的用户会话放到一起。
    既然无法把会话存放到一起统一管理,那么就定义一个公共的频道,每个服务器都向该频道发布消息,所有订阅该频道的服务器都接收消息,用来判断当前所连接的用户是否需要接收到该消息,需要则推送不需要则不推送,则刚好符合发布订阅模式。

    每个应用节点都订阅该topic的频道,这样新消息一注册,每个节点服务器都能接收到Object,然后从各自的节点中寻找正在连接的webSocket会话,进行消息推送。

    就这样通过Redis的发布/订阅功能实现session共享。
    Redis相关介绍及配置在这里就不介绍了,直接开始配置,具体配置如下:

    消息处理器

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import com.alibaba.fastjson.JSONObject;
    import com.ikang.app.employee.dto.AppNoticeDTO;
    import com.ikang.app.employee.redis.ReceiveNotice;
    import com.ikang.app.employee.redis.TestRecive;
    import com.ikang.app.employee.service.NoticeService;
    import com.ikang.app.employee.vo.SocketMessageVO;
     
    @Component
    public class RedisReceiver {
        private static final Logger logger = LoggerFactory.getLogger(RedisReceiver.class);
     
        public void testString(String message) {
            logger.info("消费字符串数据:[{}]", message);
            // 发送webSocket消息
            WebSocketConnect.broadCastInfo(message);
        }
    }
    

    消息监听

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
     
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.JsonTypeInfo;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
    import com.ikang.app.employee.redis.config.RedisConstants;
     
    @Configuration
    public class RedisMessageListener {
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter testStringAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            // 修改默认的序列化方式,支持更多类型的数据传输
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
            jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
            // 针对每一个消息处理设置不同的序列化方式
            // 测试字符串主题并绑定消息订阅处理器
            testStringAdapter.setSerializer(jackson2JsonRedisSerializer);
            container.addMessageListener(testStringAdapter, new PatternTopic("REDIS_TOPIC_TEST_STRING"));
            return container;
        }
        // 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
        /**
         * 测试字符串消息订阅处理器,并指定处理方法
         * @param redisReceiver
         * @return
         */
        @Bean
        MessageListenerAdapter testStringAdapter(RedisReceiver redisReceiver) {
            return new MessageListenerAdapter(redisReceiver, "testString");
        }
    }
    

    redis序列化配置

    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    @Configuration
    @EnableCaching
    public class RedisConfig {
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(factory);
            // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            //com.fasterxml.jackson.databind**版本**2.9.9
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            //com.fasterxml.jackson.databind**版本**2.10.1
    //       om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            // key采用String的序列化方式
            template.setKeySerializer(stringRedisSerializer);
            // hash的key也采用String的序列化方式
            template.setHashKeySerializer(stringRedisSerializer);
            // value序列化方式采用jackson
            template.setValueSerializer(jackson2JsonRedisSerializer);
            // hash的value序列化方式采用jackson
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    }
    

    发布消息

    // 此处需注意,发送的频道需要与订阅的频道一致
    redisTemplate.convertAndSend("REDIS_TOPIC_TEST_STRING", content);
    

    websocket的wss访问

    生产外网一般会用https,websocket也是一样,一般使用wss方式进行访问
    当然,生产中如果是https的话,那么https会天然的支持wss的访问
    但是在开发中,我们需要自己测试,配置springboot的https访问来支持websocket的wss访问

    首先我们要自己生成一个证书

    keytool -genkeypair -alias "tomcat" -keyalg "RSA" -keysize 2048 -keystore "tomcat.keystore"
    

    生成https证书
    在这里插入图片描述

    我们会得到一个证书,将证书放入resource下
    在这里插入图片描述
    然后在配置中加上下面这些配置

    server.ssl.key-store=classpath:tomcat.keystore
    server.ssl.key-store-password=123456
    server.ssl.keyStoreType=JKS
    server.ssl.keyAlias:tomcat
    

    这里会有一个坑,maven编译时会改变我们的证书,哪怕是一个空格,证书的大小变了。
    我们要指定maven编译时,不会改变它

    <build>
    		<resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <includes>
                        <include>*.keystore</include>
                    </includes>
                    <filtering>false</filtering>
                </resource>
    		</resources>
    	</build>
    

    至此https访问就配置好了,网上看了好多,都是还要配置http协议跳转https,我使用时至此就可以正常https访问了,如果不起作用,可是试一下配置http协议跳转https。

    在启动类中加上下面的配置

    // SpringBoot2.x配置HTTPS,并实现HTTP访问自动转向HTTPS
        @Bean
        public ServletWebServerFactory servletContainer() {
            TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(){
                @Override
                protected void postProcessContext(Context context) {
                    SecurityConstraint securityConstraint = new SecurityConstraint();
                    securityConstraint.setUserConstraint("CONFIDENTIAL");
                    SecurityCollection collection = new SecurityCollection();
                    collection.addPattern("/*");
                    securityConstraint.addCollection(collection);
                    context.addConstraint(securityConstraint);
                }
            };
            tomcat.addAdditionalTomcatConnectors(httpConnector());
            return tomcat;
        }
    
        @Bean
        public Connector httpConnector() {
            Connector connector = new Connector("org.apache.coyote.http11.Http11NioProtocol");
            connector.setScheme("http");
            connector.setPort(8080); // 监听Http的端口
            connector.setSecure(false);
            connector.setRedirectPort(8443); // 监听Http端口后转向Https端口
            return connector;
        }
    

    最后一步就是服务器配置开启websocket协议访问

    不开启的话,无法正常使用
    会报handshake: Unexpected response code: 400
    查了一下官网才发现原来在配置反向代理的时候,如果需要使用wss,还需要加上如下配置:

    location /wsapp/ {
        proxy_pass http://wsbackend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
    

    但是加上之后,还是无法访问
    最后的结果是:

    proxy set header X-Real IP Sremote addr;
    proxy_set_header Host Shost;
    proxy_set header X _Forward For Sproxy_add_x forwarded for. 
    proxy http version 1.1;
    proxy _set header Upgrade Shttp_upgrade;
    proxy_set_header Connection "upgrade";
    proxy pass http://uat-k8s;
    access_loglogs/uat-api.health.log main;
    

    到此正常访问

    展开全文
  • 主要介绍了SpringBoot集成WebSocket实现前后端消息互传的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • SpringBoot 集成 WebSocket

    2021-01-14 11:07:13
    整体集成起来非常简单,示例如下: 1、创建一个基础的 springboot 工程 (略) 2、配置 pom 依赖 <!--WebSocket--> <dependency> <groupId>org.springframework.boot</groupId> <...

    整体集成起来非常简单,示例如下:

    1、创建一个基础的 springboot 工程

    (略)

    2、配置 pom 依赖

    <!--WebSocket-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <!--WebSocket-->
    

    3、创建 websocket 配置类

    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
    
    @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
    
        public static final String TOPIC_DESTINATION_PREFIXES = "/topic-shanhy";
        public static final String APPLICATION_DESTINATION_PREFIXES = "/app-shanhy";
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/mywebsocket")     //开启/bullet端点
                    .setAllowedOrigins("*")             //允许跨域访问
                    .withSockJS()                      //使用sockJS
    //                .setClientLibraryUrl("https://cdn.bootcdn.net/ajax/libs/sockjs-client/1.5.0/sockjs.min.js")
                    ;
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
    
            //订阅Broker名称,可设置多个
            registry.enableSimpleBroker(TOPIC_DESTINATION_PREFIXES);
            //全局使用的订阅前缀(客户端订阅路径上会体现出来),可设置多个
            registry.setApplicationDestinationPrefixes(APPLICATION_DESTINATION_PREFIXES);
            //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置默认是/user/
            //registry.setUserDestinationPrefix("/user/");
        }
    }
    

    4、创建用于发送消息的 Controller

    import com.goodcol.muses.example.config.WebSocketConfiguration;
    import com.goodcol.muses.example.model.WebSocketMessageDTO;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.messaging.simp.SimpMessagingTemplate;
    import org.springframework.stereotype.Controller;
    
    /**
     * WebSocket 示例
     */
    @Controller
    @Slf4j
    public class WebSocketController {
    
        /**
         * 注入SimpMessagingTemplate 用于点对点消息发送
         */
        @Autowired
        private SimpMessagingTemplate messagingTemplate;
    
        /**
         * say
         *
         * @param clientMessage
         * @return
         */
        @MessageMapping("/chat")
        //SendTo 发送至 Broker 下的指定订阅路径
        @SendTo(WebSocketConfiguration.TOPIC_DESTINATION_PREFIXES + "/say")
        public String say(WebSocketMessageDTO clientMessage) {
            //方法用于广播测试
            if (clientMessage != null) {
                if (clientMessage.getMessage() != null) {
                    clientMessage.setMessage(clientMessage.getMessage().trim());
                }
                String msg = StringUtils.defaultString(clientMessage.getUsername(), "system").concat(":".concat(clientMessage.getMessage()));
                log.info(msg);
                return msg;
            }
            return "None";
        }
    
    
    }
    
    
    /**
     * WebSocketMessageDTO
     */
    public class WebSocketMessageDTO {
    
        private String username;
        private String message;
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    }
    

    5、创建测试 websocket 的 HTML 页面

    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8"/>
        <title>Spring Boot WebSocket</title>
        <script src="https://cdn.bootcdn.net/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
        <script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
        <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
        <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
    </head>
    <body onload="disconnect()">
    <noscript>
        <h2 style="color:#ff0000">你的浏览器不支持websocket</h2>
    </noscript>
    <div>
        <div>
            <button id="connect" onclick="connect()">连接</button>
            <button id="disconnect" onclick="disconnect();">断开连接</button>
        </div>
        <div id="conversationDiv">
            <label>姓名:</label> <input type="text" id="name"/>
            <br>
            <label>消息:</label> <input type="text" id="messgae"/>
            <button id="send" onclick="send();">发送</button>
            <p id="response"></p>
        </div>
    </div>
    <script type="text/javascript">
        var stompClient = null;
        var host = "http://localhost:8081/muses-gateway/ws/websocket";
        var application_destination_prefixes = "/app-shanhy";
        var topic_destination_prefixes = "/topic-shanhy";
    
        function setConnected(connected) {
            document.getElementById('connect').disabled = connected;
            document.getElementById('disconnect').disabled = !connected;
            document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
            $('#response').html();
        }
    
        function connect() {
            var socket = new SockJS(host + '/mywebsocket');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function (frame) {
                setConnected(true);
                console.log('Connected:' + frame);
                // 订阅
                stompClient.subscribe(topic_destination_prefixes + '/say', function (response) {
                    showResponse(response.body);
                });
    
            });
        }
    
        function disconnect() {
            if (stompClient != null) {
                stompClient.disconnect();
            }
            setConnected(false);
            console.log("Disconnected");
        }
    
        function send() {
            var name = $('#name').val();
            var message = $('#messgae').val();
            stompClient.send(application_destination_prefixes + "/chat", {}, JSON.stringify({username: name, message: message}));
        }
    
        function showResponse(message) {
            var response = $('#response');
            response.html(message);
        }
    </script>
    </body>
    </html>
    

    (END)

    展开全文
  • 主要介绍了SpringBoot集成WebSocket【基于纯H5】进行点对点[一对一]和广播[一对多]实时推送,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
  • SpringBoot集成WebSocket

    2020-12-09 17:14:52
    1、引入pom <dependency> <groupId>org.springframework.boot<...spring-boot-starter-websocket</artifactId> </dependency> 2、开启Wbesocket支持 @Configuration public class W

    1、引入pom

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
    

    2、开启Wbesocket支持

    @Configuration
    public class WebSocketConfig {
    
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    
    }
    

    3、编写核心代码

    
    @Component
    @ServerEndpoint("/webSocket")
    public class WebSocketService extends BaseService {
    
        private Session session;
        private static CopyOnWriteArraySet<WebSocketService> webSocketSet=new CopyOnWriteArraySet<>();
    
        @OnOpen
        public void onOpen(Session session){
            this.session=session;
            webSocketSet.add(this);
            logger.info("【websocket消息】有新的连接,总数:{}",webSocketSet.size());
        }
    
        @OnClose
        public void onClose(){
            webSocketSet.remove(this);
            logger.info("【websocket消息】连接断开,总数:{}",webSocketSet.size());
        }
    
        @OnMessage
        public void onMessage(String message){
            logger.info("【websocket消息】收到客户端发来的消息:{}",message);
        }
    
        public void sendMessage(String message){
            for(WebSocketService webSocket:webSocketSet){
                logger.info("【websocket消息】广播消息:{}",message);
                try {
                    webSocket.session.getBasicRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    @OnOpen:当有新请求进来时会触发
    @OnClose:连接关闭时触发
    @OnMessage:接收到消息是触发
    sendMessage()方法是群发消息

    项目中如果集成了Shiro,记得配置开放拦截

    在线测试网站
    http://www.easyswoole.com/wstool.html

    展开全文
  • springBoot集成websocket

    2020-02-12 20:40:14
    springBoot项目集成websocket实现前后连通简介pom.xml引入依赖创建springBoot项目的websocket配置文件websocketServer核心代码不明白的一个实体类,被注释待解惑前端页面的js,创建websocket进行操作监控值得注意的是...

    简介

    	boot项目,或者cloud单服务模块 实现websocket连通:
    	第一步,引入依赖,提供相关jar包
    	第二步,编写websocket配置类,通过@Configuration注解,启动注入Bean对象ServerEndpointExporter
    	第三步,websocketServer服务站点声明,包括站点路径,通道打开,消息发送等监听机制的处理逻辑
    	第四步,页面js创建websocket,连接point站点服务,进行打通
    

    1.pom.xml引入依赖

    <!-- 增加 websocket-->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-websocket</artifactId>
    		</dependency>
    

    2.创建springBoot项目的websocket配置文件

    /**
     * 开启 webSocket配置
     * @author zck
     * */
    @Configuration
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
    

    3.websocketServer核心代码

    /**
     * webSocket服务
     * 网站连接的服务point声明
     * */
    @ServerEndpoint("/webSocket/{userId}")
    @Component
    public class WebSocketService {
        /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
        private static int onlineCount = 0;
        /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
        private static ConcurrentHashMap<String,WebSocketService> webSocketMap = new ConcurrentHashMap<>();
        /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
        private Session session;
        /**接收userId*/
        private String userId="";
        //为实现启动加载socket服务自动注入通道服务,先声明了今天对象
        private static ISocketUserService socketUserService;
        //然后通过自动注入的方法,将通道服务注入到socket服务中,以实现接下来对数据库更新数据的操作。
        @Autowired
        public void setUserService(ISocketUserService socketUserService){
            WebSocketService.socketUserService=socketUserService;
        }
        /**
         * 连接建立成功调用的方法*/
        @OnOpen
        public void onOpen(Session session,@PathParam("userId") String userId) {
            this.session = session;
            this.userId=userId;
            if(webSocketMap.containsKey(userId)){
                //如果set中存在该用户原先的socket,先移除,再添加,同时更新数据库socket状态
                webSocketMap.remove(userId);
                //加入set中
                webSocketMap.put(userId,this);
                //更新用户socket状态
                socketUserService.refresh(userId, ToolsConstants.SOCKET_ONLINE);
            }else{
                //如果set中没有改用户的socket记录,可能被移除,可能未创建过
                //加入set中
                webSocketMap.put(userId,this);
                //在线数加1
                addOnlineCount();
                //查询数据库是否存在该用户socket记录,存在,更新状态,不存在新增记录
                SocketUser socketUser = socketUserService.selectSocketUserById(userId);
                if (socketUser==null){
                    socketUser=new SocketUser();
                    socketUser.setUserId(userId);
                    socketUser.setUserName(userId+"通道");
                    socketUser.setSocketStatus(ToolsConstants.SOCKET_ONLINE);
                    socketUserService.insertSocketUser(socketUser);
                }else {
                    socketUserService.refresh(userId, ToolsConstants.SOCKET_ONLINE);
                }
            }
            log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
            try {
                sendMessage("连接成功");
            } catch (IOException e) {
                log.error("用户:"+userId+",网络异常!!!!!!");
            }
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose() {
            if(webSocketMap.containsKey(userId)){
                webSocketMap.remove(userId);
                //从set中删除
                subOnlineCount();
                //同步更新数据库socket状态
                socketUserService.refresh(userId, ToolsConstants.SOCKET_OFFlINE);
            }
            log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
        }
    
        /**
         * 收到客户端消息后调用的方法
         * @param message 客户端发送过来的消息*/
        @OnMessage
        public void onMessage(String message, Session session) {
            log.info("用户消息:"+userId+",报文:"+message);
            //可以群发消息,指定给某个socket对象发消息
            //消息保存到数据库、redis可当做聊天记录
            if(StringUtils.isNotBlank(message)){
                try {
                    //模拟聊天逻辑解析发送的报文
    //                JSONObject jsonObject = JSON.parseObject(message);
                    //追加发送人(防止串改)
    //                String toUserId=jsonObject.getString("toUserId");
                    //普通通讯逻辑(自己发给自己)
                    String toUserId=this.userId;
                    //传送给对应toUserId用户的websocket
                    if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                        WebSocketService webSocketService = webSocketMap.get(toUserId);
                        webSocketService.sendMessage(message);
                        socketUserService.msgCount(userId);
                    }else{
                        //可以记录到mysql或者redis
                        log.error("请求的userId:"+toUserId+"不在该服务器上");
                    }
                }catch (Exception e){
                    e.printStackTrace();
                    //确保当前socket下线状态
                    socketUserService.refresh(userId, ToolsConstants.SOCKET_OFFlINE);
                }
            }
        }
    
        /**
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) throws IOException {
            log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
            error.printStackTrace();
            //确保当前socket下线状态(服务端主动关闭socket,移除set,更新数据库状态记录,减少在线人数)
            session.close();
            webSocketMap.remove(userId);
            socketUserService.refresh(userId, ToolsConstants.SOCKET_OFFlINE);
            subOnlineCount();
        }
        /**
         * 实现服务器主动推送搜索
         */
        public void sendMessage(String message) throws IOException {
            this.session.getBasicRemote().sendText(message);
        }
    
    
        /**
         * 发送自定义消息
         * */
        public static AjaxResult sendInfo(MessageDto dto) throws IOException {
            log.info("发送消息到:"+dto.getUserId()+",报文:"+dto.getMessage());
            if(StringUtils.isNotBlank(dto.getUserId())&&webSocketMap.containsKey(dto.getUserId())){
                webSocketMap.get(dto.getUserId()).sendMessage(dto.getMessage());
                return AjaxResult.success("发送成功");
            }else{
                log.error("用户"+dto.getUserId()+",不在线!");
                return AjaxResult.error("用户"+dto.getUserId()+",不在线!");
            }
        }
    
        /**
         * 广播所有用户消息
         * @throws IOException
         */
        public static AjaxResult sendInfoToAll(String message) throws IOException {
            log.info("广播消息,报文:"+message);
            //遍历值
            for(WebSocketService service : webSocketMap.values()) {
                service.sendMessage(message);
            }
            return AjaxResult.success("发送成功");
        }
    
        /**
         * 关闭socket通道
         * */
        public static void closeSocket(String userId) throws IOException {
            if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
                webSocketMap.get(userId).onClose();
            }else{
                log.error("用户"+userId+",不在线!");
            }
        }
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        public static synchronized void addOnlineCount() {
            WebSocketService.onlineCount++;
        }
    
        public static synchronized void subOnlineCount() {
            WebSocketService.onlineCount--;
        }
    
    }
    

    4.前端页面的js,创建websocket进行操作监控

    	var websocket = null;
        //判断当前浏览器是否支持WebSocket
        if ('WebSocket' in window) {
            console.log("尝试创建socket通道")
            websocket = new WebSocket("ws://localhost:8080/项目名字/webSocket/111");
        }
        else {
            alert('Not support websocket')
        }
        //连接发生错误的回调方法
        websocket.onerror = function () {
            setMessageInnerHTML("error");
        };
        //连接成功建立的回调方法
        websocket.onopen = function (event) {
            setMessageInnerHTML("open");
        }
        //接收到消息的回调方法
        websocket.onmessage = function (event) {
            setMessageInnerHTML(event.data);
        }
        //连接关闭的回调方法
        websocket.onclose = function () {
            setMessageInnerHTML("close");
        }
        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
        	console.log("关闭网页窗口关闭通道")
            websocket.close();
        }
        //将消息显示在网页上
        function setMessageInnerHTML(innerHTML) {
            console.log("调用的方法是"+innerHTML);
        }
        //关闭连接
        function closeWebSocket() {
            websocket.close();
        }
        //发送消息
        function send() {
            websocket.send("asdfs");
        }
    
    展开全文
  • SpringBoot集成WebSocket实战一

    千次阅读 2020-12-17 16:30:09
    SpringBoot集成WebSocket实战一1.什么是WebSocket2.为什么要使用WebSocket3.SpringBoot整合WebSocket4. 服务器主动推送数据给前端 1.什么是WebSocket 什么是websocket,百度百科的解释是:"websocket是一种在单个TCP...
  • 2021-07-14 14:53:18.352 WARN 2452 --- [ main] s.c.a.AnnotationConfigApplicationContext : Exception encountered during ... 解决办法: 重新导入SpringBoot集成websocket的依赖,去除关于POM文件中报错的依赖
  • springboot整合websocket

    2019-02-03 14:53:55
    springboot整合websocket
  • 1、在项目的全局pom.xml导入springboot的项目依赖包 <!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-...
  • 启用SpringbootWebSocket的支持 启用WebSocket的支持也是很简单,几句代码搞定: import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import...
  • rabbitmq+websocket(SpringBoot版)实现分布式消息推送 本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同 所以采用 rabbitMQ+webSocket实现分布式消息推送 生产者将消息 发送给 ...
  • Springboot集成WebSocket功能

    千次阅读 2019-06-05 18:42:40
    Springboot集成WebSocket功能 由于MT管理器论坛需要添加聊天功能,在网上搜了很多,最后发现了websocket可以用于实时通信和聊天室功能,然后看了慕课上的一个网课,跟着他做出来了一个demo,下面就来看一下什么是...
  • 什么是 WebSocket WebSocket 是一种全新的协议。它将 TCP 的 Socket(套接字)应用在了web page上,从而使通信双方建立起一个保持在活动状态的连接通道,并且属于全双工通信(双方同时进行双向通信)。 二. ...
  • springboot 集成websocket

    2020-11-08 15:29:27
    Spring Boot 集成 WebSocket# 首先创建一个 Spring Boot 项目,然后在 pom.xml 加入如下依赖集成 WebSocket: Copy org.springframework.boot spring-boot-starter-websocket 开启配置# 接下来在 config 包下创建一...
  • import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.... } } } SpringBoot集成webSocket时如果传输的文件过大需要设置
  • WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。...
  • 1.简介 由于遇到异步的接口调用,异步任务处理结果会写在rabbitmq中,部署方式为了实现高可用...前端代码 Spring Boot+WebSocket+广播式 貌似你的浏览器不支持websocket 输入实例instanceId 连接 断开连接 接收的数据:
  • springboot集成websocket的两种实现方式

    万次阅读 多人点赞 2019-05-31 17:21:22
    WebSocket跟常规的http协议的区别和优缺点这里大概描述一下 一、websocket与http http协议是用在应用层的协议,他是基于tcp协议的,http协议建立链接也必须要有三次握手才能发送信息。http链接分为短链接,长链接...
  • 第一种:SpringBoot官网提供了一种websocket集成方式 第二种:javax.websocket中提供了元注解的方式 下面讲解简单的第二种 添加依赖 <dependency> <groupId>org.springframework.boot</groupId>...
  • 启用SpringbootWebSocket的支持 启用WebSocket的支持也是很简单,几句代码搞定: import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import...
  • springboot使用websocket过程中,需要使用到@Autowired注入service 按照平时的方法注入: @ServerEndpoint(value = "/websocket/{name}") @Component public class WebSocketService { @Autowired ...
  • SpringBoot集成WebSocket实现简易版微信

    千次阅读 2020-03-23 00:25:31
    一、前言 ...这意味着它是一种持久性连接,且服务端可以发消息给客户端。这便容易实现即时通讯通知的功能,本文将介绍 WebSocket 在 SpringBoot 中的用法,以及一个简单的网上聊天的 ...-- 集成websocket --> <...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,765
精华内容 2,306
关键字:

springboot集成websocket

spring 订阅