精华内容
下载资源
问答
  • ActiveMQ的使用

    万次阅读 2019-05-04 10:22:25
    什么情况下使用ActiveMQ? 多个项目之间集成 (1) 跨平台 (2) 多语言 (3) 多项目 降低系统间模块耦合度,解耦 (1) 软件扩展性 系统前后端隔离 (1) 前后端隔离,屏蔽高安全区 生产者代码: package ...

    什么情况下使用ActiveMQ?

    1. 多个项目之间集成 
      (1) 跨平台 
      (2) 多语言 
      (3) 多项目
    2. 降低系统间模块的耦合度,解耦 
      (1) 软件扩展性
    3. 系统前后端隔离 
      (1) 前后端隔离,屏蔽高安全区

    生产者代码: 

     

    package com.ljxwtl.cn;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class Producer {
    
        //连接账号
        private String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        //连接密码
        private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        //连接地址
        private String brokerURL = "tcp://localhost:61616";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //生产者,就是产生数据的对象
        private MessageProducer producer;
    
        public void start() throws Exception{
            //根据用户名,密码,url创建一个连接工厂
            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
            //从工厂中获取一个连接
            connection = factory.createConnection();
            //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
            connection.start();
            //创建一个session
            //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
            //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
            //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
            //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
            destination = session.createQueue("text-msg");
            //从session中,获取一个消息生产者
            producer = session.createProducer(null);
            //设置生产者的模式,有两种可选
            //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
            //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
            for(int i = 0 ; i < 100 ; i ++){
                TextMessage textMsg = session.createTextMessage("我是一条消息"+i);
                //发送一条消息
                /**
                 * 第一个参数:目的地
                 * 第二个参数:消息
                 * 第三个参数:持久化模式
                 * 第四个参数:优先级(0-9   0-4为普通消息,5-9为加急消息,默认为4)
                 * 第五个参数:消息在MQ上存放的有效期 ms
                 */
                producer.send(destination,textMsg,DeliveryMode.NON_PERSISTENT,5,1000*60*2);
            }
            session.commit();
            System.out.println("发送消息成功");
            //即便生产者的对象关闭了,程序还在运行哦
            connection.close();
        }
    
        public static void main(String[] args) throws Exception {
            new Producer().start();
        }
    }
    

    消费者代码:

    package com.ljxwtl.cn;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class Consumer {
    
        //连接账号
        private String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        //连接密码
        private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        //连接地址
        private String brokerURL = "tcp://localhost:61616";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //消费者,就是消费数据的对象
        private MessageConsumer messageConsumer;
    
        public void start() throws Exception{
            //根据用户名,密码,url创建一个连接工厂
            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
            //从工厂中获取一个连接
            connection = factory.createConnection();
            //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
            connection.start();
            //创建一个session
            //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
            //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
            //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
            //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
            destination = session.createQueue("text-msg");
            //从session中,获取一个消息生产者
            messageConsumer = session.createConsumer(destination);
    
            //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
    
            while(true){
                Message message = messageConsumer.receive();
                //确认签收,对应于Session.CLIENT_ACKNOWLEDGE
                message.acknowledge();
                if(message == null){
                    break;
                }
                System.out.println("接收到的消息:"+message);
            }
    
            connection.close();
        }
    
        public static void main(String[] args) throws Exception {
            new Consumer().start();
        }
    }
    

    查看ActiveMQ消息队列转发的网址:http://localhost:8161/admin 用户名:admin 密码:admin

     

    展开全文
  • activemq的使用

    2019-10-08 09:54:43
    1. activemq的使用(点对点)  (1) 创建父工程(pom工程),定义activemq的版本号 <activemq.version>5.13.4</activemq.version>  (2) 创建子工程(war工程),引入activemq依赖 <dependencies&...

    1. activemq的使用(点对点)

      (1) 创建父工程(pom工程),定义activemq的版本号

    <activemq.version>5.13.4</activemq.version>

      (2) 创建子工程(war工程),引入activemq依赖

    <dependencies>
        <!-- activemq -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
        </dependency>
    </dependencies>

      (3) 创建producer(生产者)

    package com.demo.activemq.producer.service;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActivemqProducer {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的activemq世界");
            //8.发送消息
            producer.send(textMessage);
            System.out.println("ActivemqProducer发送了消息");
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

      (4) 创建consumer(消费者)

    package com.demo.activemq.producer.service;
    
    import java.io.IOException;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActivemqConsumer {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(queue);
            System.out.println("消费者正在运行");
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });    
            //8.等待键盘输入
            System.in.read();    
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

      (5) 运行consumer(消费者)

      (6) 运行producer(生产者)

      (7) 消费者运行结果

    2. activemq的使用(订阅)

      (1) 创建生产者

    package com.demo.activemq.producer.service;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActivemqTopicProducer {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的activemqTopic世界");
            //8.发送消息
            producer.send(textMessage);
            System.out.println("ActivemqTopicProducer发送了消息");
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

       (2) 创建topic消费者1

    package com.demo.activemq.producer.service;
    
    import java.io.IOException;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActivemqTopicConsumer1 {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
            System.out.println("消费者1正在运行");
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });    
            //8.等待键盘输入
            System.in.read();    
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

      (3) 创建topic消费者2

    package com.demo.activemq.producer.service;
    
    import java.io.IOException;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActivemqTopicConsumer2 {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
            System.out.println("消费者2正在运行");
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });    
            //8.等待键盘输入
            System.in.read();    
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

       (4) 运行,结果略

    转载于:https://www.cnblogs.com/lin-nest/p/10318011.html

    展开全文
  • ActiveMQ 的使用

    千次阅读 2016-03-26 11:43:32
    ActiveMQ 的使用一:介绍 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE...

    ActiveMQ 的使用

    一:介绍

          ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

          JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

          ActiveMQ为应用架构提供了松耦合的好处。松耦合通常被引入到一个架构迁移到一个古典的紧耦合的远程过程调用(RPC)中。这样一个松耦合设计被认为是异步的,调用两个应用中的任何一个应用对另一个都没有影响;不相互依赖或者有时间要求。应用可以信任ActiveMQ有能力保障消息交付。因此,通常表述为应用发送消息是发送后自寻的。也就是他们把消息发给ActiveMQ,并不关心消息如何交付和什么时候交付。同样的,消费应用也不用关心消息来自哪里何以如何发送到ActiveMQ的。这对于在非均匀环境中来说尤其有用,这种环境中允许客户端使用不同的语言编写,甚至可能是不同的网格协议。ActiveMQ充当中间人,允许以异步的方式进行非均匀集成和交互。

          像COM、CORBA、DCE和EJB这样的技术使用的技术称之为远程过程调用(RPC),RPC被认为是紧耦合的。使用RPC,当一个应用调用另一个应用的时候,调用者是锁定的,直到被调用者返还会控制权。

    特性及优势

    1. 实现JMS1.1规范,支持J2EE1.4以上

    2. 可运行于任何jvm和大部分web容器(ActiveMQ works great in any JVM)

    3. 支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT等等)

    4. 支持多种协议(stomp,openwire,REST)

    5. 良好的spring支持(ActiveMQ has great Spring Support)

    6. 速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ。

    7. 与OpenJMS、JbossMQ等开源jms provider相比,ActiveMQ有Apache的支持,持续发展的优势明显。


    安装

    地址:http://activemq.apache.org/activemq-5132-release.html

    MAC用户如果安装了homebrew的话,直接运行brew install maven即可,ActiveMQ会被默认安装到/usr/local/Cellar/activemq。

    启动ActiveMQ服务器,运行脚本:activemq.bat 启动服务

    目录结构说明

    • +bin (windows下面的bat和unix/linux下面的sh) 启动ActiveMQ的启动服务就在这里
    • +conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
    • +data (默认是空的)
    • +docs (index,replease版本里面没有文档)
    • +example (几个例子)
    • +lib (activeMQ使用到的lib)
    • +webapps (系统管理员控制台代码)
    • +webapps-demo(系统示例代码)
    • -activemq-all-5.8.0.jar (ActiveMQ的binary)
    • -user-guide.html (部署指引)
    • -LICENSE.txt
    • -NOTICE.txt
    • -README.txt

    运行

    • bin目录下启动 :activemq start

    • 终止 :activemq stop

    • ActiveMQ的默认是61616,可以检测这端口判断ActiveMQ是否启动成功:netstat -an|grep 61616。

    • 可以访问web终端http://localhost:8161/admin来查看和管理ActiveMQ。(默认用户名密码是admin/admin,你也可以修改配置,其在ActiveMQ安装目录下的libexec/conf/jetty-real.properties文件中)


    展开全文
  • activeMq的使用

    2020-07-23 15:09:35
    --ActiveMq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>1.5.0.RELEASE</...

    1. 引入依赖

    <!--ActiveMq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>1.5.0.RELEASE</version>
    </dependency>
    <!--消息队列连接池-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.15.0</version>
    </dependency>
    

    2. application.yml文件配置如下

    spring:
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
        close-timeout: 15s   # 在考虑结束之前等待的时间
        in-memory: true      # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
        non-blocking-redelivery: false  # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
        send-timeout: 0     # 等待消息发送响应的时间。设置为0等待永远。
        queue-name: active.customer
        topic-name: active.topic.name.model
        customer-name: active.customer.name.model
    #  packages:
    #    trust-all: true #不配置此项,会报错
      pool:
        enabled: true
        max-connections: 10   #连接池最大连接数
        idle-timeout: 30000   #空闲的连接过期时间,默认为30秒
    
     # jms:
     #   pub-sub-domain: true  #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
    
    # 是否信任所有包
    #spring.activemq.packages.trust-all=
    # 要信任的特定包的逗号分隔列表(当不信任所有包时)
    #spring.activemq.packages.trusted=
    # 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
    #spring.activemq.pool.block-if-full=true
    # 如果池仍然满,则在抛出异常前阻塞时间。
    #spring.activemq.pool.block-if-full-timeout=-1ms
    # 是否在启动时创建连接。可以在启动时用于加热池。
    #spring.activemq.pool.create-connection-on-startup=true
    # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
    #spring.activemq.pool.enabled=false
    # 连接过期超时。
    #spring.activemq.pool.expiry-timeout=0ms
    # 连接空闲超时
    #spring.activemq.pool.idle-timeout=30s
    # 连接池最大连接数
    #spring.activemq.pool.max-connections=1
    # 每个连接的有效会话的最大数目。
    #spring.activemq.pool.maximum-active-session-per-connection=500
    # 当有"JMSException"时尝试重新连接
    #spring.activemq.pool.reconnect-on-exception=true
    # 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
    #spring.activemq.pool.time-between-expiration-check=-1ms
    # 是否只使用一个MessageProducer
    #spring.activemq.pool.use-anonymous-producers=true
    

    3. 配置类 配置Bean 及工厂实例

    package com.example.demo.activeMq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import javax.jms.ConnectionFactory;
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    @Configuration
    public class ActiveMqConfig {
        @Value("${spring.activemq.broker-url}")
        private String brokerUrl;
        @Value("${spring.activemq.user}")
        private String username;
        @Value("${spring.activemq.password}")
        private String password;
        @Value("${spring.activemq.customer-name}")
        private String queueName;
        @Value("${spring.activemq.topic-name}")
        private String topicName;
        @Bean(name = "customer")
        public Queue queue() {
            return new ActiveMQQueue(queueName);
        }
        @Bean(name = "topic")
        public Topic topic() {
            return new ActiveMQTopic(topicName);
        }
        @Bean
        public ConnectionFactory connectionFactory(){
            return new ActiveMQConnectionFactory(username, password, brokerUrl);    }
    
        @Bean
        public JmsMessagingTemplate jmsMessageTemplate(){
            return new JmsMessagingTemplate(connectionFactory());
        }
    
        // 在Queue模式中,对消息的监听需要对containerFactory进行配置
        @Bean("queueListener")
        public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(false);
            return factory;
        }
    
        //在Topic模式中,对消息的监听需要对containerFactory进行配置
        @Bean("topicListener")
        public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            return factory;
        }
    }
    
    

    4. 消息生产者

    package com.example.demo.activeMq.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.jms.Destination;
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    @RestController
    public class ProducerController {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Autowired
        private Queue queue;
    
        @Autowired
        private Topic topic;
    
        @PostMapping("/customer/test")
        public String sendQueue(@RequestBody String str) {
            this.sendMessage(this.queue, str);
            return "success";
        }
    
        @PostMapping("/topic/test")
        public String sendTopic(@RequestBody String str) {
            this.sendMessage(this.topic, str);
            return "success";
        }
    
        // 发送消息,destination是发送到的队列,message是待发送的消息
        private void sendMessage(Destination destination, final String message) {
            jmsMessagingTemplate.convertAndSend(destination, message);
        }
    }
    
    

    5. 消息消费者

    5.1. -topic消费者

    package com.example.demo.activeMq.customer;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    /**
     * topic模式消费者
     */
    
    @Component
    public class TopicConsumerListener {
        int i=1;
        //topic模式的消费者(服务停止后然后再重新启动,消息队列中未接收的topic消息会丢失)
        @JmsListener(destination = "${spring.activemq.topic-name}", containerFactory = "topicListener")
        public void readActiveQueue(String message) {
    
            try {
                //异步验证
                Thread.sleep(3000);
            } catch (InterruptedException e) {
    
            }
            System.out.println("topic接受到:" + message+"-------------"+i);
            i++;
        }
    }
    
    

    5.2.queue消费者

    package com.example.demo.activeMq.customer;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    /**
     * queue模式消费者
     */
    @Component
    public class QueueConsumerListener {
        int i = 1;
    
        //queue模式的消费者(服务停止后再重新启动服务,未接收成的queue消息会继续接收)
        @JmsListener(destination = "${spring.activemq.customer-name}", containerFactory = "queueListener")
        public void readActiveQueue(String message) {
            try {
                //异步验证
                Thread.sleep(3000);
            } catch (InterruptedException e) {
    
            }
            System.out.println("queue接受到:" + message + "-------------" + i);
            i++;
        }
    }
    
    

    6. 启动类加上启动jms注解

    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.jms.annotation.EnableJms;
    
    @SpringBootApplication
    @EnableJms //启动消息队列activemq
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    
    
    }
    
    

    调用测试:调用queue消息队列

    在这里插入图片描述

    结果显示:

    在这里插入图片描述

    展开全文
  • activeMQ的使用

    2019-07-03 12:09:22
    [root@localhost bin]# ./activemq start INFO: Loading '/opt/apache-activemq-5.15.9//bin/env' INFO: Using java '/usr/bin/java' INFO: Starting - inspect logfiles specified in logging.pr...
  • 本篇文章是对ActiveMQ的使用进行了详细的分析介绍,需要的朋友参考下
  • 消息队列activemq的使用 一、在linux上安装 解压缩安装包 在bin目录下使用启动命令 ./activemq start activemq有两个默认的端口 8161端口:后台的管理页面端口 61616:服务接口 用户名和密码都是admin ...
  • 1、springboot和activemq的使用相对来说比较方便了,我在网上看了很多其他的资料,但是自己写出来总是有点问题所以,这里重点描述一下遇到的一些问题。2、至于activemq的搭建和springmvc的搭建可以参考:...
  • 这一篇文章,主要带大家详细了解一下消息队列ActiveMQ的使用。 学习消息队列ActiveMQ的使用之前,我们先来搞清JMS。 JMS 1. JMS基本概念 JMS(JAVA Message Service,java消息服务)是java的消息服务,...
  • springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制
  • 学习ActiveMQ的使用

    2018-07-10 15:34:57
    一、什么是ActiveMQ?...(JMS是一个Java平台中关于面向消息中间件API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信,仅仅定义了一系列接口,ActiveMQ是对于JMS接口一种实现)。二、ActiveM...
  • activemq的使用场景

    2019-08-30 06:44:01
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终...以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦...
  • ActiveMQ的使用教程

    2017-05-07 23:04:35
    ActiveMQ入门实例 1.下载ActiveMQ 去官方网站下载:http://activemq.apache.org/ 2.运行ActiveMQ 解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行...
  • ActiveMQ的使用详解

    2018-03-10 19:29:25
    1、下载安装ActiveMQ ActiveMQ官网下载地址:http://activemq.apache.org/download.html  ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,楼主这里选择了Linux 版本下进行开发。 下载完安装包,解压之后...
  • 1.下载ActiveMQ 去官方网站下载:http://activemq.apache.org/ 2.运行ActiveMQ 解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。 启动ActiveMQ以后,登陆:...
  • 消息队列-ActiveMQ的使用(Windows系统)
  • ActiveMQ 的使用方法

    2020-12-08 22:42:45
    使用5.11.2版本jar包。 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 第二步:使用ConnectionFactory对象创建一个Connection对象。 第三步:开启连接,调用Connection对象start方法。 第四...
  • Java中Activemq的使用

    2020-04-10 17:06:55
    什么是Activemq Activemq是一种消息中间件,也就是在两个服务中间负责消息...同时Activemq可以以队列方式进行数据传输,可以防止数据消费者,也就是接收数据一方由于数据量过大而引发问题。 设置maven ...
  • 消息队列ActiveMQ的使用

    万次阅读 2018-01-17 11:51:07
    -----------------ActiveMQ----------------- 一、ActiveMQ核心概念 1、ActiveMQ是消息队列技术,为解决高并发问题而生! 2、ActiveMQ生产者消费者模型(生产者和消费者可以跨平台、跨系统) 有中间平台 3、...
  • .Net下对ActiveMQ的使用

    2017-09-26 11:28:12
    刚来到一个新公司,因为项目需要,要使用ActiveMQ,于是就研究了一下,写下此篇笔记,借鉴了博客园和CSDN相关资料  ActiveMQ简介  ActiveMQ是一个开放源码,基于Apache2.0 licenced,实现了JMS1.1消息...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 8,178
精华内容 3,271
关键字:

activemq的使用