精华内容
下载资源
问答
  • 如下的内容是关于Java调用ActiveMQ简单范例的内容,应该能对大伙有所用。 package jms; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms....

    如下的内容是关于Java调用ActiveMQ简单范例的内容,应该能对大伙有所用。

    package jms;  
    
    import javax.jms.Connection;  
    import javax.jms.DeliveryMode;  
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.MessageProducer;  
    import javax.jms.Session;  
    import javax.jms.TextMessage;  
    
    import org.apache.activemq.ActiveMQConnectionFactory;  
    import org.apache.activemq.broker.BrokerService;  
    import org.apache.log4j.PropertyConfigurator;  
    
    public class PTP_sends {  
    
        public PTP_sends(){  
    
        }  
    
        private Connection connection;  
        private String subject = "TOOL.DEFAULT";  
    
        public void send(String msg){  
    
            try {  
                connection=connectionFactory.createConnection();  
                connection.start();  
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                Destination destination=session.createQueue(subject);  
                MessageProducer producer=session.createProducer(destination);  
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
                TextMessage message = session.createTextMessage(msg);  
    
                producer.send(message);  
                System.out.println("消息已经发送。。。。");  
    
                message.clearProperties();  
                session.close();  
                connection.stop();  
                connection.close();  
                System.out.println("关闭资源。。。。");  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
        }  
    
        public static void main(String[] args) {  
            PropertyConfigurator.configure("c:\log4j.properties");  
    
            PTP_sends ptpSends=new PTP_sends();  
            ptpSends.send("this is JMS .....");  
        }  
    
    }  
    
    package jms;  
    
    import javax.jms.Connection;  
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.Message;  
    import javax.jms.MessageConsumer;  
    import javax.jms.Session;  
    import javax.jms.TextMessage;  
    
    import org.apache.activemq.ActiveMQConnectionFactory;  
    import org.apache.log4j.PropertyConfigurator;  
    
    public class PTP_receive {  
    
        private Connection connection;  
        private String subject = "TOOL.DEFAULT";  
    
         public void receive(){  
                ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory();  
                try {  
                    connection=connectionFactory.createConnection();  
                    connection.start();  
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                    Destination destination=session.createQueue(subject);  
                    MessageConsumer consumer=session.createConsumer(destination);  
                    System.out.println("同步接受消息:");  
                    Message message=consumer.receive();  
                    System.out.println("n收到的message 是:"+((TextMessage)message).getText());  
    
                    message.clearProperties();  
                    consumer.close();  
                    session.close();  
                    connection.stop();  
                    connection.close();  
                    System.out.println("关闭资源。。。。");  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
    
        public static void main(String[] args) {  
            PropertyConfigurator.configure("c:\log4j.properties");  
            PTP_receive receive=new PTP_receive();  
            receive.receive();  
        }  
    
    }  
    展开全文
  • 订阅模式需要先订阅才能消费到信息,也就是若先启动生产者进行生产消息,在用消费者是无法接收到信息,要先使用消费者订阅完,再使用生产者才行 生产者发布一个消息,能够被多个订阅的消费者接收到

    一、点对点通信

    1、消息发送者    

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ProducerTest {
    
    	private static final int SENDNUM = 10;
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory connectionFactory;//连接工程
    		Connection connection = null;//连接
    		Session session;//会话 结束或签字发送消息的线程
    		Destination destination;//消息的目的地
    		MessageProducer messageProducer;//消息生产者
    		try {
    			
    			//实例化连接工厂
    			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
    			//通过连接工程获取连接
    			connection = connectionFactory.createConnection();
    			connection.start();//启动连接
    			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
    			destination = session.createQueue("FirstQueue1");//创建队列
    			messageProducer = session.createProducer(destination);//创建消息生产者
    			
    			sendMessage(session, messageProducer);
    			session.commit();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}finally{
    			if(connection != null)
    				connection.close();
    		}
    		
    	}
    	
    	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
    		for (int i = 0; i < SENDNUM; i++) {
    			TextMessage message = session.createTextMessage("ActiveMq 发送消息"+i);
    			messageProducer.send(message);
    		}
    	}
    }
    

    2、消费者

      (1)非监听器模式

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者
     * @author Administrator
     *
     */
    public class JMSConsumer {
    
    	private static final String USERNAME="admin"; // 默认的连接用户名
    	private static final String PASSWORD="admin"; // 默认的连接密码
    	private static final String BROKEURL = "tcp://ip:61616";
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory; // 连接工厂
    		Connection connection = null; // 连接
    		Session session; // 会话 接受或者发送消息的线程
    		Destination destination; // 消息的目的地
    		MessageConsumer messageConsumer; // 消息的消费者
    		// 实例化连接工厂
    		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    		try {
    			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
    			connection.start(); // 启动连接
    			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
    			messageConsumer=session.createConsumer(destination); // 创建消息消费者
    			while(true){
    				TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
    				if(textMessage!=null){
    					System.out.println("收到的消息:"+textMessage.getText());
    				}else{
    					break;
    				}
    			}
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    }
    

    (2)、监听器模式,消费者会实时消息监听,若有消息则立即消费

         a、监听器:

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 消息监听
     * @author Administrator
     *
     */
    public class Listener implements MessageListener{
    
    	@Override
    	public void onMessage(Message message) {
    		// TODO Auto-generated method stub
    		try {
    			System.out.println("收到的消息:"+((TextMessage)message).getText());
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    

    b、消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者
     * @author Administrator
     *
     */
    public class JMSConsumer2 {
    
    	private static final String USERNAME="admin"; // 默认的连接用户名
    	private static final String PASSWORD="admin"; // 默认的连接密码
    	
    	private static final String BROKEURL="tcp://ip:61616"; // 默认的连接地址
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory; // 连接工厂
    		Connection connection = null; // 连接
    		Session session; // 会话 接受或者发送消息的线程
    		Destination destination; // 消息的目的地
    		MessageConsumer messageConsumer; // 消息的消费者
    		
    		// 实例化连接工厂
    		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
    				
    		try {
    			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
    			connection.start(); // 启动连接
    			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
    			messageConsumer=session.createConsumer(destination); // 创建消息消费者
    			messageConsumer.setMessageListener(new Listener()); // 注册消息监听
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    }
    

    二、订阅模式

    一个发布多个订阅者都会受到消息,先订阅后发布

    1、消息生产者

    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ProducerTest2 {
    
    	private static final int SENDNUM = 10;
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory connectionFactory;//连接工程
    		Connection connection = null;//连接
    		Session session;//会话 结束或签字发送消息的线程
    		Topic createTopic;//消息的目的地
    		MessageProducer messageProducer;//消息生产者
    		try {
    			
    			//实例化连接工厂
    			
    			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
    			//通过连接工程获取连接
    			connection = connectionFactory.createConnection();
    			connection.start();//启动连接
    			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
    			createTopic = session.createTopic("FirstTopic1");
    			messageProducer = session.createProducer(createTopic);//创建消息生产者
    			
    			sendMessage(session, messageProducer);
    			session.commit();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}finally{
    			if(connection != null)
    				connection.close();
    		}
    		
    	}
    	
    	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
    		for (int i = 0; i < 1; i++) {
    			TextMessage message = session.createTextMessage("ActiveMq 发送消息"+i);
    			messageProducer.send(message);
    		}
    	}
    }
    

    2、消费者(这边就采用监听器模式)

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者,订阅模式,一个发布多个订阅者都会受到消息,先订阅后发布
     * @author Administrator
     *
     */
    public class JMSConsumer2 {
    
    	private static final String USERNAME="admin"; // 默认的连接用户名
    	private static final String PASSWORD="admin"; // 默认的连接密码
    	private static final String BROKEURL="tcp://ip:61616"; // 默认的连接地址
    	
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory; // 连接工厂
    		Connection connection = null; // 连接
    		Session session; // 会话 接受或者发送消息的线程
    		Destination destination; // 消息的目的地
    		MessageConsumer messageConsumer; // 消息的消费者
    		
    		// 实例化连接工厂
    		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
    				
    		try {
    			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
    			connection.start(); // 启动连接
    			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    			destination=session.createTopic("FirstTopic1"); 
    			messageConsumer=session.createConsumer(destination); // 创建消息消费者
    			messageConsumer.setMessageListener(new Listener()); // 注册消息监听
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} 
    	}
    }
    

    订阅模式需要先订阅才能消费到信息,也就是若先启动生产者进行生产消息,在用消费者是无法接收到信息,要先使用消费者订阅完,再使用生产者才行

    生产者发布一个消息,能够被多个订阅的消费者接收到

    展开全文
  • 使用java操作activeMQ

    2020-09-07 20:38:10
  • java操作activemq使用

    2019-09-03 09:34:52
    activemq使用 1,安装并运行activemq 开启activemq 查看状态 2,浏览器运行查看 3,编写java测试代码 测试producer public void produceQueue() throws Exception{ //创建工厂连接 ConnectionFactory ...

    activemq使用

    1,安装并运行activemq

    开启activemq

    查看状态
    在这里插入图片描述
    2,浏览器运行查看
    在这里插入图片描述
    3,编写java测试代码
    测试producer

    public void produceQueue() throws Exception{
    		//创建工厂连接
    		ConnectionFactory connectionFactory = new 
    				ActiveMQConnectionFactory("admin","admin","tcp://192.168.133.99:61616");
    		//使用连接工厂创建一个连接
    		Connection conn = connectionFactory.createConnection();
    		//开启连接
    		conn.start();
    		//使用连接对象创建会话(session)对象
    		Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		//使用会话对象创建目标对象,queue和topic(一对一,一对多)
    		 Queue queue = session.createQueue("test-queue1");
    		//使用会话对象创建生产者对象
    		 MessageProducer producer = session.createProducer(queue);
    		//使用会话创建消息对象
    		TextMessage message = session.createTextMessage("hello1,冷浪进");
    		//发送消息
    		producer.send(message);
    		//关闭资源
    		producer.close();
    		session.close();
    		conn.close();
    

    测试消费者

    public void consumerQueue() throws JMSException, IOException {
    		//1,创建工厂连接对象
    		ConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.133.99:61616");
    		//2, 使用工厂连接对象
    		Connection connection = factory.createConnection();
    		//3,开启连接
    		connection.start();
    		//4,使用连接对象创建session对象
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		//5,使用连接对象创建目标对象
    		Queue queue = session.createQueue("test-queue1");
    		//6,使用会话对象创建生产者对象
    		MessageConsumer consumer = session.createConsumer(queue);
    		//7,向consumer对象中设置一个messagelistener对象,接受消息
    		Message message = consumer.receive();
    		//8,程序等待接收用户消息
    		String content = ((TextMessage)message).getText();
    		System.out.println(content+":我好帅");
    		 //接收消息
    		//9,关闭资源
    		consumer.close();
    		session.close();
    		connection.close();
    

    4,测试结果

    在这里插入图片描述

    展开全文
  • Publiser.java package example; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; import org....
  • JAVA 监控activemq

    2020-03-27 17:14:07
    上篇内容(点击可看)里面提到了我遇到的bug,最后发现是有人动了我的服务器上的activemq,导致我的队列出现了问题。 我的代码能够获得topic,能够获得broker,就是队列信息获取不到,我还纳闷了..... import java...
  • public static void main(String[] args) throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setUseJmx(true); brokerService.addConnector("tcp://localhost:61616...
  • 7.1使用Java嵌入ActiveMQ

    2012-12-05 15:59:53
    在这一节我们使用ActiveMQJava APIs来initialize和configureActiveMQ。你将看到如何使用BrokerService类来配置代理,什么都不用,仅仅是纯Java。 然后我们将讨论你如何能使用定制的配置XML文件配置你的代理。我们...
  • broker-url: tcp://192.168.1.2:61616 #自己的activemq的服务器的地址 user: admin password: admin jms: pub-sub-domain: true #false(默认)是queue true :topic mytopic: boot-activemq-topic #指定队列...
  • javaactivemq配置和使用

    千次阅读 2018-08-08 11:00:32
    applicationContext-activemq.xml的配置 &lt;?xml version="1.0" encoding="UTF-8"?&gt; &lt;beans xmlns="http://www.springframework.org/schema/beans" xmlns:...
  • Java访问ActiveMQ

    2018-11-26 10:49:00
    我们这里使用windows测试,所以下载windows版本即可。 2、启动ActiveMQ  下载zip文件后直接解压,解压后我们比较关注的是bin和conf目录。 bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志...
  • ActiveMQjava中的嵌入式broker 导包 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> &l....
  • 1. Topic Topic:主题模式、广播模式、pub/sub模式、 特点:一个消息可以被多个消费者消费(微信公众号给粉丝发信息) 在Topic模式中需要先启动消费者,在启动消费者,原因就好比方说,我们需要先关注微信工作号才会...
  • Java使用ActiveMQ步骤

    2019-02-28 14:17:34
    Java代码实现ActiveMQ1.在maven中导入ActiveMQ相关包2.编写MQ消息生产者2.编写MQ消息消费者3.测试发送消息4.测试接收消息 1.在maven中导入ActiveMQ相关包 &amp;lt;dependency&amp;gt; &amp;lt;...
  • java使用ActiveMQ

    2018-11-15 16:04:34
    导入activeMq的相关依赖 &lt;dependency&gt; &lt;groupId&gt;org.apache.activemq&lt;/groupId&gt; &lt;artifactId&gt;activemq-client&lt;/artifactId&gt; &lt;...
  • package com.java.elasticsearch.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author Woo_home * @create by 2020/5/3 13:51 */ public class ...
  • package com.java.elasticsearch.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author Woo_home * @create by 2020/5/2 21:32 */ public class JmsProduce { ...
  • 这个项目定义了一对 Java DSL 来使用 ActiveMQ。 它最初的创建是为了允许在单元测试中定义代理配置的简化机制,而无需求助于 ActiveMQ 的内部类,或者使用 ActiveMQ XML 配置文件创建许多相似但不同的代理配置。 ...
  • java项目使用activemq

    2019-11-08 15:43:04
    1.启动activemq,并进入网站 http://127.0.0.1:8161/,打开mq网站,可查看相关队列信息 2.点击 Manage ActiveMQ broker,登陆账号密码均为 admin 第二步:idea创建maven工程,加入依赖如下: <dependencies> ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 40,048
精华内容 16,019
关键字:

java调用activemq

java 订阅