精华内容
下载资源
问答
  • ActiveMQ发送消息  1:创建链接工厂ConnectionFactory  2:创建链接Connection  3:启动session  4:创建消息发送目的地  5:创建生产者  6:发送消息 消息发送类: package ...

    推荐文章:ActiveMQ讯息传送机制以及ACK机制

     

    ActiveMQ发送消息

      1:创建链接工厂ConnectionFactory

      2:创建链接Connection

      3:启动session

      4:创建消息发送目的地

      5:创建生产者

      6:发送消息

    消息发送类:

    package com.apt.study.util.activemq;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
    
        private static final String USERNAME = "admin";
        
        private static final String PASSWORD = "admin";
        
        private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
        
        private AtomicInteger count = new AtomicInteger();
        
        private ConnectionFactory connectionFactory;
        
        private Connection connection;
        
        private Session session;
        
        private Queue queue;
        
        private MessageProducer producer;
        
        public void init() {
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                 //从工厂中创建一个链接
                connection = connectionFactory.createConnection();
                //启动链接,不启动不影响消息的发送,但影响消息的接收
                connection.start();
                //创建一个事物session
                session = connection.createSession(true, Session.SESSION_TRANSACTED);
                //获取消息发送的目的地,指消息发往那个地方
                queue = session.createQueue("test");
                //获取消息发送的生产者
                producer = session.createProducer(queue);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public void sendMsg(String queueName) {
            try {
                int num = count.getAndIncrement();
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                            "productor:生产者发送消息!,count:"+num);
                producer.send(msg);
                     
                session.commit();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    }
    connection.createSession方法
     /**
         * Creates a <CODE>Session</CODE> object.
         *
         * @param transacted indicates whether the session is transacted
         * @param acknowledgeMode indicates whether the consumer or the client will
         *                acknowledge any messages it receives; ignored if the
         *                session is transacted. Legal values are
         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
         * @return a newly created session
         * @throws JMSException if the <CODE>Connection</CODE> object fails to
         *                 create a session due to some internal error or lack of
         *                 support for the specific transaction and acknowledgement
         *                 mode.
         * @see Session#AUTO_ACKNOWLEDGE
         * @see Session#CLIENT_ACKNOWLEDGE
         * @see Session#DUPS_OK_ACKNOWLEDGE
         * @since 1.1
         */
        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
            checkClosedOrFailed();
            ensureConnectionInfoSent();
            if(!transacted) {
                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
                }
            }
            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
        }
    createSession方法里有两个参数,第一个参数表示是否使用事务,第二个参数表示消息的确认模式。消息的确认模式共有4种:
    1:AUTO_ACKNOWLEDGE 自动确认
    2:CLIENT_ACKNOWLEDGE 客户端手动确认 
    3:DUPS_OK_ACKNOWLEDGE 自动批量确认
    0:SESSION_TRANSACTED 事务提交并确认
    4:INDIVIDUAL_ACKNOWLEDGE 单条消息确认 为AcitveMQ自定义的ACK_MODE
    各种确认模式详细说明可以看文章:ActiveMQ讯息传送机制以及ACK机制
    从createSession方法中可以看出如果如果session不使用事务但是却使用了消息提交(SESSION_TRANSACTED)确认模式,或使用的消息确认模式不存在,将抛出异常。

    ActiveMQ接收消息

      1:创建链接工厂ConnectionFactory

      2:创建链接Connection

      3:启动session

      4:创建消息发送目的地

      5:创建生产者

      6:接收消息或设置消息监听器

    消息接收类:

    package com.apt.study.util.activemq;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQPrefetchPolicy;
    
    public class Receiver {
    
    private static final String USERNAME = "admin";
        
        private static final String PASSWORD = "admin";
        
        private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
        
        private AtomicInteger count = new AtomicInteger();
        
        private ConnectionFactory connectionFactory;
        
        private ActiveMQConnection connection;
        
        private Session session;
        
        private Queue queue;
        
        private MessageConsumer consumer;
        
        public void init() {
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                 //从工厂中创建一个链接
                connection = (ActiveMQConnection) connectionFactory.createConnection();
                
                //启动链接
                connection.start();
                //创建一个事物session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //获取消息接收的目的地,指从哪里接收消息
                queue = session.createQueue("test");
                //获取消息接收的消费者
                consumer = session.createConsumer(queue);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public void receiver(String queueName) {
            
            try {
                    TextMessage msg = (TextMessage) consumer.receive();
                    if(msg!=null) {
                        System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                    }
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } 
        }
    }
    consumer.receive方法
    /**
         * Receives the next message produced for this message consumer.
         * <P>
         * This call blocks indefinitely until a message is produced or until this
         * message consumer is closed.
         * <P>
         * If this <CODE>receive</CODE> is done within a transaction, the consumer
         * retains the message until the transaction commits.
         *
         * @return the next message produced for this message consumer, or null if
         *         this message consumer is concurrently closed
         */
        public Message receive() throws JMSException {
            checkClosed(); //检查unconsumedMessages是否关闭 ,消费者从unconsumedMessages对象中获取消息
            checkMessageListener(); //检查是否有其他消费者使用了监听器,同一消息消息队列中不能采用reveice和messageListener并存消费消息
    
            sendPullCommand(0); //如果prefetchSize为空且unconsumedMessages为空 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备
            MessageDispatch md = dequeue(-1); //从unconsumedMessages取出一个消息 
            if (md == null) {
                return null;
            }
    
            beforeMessageIsConsumed(md);
            afterMessageIsConsumed(md, false);
    
            return createActiveMQMessage(md);
        }
    prefetchSize属性如果大于0,消费者每次拉去消息时都会预先拉取一定量的消息,拉取的消息数量<=prefetchSize,prefetchSize默认指为1000,这个默认值是从connection中传过来的
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
            checkClosed();
    
            if (destination instanceof CustomDestination) {
                CustomDestination customDestination = (CustomDestination)destination;
                return customDestination.createConsumer(this, messageSelector, noLocal);
            }
    
            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
            int prefetch = 0;
            if (destination instanceof Topic) {
                prefetch = prefetchPolicy.getTopicPrefetch();
            } else {
                prefetch = prefetchPolicy.getQueuePrefetch();
            }
            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
        }

    消息接收类代码中调用session.createConsumer其实调用的就是上面的createConsumer方法,从上面代码中可以看出connection会将自己的prefetch传递给消费者,connection中的ActiveMQPrefetchPolicy

    对象属性如下:

    public class ActiveMQPrefetchPolicy extends Object implements Serializable {
        public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
        public static final int DEFAULT_QUEUE_PREFETCH = 1000;
        public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
        public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
        public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
        public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
        public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
    
        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class);
    
        private int queuePrefetch;
        private int queueBrowserPrefetch;
        private int topicPrefetch;
        private int durableTopicPrefetch;
        private int optimizeDurableTopicPrefetch;
        private int inputStreamPrefetch;
        private int maximumPendingMessageLimit;
    
        /**
         * Initialize default prefetch policies
         */
        public ActiveMQPrefetchPolicy() {
            this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
            this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
            this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
            this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
            this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
            this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
        }

    这里我只截了一部分代码,可以看到队列(queue)默认的queuePrefetch为1000,queuePrefetch的最大值不能超过MAX_PREFETCH_SIZE(32767)

    当然我们也可以自己设置消费者预先拉取的消息数量,方法有两种

    一:在创建connection之后修改connection中的queuePrefetch;代码如下:

    ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
    prefetchPolicy.setQueuePrefetch(number);
    connection.setPrefetchPolicy(prefetchPolicy);

    二:在创建队列(queue)的时候传入参数,回到ActiveMQMessageConsumer的创建代码中:

     public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
                String name, String selector, int prefetch,
                int maximumPendingMessageCount, boolean noLocal, boolean browser,
                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
            if (dest == null) {
                throw new InvalidDestinationException("Don't understand null destinations");
            } else if (dest.getPhysicalName() == null) {
                throw new InvalidDestinationException("The destination object was not given a physical name.");
            } else if (dest.isTemporary()) {
                String physicalName = dest.getPhysicalName();
    
                if (physicalName == null) {
                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
                }
    
                String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
    
                if (physicalName.indexOf(connectionID) < 0) {
                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
                }
    
                if (session.connection.isDeleted(dest)) {
                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
                }
                if (prefetch < 0) {
                    throw new JMSException("Cannot have a prefetch size less than zero");
                }
            }
            if (session.connection.isMessagePrioritySupported()) {
                this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
            }else {
                this.unconsumedMessages = new FifoMessageDispatchChannel();
            }
    
            this.session = session;
            this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
            setTransformer(session.getTransformer());
    
            this.info = new ConsumerInfo(consumerId);
            this.info.setExclusive(this.session.connection.isExclusiveConsumer());
            this.info.setSubscriptionName(name);
            this.info.setPrefetchSize(prefetch);
            this.info.setCurrentPrefetchSize(prefetch);
            this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
            this.info.setNoLocal(noLocal);
            this.info.setDispatchAsync(dispatchAsync);
            this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
            this.info.setSelector(null);
    
            // Allows the options on the destination to configure the consumerInfo
            if (dest.getOptions() != null) {
                Map<String, Object> options = IntrospectionSupport.extractProperties(
                    new HashMap<String, Object>(dest.getOptions()), "consumer.");
                IntrospectionSupport.setProperties(this.info, options);
                if (options.size() > 0) {
                    String msg = "There are " + options.size()
                        + " consumer options that couldn't be set on the consumer."
                        + " Check the options are spelled correctly."
                        + " Unknown parameters=[" + options + "]."
                        + " This consumer cannot be started.";
                    LOG.warn(msg);
                    throw new ConfigurationException(msg);
                }
            }
    
            this.info.setDestination(dest);
            this.info.setBrowser(browser);
            if (selector != null && selector.trim().length() != 0) {
                // Validate the selector
                SelectorParser.parse(selector);
                this.info.setSelector(selector);
                this.selector = selector;
            } else if (info.getSelector() != null) {
                // Validate the selector
                SelectorParser.parse(this.info.getSelector());
                this.selector = this.info.getSelector();
            } else {
                this.selector = null;
            }
    
            this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
                                       && !info.isBrowser();
            if (this.optimizeAcknowledge) {
                this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
                setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
            }
    
            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
            this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
            this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
            this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
            if (messageListener != null) {
                setMessageListener(messageListener);
            }
            try {
                this.session.addConsumer(this);
                this.session.syncSendPacket(info);
            } catch (JMSException e) {
                this.session.removeConsumer(this);
                throw e;
            }
    
            if (session.connection.isStarted()) {
                start();
            }
        }

    this.info.setPrefetchSize(prefetch);

    又上面代码可以看出,在创建ActiveMQMessageConsumer的过程中,程序会将connection中的queuePrefetch赋给ActiveMQMessageConsumer对象中的info对象(info为一个ConsumerInfo对象)

    Map<String, Object> options = IntrospectionSupport.extractProperties(
                    new HashMap<String, Object>(dest.getOptions()), "consumer.");
                IntrospectionSupport.setProperties(this.info, options);

    在创建队列(queue)的过程中,我们可以传一些参数来配置消费者,这些参数的前缀必须为consumer. ,当我们传的参数与info对象中的属性匹配时,将覆盖info对象中的属性值,其传参形式如下:

    queueName?param1=value1&param2=value2

    所以我们如果想改变消费者预先拉取的消息数量,可以在创建对象的时候传入如下参数

    queue = session.createQueue("test?consumer.prefetchSize=number");

     

    ActiveMq接收消息--监听器

    监听器代码如下:

    package com.apt.study.util.activemq;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class ReceiveListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            try {
                 TextMessage msg = (TextMessage) message;
                 if(msg!=null) {
                     System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText());
                 }
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    消息接收类:

    package com.apt.study.util.activemq;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQPrefetchPolicy;
    
    public class Receiver {
    
    private static final String USERNAME = "admin";
        
        private static final String PASSWORD = "admin";
        
        private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
        
        private AtomicInteger count = new AtomicInteger();
        
        private ConnectionFactory connectionFactory;
        
        private ActiveMQConnection connection;
        
        private Session session;
        
        private Queue queue;
        
        private MessageConsumer consumer;
        
        public void init() {
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                 //从工厂中创建一个链接
                connection = (ActiveMQConnection) connectionFactory.createConnection();
                
                //启动链接
                connection.start();
                //创建一个事物session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                
                queue = session.createQueue("test");
                
                consumer = session.createConsumer(queue);
                //设置消息监听器
                consumer.setMessageListener(new ReceiveListener());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
    }
    consumer.setMessageListener方法:
    public void setMessageListener(MessageListener listener) throws JMSException {
            checkClosed();
            if (info.getPrefetchSize() == 0) {
                throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
            }
            if (listener != null) {
                boolean wasRunning = session.isRunning();
                if (wasRunning) {
                    session.stop();
                }
    
                this.messageListener.set(listener);
                session.redispatch(this, unconsumedMessages);
    
                if (wasRunning) {
                    session.start();
                }
            } else {
                this.messageListener.set(null);
            }
        }

    从代码中可以看出,当我们使用监听器时,消费者prefetchSize必须大于0

     

     

     
     

    转载于:https://www.cnblogs.com/sjcq/p/7469449.html

    展开全文
  • 由于ActiveMQ是一个独立的jms provider,JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步...

    由于ActiveMQ是一个独立的jms provider,JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    ActiveMQ官网下载地址:http://activemq.apache.org/download.html

    队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:
    1、点对点(point-to-point,简称PTP)Queue消息传递模型:
    通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。

    2、发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:
    通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。

    在这里插入图片描述
    案例:

    1、生产者

    package com.activeMQ;
        
        
        import java.util.concurrent.atomic.AtomicInteger;
        
        import javax.jms.Connection;
        import javax.jms.ConnectionFactory;
        import javax.jms.JMSException;
        import javax.jms.MessageProducer;
        import javax.jms.Queue;
        import javax.jms.Session;
        import javax.jms.TextMessage;
        import javax.jms.Topic;
        
        import org.apache.activemq.ActiveMQConnection;
        import org.apache.activemq.ActiveMQConnectionFactory;
        
        public class Producter {
        	//默认登录用户
        	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        	//默认登录密码
        	private static final String USERPASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        	//默认登录路径
        	private static final String USERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        	
        	//线程安全 计数
        	AtomicInteger count = new AtomicInteger(0);
        	//连接工厂
        	ConnectionFactory connectionFactory;
        	//连接对象
        	Connection connection;
        	//事务管理
        	Session session;
        	ThreadLocal<MessageProducer> threadLocal =new ThreadLocal<>();
        	public void init() {
        		//创建连接工厂
        		connectionFactory = new ActiveMQConnectionFactory(USERNAME, USERPASSWORD, USERURL);
        	    //获取连接对象
        		try {
        			connection = (Connection) connectionFactory.createConnection();
        			//开启连接
        			connection.start();
        			//创建一个事务
                           /**
                           //createSession方法里有两个参数,第一个参数表示是否使用事务,第二个参数表示消息的确认模式。消息的确认模式共有4种:
                                       // 1:AUTO_ACKNOWLEDGE 自动确认
                                      // 2:CLIENT_ACKNOWLEDGE 客户端手动确认 
                                     // 3:DUPS_OK_ACKNOWLEDGE 自动批量确认
                                   //  0:SESSION_TRANSACTED 事务提交并确认**/
                                   //4:INDIVIDUAL_ACKNOWLEDGE 单条消息确认 为AcitveMQ自定义的ACK_MODE
        			session = connection.createSession(true, Session.SESSION_TRANSACTED);
        		} catch ( Exception e) {
        			// TODO Auto-generated catch block
        			e.printStackTrace();
        		}
        	}
        	//消息对列
        	public void sendMessage(String str) {
        		try {
        			//创建一个消息队列
        			Queue queue = session.createQueue(str);
        			//消息生产者
        			MessageProducer messageProducer = null;
        			if(threadLocal.get()!=null) {
        				messageProducer = threadLocal.get();
        			}else {
        				messageProducer = session.createProducer(queue);
        				threadLocal.set(messageProducer);
        			}
        			while( true) {
        				//线程睡眠1s
        				Thread.sleep(3000);
        				//递增
        				int num = count.incrementAndGet();
        				//创建一条消息
        				TextMessage tmessage = session.createTextMessage(Thread.currentThread().getName()+"生产hello word"+num);
        				System.out.println(Thread.currentThread().getName()+"生产hello word"+num);
        				//发送消息
        				messageProducer.send(tmessage);
        				//提交事务
        				session.commit();
        				
        			}
        		} catch (JMSException e) {
        			// TODO Auto-generated catch block
        			e.printStackTrace();
        		} catch (InterruptedException e) {
        			// TODO Auto-generated catch block
        			e.printStackTrace();
        		}
        		
        	}
        	//主题Topic模式发送
            public void TestTopicProducer() throws Exception{
                //1、创建工厂连接对象,需要制定ip和端口号
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,USERPASSWORD,USERURL);//new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
                //2、使用连接工厂创建一个连接对象
                Connection connection = connectionFactory.createConnection();
                //3、开启连接
                connection.start();
                //4、使用连接对象创建会话(session)对象
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
                Topic topic = session.createTopic("test-topic");
                
                //6、使用会话对象创建生产者对象
                MessageProducer producer =null ;//session.createProducer(topic);
                if(threadLocal.get()!=null) {
                	producer = threadLocal.get();
                }else {
                	producer = session.createProducer(topic);
                	threadLocal.set(producer);
                }
                while(true) {
                	Thread.sleep(3000);
                	int num = count.incrementAndGet();
                	//7、使用会话对象创建一个消息对象
                    TextMessage textMessage = session.createTextMessage("hello!test-topic"+num);
                    System.out.println("hello!test-topic"+num);
                    //8、发送消息
                    producer.send(textMessage);
                    //9、关闭资源
        //            producer.close();
        //            session.commit();
        //            connection.close();
                }
               
            }
        
        	 
        	
        }
    
    

    生产者测试:

    package com.activeMQ;
        
        public class TestProducter {
            public static void main(String[] args){
                Producter producter = new Producter();
                producter.init();
                TestProducter testMq = new TestProducter();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                //Thread 1
                new Thread(testMq.new ProductorMq(producter)).start();
                //Thread 2
                new Thread(testMq.new ProductorMq(producter)).start();
                //Thread 3
                new Thread(testMq.new ProductorMq(producter)).start();
                //Thread 4
                new Thread(testMq.new ProductorMq(producter)).start();
                //Thread 5
                new Thread(testMq.new ProductorMq(producter)).start();
            }
        
            private class ProductorMq implements Runnable{
                Producter producter;
                public ProductorMq(Producter producter){
                    this.producter = producter;
                }
        
                @Override
                public void run() {
                    while(true){
                        try {
                            producter.sendMessage("Jaycekon-MQ");
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    

    2、消费者

    package com.activeMQ;
        
        import java.util.concurrent.atomic.AtomicInteger;
        
        import javax.jms.Connection;
        import javax.jms.ConnectionFactory;
        import javax.jms.JMSException;
        import javax.jms.Message;
        import javax.jms.MessageConsumer;
        import javax.jms.MessageListener;
        import javax.jms.Queue;
        import javax.jms.Session;
        import javax.jms.TextMessage;
        import javax.jms.Topic;
        
        import org.apache.activemq.ActiveMQConnection;
        import org.apache.activemq.ActiveMQConnectionFactory;
        import org.apache.camel.processor.validation.PredicateValidationException;
        
        public class Comsumer {
        	//获取默认用户
           private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
           //获取默认密码
           private static final String USERPASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
           //获取默认路径
           private static final String USEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
           //MQ 连接工厂
           private ActiveMQConnectionFactory activeMQConnectionFactory;
           //连接工厂
           private ConnectionFactory connectionFactory;
           //连接对象
           private Connection connection;
           //事务管理
           private Session session;
           //线程安全计数器
           private AtomicInteger counnt = new AtomicInteger(0);
           private ThreadLocal<MessageConsumer>threadLocal = new ThreadLocal<>();
           public void init() {
        	   connectionFactory = new ActiveMQConnectionFactory(USERNAME, USERPASSWORD, USEURL);
        	   try {
        		connection = connectionFactory.createConnection();
        		connection.start();
        		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        	} catch (JMSException e) {
        		// TODO Auto-generated catch block
        		e.printStackTrace();
        	}
           }
           
           //获取消息
           public void getMessage(String name) {
        	   try {
        		Queue queue = session.createQueue(name);
        		//消费者
        		MessageConsumer messageConsumer = null;
        		if(threadLocal.get()!=null) {
        			messageConsumer = threadLocal.get();
        		}else {
        			messageConsumer = session.createConsumer(queue);
        			threadLocal.set(messageConsumer);
        		}
        		while( true) {
        			//线程睡眠1s
        			Thread.sleep(3000);
        			//递增
        			int num = counnt.incrementAndGet();
        			
        			//获取一条消息
        			TextMessage tmessage = (TextMessage) messageConsumer.receive();
        			if(tmessage !=null) {
        				//调用acknowledge()方法进行消息确认:
        				tmessage.acknowledge();
        				System.out.println(Thread.currentThread().getName()+"消费了 "+tmessage.getText()+num);
        			}else {
        				break;
        			}
        
        			//提交事务
        //			session.commit();
        			
        		}
        	} catch (JMSException e) {
        		// TODO Auto-generated catch block
        		e.printStackTrace();
        	} catch (InterruptedException e) {
        		// TODO Auto-generated catch block
        		e.printStackTrace();
        	}
        
           }
    //接收Topic 主题模式消息
       public void TestTopicConsumer() throws Exception{
           //1、创建工厂连接对象,需要制定ip和端口号
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,USERPASSWORD,USEURL);
           //2、使用连接工厂创建一个连接对象
           Connection connection = connectionFactory.createConnection();
           //3、开启连接
           connection.start();
           //4、使用连接对象创建会话(session)对象
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
           Topic topic = session.createTopic("test-topic");
           MessageConsumer consumer =null;
           if(threadLocal.get()!=null) {
        	   consumer = threadLocal.get();
           }else {
        	 //6、使用会话对象创建生产者对象
               consumer = session.createConsumer(topic);
               threadLocal.set(consumer);
           }
           while(true) {
        	   Thread.sleep(3000);
        	 //7、向consumer对象中设置一个messageListener对象,用来接收消息
               consumer.setMessageListener(new MessageListener() {
    
                   @Override
                   public void onMessage(Message message) {
                       // TODO Auto-generated method stub
                       if(message instanceof TextMessage){
                           TextMessage textMessage = (TextMessage)message;
                           try {
                               System.out.println(textMessage.getText());
                           } catch (JMSException e) {
                               // TODO Auto-generated catch block
                               e.printStackTrace();
                           }
                       }
                   }
               });
           }
           
    //       //8、程序等待接收用户消息
    //       System.in.read();
    //       //9、关闭资源
    //       consumer.close();
    //       session.close();
    //       connection.close();
       }
       
    }
    
    

    消费者测试:

    package com.activeMQ;
    
    public class TestConsumer {
        public static void main(String[] args){
            Comsumer comsumer = new Comsumer();
            comsumer.init();
            TestConsumer testConsumer = new TestConsumer();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }
    
        private class ConsumerMq implements Runnable{
            Comsumer comsumer;
            public ConsumerMq(Comsumer comsumer){
                this.comsumer = comsumer;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        comsumer.getMessage("Jaycekon-MQ");
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    效果图:
    在这里插入图片描述

    那Activemq的特性是什么?
    ActiveMQ的特性
    多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    支持通过JDBC和journal提供高速的消息持久化
    从设计上保证了高性能的集群,客户端-服务器,点对点
    支持Ajax
    支持与Axis的整合
    可以很容易得调用内嵌JMS provider,进行测试
    什么情况下使用ActiveMQ?
    多个项目之间集成
    (1) 跨平台
    (2) 多语言
    (3) 多项目
    2.降低系统间模块的耦合度,解耦
    (1) 软件扩展性
    3.系统前后端隔离
    (1) 前后端隔离,屏蔽高安全区

    展开全文
  • ActiveMQ中,消息分为持久化消息和非持久化消息。消息的持久化特性,通过producer.setDelivery()方法来设置。 MessageProducer producer = session.createProducer(destination); //DeliveryMode.NON_PERSISTENT...

    在ActiveMQ中,消息分为持久化消息非持久化消息。消息的持久化特性,通过producer.setDelivery()方法来设置。

    MessageProducer producer = session.createProducer(destination);
    //DeliveryMode.NON_PERSISTENT--持久化
    //DeliveryMode.NON_PERSISTENT--非持久化
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    消息的发送,分为同步发送(sync)异步发送(async)。接下来我们来详细讲解ActiveMQ消息的发送策略。

    1.同步发送 VS 异步发送

           消息的同步和异步发送,是ActiveMQ的两个特性。这两个特性主要是在消息发送端---至--->Broker之间的一个概念。ActiveMQ支持同步、异步两种模式,来讲消息发送至Broker上。

           同步同步发送过程中,发送者发送一条消息会阻塞,直到Broker反馈给一个确认消息,表示消息已经被Broker处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,【会影响到消息从生产者发送到Broker上的性能

           异步:异步发送的过程中,发送者不需要等待Broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所以使用异步发送的前提,是在允许出现数据丢失的情况下使用

           策略选择:异步发送或者同步发送,在不同的情况下有不同的策略。那就是持久化策略非持久化策略。在默认情况下,【非持久化消息是异步发送的非持久化消息 && 非事务模式下是同步发送的】,但是在开启事务的情况下,消息都是异步发送的。因为异步发送的效率会比同步发送性能更快,所以【在发送持久化消息的时候,尽量去开启事务会话

          如何设置同步/异步发送:除了持久化消息和非持久化消息的同步和异步特性外,我们还可以通过以下三种方式来设置异步发送:

    //1.直接在brokerUrl中设置
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.204.201:61616?jms.useAsyncSend=true");
    //2.直接在connectionFactory中设置属性
    ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
    //3.直接在当前connection设置属性
    ((ActiveMQConnection)connection).setUseAsyncSend(true);

    2.ActiveMQ消息发送流程图

    流程解析

       1. producer.send( )方法,默认是异步发送,在发送过程中,会判断producerWindow是否有空间;

       2. 如果说没有空间,则它会处于阻塞(Blocking)状态;

       3. 如果空间被释放,当消息被Broker端确认后,producerWindow窗口会递减,此时,在Broker端会发送一个onProducerACK消息,producerWindow会递减消费后的消息的大小;

      4. 此时Blocking状态,又会恢复为一个可运行状态;

      5. 如果producerWindow存在空间大小的话,会再次判断是不是异步发送。其实在步骤1中就已经有一个判断了。默认是异步发送的,只有异步发送,才会存在producerWindow这个概念;

      6. 如果是异步发送,会增加producerWindow的空间大小;

      7. 再会通过传输层,去发送消息至Broker上.至此,异步发送结束;

      8. 同步发送的情况下,会经过传输层,此时会处于阻塞(Blocking)状态;

      9. 消息最终,还是会发送至Broker,最后Broker再返回相关数据。

    3.ProducerWindowSize概念解读

           ProducerWindowSize概念,是producer允许积压的消息的大小,仅针对异步情况下有效。通过producerWindowSize能够控制待确认的消息的大小。
           异步发送情况下,producer每发送一个消息,都会统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,确认当前剩余的空间是否做够接收新消息(大小),然后才能继续发送。

      如何设置消息发送窗口(ProducerWindowSize)的大小?

           1.通过jms.producerWindowSize=xxx来设置

             在brokerUrl中设置:"tcp://192.168.204.201:61616?jms.producerWindowSize=xxx",这种设置将会对所有的producer生效。

           2.也可以destinationUri中通过producer.windowSize=xxx设置

             在destinationUri中设置:session.setQueue("myQueue?producer.windowSize=xxx"),这种设置只对使用此Destionation实例的producer生效,这这个设置将会覆盖brokerUrl中设置的producerWindowSize值。注意:此值越大,意味着消耗的内存也会越大。

    4.ActiveMQ消息发送源码分析

       如需了解源码,请点击:ActiveMQ消息发送【源码分析】

    END

    展开全文
  • activemq 5-15.19 问题描述 在直接使用springboot 中jmsMessagingTemplate.convertAndSend(queueKey,tempRuleTemplate). 进行插入时,当数量少的时候,没有什么问题,但是当我需要插入1w条数据的时候,如果循环...

    环境 

    springboot 1.5.1.RELEAS

    activemq 5-15.19

    问题描述

    在直接使用springboot 中jmsMessagingTemplate.convertAndSend(queueKey,tempRuleTemplate).

    进行插入时,当数量少的时候,没有什么问题,但是当我需要插入1w条数据的时候,如果循环使用该方法,插入1w条数据需要的时间是几分钟,这是完全不可以接受的。

    问题分析

    发现这个问题的时候,我其实内心也有些预感会出现问题,毕竟在mysql,可以用事务,在redis队列操作的时候,会有一个批量操作的方法。 

    按照这个思路我对jmsMessageTemplate,还有jmsTemplate中的方法进行全方位的搜索,都没有找到相关的方法。最后通过搜索我找到了关于在spring中对activeMQ 进行事务操作的方法,但是这个跟我的情况有些出入,我用的是springboot,由于时间赶,我在网上找到一个示例,了解到如何操作事务,最后算是初步解决了问题。

    问题解决方法(应该可以优化)

    通过使用下面的方法,1w条数据插入从几分钟最后缩短到了10秒以内。所以可以说效果还是比较明显的。

    但是自己这种写法应该还是比较欠妥,请各位大神指点一下

    我在插入数据的地方从原来的

        for (int i = lastElement+1; i <= increaseCount; i++) {
                           
                   jmsMessagingTemplate.convertAndSend(queueKey,tempRuleTemplate);
                         
         }

    转换成这种写法

    //我使用了activemq的pool
    @AutoWired
    PooledConnectionFactory pooledConnectionFactory;
    
       Connection connection = null;
                    Session session = null;
                    try {
                        // 从连接工厂创建一条连接
                         connection = pooledConnectionFactory.createConnection();
                        // 开启连接
                        connection.start();
                        // 创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 一个类似 接受 或者发送的线程
                         session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                        // 用session创建一个
                        Destination destination = session.createQueue(queueKey);
                        // MessageProducer:消息生产者
                        MessageProducer producer = session.createProducer(destination);
                        // 设置不持久化 NON_PERSISTENT  PERSISTENT设置持久化
                        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
                        for (int i = lastElement+1; i <= increaseCount; i++) {
                         
                            producer.send(session.createTextMessage(tempRuleTemplate));
                            
                        }
                        //提交事务
                        session.commit();
                        producer.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                        try {
                            session.rollback();
                        } catch (JMSException ex) {
                            ex.printStackTrace();
                        }
                    }finally {
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }

    鸣谢

    这个写法是某位博主用于spring的然后我改成在springboot中使用,由于之前参考了很多资料,然后翻了好久都没找到这位博主的链接,下次找到了再补上。

    补充:

    这种写法虽然初步解决了问题,但是我知道应该有更加简洁的写法,请大神们指导下。

    后面如果我弄清楚了,会再上来优化!

    展开全文
  • 本文将对ActiveMQ发送消息的源码进行解析,并分析ActiveMQ持久化消息和非持久化消息的发送策略和消息的存储策略。 消息的发送原理 消息同步发送和异步发送 ActiveMQ支持同步、异步两种发送模式将消息发送到broker上...
  • ActiveMQ 消息的发送过程ActiveMQ 支持同步、异步两种发送模式将消息发送到broker上。同步发送过程中,发送是阻塞的,直到收到确认。发送发送一条消息会阻...
  • 1、启动activemq,并进入MQ的消息队列 2、在pom.xml中添加依赖包 <!-- 整合activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>...
  • ActiveMq整合SpringMVC实现批量邮件进行异步发送,下载后可在Eclipse中直接使用(含有ActiveMq安装文件已配置好解压即可用)希望对大家有所帮助
  • ActiveMQ

    2020-12-02 00:09:34
    目录1、介绍2、ActiveMQ - API01、Queue队列02、Topic 主题03、Topic和Queue对比3、JMS规范01、消息头02、消息体03、消息属性04、消息持久化05、事务06、签收07、JMS发布订阅小结4、ActiveMQ的broker5、Spring整合...
  • 一、前言 在前一篇博客中,小编向大家简单的介绍了一下ActiveMQ的消息处理方式,包括了点对点,发布订阅两种模式。写向大家展示了一下如何使用,但是在真正开发的时候我们是不会写那么一大片代码,从建立连接工厂,...
  • 一、创建配置消息发送接收目的地、ActiveMQ中间件地址 JMS_BROKER_URL=failover://(tcp://192.168.1.231:61616) QUEUE_BUSP_TP_SMS_MESSAGE=busp.tp.sms.message 二、创建消息生产者配置 <?xml ...
  • activemq

    千次阅读 2021-06-12 18:33:09
    activemq前言使用步骤1.安装2.java操作activemq2.1 队列模式2.1 主题模式2.3 topic和queue的对比总结3.jms4.发布订阅5.事务6.签收7.broker8.springboot整合activemq8.1 activemq-produce(生产者)8.2 activemq-...
  • activeMQ

    2019-09-14 01:44:23
    ActiveMQ Activemq/ zeromq /rocketmq/kafka/RabbitMq/JMS ...
  • ActiveMQ实现消息队列发送邮件

    千次阅读 2016-08-17 14:05:14
    .activemq .command .ActiveMQTextMessage ; import org .apache .commons .logging .Log ; import org .apache .commons .logging .LogFactory ; import org .springframework .beans .factory .annotation ....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,536
精华内容 2,614
关键字:

activemq批量发送