-
使用Java编写的RabbitMQ连接池方法
2017-11-22 17:54:31RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们... -
python rabbitmq连接池_基于Golang实现的Rabbitmq 连接池
2021-02-09 05:45:43原文出处:个人博客地址:http://www.damonyi.cc/?p=11之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,...原文出处:个人博客地址:http://www.damonyi.cc/?p=11
之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己处理。
最近又开发了一个Golang的项目,里面也用到了rabbitmq ,google了一下,golang的Rabbitmq库,也不完善
1.多个channel共享一个connection
2.连接断开后的重建连接
项目目前也主要是为了解决这个问题,因此整了个Rabbitmq的Connection Pool (实际上是Channel Pool,存储了一个Connection的多个Channel),实现思路可以参考设计的Struct
type MqConnection struct {
Lock sync.RWMutex
Connection *amqp.Connection
MqUri string
}
type ChannelContext struct {
Exchange string
ExchangeType string
RoutingKey string
Reliable bool
Durable bool
ChannelId string
Channel *amqp.Channel
}
type BaseMq struct {
MqConnection *MqConnection
//channel cache
ChannelContexts map[string]*ChannelContext
}
查看我定义的Struct,可以看到就是定义了一个struct 保存了Connection信息和全部的Channel信息。当使用BasMq的Publish方法时,首先判断是否已经存在对应的Channel了,如果有,就用已经存在的Channel。发送消息代码如下
func TestMyMqConnection1() {
var mq1 *mq.BaseMq
mq1 = mq.GetConnection("manager")
channleContxt := mq.ChannelContext{Exchange: "docker-exchange1",
ExchangeType: "direct",
RoutingKey: "docker1",
Reliable: true,
Durable: false}
for {
fmt.Println("sending message111")
mq1.Publish(&channleContxt, "helllosaga")
time.Sleep(10 * time.Second)
}
}
参数只传入Channel信息,RabbitmqServer 可以根据业务需求调整,具体可参考源码 rabbitmq-golangclient
-
RabbitMQ JAVA里运用连接池端口无效
2019-07-26 14:26:51...改了端口按理说是连接不到MQ的但是依然可以把消息插入进去,而且把端口注掉也是可以连通的; 网上找也没有找到想要的,后来老大把setAddresses() 改为 setHost(), 端口生效,端口不正确联不通 ...今天在测试MQ写入消息失败得时候,把信息写入数据库不做修改,但是改了密码项目启动不起来;
改了端口按理说是连接不到MQ的但是依然可以把消息插入进去,而且把端口注掉也是可以连通的;
网上找也没有找到想要的,后来老大把setAddresses() 改为 setHost(), 端口生效,端口不正确联不通
-
基于Golang实现的Rabbitmq 连接池
2016-07-01 16:08:46之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己...原文出处:个人博客地址:http://www.damonyi.cc/?p=11
之前项目中需要写个Python 版本的Agent,里面用到了Rabbitmq,中间遇到了好多坑啊,最主要的原因就是Python的Rabbitmq 库没有java的完善,像spring-Rabbitmq,什么都给你处理了。但是在Python中各种异常都需要自己处理。
最近又开发了一个Golang的项目,里面也用到了rabbitmq ,google了一下,golang的Rabbitmq库,也不完善
1.多个channel共享一个connection
2.连接断开后的重建连接
项目目前也主要是为了解决这个问题,因此整了个Rabbitmq的Connection Pool (实际上是Channel Pool,存储了一个Connection的多个Channel),实现思路可以参考设计的Struct
type MqConnection struct { Lock sync.RWMutex Connection *amqp.Connection MqUri string } type ChannelContext struct { Exchange string ExchangeType string RoutingKey string Reliable bool Durable bool ChannelId string Channel *amqp.Channel } type BaseMq struct { MqConnection *MqConnection //channel cache ChannelContexts map[string]*ChannelContext }
查看我定义的Struct,可以看到就是定义了一个struct 保存了Connection信息和全部的Channel信息。当使用BasMq的Publish方法时,首先判断是否已经存在对应的Channel了,如果有,就用已经存在的Channel。发送消息代码如下
func TestMyMqConnection1() { var mq1 *mq.BaseMq mq1 = mq.GetConnection("manager") channleContxt := mq.ChannelContext{Exchange: "docker-exchange1", ExchangeType: "direct", RoutingKey: "docker1", Reliable: true, Durable: false} for { fmt.Println("sending message111") mq1.Publish(&channleContxt, "helllosaga") time.Sleep(10 * time.Second) } }
参数只传入Channel信息,RabbitmqServer 可以根据业务需求调整,具体可参考源码 rabbitmq-golangclient
-
RabbitMQ连接池、生产者、消费者实例
2019-04-15 09:50:441、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!! 2、下面是链接池主要代码: 1 import java.util.HashMap; 2 import java.util.Map; 3 4 import ...1、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!!
2、下面是链接池主要代码:
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import org.apache.commons.lang3.StringUtils; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 /** 12 * 获取RabbitMq连接 13 * @author skyfeng 14 */ 15 public class RabbitMqConnectFactory { 16 static Logger log = LoggerFactory.getLogger(RabbitMqConnectFactory.class); 17 /** 18 * 缓存连接工厂,将建立的链接放入map缓存 19 */ 20 private static Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<String, ConnectionFactory>(); 21 /** 22 * 根据rabbitMqName获取一个连接,使用完记得要自己关闭连接 conn.close() 23 */ 24 public static Connection getConnection(String rabbitMqName) { 25 if(StringUtils.isEmpty(rabbitMqName)){ 26 log.error("rabbitMqName不能为空!"); 27 throw new java.lang.NullPointerException("rabbitMqName为空"); 28 } 29 if(connectionFactoryMap.get(rabbitMqName)==null){ 30 initConnectionFactory(rabbitMqName); 31 } 32 ConnectionFactory connectionFactory = connectionFactoryMap.get(rabbitMqName); 33 if(connectionFactory==null){ 34 log.info("没有找到对应的rabbitmq,name={}",rabbitMqName); 35 } 36 try { 37 return connectionFactory.newConnection(); 38 }catch (Exception e) { 39 log.error("创建rabbitmq连接异常!",e); 40 return null; 41 } 42 } 43 /** 44 * 初始化一个连接工厂 45 * @param rabbitMqName 46 */ 47 private static void initConnectionFactory(String rabbitMqName){ 48 49 try { 50 ConnectionFactory factory = new ConnectionFactory(); 51 //新增代码,如果连接断开会自动重连 52 //factory.setAutomaticRecoveryEnabled(true); 53 factory.setHost("127.0.0.1"); 54 factory.setPort(5672); 55 //factory.setVirtualHost(vhost); 56 factory.setUsername("test"); 57 factory.setPassword("test"); 58 connectionFactoryMap.put(rabbitMqName, factory); 59 } catch (Exception e) { 60 e.printStackTrace(); 61 }finally{ 62 } 63 } 64 65 }
3、消费端的代码:
1 import org.slf4j.Logger; 2 import org.slf4j.LoggerFactory; 3 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.Consumer; 7 8 /** 9 * RabbitMQq客户端代码 10 * @author skyfeng 11 * 12 */ 13 public class CustomerMqClient { 14 15 final static Logger log = LoggerFactory.getLogger(CustomerMqClient.class); 16 private final static String RABBITMQ_NAME = "mq_name"; 17 private final static String EXCHANGE_NAME = "Exchange_name"; 18 private final static String QUEUE_NAME = "queue_name"; 19 private static Channel channel = null; 20 private static Connection connection = null; 21 22 /** 23 * 初始化客户端代码 24 */ 25 public static void initClient() { 26 //重新链接时判断之前的channel是否关闭,没有关闭先关闭 27 if(null != channel && channel.isOpen()){ 28 try { 29 channel.close(); 30 } catch (Exception e) { 31 log.error("mq name =[" +RABBITMQ_NAME+"] close old channel exception.e={}",e); 32 }finally { 33 channel = null; 34 } 35 } 36 //重新链接时判断之前的connection是否关闭,没有关闭先关闭 37 if (null != connection && connection.isOpen()) { 38 try { 39 connection.close(); 40 } catch (Exception e) { 41 log.error("mq name =[" +RABBITMQ_NAME+"] close old connection exception.e={}",e); 42 }finally{ 43 connection = null; 44 } 45 } 46 //从链接池中获取链接 47 connection = RabbitMqConnectFactory.getConnection(RABBITMQ_NAME); 48 try { 49 channel = connection.createChannel(); 50 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); 51 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 52 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");//#号接收所有的数据 53 Consumer consumer = new CustomerMqConsumer(channel);//具体的业务逻辑在CustomerMqConsumer中 54 channel.basicConsume(QUEUE_NAME, false, consumer); 55 } catch (Exception e) { 56 log.error("CustomerMqClient mq client connection fail .....{}", e); 57 //发生异常时,重连 58 reConnect(); 59 } 60 } 61 62 // 异常时,重连的方法 63 public static void reConnect() { 64 log.error("等待5s后重连"); 65 try { 66 Thread.sleep(5000); 67 } catch (InterruptedException e) { 68 } 69 initClient(); 70 } 71 72 }
4、生产端代码:
1 import org.apache.commons.lang3.StringUtils; 2 import org.slf4j.Logger; 3 import org.slf4j.LoggerFactory; 4 5 import com.rabbitmq.client.AlreadyClosedException; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 9 /** 10 * 把数据发送到rabbitmq的exchange, 11 */ 12 public class SendToExchange { 13 static Logger log = LoggerFactory.getLogger(SendToExchange.class); 14 15 final static String TYPE = "topic"; 16 final static String CHARSET_UTF8 = "UTF-8"; 17 //MQ生产者exchange,把数据发给这个exchange 18 final static String rabbitExchangeName = "ExchangeName"; 19 static boolean mqConnected = false;//mq当前处于连接状态 20 21 static Channel channel=null; 22 static{ 23 init(); 24 } 25 public static void init(){ 26 log.info(" rabbit mq init begin..."); 27 try { 28 //在mq连接中断后,发送程序判断已经断开,启动重连的时候会执行 29 if(channel!=null){ 30 try { 31 channel.close(); 32 } catch (Exception e) { 33 log.error("关闭老channel 异常",e); 34 }finally{ 35 channel = null; 36 } 37 } 38 Connection connection = RabbitMqConnectFactory.getConnection("connection"); 39 channel = connection.createChannel(); 40 /* 41 *这里只定义exchange,因为每个业务模块都会从这里接入数据,所以不在这里定义队列 42 *队列的定义在各个业务模块自己的消费端定义 43 */ 44 channel.exchangeDeclare(rabbitExchangeName, TYPE, true, false, null); 45 log.info(" rabbit mq init OK"); 46 mqConnected = true; 47 } catch (Exception e) { 48 log.error("rabbitmq初始化错误",e); 49 mqConnected = false; 50 } 51 } 52 /** 53 * 往rabbitmq发数据 54 * @param message 55 */ 56 public static void sendToRabbitMq(String message,String routingKey){ 57 try { 58 if(StringUtils.isEmpty(message)){ 59 log.debug("message is empty"); 60 return; 61 } 62 channel.basicPublish(rabbitExchangeName, routingKey, null, message.getBytes(CHARSET_UTF8)); 63 }catch(AlreadyClosedException ex){ 64 log.error("往rabbitmq发数据报错,可能连接已关闭,尝试重连,data:",message,ex); 65 init(); 66 }catch (Exception e) { 67 log.error("往rabbitmq发数据报错,data:",message,e); 68 } 69 } 70 }
-
【RabbitMQ-3】连接池的配置
2020-08-07 11:34:22rabbitmq的connection连接池1.1 问题提出1.1.1 Connection对象管理以及性能1.1.2 Channel对象管理以及性能1.2 Spring AMQP线程池配置1.2.1 ConnectionFactory连接工厂1.2.2 消费发送和接收使用不同的Connection ... -
完蛋,手写RabbitMQ客户连接池(channel池),spring版本太低的痛苦
2020-01-20 11:00:46前言: 维护公司项目,用的是JDK6 + spring2.5.6.SEC01,需求是实现一个rabbitmq客户端发送消息的工具类。 ...尝试直接使用官方rabbitmq-java客户端amqp-client,每次发送都得创建和销毁channe... -
channel rabbitmq 配置_【RabbitMQ-3】连接池的配置
2021-01-14 16:39:03java NIO是IO的多路复用,Channel连接是TCP的多路复用。那么他们有什么关系呢?NIO是服务器开启一个线程,在内核中使用select()进行轮询管理一些socket,当socket数据准备好时,会通知应用程序进行读写请求。系统... -
Java架构直通车——ThreadLocal实现RabbitMQ消息的批量发送
2020-02-29 11:16:04文章目录引入什么是ThreadLocal使用...因为rabbitMq本身是不支持批量发消息的,所以我们可以直接使用上文所创建的连接池来发送。 最简单的代码是这样的: # ProducerClient.class @Override public void se... -
rabbitmq 配置多个消费者(转载)
2017-09-03 17:19:00在通常的使用中(Java项目),我们一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听... -
spring整合RabbitMQ出现的异常
2016-10-24 02:41:30-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 taskExecutor这个需要自己实现一个连接池 按照官方说法 除非特别大的数据量 一般不需要连接池--> </beans> -
RocketMQ之连接以及连接缓存
2017-04-04 14:38:00ps:其实之所以是有上面的疑问是因为数据库连接池那个地方来的,因为数据库连接connection并没有说是线程安全的,所以为了线程安全会为每个事物单独分配一个连接。但是rocketmq用的是netty的长连接channel,Java 上... -
bird-java:bird-java基于Spring Boot为基础的开发增强组件包-源码
2021-02-05 23:50:31数据库连接池:德鲁伊 身份认证:自研单点登录 执行双向:执行异步抽象,天行自适应 分布式锁:统一抽象,Redis分布式锁适应 Eventbus:自研Eventbus,支持RocketMQ,Kafka,RabbitMQ 状态机:自研状态机,状态机... -
Java相关类
2020-02-18 10:07:161、数据库:mysql 数据库连接池 druid、mybatis 2、消息队列:kafka rabbitmq 3、网络:http client 4、多线程:thread 5、定时器:Timer 6、拦截器:Interceptor 7、过滤器:Filter 8、监听器:Listener 9... -
简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据...阿里巴巴数据库连接池开源项目 Druid 阿里巴巴实时数据同步工具 DTS 问题反馈 报告 issue: github issues
-
Java技术发展方向必备知识
2019-07-19 17:56:32Java技术发展方向必备知识!如何成为一个优秀的软件技术工程师!加QQ群:675997991 1、后端 服务框架:springbootspringclould、zookeeper、Rest...数据库连接池:AlibabaDruid1.0 核心框架:Springframework ... -
JAVA复习大纲
2021-02-19 13:16:323.数据库连接池 4.多线程,线程池(乐观锁,悲观锁,事务,事务隔离级别) 5.应用安全:过滤器过滤非法参数,幂等性 6.常用组件特性:redis mongodb rabbitmq ,nginx 7.常用开发工具:ecilpse, ide,webstorm,maven git svn... -
JAVA框架介绍和发展方向.docx
2020-03-05 13:19:07zookeeperRest服务 缓存Redis 消息中间件RabbitMQ 负载均衡Nginx 分布式文件FastDFS 数据库连接池Alibaba?Druid?1.0 核心框架Spring?framework 安全框架Apache?Shiro?1.2 视图框架Spring?MVC?4.0 服务端验 -
spring-Boot-Vue-Bank:我,请始皇[打钱]是一个前...例如Redis,RabbitMQ等(主要是多用用工具多踩踩坑)-源码
2021-02-03 10:41:04项目预览 项目架构 扫码加微信,备注技术新潮流。 项目介绍 我,请始皇[打钱]是一个前分离的工具人系统,项目采用...数据库连接池 开源软件 对象存储 MinIO 对象存储 智威汤逊 JWT登录支持 LogStash 日志收集工 -
连接池 数据库中间件 Redis 框架 大数据 日志系统 开放平台(如微信) 测试 机器学习 Devpos CI 工具 常用工具类/代码质量 Excel PDF 开发必备 API 请求 Markdown 其他 说明 公众号 成员列表 ...
-
java程序员必须要了解的基础知识
2019-01-01 22:50:55elasticsearch 搜索引擎,正在进行时 ----> elk日志框架 mq rabbitmq 消息队列 完成。kafaka的区别呢?...http线程池 和数据库连接池的区别,springdata源码的了解 spring源码 spring... -
从零搭建java后台管理系统(一)框架初步搭建
2018-09-08 14:16:00框架搭建 一、初步设想,使用springboot,框架打算用到依赖 spring web,devTools,mysql,Aspect,Redis,Lombok,Freemark,Shiro,Rabbitmq,MyBatis ...之后再添加些必要依赖,如日志,数据库连接池,mybait... -
【白雪红叶】JAVA学习技术栈梳理思维导图.xmind
2018-04-25 20:28:30连接池 串行化技术 影子Master架构 批量写入 配置中心 去中心化 通讯机制 同步 RPC RMI 异步 MQ Cron 数据层架构设计 缓存优化 DAO&ORM; 双主架构 主从同步 读写分离 性能优化架构能力 代码级别... -
电商系统,电商平台后台二次开发优秀模板,采用最新java技术栈,个人珍藏资源
2019-08-23 16:04:58Druid | 数据库连接池 | [https://github.com/alibaba/druid](https://github.com/alibaba/druid) OSS | 对象存储 | [https://github.com/aliyun/aliyun-oss-java-sdk]... -
近期学习总结
2019-08-26 11:55:53RabbitMQ队列的使用 TF-IDF提取关键词的方法 LSTM文本情感分析 MongoDB的初步使用 Redis保存序列化对象 Mysql的explain使用 Mysql索引的深入理解 ...数据库连接池c3p0 mybatis的使用和mybatis-gener... -
mall-swarm ...数据库连接池 https://github.com/alibaba/druid OSS 对象存储 https://github.com/aliyun/aliyun-oss-java-sdk MinIO 对象存储 https://github.com/minio/minio JWT JWT登录支持 ...
-
DejanvukslashTestChain:这是您的第一个存储库-源码
2021-02-11 23:54:08对于p2p节点通信,可以使用服务器套接字,或者使用Java JXTA协议或每个节点具有单独队列的RabbitMQ也是有效的选择 诸如交互式图表之类的许多功能都依赖于我没有添加的数据库连接,因为我找不到合适的键值存储,因此... -
Spring Boot 使用druid连接池连接数据库(官方start) Druid简介(Spring Boot + Mybatis + Druid数据源【官方start】) spring-boot-student-encode AES,MD5,SHA1加密算法 加密算法 AES MD5 SHA1 spring-...