精华内容
下载资源
问答
  • activemq使用教程
    2020-12-08 22:42:45

    原理图

    在这里插入图片描述

    Queue

    Producer

    生产者:生产消息,发送端。
    把jar包添加到工程中。使用5.11.2版本的jar包。
    在这里插入图片描述
    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。

    	@Test
    	public void testQueueProducer() throws Exception {
    		// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    		//brokerURL服务器的ip及端口号
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    		// 第二步:使用ConnectionFactory对象创建一个Connection对象。
    		Connection connection = connectionFactory.createConnection();
    		// 第三步:开启连接,调用Connection对象的start方法。
    		connection.start();
    		// 第四步:使用Connection对象创建一个Session对象。
    		//第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
    		//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
    		//参数:队列的名称。
    		Queue queue = session.createQueue("test-queue");
    		// 第六步:使用Session对象创建一个Producer对象。
    		MessageProducer producer = session.createProducer(queue);
    		// 第七步:创建一个Message对象,创建一个TextMessage对象。
    		/*TextMessage message = new ActiveMQTextMessage();
    		message.setText("hello activeMq,this is my first test.");*/
    		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
    		// 第八步:使用Producer对象发送消息。
    		producer.send(textMessage);
    		// 第九步:关闭资源。
    		producer.close();
    		session.close();
    		connection.close();
    	}
    
    

    Consumer

    消费者:接收消息。
    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源

    	@Test
    	public void testQueueConsumer() throws Exception {
    		// 第一步:创建一个ConnectionFactory对象。
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    		// 第二步:从ConnectionFactory对象中获得一个Connection对象。
    		Connection connection = connectionFactory.createConnection();
    		// 第三步:开启连接。调用Connection对象的start方法。
    		connection.start();
    		// 第四步:使用Connection对象创建一个Session对象。
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    		Queue queue = session.createQueue("test-queue");
    		// 第六步:使用Session对象创建一个Consumer对象。
    		MessageConsumer consumer = session.createConsumer(queue);
    		// 第七步:接收消息。
    		consumer.setMessageListener(new MessageListener() {
    			
    			@Override
    			public void onMessage(Message message) {
    				try {
    					TextMessage textMessage = (TextMessage) message;
    					String text = null;
    					//取消息的内容
    					text = textMessage.getText();
    					// 第八步:打印消息。
    					System.out.println(text);
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		});
    		//等待键盘输入
    		System.in.read();
    		// 第九步:关闭资源
    		consumer.close();
    		session.close();
    		connection.close();
    
    

    Topic

    Producer

    使用步骤:
    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。

    	@Test
    	public void testTopicProducer() throws Exception {
    		// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    		// brokerURL服务器的ip及端口号
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    		// 第二步:使用ConnectionFactory对象创建一个Connection对象。
    		Connection connection = connectionFactory.createConnection();
    		// 第三步:开启连接,调用Connection对象的start方法。
    		connection.start();
    		// 第四步:使用Connection对象创建一个Session对象。
    		// 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
    		// 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
    		// 参数:话题的名称。
    		Topic topic = session.createTopic("test-topic");
    		// 第六步:使用Session对象创建一个Producer对象。
    		MessageProducer producer = session.createProducer(topic);
    		// 第七步:创建一个Message对象,创建一个TextMessage对象。
    		/*
    		 * TextMessage message = new ActiveMQTextMessage(); message.setText(
    		 * "hello activeMq,this is my first test.");
    		 */
    		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
    		// 第八步:使用Producer对象发送消息。
    		producer.send(textMessage);
    		// 第九步:关闭资源。
    		producer.close();
    		session.close();
    		connection.close();
    	}
    
    

    Consumer

    消费者:接收消息。
    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源

    	@Test
    	public void testTopicConsumer() throws Exception {
    		// 第一步:创建一个ConnectionFactory对象。
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    		// 第二步:从ConnectionFactory对象中获得一个Connection对象。
    		Connection connection = connectionFactory.createConnection();
    		// 第三步:开启连接。调用Connection对象的start方法。
    		connection.start();
    		// 第四步:使用Connection对象创建一个Session对象。
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
    		Topic topic = session.createTopic("test-topic");
    		// 第六步:使用Session对象创建一个Consumer对象。
    		MessageConsumer consumer = session.createConsumer(topic);
    		// 第七步:接收消息。
    		consumer.setMessageListener(new MessageListener() {
    
    			@Override
    			public void onMessage(Message message) {
    				try {
    					TextMessage textMessage = (TextMessage) message;
    					String text = null;
    					// 取消息的内容
    					text = textMessage.getText();
    					// 第八步:打印消息。
    					System.out.println(text);
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		});
    		System.out.println("topic的消费端03。。。。。");
    		// 等待键盘输入
    		System.in.read();
    		// 第九步:关闭资源
    		consumer.close();
    		session.close();
    		connection.close();
    	}
    
    
    更多相关内容
  • ActiveMQ使用教程

    2016-01-11 17:43:17
    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...
  • ActiveMQ介绍 MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、...

    ActiveMQ介绍
         MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
    特点:
    1、支持多种语言编写客户端
    2、对spring的支持,很容易和spring整合
    3、支持多种传输协议:TCP,SSL,NIO,UDP等
    4、支持AJAX
    消息形式:
    1、点对点(queue)
    2、一对多(topic)

    ActiveMQ安装

    这里写图片描述

    我这里提供一个安装好的虚拟机:http://download.csdn.net/download/liuyuanq123/10217892
    服务器运行后,我们可以直接访问到activeMQ的界面:

    这里写图片描述

    然后点击queues可以看到现在没有一条消息:

    这里写图片描述


    ActiveMQ测试

          编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:

         <dependency>  
             <groupId>org.apache.activemq</groupId>  
             <artifactId>activemq-all</artifactId>  
        </dependency>  

    queue的发送代码如下:

        public void testMQProducerQueue() throws Exception{
            //1、创建工厂连接对象,需要制定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Queue queue = session.createQueue("test-queue");
            //6、使用会话对象创建生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7、使用会话对象创建一个消息对象
            TextMessage textMessage = session.createTextMessage("hello!test-queue");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }

    接收代码:

        public void TestMQConsumerQueue() throws Exception{
            //1、创建工厂连接对象,需要制定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Queue queue = session.createQueue("test-queue");
            //6、使用会话对象创建生产者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7、向consumer对象中设置一个messageListener对象,用来接收消息
            consumer.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    // TODO Auto-generated method stub
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
            //8、程序等待接收用户消息
            System.in.read();
            //9、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }

    然后当我们运行queue发送的时候可以看到队列里已经有一条消息了,但没有发送出去:

    这里写图片描述

    然后在运行queue 的接收端,可以看到消息已经发出了:

    这里写图片描述
    接着对topic进行测试,发送代码如下:

        public void TestTopicProducer() throws Exception{
            //1、创建工厂连接对象,需要制定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Topic topic = session.createTopic("test-topic");
            //6、使用会话对象创建生产者对象
            MessageProducer producer = session.createProducer(topic);
            //7、使用会话对象创建一个消息对象
            TextMessage textMessage = session.createTextMessage("hello!test-topic");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }

    接收代码:

        public void TestTopicConsumer() throws Exception{
            //1、创建工厂连接对象,需要制定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Topic topic = session.createTopic("test-topic");
            //6、使用会话对象创建生产者对象
            MessageConsumer consumer = session.createConsumer(topic);
            //7、向consumer对象中设置一个messageListener对象,用来接收消息
            consumer.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    // TODO Auto-generated method stub
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
            //8、程序等待接收用户消息
            System.in.read();
            //9、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }

    然后运行topic发送:

    这里写图片描述

    可以看到消息已经发送出去。再运行topic接收:

    这里写图片描述

    可以看到有了一个消费者,但是没有接收的消息,这是因为正常情况下我们的topic消息不会再服务器持久化,所以要先打开消费者,再打开生产者,这个时候我们再运行生产者发送一条消息看到消息已经接收到了:

    这里写图片描述
    ActiveMQ整合spring及项目中运用

          activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:

        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-context</artifactId>  
        </dependency>  

    然后编写applicationContext-activemq.xml文件,
    代码如下:

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"  
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd  
        http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置生产者 -->
        <!-- Spring使用JMS工具类,可以用来发送和接收消息 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory -->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="spring-queue"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic"/>
        </bean>
    </beans>  

    然后在我们淘淘商城中,商品添加到数据库的时候,对应也要添加数据到我们的solr索引中,所以生产者应该在插入数据后创建:

    这里写图片描述

    当然,在xml文件中配置好的jmstemplate和destination也要注入进来:

        @Autowired
        private JmsTemplate jmsTemplate;
        @Resource(name="itemAddTopic")
        private Destination destination;

    然后消费者应该写在我们的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再编写一个监听器,当接收到消息时,就讲数据存入索引库,xml文件代码如下:

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"  
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd  
        http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="spring-queue"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic"/>
        </bean>
        <!-- 配置监听器 -->
        <bean id="myListener" class="com.taotao.search.listener.MyListener"/>
        <bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
        <!-- 系统监听器 -->
    <!--    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="myListener"/>
        </bean> -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="itemAddTopic"/>
            <property name="messageListener" ref="itemAddListener"/>
        </bean>
    </beans>  

    接收消息代码:

    这里写图片描述

    最后同时打开测试即可。

    展开全文
  • ActiveMQ使用入门

    千次阅读 2021-12-24 10:14:38
    面向消息中间件 和 ActiveMQ 简介,ActiveMQ的安装和配置详解,MessageProducer的发送模式、优先级和过期时间,使用 Spring Boot 简化JMS开发,使用ActiveMQ实现抢购时的并发效率优化

    1 面向消息中间件 和 ActiveMQ 简介

    1.1 什么是面向消息中间件

    RMI、SOA和微服务等架构,为JavaEE系统的分布式提供了可能,软件理论上可以不被 物理硬件限制而无限扩展。但这些的远程调用是同步操作的,不可避免存在一些局限:

    (1)同步阻塞:客户对象发出调用后,必须等待服务对象完成处理并返回结果才能继续 执行;

    (2)紧密耦合:客户进程和服务对象进行都必须正常运行,服务对象的崩溃会导致客户 对象的异常;

    (3)点对点:客户对象一次只能发送给一个目标对象。 面向消息的中间件(Message Oriented Middleware,MOM)使用异步手段有效的解决 以上问题:“消息发送者”将消息发送给“消息服务器”,“消息服务器”将消息存放在若干“队 列”中,在合适的时候再将消息转发给接收者。

    (1)这种模式下,发送和接收是异步的,发送者无需等待;

    (2)二者松耦合:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定 运行:

    (3)一对多:对于一个消息可以有多个接收者。

    1.2 Java消息服务——JMS

    JavaEE中定义的“Java消息服务”(JMS)定义了Java中访问消息中间件的接口。JMS只 是一套接口,并没有给予实现,各大厂商和开源组织都对JMS实现不同产品,这些产品 包括:Apache的ActiveMQ、阿里的RocketMQ、IBM的MQSeries、Microsoft的MSMQ和 Spring Source的RabbitMQ等等,它们基本都遵循JMS规范。 这里介绍的ActiveMQ是最早的JMS开源产品,在Java世界使用比较广泛,在中等规模的 应用中是完全胜任的。当然,如果要真正面面对大型互联网应,要解决超高并发和吞吐 量问题,现在更推荐使用RabbitMQ、Kafuka或者RocketMQ等新一代的分布式产品,但 它们的基本原理和用法是相通的。

    1.3 JMS规范、术语和常见接口

    (1)Provider(MessageProvider)/ Producer:生产者(消息的发送方)

    (2)Consumer(MessageConsumer):消费者(消息的接收方)

    (3)PTP:Point To Point,即点对点的消息模型(一对一发布)

    (4)Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型(一对多发布)

    (5)Queue:队列

    (6)Topic:主题

    (7)ConnectionFactory:连接工厂。JMS用它创建连接

    (8)Connection:JMS Consumer 到 JMS Provider的连接

    连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂以后,就可以创 建一个与jms提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接 收队列和主题到目标。

    (9)Destination:消息的目的地

    目标是一个包装了消息目标标识符的【被管对象】,消息目标是指消息发布和接收的地 点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过jndi发现它们。 和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的queue,以及发布者/ 订阅者模型的Topic

    (10)Session:会话,内部维护一个发送或者接收消息的线程

    表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连 续的,也就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如 果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消 息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消 息生产者来发送消息,创建消息消费者来接收消息。

    (11)Message:消息

    是在消费者和生产者之间传送的对象,也就是说从一个应用程序送到另一个应用程序。 一个消息有三个主要部分: 消息头(必须):包含用于识别和消息寻找路由的操作设置。 一组消息属性(可选):包括额外的属性,支持其他提供者和用户的兼容。可以创建定制 的字段和过滤器(消息选择器) 一个消息体(可选):允许用户创建五种类型的消息(文本消息TextMessage 、映射 消息MapMessage 、字节消息ByteMessage、流消息StreamMessage、对象消息 ObjectMessage)

    2 ActiveMQ的安装

    ActiveMQ是一个用Java编写的程序,可以在官网中下载zip压缩包,只要配置好JDK,解 压即用。

    这里是官方的下载地址:http://activemq.apache.org/components/classic/download

    (1)运行:解压后,进入bin目录,执行对应版本的 activemq.bat

     (2)管理页面:ActiveMQ的默认端口是8161,通过http://localhost:8161/admin/ 可以进 入管理页面

     管理员默认账号为:admin,admin。账号是通过 conf/jetty-realm.properties 文件来设置 的。

     (3)把ActiveMQ注册成Window服务

    以管理员身份打开cmd:

     进入“~\apache-activemq-5.15.9\bin\win64”目录,执行“InstallService.bat”:

     (4)为ActiveMQ添加使用者账号 ActiveMQ默认使用是不需要账号和密码的,在实际使用中当然不合适,我们可以修改 ~\conf\activemq.xml 文件,添加简单的验证账号。

    修改配置文件,在元素中添加验证插件:(如需直接获得代码可往底部链接)

     然后再把上述配置中的username和password,配置在 ~\conf\credentials.properties 文件 中。

     密码设置问题可以参考:https://blog.csdn.net/dandan2zhuzhu/article/details/78461872

    3 Java中使用ActiveMQ

    3.1 消息生产者程序

    (1)创建maven的jar项目并导入activemq依赖

     (2)实现消息生产者示例,并执行。‘ 需要注意ActiveMQ中服务 url 的区别:管理路径为 http://localhost:8161 ;生产路径为 tcp://localhost:61616

     

     (3)登录 http://localhost:8161/admin ,进入Queues页面,可查看消息队列中保存的消 息

    2.2 消息消费者程序

    (1)创建maven的jar项目,导入上述的activemq依赖

    (2)创建消息消费者示例,并执行

     

     (3)消息消费完结后,再次查看消息队列已经清空(都被消费掉了)。

     4 消息发送详解

    4.1 Session事务

    创建 Session 时,把 transacted 参数设置为 true,可以使用为会话事务管理所发送的消 息。

    Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

    这时该Session发送的消息不会马上保存到服务器上,如果执行 “sesssion.commit()” 则所 有消息会以原子性的方式提交到服务器,如果执行“session.rollback()” 则发送的消息会被 回滚。下面代码显示“消息生产者”使用事务发送消息。

     4.2 Session与签收模式

    创建 Session 时还可以选择消息消费者的“签收模式”——acknowledgeMode。

     消息消费者在获取到(Push或Pull)消息后,需要向消息中间(Activemq服务器)件发 送一个签收信息“Ack”,以表示消息已收到,如果消费者没有签收,消息中间件是不会把 消息删除的,它还会在服务器等待获取。

    创建 Session 时的签收模式参数(acknowledgeMode),用于指定消费者的签收方式。

    “签收模式”是一个整型常量,可选:AUTO_ACKNOWLEDGE(自动签收)、 CLIENT_ACKNOWLEDGE(手工签收),如果把签收模式设置为 CLIENT_ACKNOWLEDGE,消费者必须调用消息对象的acknowledge(),消息中间件才 会认为该消息已经被消费,可以清除了。

    修改消费者代码如下:

    这时,虽然消费者已经读取了activemq中的消息了,但activemq中的消息还保留在服务 器等待获取。

    4.3 MessageProducer的发送模式、优先级和过期时间 

    MessageProducer 由 session创建,用于向指定的消息队列(Destination)发送消息, 消息发送通过send()方法实现。send()方法有几个重载,其中参数最完整的如下:

    前两个参数代表指定的消息队列和消息体,而deliveryMode、piority和timeToLive 是可选 参数,用于控制消息的属性。

    (1)deliveryMode ——发送模式

    ActiveMq支持两种消息的发送模式:PERSISTENT(持久化)和 NON_PERSISTENT(非持久化);若不指定传送模式,默认为持久消息;如果可以容 忍消息丢失,使用非持久化模式可以改善性能和减小存储开销。

    (2)priority——优先级

    消息优先级有从0~9十个级别,0-4是普通消息,5-9是加急消息,如果不指定优先级,则 默认为4,JMS不要求严格按照这10个优先级发送消息,但必须保证加急消息要优先于普 通消息到达。

    (3)timeToLive——消息过期时间

    默认消息永不过期,但是可以设置过期时间,单位是毫秒。 以下示例使用“持久化”、“优先级”和“超时”来发送消息:

     需要注意的是,消费者读取带有“优先级”的队列的时候,默认并不严格根据优先级大小来 消费,需要严格根据优先级来消费的话,需要在配置中指定消息队列开启优先级规则。 下面修改了 activemq.xml 配置文件,开启了“text-queue-1”队列的优先级规则。

     这时,消费者才会根据优先级来读取消息。值得注意的是:在实际的高并发请求下,消 息的优先级是很难严格的保证了。

    5 使用 Spring Boot 简化JMS开发 5.1 发送字符串消息

    (1)创建 spring boot 项目,引入 spring-boot-starter-activemq 

    (2)在 application.yml 中配置 activemq 连接

    (3)在Spring配置类中创建Destination(消息目的地)——Queue(队列)

     (4)使用“JmsMessagingTemplate”实现消息生产者

    spring提供了JmsMessagingTemplate来简化JMS的调用,直接可以向指定队列发送消 息。

    为了方便测试,这里使用了REST控制器直接调用消息队列。

     

    (4)使用“@JmsListener”实现消息消费者

    spring 提供了“@JmsListener”注解,用于指定接收特定队列消息的消费者方法。

    5.2 发送对象消息

    使用JmsMessagingTemplate还可从生产者向消费者以发送对象,对象实际上会被序列化 到消息队列中。

    作为消息发送的对象需要: 

    (1)设置为可序列化

    (2)修改application.yml,配置需要传输的类为信任对象

     (3)定义消息队列

    (4)消息生产者

     (5)消息消费者 

     

    6 使用ActiveMQ实现抢购时的并发效率优化 

    抢购超发时可以使用过Redis来判断超发问题,使用Redis取代SQL数据库可 以有效提高并发操作的效率。

    但实际使用中,我们最终还是要把重要的业务数据保存到SQL数据库中,因此Redis避免 超发后程序依然要读写缓慢的SQL数据库,因此无法真正提高并发的响应效率(请求依 然要等待SQL数据写入后才能返回)。

    为了解决并发效率,这里可以使用JMS把购买请求和SQL写入分离,购买请求处理只需 把要保存到SQL的购买信息推送到消息队列中,然后由另一端的购买信息消费者程序负 责写入SQL,购买请求就可以快速返回并响应用户,而消费者程序可以慢慢的再把数据 保存到SQL数据库中。

    下面示例,演示这一改进:

    (1)修改 pom 导入mq依赖:

    (2)修改application.yml 配置ActiveMQ

     

    (3)修改业务对象“PurchaseServiceImpl”,配置 Queue,在购买请求处理的业务对象 中使用JMS

     (4)创建消息消费者业务对象,把购买记录保存到SQL

    资源下载:

    ActiveMQ使用入门.pdf-Java文档类资源-CSDN下载

    展开全文
  • ActiveMQ详细入门教程系列(一)

    千次阅读 多人点赞 2020-10-11 10:50:19
    一、什么是消息中间件 两个系统或两个客户端之间进行消息传送,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队...二、什么是ActiveMQ ActiveM

    在这里插入图片描述

    一、什么是消息中间件

    两个系统或两个客户端之间进行消息传送,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

    消息中间件,总结起来作用有三个:异步化提升性能、降低耦合度、流量削峰。
    在这里插入图片描述

    系统A发送消息给中间件后,自己的工作已经完成了,不用再去管系统B什么时候完成操作。而系统B拉去消息后,执行自己的操作也不用告诉系统A执行结果,所以整个的通信过程是异步调用的。

    二、消息中间件的应用场景

    2.1 异步通信

    有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    在这里插入图片描述

    2.2 缓冲

    在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。

    2.3 解耦

    降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    在这里插入图片描述

    2.4 冗余

    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

    2.5 扩展性

    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。

    2.6 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

    2.7 顺序保证

    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

    2.8 过载保护

    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    2.9 数据流处理

    分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

    三、常用消息队列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)比较

    特性MQActiveMQRabbitMQRocketMQKafka
    生产者消费者模式支持支持支持支持
    发布订阅模式支持支持支持支持
    请求回应模式支持支持不支持不支持
    Api完备性
    多语言支持支持支持java支持
    单机吞吐量万级万级万级十万级
    消息延迟微秒级毫秒级毫秒级
    可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
    消息丢失理论上不会丢失理论上不会丢失
    文档的完备性
    提供快速入门
    社区活跃度
    商业支持商业云商业云

    四、消息中间件的角色

    Queue: 队列存储,常用与点对点消息模型 ,默认只能由唯一的一个消费者处理。一旦处理消息删除。

    Topic: 主题存储,用于订阅/发布消息模型,主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处 理的业务场景中可使用,Queue/Topic都是 Destination 的子接口

    ConnectionFactory: 连接工厂,客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory

    Connection: JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。

    Destination: 消息的目的地,目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。

    点对点消息传递域的特点如下:

    • 每个消息只能有一个消费者。
    • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

    发布/订阅消息传递域的特点如下:

    • 每个消息可以有多个消费者。
    • 生产者和消费者之间有时间上的相关性。
    • 订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求 。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
      在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

    五、JMS的消息格式

    JMS消息由以下三部分组成的:

    • 消息头:

      每个消息头字段都有相应的getter和setter方法。

    • 消息属性:

      如果需要除消息头字段以外的值,那么可以使用消息属性。

    • 消息体:

      JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

    消息类型:

    属性类型
    TextMessage文本消息
    MapMessagek/v
    BytesMessage字节流
    StreamMessagejava原始的数据流
    ObjectMessage序列化的java对象

    六、消息可靠性机制

    只有在被确认之后,才认为已经被成功地消费了,消息的成功消费通常包含三个阶段 :客户接收消息、客户处理消息和消息被确认在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

    • Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
    • Session.CLIENT_ACKNOWLEDGE:客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
    • Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。
    6.1 优先级

    可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

    6.2 消息过期

    可以设置消息在一定时间后过期,默认是永不过期。

    6.3 临时目的地

    可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

    七、什么是ActiveMQ

    ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

    官网地址:http://activemq.apache.org/

    7.1 存储方式

    1. KahaDB存储: KahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化

    特性:
    1、日志形式存储消息;
    2、消息索引以 B-Tree 结构存储,可以快速更新;
    3、 完全支持 JMS 事务;
    4、支持多种恢复机制kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

    2. AMQ 方式: 只适用于 5.3 版本之前。 AMQ 也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

    3. JDBC存储 : 使用JDBC持久化方式,数据库默认会创建3个表,每个表的作用如下:

    activemq_msgs:queue和topic的消息都存在这个表中
    activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
    activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

    4. LevelDB存储 : LevelDB持久化性能高于KahaDB,但是在ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB

    5.Memory 消息存储: 顾名思义,基于内存的消息存储,就是消息存储在内存中。persistent=”false”,表示不设置持 久化存储,直接存储到内存中,在broker标签处设置。

    7.2 协议

    协议官网API:http://activemq.apache.org/configuring-version-5-transports.html

    • Transmission Control Protocol (TCP):

      1. 这是默认的Broker配置,TCP的Client监听端口是61616。
      2. 在网络传输数据前,必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。
      3. TCP连接的URI形式:tcp://hostname:port?key=value&key=value
      4. TCP传输的优点:

        (1)TCP协议传输可靠性高,稳定性强
        (2)高效性:字节流方式传递,效率很高
        (3)有效性、可用性:应用广泛,支持任何平台

    • New I/O API Protocol(NIO)

      1. NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。

      2. 适合使用NIO协议的场景:

        (1)可能有大量的Client去链接到Broker上一般情况下,大量的Client去链接Broker是被操作系统的线程数所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议
        (2)可能对于Broker有一个很迟钝的网络传输NIO比TCP提供更好的性能

      3. NIO连接的URI形式:nio://hostname:port?key=value

      4. Transport Connector配置示例:

    <transportConnectors>
      <transportConnector
        name="tcp"
        uri="tcp://localhost:61616?trace=true" />
      <transportConnector
        name="nio"
        uri="nio://localhost:61618?trace=true" />
    </transportConnectors>
    
    • User Datagram Protocol(UDP)
      1:UDP和TCP的区别
      (1)TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复制和丢失的。UDP,另一方面,它是不会保证数据包的传递的
      (2)TCP也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样确保了在发送和接收之间能够可靠的传递。相反,UDP仅仅是一个链接协议,所以它没有可靠性之说
      2:从上面可以得出:TCP是被用在稳定可靠的场景中使用的;UDP通常用在快速数据传递和不怕数据丢失的场景中,还有ActiveMQ通过防火墙时,只能用UDP
      3:UDP连接的URI形式:udp://hostname:port?key=value
      4:Transport Connector配置示例:
    <transportConnectors>
        <transportConnector
            name="udp"
            uri="udp://localhost:61618?trace=true" />
    </transportConnectors>
    
    • Secure Sockets Layer Protocol (SSL)
      1:连接的URI形式:ssl://hostname:port?key=value
      2:Transport Connector配置示例:
    <transportConnectors>
        <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/>
    </transportConnectors>
    

    八、案例(Hello World)

    这里以windows为案例演示

    下载地址:http://activemq.apache.org/components/classic/download/

    8.1 安装启动

    解压后直接执行
    bin/win64/activemq.bat

    在这里插入图片描述

    8.2 web控制台

    http://localhost:8161/
    账号密码:admin/admin

    在这里插入图片描述
    在这里插入图片描述

    8.3 web控制台

    修改 ActiveMQ 配置文件 activemq/conf/jetty.xml
    jettyport节点: 配置文件修改完毕,保存并重新启动 ActiveMQ 服务

     <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
                 <!-- the default port number for the web console -->
            <property name="host" value="127.0.0.1"/>
            <property name="port" value="8161"/>
        </bean>
    
    8.4 开发

    1. jar引入:

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-activemq</artifactId>
       </dependency>
    

    2. Sender :

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    /**
     * @program: activemq_01
     * @ClassName Sender
     * @description: 消息发送
     * @author: muxiaonong
     * @create: 2020-10-02 13:01
     * @Version 1.0
     **/
    public class Sender {
    
        public static void main(String[] args) throws Exception{
            // 1. 获取连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://localhost:61616"
            );
    
            // 2. 获取一个向activeMq的连接
            Connection connection = factory.createConnection();
            // 3. 获取session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            // 4.找目的地,获取destination,消费端,也会从这个目的地取消息
            Queue queue = session.createQueue("user");
    
            // 5.1 消息创建者
            MessageProducer producer = session.createProducer(queue);
    
            // consumer --> 消费者
            // producer --> 创建者
            // 5.2. 创建消息
            for (int i = 0; i < 100; i++) {
                TextMessage textMessage = session.createTextMessage("hi:"+i);
                // 5.3 向目的地写入消息
                producer.send(textMessage);
                Thread.sleep(1000);
            }
    
            // 6.关闭连接
            connection.close();
    
            System.out.println("结束。。。。。");
    
        }
    }
    

    3. Receiver :

    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    
    /**
     * @program: activemq_01
     * @ClassName Receiver
     * @description: 消息接收
     * @author: muxiaonong
     * @create: 2020-10-02 13:01
     * @Version 1.0
     **/
    public class Receiver {
    
        public static void main(String[] args) throws Exception{
            // 1. 获取连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://localhost:61616"
            );
    
            // 2. 获取一个向activeMq的连接
            Connection connection = factory.createConnection();
            connection.start();
    
            // 3. 获取session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            // 4.找目的地,获取destination,消费端,也会从这个目的地取消息
            Destination queue = session.createQueue("user");
    
            // 5 获取消息
            MessageConsumer consumer = session.createConsumer(queue);
    
            while(true){
                TextMessage message = (TextMessage)consumer.receive();
                System.out.println("message:"+message.getText());
            }
    
        }
    }
    

    测试结果:

    message:hi:38
    message:hi:39
    message:hi:40
    message:hi:41
    message:hi:42
    message:hi:43
    message:hi:44
    message:hi:45
    

    web后台显示有一个消费者处于连接状态,且已消费了68个message,而该条队列已没有message待消费了
    在这里插入图片描述

    九、总结

    今天的MQ入门教程系列就这里了,感兴趣的小伙伴可以试试,遇到了什么问题,或者有疑问的,都可以在下方留言,小农看见了会第一时间回复大家,MQ作为一个消息中间件,不管是面试还是工作中都会经常用到,所以是很有必要去了解和学习的一个技术点,今天的分享就到这里了,谢谢各位小伙伴的观看,我们下篇文章见,大家加油!

    展开全文
  • 一份详细ActiveMQ使用教程
  • ActiveMQ视频教程,共9章节。学习后进步很大,优质视频教程
  • MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...
  • activemq_使用教程

    2011-06-09 11:15:45
    在介绍ActiveMQ之前,首先简要介绍一下JMS规范。 1.1 JMS的基本构件 1.1.1 连接工厂 连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。 1.1.2 连接 JMS Connection...
  • c# ActiveMq 使用

    2019-07-05 16:00:00
    踩过的坑: ... ActiveMq断网之后不会自动重连,需要将连接字符串修改为:failover:(tcp://192.168.0.47:... 如果有多个地址可以使用 逗号分隔,例如:failover:(tcp://192.168.0.47:61616,tcp://192.168.0....
  • ActiveMQ教程

    2018-06-12 16:32:17
    ActiveMQ基础教程,初识ActiveMQ的知识其中的原理与案例讲解
  • ActiveMQ集群实战教程
  • 使用 Java 操作 ActiveMQ 导入 maven 依赖 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.2</version>...
  • 主要介绍了php ActiveMQ的安装与使用方法,结合图文与实例形式分析了ActiveMQ的功能、安装、使用方法及操作注意事项,需要的朋友可以参考下
  • ActiveMQ介绍 MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1...
  • 消息监听器 在demo里,除了直接获取信息,还可以使用消息异步非阻塞监听器进行获取,这样的效果会更好一些. /* * 监听器取出消息 * */ consumer.setMessageListener(new MessageListener() { @Override public void ...
  • 在这一篇博客分享一下消费者,使用监听的...和上一篇一样 : ActiveMQ入门教程(五) - ActiveMQ与Spring整合 2. 生产者 [java] view plaincopy package org.ygy.mq.lesson04;    import java

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,258
精华内容 2,503
关键字:

activemq使用教程