精华内容
下载资源
问答
  • 2022-01-05 00:22:56

    如下的内容是关于Java调用ActiveMQ简单范例的内容,应该能对大伙有所用。

    package jms;

    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.log4j.PropertyConfigurator;

    public class PTP_sends {

    public PTP_sends(){  
          
    }  
    
    private Connection connection;  
    private String subject = "TOOL.DEFAULT";  
    
    
    public void send(String msg){  
    
        try {  
            connection=connectionFactory.createConnection();  
            connection.start();  
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            Destination destination=session.createQueue(subject);  
            MessageProducer producer=session.createProducer(destination);  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            TextMessage message = session.createTextMessage(msg);  
              
            producer.send(message);  
            System.out.println("消息已经发送。。。。");  
              
              
              
            message.clearProperties();  
            session.close();  
            connection.stop();  
            connection.close();  
            System.out.println("关闭资源。。。。");  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
      
    public static void main(String[] args) {  
        PropertyConfigurator.configure("c:\log4j.properties");  
          
          
        PTP_sends ptpSends=new PTP_sends();  
        ptpSends.send("this is JMS .....");  
    }  
    

    }

    package jms;

    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.log4j.PropertyConfigurator;

    public class PTP_receive {

    private Connection connection;  
    private String subject = "TOOL.DEFAULT";  
      
      
      
     public void receive(){  
            ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory();  
            try {  
                connection=connectionFactory.createConnection();  
                connection.start();  
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                Destination destination=session.createQueue(subject);  
                MessageConsumer consumer=session.createConsumer(destination);  
                System.out.println("同步接受消息:");  
                Message message=consumer.receive();  
                System.out.println("n收到的message 是:"+((TextMessage)message).getText());  
                  
                  
                message.clearProperties();  
                consumer.close();  
                session.close();  
                connection.stop();  
                connection.close();  
                System.out.println("关闭资源。。。。");  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
        }  
          
    public static void main(String[] args) {  
        PropertyConfigurator.configure("c:\log4j.properties");  
        PTP_receive receive=new PTP_receive();  
        receive.receive();  
    }  
    

    }

    更多相关内容
  • java调用activeMQ实例

    2021-07-05 09:31:11
    } } } 订阅模式需要先订阅才能消费到信息,也就是若先启动生产者进行生产消息,在用消费者是无法接收到信息,要先使用消费者订阅完,再使用生产者才行 生产者发布一个消息,能够被多个订阅的消费者接收到 maven ...

    一、点对点通信

    1、消息发送者    

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
     
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    public class ProducerTest {
     
    	private static final int SENDNUM = 10;
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory connectionFactory;//连接工程
    		Connection connection = null;//连接
    		Session session;//会话 结束或签字发送消息的线程
    		Destination destination;//消息的目的地
    		MessageProducer messageProducer;//消息生产者
    		try {
    			
    			//实例化连接工厂
    			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
    			//通过连接工程获取连接
    			connection = connectionFactory.createConnection();
    			connection.start();//启动连接
    			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
    			destination = session.createQueue("FirstQueue1");//创建队列
    			messageProducer = session.createProducer(destination);//创建消息生产者
    			
    			sendMessage(session, messageProducer);
    			session.commit();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}finally{
    			if(connection != null)
    				connection.close();
    		}
    		
    	}
    	
    	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
    		for (int i = 0; i < SENDNUM; i++) {
    			TextMessage message = session.createTextMessage("ActiveMq 发送消息"+i);
    			messageProducer.send(message);
    		}
    	}
    }

    2、消费者

      (1)非监听器模式

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    /**
     * 消息消费者
     * @author Administrator
     *
     */
    public class JMSConsumer {
     
    	private static final String USERNAME="admin"; // 默认的连接用户名
    	private static final String PASSWORD="admin"; // 默认的连接密码
    	private static final String BROKEURL = "tcp://ip:61616";
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory; // 连接工厂
    		Connection connection = null; // 连接
    		Session session; // 会话 接受或者发送消息的线程
    		Destination destination; // 消息的目的地
    		MessageConsumer messageConsumer; // 消息的消费者
    		// 实例化连接工厂
    		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    		try {
    			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
    			connection.start(); // 启动连接
    			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
    			messageConsumer=session.createConsumer(destination); // 创建消息消费者
    			while(true){
    				TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
    				if(textMessage!=null){
    					System.out.println("收到的消息:"+textMessage.getText());
    				}else{
    					break;
    				}
    			}
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    }

    (2)、监听器模式,消费者会实时消息监听,若有消息则立即消费

         a、监听器:

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
     
    /**
     * 消息监听
     * @author Administrator
     *
     */
    public class Listener implements MessageListener{
     
    	@Override
    	public void onMessage(Message message) {
    		// TODO Auto-generated method stub
    		try {
    			System.out.println("收到的消息:"+((TextMessage)message).getText());
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
     
    }

     b、消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    /**
     * 消息消费者
     * @author Administrator
     *
     */
    public class JMSConsumer2 {
     
    	private static final String USERNAME="admin"; // 默认的连接用户名
    	private static final String PASSWORD="admin"; // 默认的连接密码
    	
    	private static final String BROKEURL="tcp://ip:61616"; // 默认的连接地址
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory; // 连接工厂
    		Connection connection = null; // 连接
    		Session session; // 会话 接受或者发送消息的线程
    		Destination destination; // 消息的目的地
    		MessageConsumer messageConsumer; // 消息的消费者
    		
    		// 实例化连接工厂
    		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
    				
    		try {
    			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
    			connection.start(); // 启动连接
    			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
    			messageConsumer=session.createConsumer(destination); // 创建消息消费者
    			messageConsumer.setMessageListener(new Listener()); // 注册消息监听
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    }

    二、订阅模式

    一个发布多个订阅者都会受到消息,先订阅后发布

    1、消息生产者

     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
     
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    public class ProducerTest2 {
     
    	private static final int SENDNUM = 10;
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory connectionFactory;//连接工程
    		Connection connection = null;//连接
    		Session session;//会话 结束或签字发送消息的线程
    		Topic createTopic;//消息的目的地
    		MessageProducer messageProducer;//消息生产者
    		try {
    			
    			//实例化连接工厂
    			
    			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
    			//通过连接工程获取连接
    			connection = connectionFactory.createConnection();
    			connection.start();//启动连接
    			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
    			createTopic = session.createTopic("FirstTopic1");
    			messageProducer = session.createProducer(createTopic);//创建消息生产者
    			
    			sendMessage(session, messageProducer);
    			session.commit();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}finally{
    			if(connection != null)
    				connection.close();
    		}
    		
    	}
    	
    	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
    		for (int i = 0; i < 1; i++) {
    			TextMessage message = session.createTextMessage("ActiveMq 发送消息"+i);
    			messageProducer.send(message);
    		}
    	}
    }

    2、消费者(这边就采用监听器模式)

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    /**
     * 消息消费者,订阅模式,一个发布多个订阅者都会受到消息,先订阅后发布
     * @author Administrator
     *
     */
    public class JMSConsumer2 {
     
    	private static final String USERNAME="admin"; // 默认的连接用户名
    	private static final String PASSWORD="admin"; // 默认的连接密码
    	private static final String BROKEURL="tcp://ip:61616"; // 默认的连接地址
    	
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory; // 连接工厂
    		Connection connection = null; // 连接
    		Session session; // 会话 接受或者发送消息的线程
    		Destination destination; // 消息的目的地
    		MessageConsumer messageConsumer; // 消息的消费者
    		
    		// 实例化连接工厂
    		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
    				
    		try {
    			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
    			connection.start(); // 启动连接
    			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    			destination=session.createTopic("FirstTopic1"); 
    			messageConsumer=session.createConsumer(destination); // 创建消息消费者
    			messageConsumer.setMessageListener(new Listener()); // 注册消息监听
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    }

     

    订阅模式需要先订阅才能消费到信息,也就是若先启动生产者进行生产消息,在用消费者是无法接收到信息,要先使用消费者订阅完,再使用生产者才行

    生产者发布一个消息,能够被多个订阅的消费者接收到

    maven

    		<!-- https://mvnrepository.com/artifact/org.glassfish.main.javaee-api/javax.jms -->
    		<dependency>
    			<groupId>org.glassfish.main.javaee-api</groupId>
    			<artifactId>javax.jms</artifactId>
    			<version>3.1.2.2</version>
    		</dependency>
    
    		<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
    		<dependency>
    			<groupId>org.apache.activemq</groupId>
    			<artifactId>activemq-core</artifactId>
    			<version>5.7.0</version>
    		</dependency>

    展开全文
  • 使用 Java 操作 ActiveMQ 导入 maven 依赖 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.2</version>...

    使用 Java 操作 ActiveMQ

    导入 maven 依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.2</version>
    </dependency>
    

    现在的消息队列大概分为队列模型和发布订阅模型

    队列 Queue

    队列模型:

    • 消费者之间是竞争的关系,每个消费者只能收到队列中的一部分消息
    • 如果需要将一份数据发送给多个消费者,单个队列满足不了需求,可以为每个消费者创建一个单独的队列

    生产者代码

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.Date;
    
    public class JmsProduce {
    
        private static final String DEFAULT_BROKER_HOST = "tcp://IP:61616";
    
        public static void main(String[] args) throws JMSException {
            // 创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_HOST);
            // 获取 connection
            final Connection connection = connectionFactory.createConnection("admin", "admin");
            // 启动
            connection.start();
            // 创建会话 session,参数第一个是事务,第二个是签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地,queue 或者 topic
            Queue queue = session.createQueue("queueName");
            // 创建消息的生产者
            MessageProducer producer = session.createProducer(queue);
            // 创建消息
            TextMessage textMessage = session.createTextMessage("这是一条消息" + new Date());
            // 发送消息给 mq
            producer.send(textMessage);
            // 关闭
            session.close();
            connection.close();
            System.out.println("消息发送成功~");
        }
    
    }
    

    消费者代码

    1、同步阻塞,使用 receive() 方法进行消费,receive() 方法不带参数会一直等待,receive(Long timeout) 会等待指定时间后退出等待

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class JmsConsumer {
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP:61616");
            Connection connection = connectionFactory.createConnection("admin", "admin");
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("queueName");
            // 创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            while (true) {
                // 同步阻塞方式使用 receive(),超时之前一直等待
                // receive() 方法不带参数会一直等待
                // receive(Long timeout) 会等待指定时间后退出等待
                TextMessage receive = (TextMessage) consumer.receive();
                if (receive != null) {
                    System.out.println("接收到消息:" + receive);
                }else {
                    break;
                }
            }
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    2、异步阻塞,使用监听方式消费消息,订阅者或接收者通过 MessageConsumer 的 setMessageListener(MessageListenner listener) 注册一个消息监听器,消息到达后自动调用监听器的 MessageListener 的 onMessage(Message message) 方法

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class JmsConsumer {
        public static void main(String[] args) throws JMSException, IOException {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://82.156.227.150:61616");
            Connection connection = connectionFactory.createConnection("admin", "admin");
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("queueName");
           // 创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            // 通过监听方式消费消息
            consumer.setMessageListener((message) -> {
                if (null != message && message instanceof TextMessage) {
                    TextMessage message1 = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息:" + message1.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.in.read(); // 保证控制台存活状态
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    主题 Topic

    订阅-发布模式

    • 消息的发送方称为发布者(Publisher),消息接收者称为订阅者(Subscriber),服务端存放消息的容器称为(Topic)
    • 发布者将消息发送到 Topic 中,订阅者需要订阅主题,每个订阅者都可以接收到订阅之后主题发布所有的消息
    • 生产者和消费者有时间上的相关性,生产者生产时,topic 不保存消息它是无状态的,应该先启动消费者再启动生产者
    • JMS 允许客户创建持久订阅,持久订阅允许消费者消费未激活状态时发送的消息

    生产者代码

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.Date;
    
    public class JmsProduceTopic {
        public static void main(String[] args) throws Exception{
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP:61616");
            Connection connection = connectionFactory.createConnection("admin", "admin");
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("topicName");
            MessageProducer producer = session.createProducer(topic);
            TextMessage textMessage = session.createTextMessage("message--" + new Date());
            producer.send(textMessage);
            producer.close();
            session.close();
            connection.close();
            System.out.println("消息发送成功~");
            
        }
    }
    
    

    消费者代码

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.Date;
    
    public class JmsConsumerTopic {
        public static void main(String[] args) throws Exception{
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP:61616");
            Connection connection = connectionFactory.createConnection("admin", "admin");
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("topicName");
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener((message) -> {
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.in.read();
            consumer.close();
            session.close();
            connection.close();
    
        }
    }
    

    Topic 模式队列和 Queue 模式队列比较

    Topic 模式队列Queue 模式队列
    订阅,发布模式,当前没有订阅者,消息会被放弃,如果有多个订阅者,所有订阅者都会收到消息如果没有消费者,消息也不会丢弃,多个消费者,消息只会发送给其中一个消费者,并且要求消费者 ACK 信息
    无状态默认在 mq 服务器上以文件形式保存,也可以配置成 DB 存储
    没有订阅者消息丢弃消息不会丢弃
    消息按照订阅者数量进行赋值,性能随着订阅者数量增加降低每个消息只发送个一个消费者,性能不会明显降低

    ActiveMQ Broker

    Broker 相当于一个 ActiveMQ 实例

    ActiveMQ 实现了用代码的形式启动 ActiveMQ 将 MQ 嵌入到 Java 代码中

    导入 maven 依赖

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.12.3</version>
    </dependency>
    

    启动 Broker 代码

    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
    
    展开全文
  • Java访问ActiveMQ实例

    2022-05-24 10:07:28
    第一步点开新建然后other 里面的maven里面的maven project。... 新建完成。 2.在pom.xml添加依赖 ...在java里面添加 ...新建java文件 ...消息生产者TopicPublisher.java ...import org.apache.activemq.Active...

    第一步点开新建然后other 里面的maven里面的maven project。groud id 与packed需要前面写com.xxxxx。

    新建完成。

    2.在pom.xml添加依赖

     添加完后。添加jar

    在java里面添加

    先new一下。之后添加外部jar 

    新建java文件

     新建后添加此代码

    1. 消息生产者TopicPublisher.java


    package com.activemq.ActiveMQ;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;

    public class TopicPublisher {
        public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        public static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
        public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
        //创建连接
        Connection connection = connectionFactory.createConnection();
        //开启连接
        connection.start();
        //创建会话,不需要事务
        Session session = connection. createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建Topic, 用作消费者订阅消息
        Topic myTestTopic = session.createTopic("activemq-topic-test1");
        //消息生产者.
        MessageProducer producer = session.createProducer(myTestTopic);
        for (int i= 1;i<= 3;i++){
        TextMessage message = session.createTextMessage("发送消息"+ i);
        producer. send(myTestTopic, message);
        }
        //关闭资源
        session.close();
        connection.close();
        
        } catch (JMSException e) {
        e.printStackTrace();
        }
        }
        }


     


     

    2.继续添加

    消息消费者TopicSubscriber.java

    package com.activemq.ActiveMQ;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;

    public class TopicSubscriber {
        public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        public static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
        public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
        //创建连接
        Connection connection = connectionFactory.createConnection();
        //开启连接
        connection.start();
        //创建会话,不需要事务
        Session session = connection. createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建Topic, 用作消费者订阅消息
        Topic myTestTopic = session.createTopic("activemq-topic-test1");
        //消息生产者.
        MessageConsumer messageConsumer=session.createConsumer(myTestTopic);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
            try {
                System.out.println("消费者1接收到消息: " + ((TextMessage) message).getText());
            } catch (JMSException e) {
            e.printStackTrace();
            }}});
            MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
            messageConsumer2.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
            try{
            System.out.println("消费者2接收到消息: " + ((TextMessage) message).getText());
            } catch (JMSException e) {
            e.printStackTrace();
            }}});
            MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
            messageConsumer3.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
            try {
                System.out.println("消费者3接收到消息: " + ((TextMessage) message).getText());
            } catch (JMSException e) {
            e.printStackTrace();
            }}});
            //让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
            Thread.sleep(100*1000);
            //关闭资源
            session.close();
            connection.close();
            }catch (Exception e) {
            e.printStackTrace();
            }
            }
        }


            

     打开bin目录下的

    启动activeMQ 以管理员身份运行。

     

    1. 启动activemq服务器

    http://127.0.0.1:8161/admin/

    用户名:admin

    密码:admin

    1. 运行TopicSubscriber

    INFO | Successfully connected to tcp://localhost:61616

     

    展开全文
  • Publiser.java package example; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; import org....
  • Java-ActiveMQ-MQTT

    2021-08-07 21:06:44
    感谢大佬们的文章 第一个连接 https://blog.csdn.net/u012851114/article/details/103346132 第二个连接 https://www.cnblogs.com/StefanieYang/tag/ActiveMq/
  • 这个项目定义了一对 Java DSL 来使用 ActiveMQ。 它最初的创建是为了允许在单元测试中定义代理配置的简化机制,而无需求助于 ActiveMQ 的内部类,或者使用 ActiveMQ XML 配置文件创建许多相似但不同的代理配置。 ...
  • org.apache.activemq activemq-all ...com.java.elasticsearch.activemq; import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * @author Woo_home * @c..
  • Java访问ActiveMQ

    2021-02-12 13:57:21
    我们这里使用windows测试,所以下载windows版本即可。2、启动ActiveMQ下载zip文件后直接解压,解压后我们比较关注的是bin和conf目录。bin存放的是脚本文件conf存放的是基本配置文件data存放的是日志文件docs存放的是...
  • 适用于Java后台开发消息队列ActiveMQ使用者,包括服务器的搭建以及ActiveMQ的三种使用模式
  • 1、向ActiveMQ中放入消息 import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms....
  • javaactiveMQ

    2020-10-23 19:31:39
    文章目录一、activeMQ默认已经安装完毕队列点对点模式服务提供方服务消消费方(一)服务消消费方(二)主题服务提供方服务消费方(一)服务消费方(二)手动签收事物总结springboot 整合activemq点对点坐标yml创建一个Queue...
  • 在运行该程序之前我们需要先启动一下 ActiveMQ 访问 http://localhost:8161/admin/,点击 Queues 选项 队列中也是啥也没有 然后我们运行一下上面的程序 控制台已显示成功将消息发送到 MQ 了 ...
  • 目录(?...对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息
  • 1.进入ActiveMq的目录的conf目录下,编辑...修改该标签,开启JMX服务4.java代码public class ActiveMqKit {private static Logger log = Logger.getLogger(ActiveMqKit.class);//和activemq.xml中添加的对应上p...
  • activeMQ的broker可以在java代码中内嵌一个微型的MQ代理,当我们服务器挂了实在开不起来MQ时,可以临时启用一个broker来应急! 但是我跟着搞了一遍,发现MQ刚连接上就马上关闭了! 网上搜了一下方法,发现也是有人跟...
  • JAVA 监控activemq

    2020-03-27 17:14:07
    上篇内容(点击可看)里面提到了我遇到的bug,最后发现是有人动了我的服务器上的activemq,导致我的队列出现了问题。 我的代码能够获得topic,能够获得broker,就是队列信息获取不到,我还纳闷了..... import java...
  • java操作activemq使用

    2019-09-03 09:34:52
    activemq使用 1,安装并运行activemq 开启activemq 查看状态 2,浏览器运行查看 3,编写java测试代码 测试producer public void produceQueue() throws Exception{ //创建工厂连接 ConnectionFactory ...
  • java ActiveMQ 配置

    2021-03-09 17:02:32
    项目属性文件#activemq settings#vm brokerjms.broker_url=vm://showcase?broker.persistent=false&broker.useJmx=false&broker.schedulerSupport=false#localhost broker#jms.broker_url=tcp://localhost:...
  • --activemq所需依赖jar配置--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </...
  • MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...
  • java项目使用activemq

    2019-11-08 15:43:04
    1.启动activemq,并进入网站 http://127.0.0.1:8161/,打开mq网站,可查看相关队列信息 2.点击 Manage ActiveMQ broker,登陆账号密码均为 admin 第二步:idea创建maven工程,加入依赖如下: <dependencies> ...
  • http://activemq.apache.org/...运行ActiveMQ解压缩apache-activemq-5.8.0-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。启动ActiveMQ以后,登陆:http://localhost:8161/admi...
  • 我试图在我的应用程序中使用activemq,但是当我尝试连接到本地主机时会继续出现错误:log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).log4j:WARN Please ...
  • ActiveMQ_3Java实现

    2021-03-15 02:25:31
    Java实现添加相应的jar包org.apache.activemqactivemq-allx.xx.x创建生产者类(点对点)public class ProducerTest {// 异步发送asyn// 死信队列 DLQ// 文件上传// header properties使用// jdbc存储// byteMsg objMsg...
  • activeMQ所需jar包

    2018-09-28 16:57:09
    使用activeMQ时所需jar包:activemq-broker-5.9.0.jar,activemq-client-5.9.0.jar,geronimo-j2ee-management_1.1_spec-1.0.1.jar,geronimo-jms_1.1_spec-1.1.1.jar,slf4j-api-1.7.5.jar
  • java ActiveMQ的例子

    2016-07-22 14:48:44
    java ActiveMQ的例子,里面有apache-activemq-5.9.0的压缩包,有使用说明,适合c初步了解activeMQ的朋友
  • ActiveMQ安装及JAVA集成使用ActiveMQ简介及官网下载ActiveMQ安装配置及WebApp控制台访问下载解压创建一个systemd服务文件并启动java通过API调用ActiveMQ接口 ActiveMQ简介及官网下载 Apache ActiveMQ是开源、多协议...
  • springboot-activemq:Java

    2021-05-16 10:19:23
    springboot-activemq创建使用者以监听队列中的消息 在appicaltion.properties字段active.topics中更改主题名称 调用localhost:8080 / message将随机消息发送到activemq服务器

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 44,725
精华内容 17,890
关键字:

java调用activemq

java 订阅