精华内容
下载资源
问答
  • RabbitMQ客户连接池Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
  • 本篇文章主要介绍了java远程连接调用Rabbitmq的实例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • RabbitMQ连接池+SpringBoot实现。通过连接池实现将高效的管理RabbitMQ的Connection,并与springboot进行整合,实现消息发送,获取队列列表等功能。基于此可以进行更多功能的扩充。
  • RabbitMQ客户端连接池实现代码可直接复制项目中使用,内含MQhelper getMessage sendMessage
  • RabbitMQ客户连接池的实现代码示例
  • RabbitMQ客户连接池的实现代码示例
  • RabbitMQ客户端连接池的原理及源码

    热门讨论 2015-04-18 19:24:15
    RabbitMQ客户端连接池的原理及源码,经过仔细功能测试和性能测试的
  • 前言: 维护公司项目,用的是JDK6 + spring2.5.6.SEC01,需求是实现一个rabbitmq客户端发送消息的工具类。 ...尝试直接使用官方rabbitmq-java客户端amqp-client,每次发送都得创建和销毁channe...

    前言:

    维护公司项目,用的是JDK6 + spring2.5.6.SEC01,需求是实现一个rabbitmq客户端发送消息的工具类。

    太难了~非springboot项目,无法直接配置使用rabbitTemplate;版本太低不能也不敢修改spring版本,无法使用amqpTemplate;尝试直接使用官方rabbitmq-java客户端amqp-client,每次发送都得创建和销毁channel,Public速度只有13/s,实在太慢......中间各种踩坑,最后使用apache的commons-pool2通用对象池解决(jedis连接池(v3.0.1)用的也是这个redis.clients.jedis.util.Pool#internalPool)。

    为了适配公司项目,项目主要配置如下,各位看官请知悉:

    jdk 1.6,amqp-client:4.0.3,commons-pool2:2.0

    代码在文末。


    介绍:

    • 连接池/对象池(ObjectPool):用于存放连接对象的一个池子(集合),通常用数组或者List对象.durid用两个数组分别保存活跃的连接和空闲连接.commons-pool2用双端阻塞队列LinkedBlockingDeque保存空闲连接
    • 用ConcurrentHashMap保存所有的连接.
    • 对象工厂(PooledObjectFactory):连接池工厂,用于产生一个新的连接对象.
    • 连接对象/池中对象(PooledObject):连接池里面存放的对象.

    相关的类:

    连接池主要两个类,PooledObjectFactory:管理池中对象,如创建/销毁对象,验证对象是否可用,钝化/激活对象(这个暂时不理解,所以没用到)。第二个就是GenericObjectPool了:操作池中对象,借用/归还对象。它继承了BaseGenericObjectPool,实现了ObjectPool,一般默认用这个作为连接池对象。它构造方法还可以传入两个类:GenericObjectPoolConfig,AbandonedConfig,用于连接池配置。

    PooledObjectFactory:

    GenericObjectPool继承BaseGenericObjectPool实现ObjectPool,通常用此实现来作为默认的连接池对象。
    BaseGenericObjectPool一个抽象类,主要实现了两个功能:

    1.注册JMX
    2.管理一个连接池驱逐线程,此线程调用GenericObjectPool的evict()驱逐超过最大生命周期的连接对象。

    ObjectPool连接池的最上层接口,定义了一个连接池必须具备的方法,比如借用一个连接对象T borrowObject(),归还一个使用后的连接对象void returnObject(T obj)。

    PooledObjectFactory:

    我们的自定义mq连接池得实现这五个方法:makeObject(创建对象),destroyObject(销毁对象),validateObject(验证对象可用),activateObject(激活对象),passivateObject(钝化对象)。

    GenericObjectPoolConfig:

    字段意义默认值备注
    lifo对象池存储空闲对象是使用的LinkedBlockingDeque,它本质上是一个支持FIFO和FILO的双向的队列,common-pool2中的LinkedBlockingDeque不是Java原生的队列,而有common-pool2重新写的一个双向队列。如果为true,表示使用FIFO获取对象。true 
    fairnesscommon-pool2实现的LinkedBlockingDeque双向阻塞队列使用的是Lock锁。这个参数就是表示在实例化一个LinkedBlockingDeque时,是否使用lock的公平锁。false 
    maxWaitMillis当没有空闲连接时,获取一个对象的最大等待时间。如果这个值小于0,则永不超时,一直等待,直到有空闲对象到来。如果大于0,则等待maxWaitMillis长时间,如果没有空闲对象,将抛出NoSuchElementException异常。默认值是-1;可以根据需要自己调整,单位是毫秒-1 
    minEvictableIdleTimeMillis对象最小的空闲时间。如果为小于等于0,最Long的最大值,如果大于0,当空闲的时间大于这个值时,执行移除这个对象操作。默认值是1000L * 60L * 30L;即30分钟。可以避免(连接)泄漏。1000L * 60L * 30L 
    evictorShutdownTimeoutMillisshutdown驱逐线程的超时时间。当创建驱逐线(evictor)程时,如发现已有一个evictor正在运行则会停止该evictor,evictorShutdownTimeoutMillis表示当前线程需等待多长时间让ScheduledThreadPoolExecutor(evictor继承自TimerTask,由ScheduledThreadPoolExecutor进行调度)停止该evictor线程。 当前版本没有这个属性
    softMinEvictableIdleTimeMillis对象最小的空间时间,如果小于等于0,取Long的最大值,如果大于0,当对象的空闲时间超过这个值,并且当前空闲对象的数量大于最小空闲数量(minIdle)时,执行移除操作。这个和上面的minEvictableIdleTimeMillis的区别是,它会保留最小的空闲对象数量。而上面的不会,是强制性移除的。默认值是-1;-1 
    numTestsPerEvictionRun检测空闲对象线程每次检测的空闲对象的数量。默认值是3;如果这个值小于0,则每次检测的空闲对象数量等于当前空闲对象数量除以这个值的绝对值,并对结果向上取整。3 
    testOnCreate在创建对象时检测对象是否有效,true是,默认值是false。做了这个配置会降低性能。false 
    testOnBorrow在从对象池获取对象时是否检测对象有效,true是;默认值是false。做了这个配置会降低性能。false 
    testOnReturn在向对象池中归还对象时是否检测对象有效,true是,默认值是false。做了这个配置会降低性能。false 
    testWhileIdle在检测空闲对象线程检测到对象不需要移除时,是否检测对象的有效性。true是,默认值是false。建议配置为true,不影响性能,并且保证安全性。false 
    timeBetweenEvictionRunsMillis空闲对象检测线程的执行周期,即多长时候执行一次空闲对象检测。单位是毫秒数。如果小于等于0,则不执行检测线程。默认值是-1;-1 
    blockWhenExhausted当对象池没有空闲对象时,新的获取对象的请求是否阻塞。true阻塞。默认值是true;
     
    true 
    jmxEnabled是否注册JMXtrue 
    jmxNamePrefixJMX前缀pool 
    jmxNameBase使用base + jmxNamePrefix + i来生成ObjectName  
    maxTotal对象池中管理的最多对象个数。默认值是8。
     
    8 
    maxIdle对象池中最大的空闲对象个数。默认值是8。8 
    minIdle对象池中最小的空闲对象个数。默认值是0。0 


    AbandonedConfig

    字段意义默认值备注
    logAbandoned标记是否在pooledObject被abandon的时候打出堆栈false参考GenericObjectPool#removeAbandoned
    removeAbandonedOnBorrow标记从objectPool里"借"出时,是否将一些认为无用的pooledObject给remove掉false 
    removeAbandonedTimeout当pooledObject处于allocated状态下,但是超过该时长没有用过,则abandon300(调用方会转成300s)在removeAbandonedOnBorrow=true的前提下
    removeAbandonedOnMaintenanceevictor工作时,是否进行abandonfalse 

    另外还有个EvictionConfig配置(空闲时驱逐配置),GenericKeyedObjectPool(如果需要定义多个对象池可使用,利用ConcurrentHashMap,每个key对应一个PooledObject)这里就不贴出来了。


    代码:

    思路还是1.配一个PooledObjectFactory管理对象。2.GenericObjectPool初始化配置连接池。3.RabbitMQChannelPool操作对象。

    PooledObjectFactory

    /**
     * @Author canon
     * @Date 2020/1/8
     * @Description rabbitmq连接池工厂
     **/
    public class RabbitMQChannelPoolFactory implements PooledObjectFactory<Channel> {
        private ConnectionFactory factory;
    
        public RabbitMQChannelPoolFactory(ConnectionFactory factory) {
            this.factory = factory;
        }
    
        @Override
        public PooledObject<Channel> makeObject() throws Exception {
            // 池对象创建实例化资源
            return new DefaultPooledObject<Channel>(factory.newConnection().createChannel());
        }
    
        @Override
        public void destroyObject(PooledObject<Channel> p) throws Exception {
            // 池对象销毁资源
            if (p != null && p.getObject() != null && p.getObject().isOpen()) {
                p.getObject().close();
            }
        }
    
        @Override
        public boolean validateObject(PooledObject<Channel> p) {
            // 验证资源是否可用
            return p.getObject() != null && p.getObject().isOpen();
        }
    
        @Override
        public void activateObject(PooledObject<Channel> p) throws Exception {
    
        }
    
        @Override
        public void passivateObject(PooledObject<Channel> p) throws Exception {
    
        }
    }

    GenericObjectPool

    /**
     * @Author canon
     * @Date 2020/1/8
     * @Description rabbitmq channel池
     **/
    public class RabbitMqChannelPool extends GenericObjectPool<Channel> {
        public RabbitMqChannelPool(PooledObjectFactory<Channel> factory) {
            super(factory);
        }
    
        public RabbitMqChannelPool(PooledObjectFactory<Channel> factory, GenericObjectPoolConfig config) {
            super(factory, config);
        }
    
        public RabbitMqChannelPool(PooledObjectFactory<Channel> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
            super(factory, config, abandonedConfig);
        }
    }

    RabbitMQChannelPool

    /**
     * @Author canon
     * @Date 2020/1/8
     * @Description 操作channel池
     **/
    public class RabbitMQChannelPool {
        private GenericObjectPool<Channel> pool;
    
        public RabbitMQChannelPool(RabbitMQChannelPoolFactory factory, GenericObjectPoolConfig poolConfig) {
            this.pool = new GenericObjectPool<Channel>(factory, poolConfig);
        }
    
        public Channel getChannel() {
    
            try {
                return pool.borrowObject();
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
    
        public void returnChannel(Channel channel) {
            if (channel != null) {
                pool.returnObject(channel);
            }
        }
    }

    spirng配置(公司项目):

    <!--    rabbitmq连接及channel池配置开始    -->
        <bean id="rabbitConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
            <property name="host" value="${mq.host}"/>
            <property name="port" value="${mq.port}"/>
            <property name="virtualHost" value="${mq.virtual}"/>
            <property name="username" value="${mq.username}"/>
            <property name="password" value="${mq.password}"/>
        </bean>
    
        <bean id="rabbitMQChannelPoolFactory" class="com.canon.push.pool.RabbitMQChannelPoolFactory">
            <constructor-arg index="0" ref="rabbitConnectionFactory"/>
        </bean>
    
        <bean id="myRabbitMqPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig">
            <property name="jmxEnabled" value="true"/>
            <property name="blockWhenExhausted" value="true"/>
            <property name="maxWaitMillis" value="5000"/>
            <property name="testOnBorrow" value="true"/>
            <property name="testOnReturn" value="true"/>
    
            <property name="maxTotal" value="100"/>
            <property name="maxIdle" value="20"/>
            <property name="minIdle" value="10"/>
            <property name="timeBetweenEvictionRunsMillis" value="6000"/>
            <property name="softMinEvictableIdleTimeMillis" value="20000"/>
        </bean>
    
        <bean id="rabbitMQChannelPool" class="com.canon.push.config.RabbitMqChannelPool">
            <constructor-arg index="0" ref="rabbitMQChannelPoolFactory"/>
            <constructor-arg index="1" ref="myRabbitMqPoolConfig"/>
        </bean>
    
        <bean id="rabbitService" class="com.canon.push.service.impl.RabbitMqServiceImpl"/>
        <!--    rabbitmq连接及channel池配置结束    -->

    代码已上传:https://gitee.com/Canon_Canon/rabbitmq_pool.git,打包引入项目即可,在项目中配置GenericObjectPoolConfig和AbandonedConfig及相关bean,就可直接@Autowried RabbitMqServiceImpl rabbitService使用了。

    测试可参考项目test目录。

    完~

    参考博客:

    commons-pool2 3 - 配置介绍,BaseObjectPoolConfig,AbandonedConfig,EvictionConfig

    利用commons-pool2自定义对象池

     

    展开全文
  • ...改了端口按理说是连接不到MQ的但是依然可以把消息插入进去,而且把端口注掉也是可以连通的; 网上找也没有找到想要的,后来老大把setAddresses() 改为 setHost(), 端口生效,端口不正确联不通 ...

    今天在测试MQ写入消息失败得时候,把信息写入数据库不做修改,但是改了密码项目启动不起来;

    改了端口按理说是连接不到MQ的但是依然可以把消息插入进去,而且把端口注掉也是可以连通的;

    网上找也没有找到想要的,后来老大把setAddresses()  改为 setHost(), 端口生效,端口不正确联不通

    展开全文
  • 自定义创建rabbitMQ的channel连接池

    千次阅读 2020-09-24 18:21:59
    参考地址:...利用commons-pool2自定义对象池 commons-pool2是Apache下一个开源的公共资源池。我们可以根据它来快速的建立一个自己的对象池。 ... 1.创建连接池工厂,实现PooledObjectFactory接口 impo

    参考地址:https://blog.csdn.net/qq447995687/article/details/80233621

    利用commons-pool2自定义对象池
    commons-pool2是Apache下一个开源的公共资源池。我们可以根据它来快速的建立一个自己的对象池。

    commons-pool2包下载地址https://pan.baidu.com/s/1lVUX_FZAbKjW87jTGJ_Rmw
    密码:a8d0

    1.创建连接池工厂,实现PooledObjectFactory接口

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.tds.smgw.util.Environment;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.PooledObjectFactory;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     1. 自定义连接池工厂
     **/
    public class RabbitMqChannelPoolFactory implements PooledObjectFactory<Channel> {
    
        private Connection connection;
    
        public RabbitMqChannelPoolFactory(){
            try {
                //创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //设置host
                factory.setHost("127.0.0.1");
                factory.setPort(5672);
                factory.setUsername("test");
                factory.setPassword("test");
                factory.setVirtualHost("/");
                factory.setConnectionTimeout(15000);
                connection = factory.newConnection();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public PooledObject<Channel> makeObject() throws Exception {
            // 池对象创建实例化资源
            return new DefaultPooledObject<>(connection.createChannel());
        }
    
        @Override
        public void destroyObject(PooledObject<Channel> p) throws Exception {
            // 池对象销毁资源
            if (p != null && p.getObject() != null && p.getObject().isOpen()) {
                p.getObject().close();
            }
        }
    
        @Override
        public boolean validateObject(PooledObject<Channel> p) {
            // 验证资源是否可用
            return p.getObject() != null && p.getObject().isOpen();
        }
    
        @Override
        public void activateObject(PooledObject<Channel> p) {
    
        }
    
        @Override
        public void passivateObject(PooledObject<Channel> p) {
    
        }
    }
    

    2.创建连接池,实现GenericObjectPool接口

    import com.rabbitmq.client.Channel;
    import org.apache.commons.pool2.PooledObjectFactory;
    import org.apache.commons.pool2.impl.AbandonedConfig;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    /**
     1. 自定义连接池
     **/
    public class RabbitMqChannelPool extends GenericObjectPool<Channel> {
    
        public RabbitMqChannelPool(PooledObjectFactory<Channel> factory) {
            super(factory);
        }
    
        public RabbitMqChannelPool(PooledObjectFactory<Channel> factory, GenericObjectPoolConfig config) {
            super(factory, config);
        }
    
        public RabbitMqChannelPool(PooledObjectFactory<Channel> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
            super(factory, config, abandonedConfig);
        }
    }
    

    3.获取和关闭channel连接

    import com.rabbitmq.client.Channel;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    /**
     * channel管理
     **/
    public class RabbitMqChannelService {
    
        private RabbitMqChannelPool pool;
    
        public static RabbitMqChannelService getInstance() {
            return SingletonHolder.INSTANCE;
        }
    
        /**
         * 获取channel对象
         * pool.borrowObject()是线程安全的
         *
         * @return 结果
         * @throws Exception 异常
         */
        public Channel getChannel() throws Exception {
            return pool.borrowObject();
        }
    
        /**
         * 归还channel对象
         *
         * @param channel 结果
         */
        public void returnChannel(Channel channel) {
            pool.returnObject(channel);
        }
    
        private static class SingletonHolder {
            private final static RabbitMqChannelService INSTANCE = new RabbitMqChannelService();
        }
    
        private RabbitMqChannelService() {
            initPool();
        }
    
        /**
         * 初始化pool
         */
        private void initPool() {
            RabbitMqChannelPoolFactory factory = new RabbitMqChannelPoolFactory();
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            // 设置最大连接数
            config.setMaxTotal(300);
            // 最大空闲连接数
            config.setMaxIdle(20);
            // 最小空闲连接数
            config.setMinIdle(10);
            // 空闲连接检测周期
            config.setTimeBetweenEvictionRunsMillis(6000);
            // 达到此空闲时间后,连接将被移除
            config.setSoftMinEvictableIdleTimeMillis(20000);
            // 连接资源耗尽后最大等待时间
            config.setMaxWaitMillis(10000);
    
            pool = new RabbitMqChannelPool(factory, config);
        }
    }
    

    4.生产者

    import com.rabbitmq.client.Channel;
    import org.apache.commons.lang.SerializationUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Serializable;
    
    /**
     * rabbit消息发送
     */
    public class RabbitMqProducer {
    
        private Logger logger = LoggerFactory.getLogger(RabbitMqProducer.class);
    
        private String queueName;
    
        public RabbitMqProducer(String queueName) {
            this.queueName = queueName;
        }
    
        /**
         * 发送信息
         *
         * @param obj 发送的内容
         */
        public void sendMessage(Serializable obj) {
            RabbitMqChannelService rabbitMqChannelService = RabbitMqChannelService.getInstance();
            Channel channel = null;
            try {
                channel = rabbitMqChannelService.getChannel();
                // 为这个通道申明一个队列,如果这个队列不存在,他将在服务器上创建
                channel.queueDeclare(queueName, true, false, false, null);
                channel.basicPublish("", queueName, null, SerializationUtils.serialize(obj));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (null != channel) {
                    rabbitMqChannelService.returnChannel(channel);
                }
            }
        }
    }
    

    5.消费者,实现Consumer接口

    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.ShutdownSignalException;
    import com.tds.smgw.service.impl.RabitMqService;
    import com.tds.smgw.util.Environment;
    import org.apache.commons.lang.SerializationUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 消费者
     * 这里channel和Consumer是一对一
     * 使用消息自动确认机制
     */
    public class RabbitMqConsumer implements Consumer {
    
        private static Logger logger = LoggerFactory.getLogger(RabbitMqConsumer.class);
    	
    	private static String queueName;
    
        private static RabitMqService rabitMqService;
    
        private Channel channel;
    
        private RabbitMqConsumer() {
            init();
        }
    
        private static class SingletonHolder {
            private static final RabbitMqConsumer INSTANCE = new RabbitMqConsumer();
        }
    
        public static RabbitMqConsumer getInstance(String queueName) {
            if (null == RabbitMqConsumer.queueName) {
                RabbitMqConsumer.queueName = queueName;
            }
    
            return SingletonHolder.INSTANCE;
        }
    
        /**
         * 设置接收消息queue
         */
        private void init() {
            try {
                channel = RabbitMqChannelService.getInstance().getChannel();
                logger.info("启动RabbitMq监听器:" + channel + "监听QUEUE【" + queueName + "】");
                // 为这个通道申明一个队列,如果这个队列不存在,他将在服务器上创建,防止不存在时报错
                channel.queueDeclare(queueName, true, false, false, null);
                // 开启接收信息,自动确认
                channel.basicConsume(queueName, true, this);
                // 开启接收信息,手动确认
                // channel.basicConsume(queueName, false, this);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (null != channel) {
                    logger.info("channel ---》" + channel);
                    // 不能归还,归还后有可能被关闭导致消费者阻塞
    //                rabbitMqChannelService.returnChannel(channel);
                }
            }
    
        }
    
        /**
         * 当用户注册时调用
         */
        @Override
        public void handleCancelOk(String consumerTag) {
            System.out.println("Consumer " + consumerTag + " registered");
        }
    
        /**
         * 当有可用新消息时调用
         */
        @Override
        @SuppressWarnings("unchecked")
        public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) {
            logger.info("------------------------------[{}]MQ监听器执行开始[{}]---------------------------"
                    , env.getRoutingKey(), System.currentTimeMillis());
            Map<String, String> map = (HashMap<String, String>) SerializationUtils.deserialize(body);
            logger.info("接收消息:" + SerializationUtils.deserialize(body));
            //     channel.basicAck(env.getDeliveryTag(), false);
            logger.info("------------------------------[{}]MQ监听器执行结束[{}]---------------------------"
                    , env.getRoutingKey(), System.currentTimeMillis());
        }
    
        @Override
        public void handleCancel(String consumerTag) {
        }
    
        @Override
        public void handleConsumeOk(String consumerTag) {
        }
    
        @Override
        public void handleRecoverOk(String consumerTag) {
        }
    
    
        @Override
        public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {
        }
    }
    

    6.测试

       public static void main(String[] args) {
            String queueName = "testQueue";
            // 需要提前启动消费者监听queue,否则监听不到生产者生产的消息
            RabbitMqConsumer.getInstance(queueName);
            // 启动rabbitMq生产者
            RabbitMqProducer producer = new RabbitMqProducer(queueName);
            Map<String, String> map = new HashMap<>(16);
            map.put("date", "date");
            map.put("time", "time");
            map.put("userId", "userId");
            producer.sendMessage((Serializable) map);
        }
    
    展开全文
  • RabbitMQ连接池、生产者、消费者实例

    千次阅读 2019-04-15 09:50:44
    1、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!! 2、下面是链接主要代码: 1 import java.util.HashMap; 2 import java.util.Map; 3 4 import ...

    1、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!!

    2、下面是链接池主要代码:

    复制代码

     1 import java.util.HashMap;
     2 import java.util.Map;
     3 
     4 import org.apache.commons.lang3.StringUtils;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 /**
    12  * 获取RabbitMq连接
    13  * @author skyfeng
    14  */
    15 public class RabbitMqConnectFactory {
    16     static Logger log = LoggerFactory.getLogger(RabbitMqConnectFactory.class);
    17     /**
    18      * 缓存连接工厂,将建立的链接放入map缓存
    19      */
    20     private static Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<String, ConnectionFactory>();
    21     /**
    22      * 根据rabbitMqName获取一个连接,使用完记得要自己关闭连接 conn.close()
    23      */
    24     public static Connection getConnection(String rabbitMqName) {
    25         if(StringUtils.isEmpty(rabbitMqName)){
    26             log.error("rabbitMqName不能为空!");
    27             throw new java.lang.NullPointerException("rabbitMqName为空");
    28         }
    29         if(connectionFactoryMap.get(rabbitMqName)==null){
    30             initConnectionFactory(rabbitMqName);
    31         }
    32         ConnectionFactory connectionFactory = connectionFactoryMap.get(rabbitMqName);
    33         if(connectionFactory==null){
    34             log.info("没有找到对应的rabbitmq,name={}",rabbitMqName);
    35         }
    36         try {
    37             return connectionFactory.newConnection();
    38         }catch (Exception e) {
    39             log.error("创建rabbitmq连接异常!",e);
    40             return null;
    41         }
    42     }
    43     /**
    44      * 初始化一个连接工厂
    45      * @param rabbitMqName
    46      */
    47     private static void initConnectionFactory(String rabbitMqName){
    48         
    49         try {
    50             ConnectionFactory factory = new ConnectionFactory();
    51             //新增代码,如果连接断开会自动重连
    52             //factory.setAutomaticRecoveryEnabled(true);
    53             factory.setHost("127.0.0.1");
    54             factory.setPort(5672);
    55             //factory.setVirtualHost(vhost);
    56             factory.setUsername("test");
    57             factory.setPassword("test");
    58             connectionFactoryMap.put(rabbitMqName, factory);
    59         } catch (Exception e) {
    60             e.printStackTrace();
    61         }finally{
    62         }
    63     }
    64     
    65 }

    复制代码

     3、消费端的代码:

    复制代码

     1 import org.slf4j.Logger;
     2 import org.slf4j.LoggerFactory;
     3 
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.Consumer;
     7 
     8 /**
     9  * RabbitMQq客户端代码
    10  * @author skyfeng
    11  *
    12  */
    13 public class CustomerMqClient {
    14 
    15     final static Logger log = LoggerFactory.getLogger(CustomerMqClient.class);
    16     private final static String RABBITMQ_NAME = "mq_name";
    17     private final static String EXCHANGE_NAME = "Exchange_name";
    18     private final static String QUEUE_NAME = "queue_name";
    19     private static Channel channel = null;
    20     private static Connection connection = null;
    21     
    22     /**
    23      * 初始化客户端代码
    24      */
    25     public static void initClient() {
    26         //重新链接时判断之前的channel是否关闭,没有关闭先关闭
    27         if(null != channel  && channel.isOpen()){
    28             try {
    29                 channel.close();
    30             } catch (Exception e) {
    31                 log.error("mq name =[" +RABBITMQ_NAME+"] close old channel exception.e={}",e);
    32             }finally {
    33                 channel = null;
    34             }
    35         }
    36         //重新链接时判断之前的connection是否关闭,没有关闭先关闭
    37         if (null != connection && connection.isOpen()) {
    38             try {
    39                 connection.close();
    40             } catch (Exception e) {
    41                 log.error("mq name =[" +RABBITMQ_NAME+"] close old connection exception.e={}",e);
    42             }finally{
    43                 connection = null;
    44             }
    45         }
    46         //从链接池中获取链接
    47         connection = RabbitMqConnectFactory.getConnection(RABBITMQ_NAME);
    48         try {
    49             channel = connection.createChannel();
    50             channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    51             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    52             channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");//#号接收所有的数据
    53             Consumer consumer = new CustomerMqConsumer(channel);//具体的业务逻辑在CustomerMqConsumer中
    54             channel.basicConsume(QUEUE_NAME, false, consumer);
    55         } catch (Exception e) {
    56             log.error("CustomerMqClient mq client connection fail .....{}", e);
    57             //发生异常时,重连
    58             reConnect();
    59         }
    60     }
    61 
    62     // 异常时,重连的方法
    63     public static void reConnect() {
    64         log.error("等待5s后重连");
    65         try {
    66             Thread.sleep(5000);
    67         } catch (InterruptedException e) {
    68         }
    69         initClient();
    70     }
    71 
    72 }

    复制代码

    4、生产端代码:

    复制代码

     1 import org.apache.commons.lang3.StringUtils;
     2 import org.slf4j.Logger;
     3 import org.slf4j.LoggerFactory;
     4 
     5 import com.rabbitmq.client.AlreadyClosedException;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 /**
    10  * 把数据发送到rabbitmq的exchange,
    11  */
    12 public class SendToExchange {
    13     static Logger log = LoggerFactory.getLogger(SendToExchange.class);
    14     
    15     final static String TYPE = "topic";
    16     final static String CHARSET_UTF8 = "UTF-8";
    17     //MQ生产者exchange,把数据发给这个exchange
    18     final static String rabbitExchangeName = "ExchangeName";
    19     static boolean mqConnected = false;//mq当前处于连接状态
    20     
    21     static Channel channel=null;
    22     static{
    23         init();
    24     }
    25     public static void init(){
    26         log.info(" rabbit mq init begin...");
    27         try {
    28             //在mq连接中断后,发送程序判断已经断开,启动重连的时候会执行
    29             if(channel!=null){
    30                 try {
    31                     channel.close();
    32                 } catch (Exception e) {
    33                     log.error("关闭老channel 异常",e);
    34                 }finally{
    35                     channel = null;
    36                 }
    37             }
    38             Connection connection = RabbitMqConnectFactory.getConnection("connection");
    39             channel = connection.createChannel();
    40             /*
    41              *这里只定义exchange,因为每个业务模块都会从这里接入数据,所以不在这里定义队列
    42              *队列的定义在各个业务模块自己的消费端定义
    43              */
    44             channel.exchangeDeclare(rabbitExchangeName, TYPE, true, false, null);
    45             log.info(" rabbit mq init OK");
    46             mqConnected = true;
    47         } catch (Exception e) {
    48             log.error("rabbitmq初始化错误",e);
    49             mqConnected = false;
    50         }
    51     }
    52     /**
    53      * 往rabbitmq发数据
    54      * @param message
    55      */
    56     public static void sendToRabbitMq(String message,String routingKey){
    57         try {
    58             if(StringUtils.isEmpty(message)){
    59                 log.debug("message is empty");
    60                 return;
    61             }
    62             channel.basicPublish(rabbitExchangeName, routingKey, null, message.getBytes(CHARSET_UTF8));
    63         }catch(AlreadyClosedException ex){
    64             log.error("往rabbitmq发数据报错,可能连接已关闭,尝试重连,data:",message,ex);
    65             init();
    66         }catch (Exception e) {
    67             log.error("往rabbitmq发数据报错,data:",message,e);
    68         }
    69     }
    70 }
    展开全文
  • rabbitmq的connection连接池1.1 问题提出1.1.1 Connection对象管理以及性能1.1.2 Channel对象管理以及性能1.2 Spring AMQP线程池配置1.2.1 ConnectionFactory连接工厂1.2.2 消费发送和接收使用不同的Connection ...
  • 1、java工具类RabbitmqUtil是本人封装好的直接使用 2、常用的方法都在里面 3、改一下初始化配置参数就行
  • 解决Rabbitmq连接超时问题

    千次阅读 2019-12-05 16:34:48
    超时原因 : 当安装centos时修改主机后没有hosts文件中同步,所以访问时需要多重解析,就会导致超时;...3、退出保存然后重启Rabbitmq就可以了 service rabbitmq-server restart 获取更多资源可以关注我的公众号...
  • 基于Golang实现的Rabbitmq 连接池

    千次阅读 2016-07-01 16:08:46
     之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己...
  • 前言 现如今的互联网应用大都是采用 分布式系统架构 设计的,所以 ...有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分数据库 如 Redis、MySQL 以及 phxsql ,如果硬搞的话
  • Java连接池

    2018-07-01 15:20:01
    参数(所有参数都有默认值):初始大小:10个最小空闲连接数:3个增量:一次创建的最小单位(5个)最大空闲连接数:12个最大连接数:20个最大的等待时间:1000毫秒 //加载链接数据库的信息 InputStream is = ...
  • rabbitMQ java api指南

    2019-02-12 17:11:14
    RabbitMQ Java client 将com.rabbitmq.client作为其顶层包. 关键类和接口有: Channel Connection ConnectionFactory Consumer 协议操作可通过Channel接口来进行.Connection用于开启channels,注册connection生命...
  • Rabbitmq Java Client Api详解
  • java实现RQBBITMQ连接池

    2019-03-08 14:50:45
    基于数据库连接池改编的RabbitMQ java实现,包含效率测试以及生产者消费者连接池的使用demo。实测下来,运行效率是非连接池使用效率的5倍左右。使用方便简单,不需要引入其他什么jar,直接能在项目中使用。
  • RabbitMQ入门指南

    2020-09-01 16:41:18
    当然我们也可以在系统间保持一个长连接,基于底层socket机制进行数据的实时收发,如果再将这部分功能独立成一个中间件,供项目中所有系统使用,就是我们今天所指的MQ。 对比&选择 以下以当前较为流行社区活跃度...
  • 背景: 监听器针对RabbitMQ队列做业务数据处理 系统问题表现: 业务系统无法正常使用,所有请求均不予相应,报404异常 ...dbcp连接池(开始使用) [WARN ] 19:01:05.762 [SimpleAsyncTaskExecutor-1] o.h.util.JDBCEx...
  • 定义工具类 package helloworld.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;...import java.util.concurrent.TimeoutException; /** * @Author: xj09
  • RabbitMQ Channel设计看连接复用

    千次阅读 2020-02-23 23:23:51
    今天公司有同事在做RabbitMQ的分享的时候,讲到了Connection和Channel的设计,有同学有疑惑,为什么不用连接池实现,而要通过Channel的方式实现呢? 先脑补下Connection和Channel的关系 即可以在一个连接上...
  • 源码精品专栏原创 | Java 2020 超神之路,很肝~中文详细注释的开源项目RPC 框架 Dubbo 源码解析网络应用框架 Netty 源码解析消息中间件 RocketMQ 源码解析数据库中间件 Sharding-JDBC 和 MyCAT 源码解析作业调度...
  • 笔者所在的是一家金融科技公司,但公司内部并没有采用在金融支付领域更为流行的RabbitMQ,而是采用了设计之初就为日志处理而生的Kafka,所以我一直很好奇Kafka的高可用实现和保障。从Kafka部署后,系统内部使用的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,088
精华内容 2,435
关键字:

java连接池连接rabbitmq

java 订阅