精华内容
下载资源
问答
  • 取消订阅
    2021-03-16 01:11:30

    第一步:建立一个监听类,继承JedisPubSub

    package work;

    import redis.clients.jedis.JedisPubSub;

    /**

    * 监听

    * author:songyan

    * date: 2019/10/17

    **/

    public class MyListener extends JedisPubSub {

    private String subName;

    public MyListener(String subName) {

    this.subName = subName;

    }

    // 取得订阅的消息后的处理

    public void onMessage(String channel, String message) {

    System.out.println(String.format("订阅者:"+ subName + "接收消息,频道:%s,消息%s" , channel , message));

    }

    // 初始化订阅时候的处理

    public void onSubscribe(String channel, int subscribedChannels) {

    System.out.println(String.format("订阅者:"+ subName + "订阅频道成功,频道:%s,订阅频道数%d" , channel , subscribedChannels));

    }

    // 取消订阅时候的处理

    public void onUnsubscribe(String channel, int subscribedChannels) {

    System.out.println(String.format("订阅者:"+ subName + "取消订阅成功,频道名称:%s,订阅频道数%d",channel , subscribedChannels));

    }

    }

    第二步:建立一个Publisher (发布者)

    package work;

    import redis.clients.jedis.Jedis;

    import redis.clients.jedis.JedisPool;

    import java.io.BufferedReader;

    import java.io.IOException;

    import java.io.InputStreamReader;

    /**

    * 发布

    * author:songyan

    * date: 2019/10/17

    **/

    public class Publisher extends Thread {

    //定义一个连接池

    private final JedisPool jedisPool;

    private String chanelName;

    public Publisher(JedisPool jedisPool, String chanelName) {

    this.jedisPool = jedisPool;

    this.chanelName = chanelName;

    }

    @Override

    public void run() {

    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

    Jedis jedis = jedisPool.getResource();

    while (true) {

    String line = null;

    try {

    line = reader.readLine();

    if (!"quit".equals(line)) {

    jedis.publish(chanelName, line);

    System.out.println("发布者频道发布成功,频道名称:" + chanelName);

    } else {

    break;

    }

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }

    }

    第三步:创建一个订阅者类,用来控制订阅者订阅消息频道

    ---恢复内容结束---

    标签:订阅,频道,String,subName,import,Jedis,public

    来源: https://www.cnblogs.com/excellencesy/p/11696580.html

    更多相关内容
  • 你可能知道当你订阅 Observable 对象或设置事件监听时,在某个时间点,你需要执行取消订阅操作,进而释放操作系统的内存。否则,你的应用程序可能会出现内存泄露。 接下来让我们看一下,需要在 ngOnDestroy 生命周期...
  • 从邮件列表中大量退订unsubscribe.sh脚本用于从不需要的广告类型邮件列表中大量取消订阅。 它基于RFC 2369(1998年7月)定义的“ List-Unsubscribe字段,通常出现在法语广告电子邮件中。 此字段包含和/或<http> (或...
  • 取消订阅 Magento 中的访客表格时事通讯 注册客户的双重选择 事实 版本:1.5.0.4 Magento 连接: : 描述 取消订阅框 注册客户的双重选择 为时事通讯链接强制 SSL 重新发送双重选择加入的电子邮件选项 为通讯链接...
  • Magento-禁用时事通讯通知扩展概述当用户订阅或取消订阅您的新闻通讯时,他会收到一封电子邮件以确认其操作。 可能不需要这种行为。 通过此扩展程序,您可以阻止这些确认电子邮件。兼容性在Magento CE 1.6-1.9上测试...
  • 手动取消订阅 Consumer类型 Observable创建返回Disposable取消 public class SecondActivity extends AppCompatActivity { private static final String TAG = SecondActivity; private Disposable disposable; ...
  • MQTT发布订阅和取消订阅

    千次阅读 2022-06-18 10:25:35
    在这节课里,我们学习客户端如何实现发布消息、订阅主题以及取消订阅主题。在本节课里我们将重点讲解以下MQTT报文: PUBLISH – 发布信息 SUBSCRIBE – 订阅主题 SUBACK – 订阅确认 UNSUBSCRIBE – 取消订阅 ...

    在之前的课程里,我们学习了如何使用MQTT客户端连接MQTT服务端。在这节课里,我们学习客户端如何实现发布消息、订阅主题以及取消订阅主题。在本节课里我们将重点讲解以下MQTT报文:

    • PUBLISH – 发布信息
    • SUBSCRIBE – 订阅主题
    • SUBACK – 订阅确认
    • UNSUBSCRIBE – 取消订阅

    PUBLISH – 发布消息

    MQTT客户端一旦连接到服务端,便可以发布消息。 每条发布的MQTT消息必须包含一个主题。MQTT服务器可以通过主题确定将消息转发给哪些客户端。(注:这里的消息指的是MQTT报文。)
    在这里插入图片描述
    PUBLISH - 发布消息PUBLISH – 发布消息
    MQTT客户端发布消息时,会向服务端发送PUBLISH报文。以下是PUBLISH报文的详细信息。
    在这里插入图片描述
    MQTT PUBLISH 报文MQTT PUBLISH 报文
    上图左侧栏中的内容是PUBLISH报文所包含的信息名称。右侧是信息的具体内容。

    topicName – 主题名

    主题名用于识别此信息应发布到哪一个主题。关于MQTT主题的应用,我们在之前的课程中已经做了详细介绍,在后续课程中我们还会对主题的高级应用进行更加详细的讲解。

    QoS – 服务质量等级

    QoS(Quality of Service)表示MQTT消息的服务质量等级。QoS有三个级别:0、1和2。QoS决定MQTT通讯有什么样的服务保证。有关QoS的详细信息我们会在后续课程中详细讲解。

    packetId – 报文标识符

    报文标识符可用于对MQTT报文进行标识。不同的MQTT报文所拥有的标识符不同。MQTT设备可以通过该标识符对MQTT报文进行甄别和管理。请注意:报文标识符的内容与QoS级别有密不可分的关系。只有QoS级别大于0时,报文标识符才是非零数值。如果QoS等于0,报文标识符为0。

    retainFlag – 保留标志

    在默认情况下,当客户端订阅了某一主题后,并不会马上接收到该主题的信息。只有在客户端订阅该主题后,服务端接收到该主题的新信息时,服务端才会将最新接收到的该主题信息推送给客户端。

    但是在有些情况下,我们需要客户端在订阅了某一主题后马上接收到一条该主题的信息。这时候就需要用到保留标志这一信息。关于保留标志的具体使用方法,我们将在本教程的后续部分进行详细讲解。

    Payload – 有效载荷

    有效載荷是我们希望通过MQTT所发送的实际内容。我们可以使用MQTT协议发送文本,图像等格式的内容。这些内容都是通过有效載荷所发送的。

    dupFlag – 重发标志

    当MQTT报文的接收方没有及时确认收到报文时,发送方会重复发送MQTT报文。在重复发送MQTT报文时,发送方会将此“重发标志”设置为true。请注意,重发标志只在QoS级别大于0时使用。有关QoS的详细信息,我们将会在后续教程中为您做详细介绍。

    SUBSCRIBE – 订阅主题

    当客户端连接到服务端后,除了可以发布消息,也可以接收消息。我们在之前的课程讲过,所有MQTT消息都有主题。客户端要想接收消息,首先要订阅该消息的主题。这样,当有客户端向该主题发布消息后,订阅了该主题的客户端就能接收到消息了。

    客户端要想订阅主题,首先要向服务端发送主题订阅请求。客户端是通过向服务端发送SUBSCRIBE报文来实现这一请求的。该报文包含有一系列“订阅主题名”。请留意,一个SUBSCRIBE报文可以包含有单个或者多个订阅主题名。也就是说,一个SUBSCRIBE报文可以用于订阅一个或者多个主题。

    在以上PUBLISH报文讲解中,我们曾经提到过QoS(服务质量等级)这一概念。同样的,客户端在订阅主题时也可以明确QoS。服务端会根据SUBSCRIBE中的QoS来提供相应的服务保证。

    另外每一个SUBSCRIBE报文还包含有“报文标识符”。报文标识符可用于对MQTT报文进行标识。不同的MQTT报文所拥有的标识符不同。MQTT设备可以通过该标识符对MQTT报文进行甄别和管理。

    SUBACK – 订阅确认

    服务端接收到客户端的订阅报文后,会向客户端发送SUBACK报文确认订阅。

    SUBACK报文包含有“订阅返回码”和“报文标识符”这两个信息。

    returnCode – 订阅返回码

    客户端向服务端发送订阅请求后,服务端会给客户端返回一个订阅返回码。

    在之前的讲解中我们说过,客户端可通过一个SUBSCRIBE报文发送多个主题的订阅请求。服务端会针对SUBSCRIBE报文中的所有订阅主题来逐一回复给客户端一个返回码。

    这个返回码的作用是告知客户端是否成功订阅了主题。以下是返回码的详细说明。

    0 - 订阅成功 – QoS 0

    1 - 订阅成功- QoS 1

    2 - 订阅成功- QoS 2

    128 - 订阅失败

    请留意,如上表所示,针对不同的主题订阅QoS,服务端的返回码会有所不同。

    另外每一个SUBACK报文也包含有“报文标识符”。MQTT设备可以通过该标识符对报文进行管理。

    UNSUBSCRIBE – 取消订阅

    顾名思义,当客户端要取消订阅某主题时,可通过向服务端发送UNSUBSCRIBE – 取消订阅报文来实现。

    以上示意图显示,UNSUBSCRIBE报文包含两个重要信息,第一个是取消订阅的主题名称。同一个UNSUBSCRIBE报文可以同时包含多个取消订阅的主题名称。另外,UNSUBSCRIBE报文也包含“报文标识符”,MQTT设备可以通过该标识符对报文进行管理。

    当服务端接收到UNSUBSCRIBE报文后,会向客户端发送取消订阅确认报文 – UNSUBACK报文。该报文含有客户端所发送的“取消订阅报文标识符”。

    客户端接收到UNSUBACK报文后就可以确认取消主题订阅已经成功完成了。

    展开全文
  • 通常情况下,使用subject时会出现next出的数据...这种反复调用放任不管的情况下可能会导致内存泄露,解决该问题的方式也很简单,将subject取消订阅(unsubscribe)即可,单如下这种方式会导致报错 test(){ this.

    通常情况下,使用subject时会出现next出的数据subscribe后被反复调用,如:

    test(){
        this.closeSubject.sunscribe((res:any)=>{
            // 代码逻辑
            console.log("111")
        })
    }

    这种反复调用放任不管的情况下可能会导致内存泄露,解决该问题的方式也很简单,将subject取消订阅(unsubscribe)即可,但如下这种方式会导致报错

    test(){
        this.closeSubject.sunscribe((res:any)=>{
            // 代码逻辑
            console.log("111")
            this.closeSubject.unsubscribe();
        })
    }

    正确方式应当为

    test(){
        let closeSubject = this.closeSubject.sunscribe((res:any)=>{
            // 代码逻辑
            console.log("111")
            closeSubject.unsubscribe();
        })
    }

     

     多次执行及取消订阅报错问题解决

    展开全文
  • websocket+redis动态订阅和动态取消订阅

    千次阅读 热门讨论 2021-05-28 14:29:19
    websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。 订阅频道消息格式: { "cmd":"subscribe", "topic":[ "topic_name" ] } 取消订阅格式 { ...

    原理

    websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。

    订阅频道消息格式:

    {
        "cmd":"subscribe",
        "topic":[
            "topic_name"
        ]
    }
    

    模糊订阅格式

    {
        "cmd":"psubscribe",
        "topic":[
            "topic_name"
        ]
    }
    

    取消订阅格式

    {
        "cmd":"unsubscribe",
        "topic":[
            "topic_name"
        ]
    }
    

    两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。

    redis订阅监听类

    package com.curtain.core;
    
    import com.curtain.config.GetBeanUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPubSub;
    
    import java.util.Arrays;
    
    /**
     * @Author Curtain
     * @Date 2021/6/7 14:27
     * @Description
     */
    @Component
    @Slf4j
    public class RedisPubSub extends JedisPubSub {
        private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
        private Jedis jedis;
    
        //订阅
        public void subscribe(String... channels) {
            jedis = jedisPool.getResource();
            try {
                jedis.subscribe(this, channels);
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
                subscribe(channels);
            }
        }
    
        //模糊订阅
        public void psubscribe(String... channels) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.psubscribe(this, channels);
            } catch (ArithmeticException e) {//取消订阅故意造成的异常
                if (jedis != null)
                    jedis.close();
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
                psubscribe(channels);
            }
        }
    
        public void unsubscribeAndClose(String... channels){
            unsubscribe(channels);
            if (jedis != null && !isSubscribed())
                jedis.close();
        }
    
        public void punsubscribeAndClose(String... channels){
            punsubscribe(channels);
            if (jedis != null && !isSubscribed())
                jedis.close();
        }
    
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId());
        }
    
        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
            log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId());
        }
    
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
            WebSocketServer.publish(message, pattern);
            WebSocketServer.publish(message, channel);
    
        }
    
        @Override
        public void onMessage(String channel, String message) {
            log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
            WebSocketServer.publish(message, channel);
        }
    
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            log.info("unsubscribe redis channel:" + channel);
        }
    
        @Override
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            log.info("punsubscribe redis channel:" + pattern);
        }
    }
    

    ps:
    1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。

    webSocket订阅推送类

    这个类会有两个ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>类型类变量,分别存储订阅模糊订阅的信息。
    外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。
    还有个ConcurrentHashMap<String, RedisPubSub>类型的变量,存储的是事件-RedisPubSub,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。
    信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。

    package com.curtain.core;
    
    import com.alibaba.fastjson.JSON;
    import com.curtain.config.WebsocketProperties;
    import com.curtain.service.Cancelable;
    import com.curtain.service.impl.TaskExecuteService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    /**
     * @Author Curtain
     * @Date 2021/5/14 16:49
     * @Description
     */
    @ServerEndpoint("/ws")
    @Component
    @Slf4j
    public class WebSocketServer {
        /**
         * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
         */
        private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
        /**
         * 存放psub的事件
         **/
        private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
        /**
         * 存放topic(pattern)-对应的RedisPubsub
         */
        private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        private Session session;
        private String sessionId = "";
        //要注入的对象
        private static TaskExecuteService executeService;
        private static WebsocketProperties properties;
    
        private Cancelable cancelable;
    
        @Autowired
        public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
            WebSocketServer.executeService = taskExecuteService;
        }
    
        @Autowired
        public void setWebsocketProperties(WebsocketProperties properties) {
            WebSocketServer.properties = properties;
        }
    
        /**
         * 连接建立成功调用的方法
         */
        @OnOpen
        public void onOpen(Session session) {
            this.session = session;
            this.sessionId = session.getId();
            //构造推送数据
            Map pubHeader = new HashMap();
            pubHeader.put("name", "connect_status");
            pubHeader.put("type", "create");
            pubHeader.put("from", "pubsub");
            pubHeader.put("time", new Date().getTime() / 1000);
            Map pubPayload = new HashMap();
            pubPayload.put("status", "success");
            Map pubMap = new HashMap();
            pubMap.put("header", pubHeader);
            pubMap.put("payload", pubPayload);
            sendMessage(JSON.toJSONString(pubMap));
            cancelable = executeService.runPeriodly(() -> {
                try {
                    if (cancelable != null && !session.isOpen()) {
                        log.info("断开连接,停止发送ping");
                        cancelable.cancel();
                    } else {
                        String data = "ping";
                        ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                        session.getBasicRemote().sendPing(payload);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }, properties.getPeriod());
    
        }
    
        @OnMessage
        public void onMessage(String message) {
            synchronized (session) {
                Map msgMap = (Map) JSON.parse(message);
                String cmd = (String) msgMap.get("cmd");
                //订阅消息
                if ("subscribe".equals(cmd)) {
                    List<String> topics = (List<String>) msgMap.get("topic");
                    //本地记录订阅信息
                    for (int i = 0; i < topics.size(); i++) {
                        String topic = topics.get(i);
                        log.info("============================subscribe-start============================");
                        log.info("sessionId:" + this.sessionId + ",开始订阅:" + topic);
                        if (webSocketMap.containsKey(topic)) {//有人订阅过了
                            webSocketMap.get(topic).put(this.sessionId, this);
                        } else {//之前还没人订阅过,所以需要订阅redis频道
                            ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                            map.put(this.sessionId, this);
                            webSocketMap.put(topic, map);
                            new Thread(() -> {
                                RedisPubSub redisPubSub = new RedisPubSub();
                                //存入map
                                redisPubSubMap.put(topic, redisPubSub);
                                redisPubSub.subscribe(topic);
                            }).start();
                        }
                        log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic);
                        log();
                        log.info("============================subscribe-end============================");
                    }
                }
                //psubscribe
                if ("psubscribe".equals(cmd)) {
                    List<String> topics = (List<String>) msgMap.get("topic");
                    //本地记录订阅信息
                    for (int i = 0; i < topics.size(); i++) {
                        String topic = topics.get(i);
                        log.info("============================psubscribe-start============================");
                        log.info("sessionId:" + this.sessionId + ",开始模糊订阅:" + topic);
                        if (pWebSocketMap.containsKey(topic)) {//有人订阅过了
                            pWebSocketMap.get(topic).put(this.sessionId, this);
                        } else {//之前还没人订阅过,所以需要订阅redis频道
                            ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                            map.put(this.sessionId, this);
                            pWebSocketMap.put(topic, map);
                            new Thread(() -> {
                                RedisPubSub redisPubSub = new RedisPubSub();
                                //存入map
                                redisPubSubMap.put(topic, redisPubSub);
                                redisPubSub.psubscribe(topic);
                            }).start();
                        }
                        log.info("sessionId:" + this.sessionId + ",完成模糊订阅:" + topic);
                        log();
                        log.info("============================psubscribe-end============================");
                    }
                }
                //取消订阅
                if ("unsubscribe".equals(cmd)) {
                    List<String> topics = (List<String>) msgMap.get("topic");
                    //删除本地对应的订阅信息
                    for (String topic : topics) {
                        log.info("============================unsubscribe-start============================");
                        log.info("sessionId:" + this.sessionId + ",开始删除订阅:" + topic);
                        if (webSocketMap.containsKey(topic)) {
                            ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                            map.remove(this.sessionId);
                            if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                                webSocketMap.remove(topic);
                                redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                                redisPubSubMap.remove(topic);
                            }
                        }
                        if (pWebSocketMap.containsKey(topic)) {
                            ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                            map.remove(this.sessionId);
                            if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                                pWebSocketMap.remove(topic);
                                redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                                redisPubSubMap.remove(topic);
                            }
                        }
                        log.info("sessionId:" + this.sessionId + ",完成删除订阅:" + topic);
                        log();
                        log.info("============================unsubscribe-end============================");
                    }
                }
            }
        }
    
        @OnMessage
        public void onPong(PongMessage pongMessage) {
            try {
                log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose() {
            synchronized (session) {
                log.info("============================onclose-start============================");
                //删除订阅
                Iterator iterator = webSocketMap.keySet().iterator();
                while (iterator.hasNext()) {
                    String topic = (String) iterator.next();
                    ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                    map.remove(this.sessionId);
                    if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                        webSocketMap.remove(topic);
                        redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                        redisPubSubMap.remove(topic);
                    }
                }
                //删除模糊订阅
                Iterator iteratorP = pWebSocketMap.keySet().iterator();
                while (iteratorP.hasNext()) {
                    String topic = (String) iteratorP.next();
                    ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                    map.remove(this.sessionId);
                    if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                        pWebSocketMap.remove(topic);
                        redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                        redisPubSubMap.remove(topic);
                    }
                }
                log.info("sessionId:" + this.sessionId + ",断开连接:");
                //debug
                log();
                log.info("============================onclose-end============================");
            }
        }
    
    
        /**
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            synchronized (session) {
                log.info("============================onError-start============================");
                log.error("用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage());
                error.printStackTrace();
                log.info("关闭错误用户对应的连接");
                //删除订阅
                Iterator iterator = webSocketMap.keySet().iterator();
                while (iterator.hasNext()) {
                    String topic = (String) iterator.next();
                    ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                    map.remove(this.sessionId);
                    if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                        webSocketMap.remove(topic);
                        redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                        redisPubSubMap.remove(topic);
                    }
                }
                //删除模糊订阅
                Iterator iteratorP = pWebSocketMap.keySet().iterator();
                while (iteratorP.hasNext()) {
                    String topic = (String) iteratorP.next();
                    ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                    map.remove(this.sessionId);
                    if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                        pWebSocketMap.remove(topic);
                        redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                        redisPubSubMap.remove(topic);
                    }
                }
                log.info("完成错误用户对应的连接关闭");
                //debug
                log();
                log.info("============================onError-end============================");
            }
        }
    
        /**
         * 实现服务器主动推送
         */
        public void sendMessage(String message) {
            synchronized (session) {
                try {
                    this.session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void publish(String msg, String topic) {
            ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
            if (map != null && map.values() != null) {
                for (WebSocketServer webSocketServer : map.values())
                    webSocketServer.sendMessage(msg);
            }
            map = pWebSocketMap.get(topic);
            if (map != null && map.values() != null) {
                for (WebSocketServer webSocketServer : map.values())
                    webSocketServer.sendMessage(msg);
            }
        }
    
        private void log() {
            log.info("<<<<<<<<<<<完成操作后,打印订阅信息开始>>>>>>>>>>");
            Iterator iterator1 = webSocketMap.keySet().iterator();
            while (iterator1.hasNext()) {
                String topic = (String) iterator1.next();
                log.info("topic:" + topic);
                Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
                while (iterator2.hasNext()) {
                    String session = (String) iterator2.next();
                    log.info("订阅" + topic + "的sessionId:" + session);
                }
            }
            log.info("<<<<<<<<<<<完成操作后,打印订阅信息结束>>>>>>>>>>");
        }
    }
    

    项目地址

    上面介绍了核心代码,下面是完整代码地址

    https://github.com/Curtain-Wang/websocket-redis-subscribe.git

    Update20220415

    参考评论区老哥的建议,将redis订阅监听类里面的subscribepsubscribe方法调整如下:

        //订阅
        @Override
        public void subscribe(String... channels) {
            boolean done = true;
            while (done){
                Jedis jedis = jedisPool.getResource();
                try {
                    jedis.subscribe(this, channels);
                    done = false;
                } catch (Exception e) {
                    log.error(e.getMessage());
                    if (jedis != null)
                        jedis.close();
                    //遇到异常后关闭连接重新订阅
                    log.info("监听遇到异常,四秒后重新订阅频道:");
                    Arrays.asList(channels).forEach(s -> {log.info(s);});
                    try {
                        Thread.sleep(4000);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }
        }
        //模糊订阅
        @Override
        public void psubscribe(String... channels) {
            boolean done = true;
            while (done){
                Jedis jedis = jedisPool.getResource();
                try {
                    jedis.psubscribe(this, channels);
                    done = false;
                } catch (Exception e) {
                    log.error(e.getMessage());
                    if (jedis != null)
                        jedis.close();
                    //遇到异常后关闭连接重新订阅
                    log.info("监听遇到异常,四秒后重新订阅频道:");
                    Arrays.asList(channels).forEach(s -> {log.info(s);});
                    try {
                        Thread.sleep(4000);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }
        }
    
    展开全文
  • 1.4按模式规则订阅和取消订阅 2、应用场景 2.1 测试实践:微信班级群 class:20170101 1、redis客户端实现发布订阅 1.1 启动端口,发布hello world 127.0.0.1:6379> publish channel:test "hello word" //...
  • Angular自动取消订阅RxJs

    千次阅读 2020-03-18 10:49:57
    Angular自动取消订阅RxJs 在使用 rxjs 时我们经常忘记调用unsubscribe()而导致内存泄露,很多时候你很难发现它,在RxJs官方有这样一段话: What is a Subscription? A Subscription is an object that represents a...
  • 我有一个关于如何取消订阅可观察量的问题。我有两个代码,我不确定哪一个更好。实施例1 – >流完成后取消订阅订阅者:Subscriber subscriber = new Subscriber() {@Overridepublic void onCompleted() {...
  • UnSpam Me v1.0.2(c)2018 by Dennis M.Heine此程序搜索您的IMAP电子邮件收件箱文件夹,并取消订阅所有邮件列表/新闻信。 它使用List-Unsubscribe邮件标头获取取消订阅链接。 如果是URL,将对其进行访问。 如果这是...
  • RxJava2取消订阅

    2021-03-30 08:16:06
    所以分析之后最终想在关闭界面的时候取消调 RxJava2 的订阅 在 Observer ,其源码如下 public interface Observer { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull ...
  • PubSub.subsribe( )是用于订阅消息,在componentDidMount( )中使用。在componentWillUnmount中取消订阅
  • 将“取消订阅”标签应用到Gmail中的“从批量电子邮件中取消订阅” Gmail Gmail取消订阅是一种开源Google脚本,可帮助您轻松从Gmail和Google收件箱中不需要的新闻通讯和其他批量电子邮件中取消订阅您的电子邮件地址。...
  • 通过电子邮件取消订阅Rails 向您的电子邮件添加一键式退订链接 从您的电子邮件服务中获取退回邮件和垃圾邮件报告 优雅地处理电子邮件地址更改 :postbox: 查看进行分析 安装 将此行添加到您的应用程序的Gemfile中...
  • 在这篇文章中,我们将重点介绍 MQTT 中的发布、订阅和取消订阅。在本系列的前面,我们介绍了发布/订阅模型的基础知识。在这篇文章中,我们深入探讨了 MQTT 协议中发布/订阅的细节。如果您还没有阅读有关发布/订阅...
  • 一、取消订阅、 二、取消订阅 unsubscribeByEventType 方法、
  • 通过委托方法取消订阅 observable。 将 Bacon 与传统的面向对象框架一起使用可能很困难。 这些框架通常依赖于具有副作用的方法。 以 Backbone 为例。 当remove()方法被调用时,您通常希望停止正在运行的 observable...
  • * 事件订阅,发布,取消订阅 */ export class EventManager { events = {}; subscribe(name, func) { if (!this.events[name]) { this.events[name] = [] } this.events[name].push(func) } publish(name)...
  • 主要介绍了C#中事件的订阅和取消订阅,结合Visual C#开发环境来进行讲解,Visual C#被集成在微软的IDE程序Visual Studio中,需要的朋友可以参考下
  • bbPress 批量退订 立即取消订阅论坛订阅 视频演示 作者
  • MQTT 发布,订阅 和 取消订阅-MQTT核心系列:第四章 作者:HiveMQ 团队 翻译:索隆有几把刀 欢迎来到MQTT核心系列的第四章。这个系列一共有十章,用来介绍MQTT的核心特性和概念。在这一章中,我们将着眼于MQTT中的...
  • 如果您在 iPhone 上订阅了应用提供的服务或内容,在取消订阅之前,大部分应用会自动续订。为了避免不必要的付费,用户可以前往 iPhone 设置-Apple ID-订阅中查看所有订阅的内容,并在此取消订阅。小提示:如果您没有...
  • rxjs 的 Observable(可观察对象)极大的方便了我们的开发,但是当 subscribe(订阅) 没有多次时,前一个...这种情况怎么办呢,可以手动取消订阅,但是有没有办法在 页面Destroy 后自动取消订阅呢? 使用 until-destr
  • 从此用 RxJS 订阅的时候,时刻都不忘调用 unsubscribe() 以防内存泄漏。对于结束 Observable,释放...第三种,订阅者主动取消订阅,执行 subscription.unsubscribe() 对于Observable.onComplete()和Observable.OnError
  • Kafka-消费者订阅主题和取消订阅

    千次阅读 2020-10-28 09:33:20
    消费者订阅 消费者通过 subscribe() 和 assign() 两种方式订阅主题 subscribe() 使用 subscribe() 可以订阅一个或多个主题,对于这个方法而言,可以以集合的方式订阅多个主题,也可以以正则表达式的形式订阅特定...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 69,888
精华内容 27,955
关键字:

取消订阅

友情链接: geologic_time_scale.zip