精华内容
下载资源
问答
  • spring boot jsm ibmmq topic queue 两种方式实现
  • Activitymq topic与 queue 并存实现方案 问题: 根据官方的事例文档,配置,我们发现按照该方案,发现要么实现topic 要么 实现queue,不能同时出现,因为他只是一个配置参数,结果值为true或是false。但是我们的...

    Activitymq topic与 queue 并存实现方案

    问题:

    根据官方的事例文档,配置,我们发现按照该方案,发现要么实现topic 要么 实现queue,不能同时出现,因为他只是一个配置参数,结果值为true或是false。但是我们的实际使用场景,很可能会出现一个服务即需要队列,又需要订阅。本文档则着重解决这个问题。

     

     

    一、引入activity_sample_activitymq

    二、修改SampleActiveMQApplication

    删除原有的queue方法,


    替换为queue和topic的工厂

           @Bean

           publicJmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,

                         DefaultJmsListenerContainerFactoryConfigurerconfigurer) {

                  DefaultJmsListenerContainerFactoryfactory = new DefaultJmsListenerContainerFactory();

                  //为true 则是 topic 不写则为queue

                  factory.setPubSubDomain(true);

                  configurer.configure(factory, connectionFactory);

                  return factory;

           }

     

           @Bean

           publicJmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory,

                         DefaultJmsListenerContainerFactoryConfigurerconfigurer) {

                  DefaultJmsListenerContainerFactoryfactory = new DefaultJmsListenerContainerFactory();

                  configurer.configure(factory, connectionFactory);

                  return factory;

           }

      

    三、修改Consumer

    删除原有的消费者方法,替换为:

     

    // destination 监听的队列名, containerFactory                                                                                           使用的工厂

    @JmsListener(destination ="mytopic", containerFactory = "topicListenerFactory")               

    public void receiveTopicMessage(Stringcontext) {                                              

         System.out.println("Receivedtopic<" + context + ">");                                     

    }                                                                                               

                                                                                                   

                                                                                                      

    @JmsListener(destination ="myqueue", containerFactory = "queueListenerFactory")               

    public void receiveQueueMessage(Stringcontext) {                                              

         System.out.println("Receivedqueue<" + context + ">");                                      

    }                                                                                              

                                                                                                      

    四、修改消息生产者send方法

    public void send(String msg) {                                                            

         //key值 为 监听器监听的 队列名  value 可以是任意类型,需要在consumer中定义对应的类型才可以接受                          

         this.jmsMessagingTemplate.convertAndSend("mytopic","topic");                         

         this.jmsMessagingTemplate.convertAndSend("myqueue","queue");                         

    }                                                                                         

                                                                                              

     

     

    五、结果

     

     

    展开全文
  • mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给他保留,topic只有持久化订阅者会保留   (1)使用queue,即队列时,每个消息...

    mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给他保留,topic只有持久化订阅者会保留

     

    (1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可

    。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。

    (2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。

    首先,假设消费者都是普通的消费者,
    ------------------------
    <1>activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么

    ,这个消息就被抛弃了。

    <2>消费者1启动了,连接了activemq,进行了订阅,在等待消息~~

    activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。

    <3>消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~

    activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

    <4>消费者1关掉了。

    activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。

    <5>消费者1又启动了。

    activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
    -----------------------------
    总结一下:
    activemq只是向当前启动的消费者发送消息。
    关掉的消费者,会错过很多消息,并无法再次接收这些消息。

    如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。

    那么,如何让消费者重新启动时,接收到错过的消息呢?

    答案是持久订阅。

    (3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。
    持久订阅,就要记录消费者的名字了。
    张三说,我是张三,有馒头给我留着,我回来拿。
    李四说,我是李四,有馒头给我留着,我回来拿。
    activemq就记下张三,李四两个名字。

    那么,分馒头时,还是一个人头给一个馒头。
    分完了,一看张三没说话,说明他不在,给他留一个。
    李四说话了,那就不用留了。

    张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
    可能是一个馒头,也可能是100个馒头,就看张三离开这阵子,分了多少次馒头了。

    activemq区分消费者,是通过clientID和订户名称来区分的。


    // 创建connection
    connection = connectionFactory.createConnection();
    connection.setClientID("bbb"); //持久订阅需要设置这个。
    connection.start();

    // 创建session
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

    // 创建destination
    Topic topic = session.createTopic("userSyncTopic"); //Topic名称

    //MessageConsumer consumer = session.createConsumer(topic); //普通订阅
    MessageConsumer consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅



    (4)还有一点,消息的生产者,发送消息时用使用持久模式
    MessageProducer producer = ...;
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    不设置,默认就是持久的

    (5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。

    (6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。

    可以访问 http://localhost:8161/admin/index.jsp
    查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。

    可以复制activemq-jdbc.xml中的内容过来,修改一下,就可以把消息保存在其它数据库中了。
    展开全文
  • package com.quest.test; import java.io.IOException; import com.ibm.mq.MQException;...import com.ibm.mq.MQGetMessageOptions;...import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; ...
    package com.quest.test;
    
    import java.io.IOException;
    
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQGetMessageOptions;
    import com.ibm.mq.MQMessage;
    import com.ibm.mq.MQPutMessageOptions;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.MQTopic;
    import com.ibm.mq.constants.CMQC;
    import com.quest.mq.MQTool;
    
    public class MQTest {
    
    	/**
    	 * @param args
    	 * @throws MQException 
    	 * @throws IOException 
    	 * @throws InterruptedException 
    	 */
    	public static void main(String[] args) throws MQException, IOException, InterruptedException {
    		
    		String queueManagerName = "QM1";
    		String hostname = "127.0.0.1";
    		int port = 1421;
    		String channel = "SYSTEM.DEF.SVRCONN";
    		MQTool mqTool = new MQTool(queueManagerName, hostname, port, channel);
    		MQQueueManager queueManager = mqTool.getQueueManager();
    		//accessTopic方法的第一个参数为topic string,第二个参数为topic name  
    		//程序会根据topic name的topic string 和你提供的topic string 组合得到消息发送到的topic string
    		MQTopic topic = queueManager.accessTopic("", "a", CMQC.MQTOPIC_OPEN_AS_PUBLICATION, CMQC.MQOO_OUTPUT);
    		System.out.println(topic.getName());
    		
    		MQMessage msg = new MQMessage();
    		msg.writeString("ssssssssss");
    		MQPutMessageOptions pmo = new MQPutMessageOptions();
    		pmo.options = CMQC.MQPMO_ASYNC_RESPONSE;
    		topic.put(msg,pmo);
    		
    		System.out.println(queueManager.getAsyncStatus().putSuccessCount);
    		System.out.println(queueManager.getAsyncStatus().putFailureCount);
    		queueManager.commit();
    		topic.close();
    		
    		MQTopic topic2 = queueManager.accessTopic("price", "", CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, CMQC.MQSO_CREATE);
            
            MQGetMessageOptions option = new MQGetMessageOptions();
            topic2.get(msg, option); 
    
    	}
    
    }
    

      

    展开全文
  • IBM MQ通过topic上传 C++

    2013-08-05 14:47:54
    c++通过远程上传到mq服务器示例
  • JAVA 连接IBM MQtopic

    2013-03-27 16:02:24
    1:java 连接ibm mqtopic首先需要在mq上建立topic,命令如下 DEFINE TOPIC(TOPIC_xx_2) TOPICSTR(NBA/LAKER) DESCR('This is a TOPIC') PUB(ASPARENT) SUB(ENABLED) WILDCARD(PASSTHRU)至于队列管理器的创建见...

    1:java 连接ibm mq的topic首先需要在mq上建立topic,命令如下
         DEFINE TOPIC(TOPIC_xx_2) TOPICSTR(NBA/LAKER) DESCR('This is a TOPIC') PUB(ASPARENT) SUB(ENABLED)  WILDCARD(PASSTHRU)
    至于队列管理器的创建见日志中的MQ队列管理建立日志
    2:java连接程序



    import com.ibm.mq.MQEnvironment;
    import com.ibm.mq.MQGetMessageOptions;
    import com.ibm.mq.MQMessage;
    import com.ibm.mq.MQPutMessageOptions;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.MQTopic;
    import com.ibm.mq.pcf.CMQC;

    /**
     *
     *@company XX
     *@author: XX
     *@since: 2011-9-13
     *@version:1.0
     */
    public class MQTopicTest {
        public static void main(String[] args){
            MQTopicTest temp = new MQTopicTest();
            temp.send();
            temp.receive();
        }
        public void send(){
            try{
                MQEnvironment.hostname = "172.17.102.2";
                MQEnvironment.channel = "CHANNEL_xx_2";
                MQEnvironment.port = 8080;
                MQEnvironment.CCSID = 1381;
                MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY,
                        CMQC.TRANSPORT_MQSERIES);
                MQQueueManager queueManager= new MQQueueManager("QM_xx_2");
                MQTopic topic = queueManager.accessTopic("TOPIC_xx_2", "TOPIC_xx_2", CMQC.MQTOPIC_OPEN_AS_PUBLICATION, CMQC.MQOO_OUTPUT);
                MQPutMessageOptions option = new MQPutMessageOptions();
                option.options = CMQC.MQPMO_FAIL_IF_QUIESCING | CMQC.MQPMO_RETAIN;;
                MQMessage msg = new MQMessage();
                msg.expiry = 5*60*1000;
                msg.writeObject("test");
                msg.setStringProperty("test", "1");
                topic.put(msg,option);
                queueManager.commit();
            }catch(Exception e){
                System.out.println(e);
            }
        }
        public void receive(){
            try{
                MQEnvironment.hostname = "172.17.102.2";
                MQEnvironment.channel = "CHANNEL_xx_2";
                MQEnvironment.port = 8080;
                MQEnvironment.CCSID = 1381;
                MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY,
                        CMQC.TRANSPORT_MQSERIES);
                MQQueueManager queueManager= new MQQueueManager("QM_xx_2");
                MQTopic topic = queueManager.accessTopic("TOPIC_xx_2", "TOPIC_xx_2", CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, CMQC.MQSO_CREATE);
                MQMessage msg = new MQMessage();
                MQGetMessageOptions option = new MQGetMessageOptions();
                topic.get(msg, option);       
                queueManager.commit();
                System.out.println(msg.getStringProperty("test"));
            }catch(Exception e){
                System.out.println(e);
            }
        }
    }

    展开全文
  • MQ-Queue与Topic区别

    2021-02-07 16:15:18
    队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型: Queue Topic 名称 Point-to-Point 点对点 Publish Subscribe 发布/订阅 概要 在该消息传递模型下,一个消息生产者向消息服务器端...
  • Queue是RocketMQ中的另一个重要概念。...同理,对于RocketMQ,一个Topic可以分布在各个Broker上,我们可以把一个Topic分布在一个Broker上的子集定义为一个Topic分片。对应上图,TopicA有3个Topic分片,分布在Broker
  • springboot整合mq同时监听queue和topic

    千次阅读 2018-11-09 14:09:31
    前言:springboot和mq整合的时候,默认情况下,要么只能监听queue要么只能监听topic,而不能二者兼得。 在application.properties文件中通过如下配置项,切换监听消息的类型。 1 2 #为true时是...
  • 学习笔记: queue topic 工作模式 负载均衡模式:多个消息时,按照每人一个的方式 订阅发布模式:每个消息都发送给所有的订阅者 信息是否丢弃 没有消费者时,消息不会丢弃 后期的订阅者读不到前期发送的消息,所以...
  • Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表(): Topic Queue 概要 Publish Subscribe ...topic数据默认不落地,是无状态的。...Queue数据默认会在mq服务器上以文件形式保存,...
  • #rabbitmq配置 spring.rabbitmq.host=192.168.xxx.xxx spring.rabbitmq.port=...log.topic mq.config.queue.info=log.info mq.config.queue.error=log.error mq.config.queue.logs=log.msg
  • Active MQ 高级特性和用法(一)内嵌Active MQ、Active MQ的各种持久化方式、消息(Topic)的持久化订阅
  • 使用Rocket MQ发送消息时报错 No route info of this topic, 详细错误信息如下: com.aliyun.openservices.ons.api.exception.ONSClientException: defaultMQProducer send exception at ...
  • MQ 入门【7】--topic主题模式

    千次阅读 2018-07-05 19:13:45
    public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils....
  • 目的是在was中配置mq的连接工厂和目标和MQ通信,queue部分都已经实现了,但是在尝试topic时出现了问题: mq中没有类似queue一样的topic对象,它是通过broker实现发布/订阅的。我照着网上的资料用MQJMS_PSQ.mqsc...
  • 转自:http://blog.sincerad.com/archives/%E5%9C%A8java%E4%B8%AD%E4%BD%BF%E7%94%A8spring-jms%E9%9B%86%E6%88%90websphere-mq%EF%BC%8Ctopic%E5%8F%91%E5%B8%83%E5%92%8C%E8%AE%A2%E9%98%85 [code="java&...
  • MQ消息队列详解、四大MQ的优缺点分析

    万次阅读 多人点赞 2020-03-07 16:05:28
    MQ消息队列详解 近期有了想跳槽的打算,所以自己想巩固一下自己的技术,想了解一些面试比较容易加分的项,近期准备深入研究一下redis和mq这两样,这总体上都是为了解决服务器并发的原因,刚翻到了一篇有关于mq的,...
  • mq 使用Spring发送,消费topic和queue消息

    千次阅读 2016-10-26 14:49:50
    简介 实战一 , 实战二介绍了ActiveMQ的基本概念和配置方式. 本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之. ...TopicMessageProducer向topic发送消息, Topic
  • <p>mq消息 在生产环境和灰度环境隔离 各位是如何实现的,不同的topic吗</p>
  • 本文主要通过实际编码来对《MQ: 一张图读懂kafka工作原理》提到的部分原理进行验证与实现。 相关文章参考: MQ: 消息队列常见应用场景及主流消息队列ActiveMQ、RabbitMQ、RocketMQ和Kafka的简单对比 MQ: 一张图读懂...
  • 简介 实战一 , 实战二 介绍了ActiveMQ的基本概念和配置方式. 本篇将通过一个实例介绍使用spring发送,...如图示, TOPIC和QUEUE分别代表一个topic和一个queue消息通道. TopicMessageProducer向topic发送消息, T
  • 获取所有topic AdminBrokerProcessor#processRequest#getAllTopicConfig private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { // 创建响应命令对象 final Remotin.....
  • 2018-07-23 16:43:22,022 ERROR RocketmqClient - Send message Exception, Message [topic=heaven-mq-test, systemProperties={__KEY=ORDERID_0, __TAG=TEST-TAG}, userProperties=null, body=22] ...
  • 消息中间件MQ与RabbitMQ面试题(2020最新版)

    万次阅读 多人点赞 2020-03-01 11:11:21
    文章目录为什么使用MQMQ的优点消息队列有什么优缺点?RabbitMQ有什么优缺点?你们公司生产环境用的是什么消息中间件?Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?MQ 有哪些常见问题?如何解决这些问题?...
  • MQ

    千次阅读 2009-05-11 13:22:00
    转贴自 http://hi.baidu.com/javashmily/blog/category/Ibm%20Mq/index/0 MQQueueManager类介绍MQQueueManager java.lang.Object | *- com.ibm.mq.MQManagedObject | *- co
  • springMVC+activityMq

    2015-06-04 15:34:45
    springMVC+activityMq 轻松集成mq,快乐学习activityMQ Queue 和Topic
  • MQ是一个简单的分布式内存消息代理 特征 内存中消息队列 HTTP或gRPC传输 聚类 分片 代理 发现 自动重试 TLS支持 命令行客户端 互动客户 去客户 坚持归档 如果未指定TLS配置,则MQ缺省情况下会生成自签名证书 原料药...
  • 消息中间件(一)MQ详解及四大MQ比较

    万次阅读 多人点赞 2018-08-29 22:05:58
    主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播   2.5 Queue 队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 37,301
精华内容 14,920
关键字:

mqtopic