精华内容
下载资源
问答
  • RabbitMQ客户连接池Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
  • 原文出处:个人博客地址:http://www.damonyi.cc/?p=11之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,...

    原文出处:个人博客地址:http://www.damonyi.cc/?p=11

    之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己处理。

    最近又开发了一个Golang的项目,里面也用到了rabbitmq ,google了一下,golang的Rabbitmq库,也不完善

    1.多个channel共享一个connection

    2.连接断开后的重建连接

    项目目前也主要是为了解决这个问题,因此整了个Rabbitmq的Connection Pool (实际上是Channel Pool,存储了一个Connection的多个Channel),实现思路可以参考设计的Struct

    type MqConnection struct {

    Lock sync.RWMutex

    Connection *amqp.Connection

    MqUri string

    }

    type ChannelContext struct {

    Exchange string

    ExchangeType string

    RoutingKey string

    Reliable bool

    Durable bool

    ChannelId string

    Channel *amqp.Channel

    }

    type BaseMq struct {

    MqConnection *MqConnection

    //channel cache

    ChannelContexts map[string]*ChannelContext

    }

    查看我定义的Struct,可以看到就是定义了一个struct 保存了Connection信息和全部的Channel信息。当使用BasMq的Publish方法时,首先判断是否已经存在对应的Channel了,如果有,就用已经存在的Channel。发送消息代码如下

    func TestMyMqConnection1() {

    var mq1 *mq.BaseMq

    mq1 = mq.GetConnection("manager")

    channleContxt := mq.ChannelContext{Exchange: "docker-exchange1",

    ExchangeType: "direct",

    RoutingKey: "docker1",

    Reliable: true,

    Durable: false}

    for {

    fmt.Println("sending message111")

    mq1.Publish(&channleContxt, "helllosaga")

    time.Sleep(10 * time.Second)

    }

    }

    参数只传入Channel信息,RabbitmqServer 可以根据业务需求调整,具体可参考源码 rabbitmq-golangclient

    展开全文
  • ...改了端口按理说是连接不到MQ的但是依然可以把消息插入进去,而且把端口注掉也是可以连通的; 网上找也没有找到想要的,后来老大把setAddresses() 改为 setHost(), 端口生效,端口不正确联不通 ...

    今天在测试MQ写入消息失败得时候,把信息写入数据库不做修改,但是改了密码项目启动不起来;

    改了端口按理说是连接不到MQ的但是依然可以把消息插入进去,而且把端口注掉也是可以连通的;

    网上找也没有找到想要的,后来老大把setAddresses()  改为 setHost(), 端口生效,端口不正确联不通

    展开全文
  • 基于Golang实现的Rabbitmq 连接池

    千次阅读 2016-07-01 16:08:46
     之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己...

     原文出处:个人博客地址:http://www.damonyi.cc/?p=11

          之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己处理。

         最近又开发了一个Golang的项目,里面也用到了rabbitmq ,google了一下,golang的Rabbitmq库,也不完善

            1.多个channel共享一个connection

            2.连接断开后的重建连接

      项目目前也主要是为了解决这个问题,因此整了个Rabbitmq的Connection Pool (实际上是Channel Pool,存储了一个Connection的多个Channel),实现思路可以参考设计的Struct

    type MqConnection struct {
    	Lock       sync.RWMutex
    	Connection *amqp.Connection
    	MqUri      string
    }
    
    type ChannelContext struct {
    	Exchange     string
    	ExchangeType string
    	RoutingKey   string
    	Reliable     bool
    	Durable      bool
    	ChannelId    string
    	Channel      *amqp.Channel
    }
    type BaseMq struct {
    	MqConnection *MqConnection
    
    	//channel cache
    	ChannelContexts map[string]*ChannelContext
    }

    查看我定义的Struct,可以看到就是定义了一个struct 保存了Connection信息和全部的Channel信息。当使用BasMq的Publish方法时,首先判断是否已经存在对应的Channel了,如果有,就用已经存在的Channel。发送消息代码如下

    func TestMyMqConnection1() {
    	var mq1 *mq.BaseMq
    	mq1 = mq.GetConnection("manager")
    	channleContxt := mq.ChannelContext{Exchange: "docker-exchange1",
    		ExchangeType: "direct",
    		RoutingKey:   "docker1",
    		Reliable:     true,
    		Durable:      false}
    	for {
    		fmt.Println("sending message111")
    		mq1.Publish(&channleContxt, "helllosaga")
    		time.Sleep(10 * time.Second)
    	}
    }

    参数只传入Channel信息,RabbitmqServer 可以根据业务需求调整,具体可参考源码 rabbitmq-golangclient

    展开全文
  • RabbitMQ连接池、生产者、消费者实例

    千次阅读 2019-04-15 09:50:44
    1、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!! 2、下面是链接主要代码: 1 import java.util.HashMap; 2 import java.util.Map; 3 4 import ...

    1、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!!

    2、下面是链接池主要代码:

    复制代码

     1 import java.util.HashMap;
     2 import java.util.Map;
     3 
     4 import org.apache.commons.lang3.StringUtils;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 /**
    12  * 获取RabbitMq连接
    13  * @author skyfeng
    14  */
    15 public class RabbitMqConnectFactory {
    16     static Logger log = LoggerFactory.getLogger(RabbitMqConnectFactory.class);
    17     /**
    18      * 缓存连接工厂,将建立的链接放入map缓存
    19      */
    20     private static Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<String, ConnectionFactory>();
    21     /**
    22      * 根据rabbitMqName获取一个连接,使用完记得要自己关闭连接 conn.close()
    23      */
    24     public static Connection getConnection(String rabbitMqName) {
    25         if(StringUtils.isEmpty(rabbitMqName)){
    26             log.error("rabbitMqName不能为空!");
    27             throw new java.lang.NullPointerException("rabbitMqName为空");
    28         }
    29         if(connectionFactoryMap.get(rabbitMqName)==null){
    30             initConnectionFactory(rabbitMqName);
    31         }
    32         ConnectionFactory connectionFactory = connectionFactoryMap.get(rabbitMqName);
    33         if(connectionFactory==null){
    34             log.info("没有找到对应的rabbitmq,name={}",rabbitMqName);
    35         }
    36         try {
    37             return connectionFactory.newConnection();
    38         }catch (Exception e) {
    39             log.error("创建rabbitmq连接异常!",e);
    40             return null;
    41         }
    42     }
    43     /**
    44      * 初始化一个连接工厂
    45      * @param rabbitMqName
    46      */
    47     private static void initConnectionFactory(String rabbitMqName){
    48         
    49         try {
    50             ConnectionFactory factory = new ConnectionFactory();
    51             //新增代码,如果连接断开会自动重连
    52             //factory.setAutomaticRecoveryEnabled(true);
    53             factory.setHost("127.0.0.1");
    54             factory.setPort(5672);
    55             //factory.setVirtualHost(vhost);
    56             factory.setUsername("test");
    57             factory.setPassword("test");
    58             connectionFactoryMap.put(rabbitMqName, factory);
    59         } catch (Exception e) {
    60             e.printStackTrace();
    61         }finally{
    62         }
    63     }
    64     
    65 }

    复制代码

     3、消费端的代码:

    复制代码

     1 import org.slf4j.Logger;
     2 import org.slf4j.LoggerFactory;
     3 
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.Consumer;
     7 
     8 /**
     9  * RabbitMQq客户端代码
    10  * @author skyfeng
    11  *
    12  */
    13 public class CustomerMqClient {
    14 
    15     final static Logger log = LoggerFactory.getLogger(CustomerMqClient.class);
    16     private final static String RABBITMQ_NAME = "mq_name";
    17     private final static String EXCHANGE_NAME = "Exchange_name";
    18     private final static String QUEUE_NAME = "queue_name";
    19     private static Channel channel = null;
    20     private static Connection connection = null;
    21     
    22     /**
    23      * 初始化客户端代码
    24      */
    25     public static void initClient() {
    26         //重新链接时判断之前的channel是否关闭,没有关闭先关闭
    27         if(null != channel  && channel.isOpen()){
    28             try {
    29                 channel.close();
    30             } catch (Exception e) {
    31                 log.error("mq name =[" +RABBITMQ_NAME+"] close old channel exception.e={}",e);
    32             }finally {
    33                 channel = null;
    34             }
    35         }
    36         //重新链接时判断之前的connection是否关闭,没有关闭先关闭
    37         if (null != connection && connection.isOpen()) {
    38             try {
    39                 connection.close();
    40             } catch (Exception e) {
    41                 log.error("mq name =[" +RABBITMQ_NAME+"] close old connection exception.e={}",e);
    42             }finally{
    43                 connection = null;
    44             }
    45         }
    46         //从链接池中获取链接
    47         connection = RabbitMqConnectFactory.getConnection(RABBITMQ_NAME);
    48         try {
    49             channel = connection.createChannel();
    50             channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    51             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    52             channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");//#号接收所有的数据
    53             Consumer consumer = new CustomerMqConsumer(channel);//具体的业务逻辑在CustomerMqConsumer中
    54             channel.basicConsume(QUEUE_NAME, false, consumer);
    55         } catch (Exception e) {
    56             log.error("CustomerMqClient mq client connection fail .....{}", e);
    57             //发生异常时,重连
    58             reConnect();
    59         }
    60     }
    61 
    62     // 异常时,重连的方法
    63     public static void reConnect() {
    64         log.error("等待5s后重连");
    65         try {
    66             Thread.sleep(5000);
    67         } catch (InterruptedException e) {
    68         }
    69         initClient();
    70     }
    71 
    72 }

    复制代码

    4、生产端代码:

    复制代码

     1 import org.apache.commons.lang3.StringUtils;
     2 import org.slf4j.Logger;
     3 import org.slf4j.LoggerFactory;
     4 
     5 import com.rabbitmq.client.AlreadyClosedException;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 /**
    10  * 把数据发送到rabbitmq的exchange,
    11  */
    12 public class SendToExchange {
    13     static Logger log = LoggerFactory.getLogger(SendToExchange.class);
    14     
    15     final static String TYPE = "topic";
    16     final static String CHARSET_UTF8 = "UTF-8";
    17     //MQ生产者exchange,把数据发给这个exchange
    18     final static String rabbitExchangeName = "ExchangeName";
    19     static boolean mqConnected = false;//mq当前处于连接状态
    20     
    21     static Channel channel=null;
    22     static{
    23         init();
    24     }
    25     public static void init(){
    26         log.info(" rabbit mq init begin...");
    27         try {
    28             //在mq连接中断后,发送程序判断已经断开,启动重连的时候会执行
    29             if(channel!=null){
    30                 try {
    31                     channel.close();
    32                 } catch (Exception e) {
    33                     log.error("关闭老channel 异常",e);
    34                 }finally{
    35                     channel = null;
    36                 }
    37             }
    38             Connection connection = RabbitMqConnectFactory.getConnection("connection");
    39             channel = connection.createChannel();
    40             /*
    41              *这里只定义exchange,因为每个业务模块都会从这里接入数据,所以不在这里定义队列
    42              *队列的定义在各个业务模块自己的消费端定义
    43              */
    44             channel.exchangeDeclare(rabbitExchangeName, TYPE, true, false, null);
    45             log.info(" rabbit mq init OK");
    46             mqConnected = true;
    47         } catch (Exception e) {
    48             log.error("rabbitmq初始化错误",e);
    49             mqConnected = false;
    50         }
    51     }
    52     /**
    53      * 往rabbitmq发数据
    54      * @param message
    55      */
    56     public static void sendToRabbitMq(String message,String routingKey){
    57         try {
    58             if(StringUtils.isEmpty(message)){
    59                 log.debug("message is empty");
    60                 return;
    61             }
    62             channel.basicPublish(rabbitExchangeName, routingKey, null, message.getBytes(CHARSET_UTF8));
    63         }catch(AlreadyClosedException ex){
    64             log.error("往rabbitmq发数据报错,可能连接已关闭,尝试重连,data:",message,ex);
    65             init();
    66         }catch (Exception e) {
    67             log.error("往rabbitmq发数据报错,data:",message,e);
    68         }
    69     }
    70 }
    展开全文
  • rabbitmq的connection连接池1.1 问题提出1.1.1 Connection对象管理以及性能1.1.2 Channel对象管理以及性能1.2 Spring AMQP线程池配置1.2.1 ConnectionFactory连接工厂1.2.2 消费发送和接收使用不同的Connection ...
  • 前言: 维护公司项目,用的是JDK6 + spring2.5.6.SEC01,需求是实现一个rabbitmq客户端发送消息的工具类。 ...尝试直接使用官方rabbitmq-java客户端amqp-client,每次发送都得创建和销毁channe...
  • java NIO是IO的多路复用,Channel连接是TCP的多路复用。那么他们有什么关系呢?NIO是服务器开启一个线程,在内核中使用select()进行轮询管理一些socket,当socket数据准备好时,会通知应用程序进行读写请求。系统...
  • 文章目录引入什么是ThreadLocal使用...因为rabbitMq本身是不支持批量发消息的,所以我们可以直接使用上文所创建的连接池来发送。 最简单的代码是这样的: # ProducerClient.class @Override public void se...
  • rabbitmq 配置多个消费者(转载)

    千次阅读 2017-09-03 17:19:00
    在通常的使用中(Java项目),我们一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQjava client来和Broker交互,比如我们会用如下配置来建立RabbitMQ连接池、声明Queue以及指明监听者的监听...
  • -- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 taskExecutor这个需要自己实现一个连接池 按照官方说法 除非特别大的数据量 一般不需要连接池--> </beans>
  • ps:其实之所以是有上面的疑问是因为数据库连接池那个地方来的,因为数据库连接connection并没有说是线程安全的,所以为了线程安全会为每个事物单独分配一个连接。但是rocketmq用的是netty的长连接channel,Java 上...
  • 数据库连接池:德鲁伊 身份认证:自研单点登录 执行双向:执行异步抽象,天行自适应 分布式锁:统一抽象,Redis分布式锁适应 Eventbus:自研Eventbus,支持RocketMQ,Kafka,RabbitMQ 状态机:自研状态机,状态机...
  • Java相关类

    2020-02-18 10:07:16
    1、数据库:mysql 数据库连接池 druid、mybatis 2、消息队列:kafka rabbitmq 3、网络:http client 4、多线程:thread 5、定时器:Timer 6、拦截器:Interceptor 7、过滤器:Filter 8、监听器:Listener 9...
  • 简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据...阿里巴巴数据库连接池开源项目 Druid 阿里巴巴实时数据同步工具 DTS 问题反馈 报告 issue: github issues
  • Java技术发展方向必备知识!如何成为一个优秀的软件技术工程师!加QQ群:675997991 1、后端 服务框架:springbootspringclould、zookeeper、Rest...数据库连接池:AlibabaDruid1.0 核心框架:Springframework ...
  • JAVA复习大纲

    2021-02-19 13:16:32
    3.数据库连接池 4.多线程,线程池(乐观锁,悲观锁,事务,事务隔离级别) 5.应用安全:过滤器过滤非法参数,幂等性 6.常用组件特性:redis mongodb rabbitmq ,nginx 7.常用开发工具:ecilpse, ide,webstorm,maven git svn...
  • zookeeperRest服务 缓存Redis 消息中间件RabbitMQ 负载均衡Nginx 分布式文件FastDFS 数据库连接池Alibaba?Druid?1.0 核心框架Spring?framework 安全框架Apache?Shiro?1.2 视图框架Spring?MVC?4.0 服务端验
  • 项目预览 项目架构 扫码加微信,备注技术新潮流。 项目介绍 我,请始皇[打钱]是一个前分离的工具人系统,项目采用...数据库连接池 开源软件 对象存储 MinIO 对象存储 智威汤逊 JWT登录支持 LogStash 日志收集工
  • 连接池 数据库中间件 Redis 框架 大数据 日志系统 开放平台(如微信) 测试 机器学习 Devpos CI 工具 常用工具类/代码质量 Excel PDF 开发必备 API 请求 Markdown 其他 说明 公众号 成员列表 ...
  • elasticsearch 搜索引擎,正在进行时 ----&gt; elk日志框架 mq rabbitmq 消息队列 完成。kafaka的区别呢?...http线程池 和数据库连接池的区别,springdata源码的了解 spring源码 spring...
  • 框架搭建 一、初步设想,使用springboot,框架打算用到依赖 spring web,devTools,mysql,Aspect,Redis,Lombok,Freemark,Shiro,Rabbitmq,MyBatis ...之后再添加些必要依赖,如日志,数据库连接池,mybait...
  • 连接池 串行化技术 影子Master架构 批量写入 配置中心 去中心化 通讯机制 同步 RPC RMI 异步 MQ Cron 数据层架构设计 缓存优化 DAO&ORM; 双主架构 主从同步 读写分离 性能优化架构能力 代码级别...
  • Druid | 数据库连接池 | [https://github.com/alibaba/druid](https://github.com/alibaba/druid) OSS | 对象存储 | [https://github.com/aliyun/aliyun-oss-java-sdk]...
  • 近期学习总结

    2019-08-26 11:55:53
    RabbitMQ队列的使用 TF-IDF提取关键词的方法 LSTM文本情感分析 MongoDB的初步使用 Redis保存序列化对象 Mysql的explain使用 Mysql索引的深入理解 ...数据库连接池c3p0 mybatis的使用和mybatis-gener...
  • mall-swarm ...数据库连接池 https://github.com/alibaba/druid OSS 对象存储 https://github.com/aliyun/aliyun-oss-java-sdk MinIO 对象存储 https://github.com/minio/minio JWT JWT登录支持 ...
  • 对于p2p节点通信,可以使用服务器套接字,或者使用Java JXTA协议或每个节点具有单独队列的RabbitMQ也是有效的选择 诸如交互式图表之类的许多功能都依赖于我没有添加的数据库连接,因为我找不到合适的键值存储,因此...
  • Spring Boot 使用druid连接池连接数据库(官方start) Druid简介(Spring Boot + Mybatis + Druid数据源【官方start】) spring-boot-student-encode AES,MD5,SHA1加密算法 加密算法 AES MD5 SHA1 spring-...

空空如也

空空如也

1 2
收藏数 38
精华内容 15
关键字:

java连接池连接rabbitmq

java 订阅