精华内容
下载资源
问答
  • Mq动态监听queue

    2019-12-20 12:01:41
    RabbitAdmin spring的mq底层封装,由于各种declare操作比如declareQueue,declareExchange ... 创建mq动态添加类 @Slf4j public class MqListener { public MqListener() { param.put...

    RabbitAdmin spring的mq底层封装,由于各种declare操作比如declareQueue,declareExchange
    SimpleMessageListenerContainer,spring的监听封装

    创建mq动态添加类

    
    @Slf4j
    public class MqListener {
    
        public MqListener() {
            param.put("x-queue-type", "classic");
        }
    
        private String mqServiceName;
        private SimpleMessageListenerContainer container = null;
        private Map<String, Object> param = new HashMap<>();
        private RabbitAdmin rabbitAdmin = null;
        private IMqHandler mqHandler;
    
        public MqListener(ConnectionFactory connectionFactory, String mqServiceName,IMqHandler mqHandler) {
            this.mqServiceName = mqServiceName;
            this.mqHandler = mqHandler;
            this.rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            rabbitAdmin.afterPropertiesSet();
            container = new SimpleMessageListenerContainer();
            container.setAmqpAdmin(rabbitAdmin);
            container.setConcurrentConsumers(1);
            // 最大的并发消费者
            container.setMaxConcurrentConsumers(5);
            // 设置是否重回队列
            container.setDefaultRequeueRejected(false);
            // 设置签收模式
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //设置消费者的Arguments
            Map<String, Object> args = new HashMap<>();
            args.put("type", "my-mq-listener");
            container.setConsumerArguments(args);
            container.setConsumerTagStrategy((queue) -> mqServiceName + "_queue_" + queue + "_" + UUID.randomUUID());
            // 设置非独占模式
            container.setExclusive(false);
            // 设置consumer未被 ack 的消息个数
            container.setPrefetchCount(1);
            container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                try {
                    this.mqHandler.todo(this.mqServiceName, message);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            });
            container.setConnectionFactory(connectionFactory);
            container.afterPropertiesSet();
        }
    
        /**
         * 需要监听的queueNames
         *
         * @param queueNames
         */
        public SimpleMessageListenerContainer addListenerQueue(String... queueNames) {
            this.container.addQueueNames(queueNames);
            return this.container;
        }
    
        /**
         * 再服务端声明 关系
         *
         * @param queueName
         * @param routeKey
         * @param exchange
         * @param exchangeType
         */
        public void declareQueue(String queueName, String routeKey, String exchange, String exchangeType) {
            if (StringUtils.isEmpty(routeKey)) {
                log.warn("add routeKey is null");
                return;
            }
            if (topic.equals(exchangeType)) {
                TopicExchange ex = new TopicExchange(exchange, true, false);
                Queue queue = new Queue(queueName, true, false, false, param);
                this.rabbitAdmin.declareExchange(ex);
                this.rabbitAdmin.declareQueue(queue);
                Binding binding = BindingBuilder.bind(queue).to(ex).with(routeKey);
                this.rabbitAdmin.declareBinding(binding);
            } else {
                throw new RuntimeException("not support " + exchangeType);
            }
        }
    
        public int removeQueueNames(String... queueNames) {
            this.container.removeQueueNames(queueNames);
            return this.container.getQueueNames().length;
        }
    
        public void removeBind(String queueName, String routeKey, String exchange, String exchangeType) {
            if (StringUtils.isEmpty(routeKey)) {
                log.warn("add routeKey is null");
                return;
            }
            if ("topic".equals(exchangeType)) {
                TopicExchange ex = new TopicExchange(exchange, true, false);
                Queue queue = new Queue(queueName, true, false, false, param);
                Binding binding = BindingBuilder.bind(queue).to(ex).with(routeKey);
                this.rabbitAdmin.removeBinding(binding);
            } else {
                //其他类型自己添加
                throw new RuntimeException("not support " + exchangeType);
            }
        }
    
    
        public void start() {
            if (container.isRunning()) {
                return;
            }
            this.container.start();
        }
    
        public String printInfo() {
            StringBuilder sb = new StringBuilder(this.mqServiceName);
            sb.append(" is ").append(this.container.isRunning() == true ? "Running" : "Stopped")
                    .append(" queues is ").append(JsonUtil.to(this.container.getQueueNames()));
            return sb.toString();
        }
    
    
        public void stop() {
            this.container.stop();
        }
    
    }

    使用上面的类如下
    1、先根据自己的环境创建connection

    @Component
    @Data
    @ConfigurationProperties(prefix = "my.mq")
    public class MqProperties {
    
        private List<MqYml> mqYmlList;
    
        @Data
        public static class MqYml {
            private String addresses;
            private int port;
            private String username;
            private String password;
            private String virtualHost;
            private String mqServiceName;
        }
    }
    private Map<String, ConnectionFactory> getMqConnection() {
        Map<String, ConnectionFactory> returns = Maps.newConcurrentMap();
        for (MqProperties.MqYml mqYml : mqProperties.getMqYmlList()) {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost(mqYml.getAddresses());
            connectionFactory.setPort(mqYml.getPort());
            connectionFactory.setUsername(mqYml.getUsername());
            connectionFactory.setPassword(mqYml.getPassword());
            connectionFactory.setVirtualHost(mqYml.getVirtualHost());
            connectionFactory.afterPropertiesSet();
            returns.put(mqYml.getMqServiceName(), connectionFactory);
        }
        return returns;
    }
    

    2、根据自己需求创建自己服务监听
    这里mqServiceName对应的是一个mqserver 可以是不通ip或者不用vhost

    //mqHisHandler 自己实现了todo的监听接口
    MqListener mqListener = new MqListener(connectionFactory, "serviceAname", mqHisHandler);
    //开启声明,routingkey和queue和exchange的关系
    mqListener.declareQueue("QueueName", "RouteKey"
            , "ExchangeName", "ExchangeType");
    //监听列队        
    addListenerQueue("QueueName1","QueueName2").start()
    //动态删除列队
    mqListener.stop();
    //forListenerQueueNames 返回的是该监听器里面还有多少被监听的queue
    int forListenerQueueNames = mqListener.removeQueueNames(lmRecordHis.getQueueName());
    //如果此监听器没有监听的列队了,就不需要start了
    if (forListenerQueueNames != 0) {
        mqListener.start();
    }

    handler的实现

    @Override
    public void todo(String mqServiceName, Message message) {
        try {
            String routeKey = message.getMessageProperties().getReceivedRoutingKey();
            String exchange = message.getMessageProperties().getReceivedExchange();
            String queueName = message.getMessageProperties().getConsumerQueue();
            //这里需要判断routeKey 是否是自己需要的
            //为什么需要判断请看我上一个文章
            
        } catch (Exception e) {
            log.error("save message error ", e);
        }
    }
    展开全文
  • 动态管理RabbitMQ队列监听

    千次阅读 2019-07-08 17:53:23
    需求背景:最近项目要求,动态监听MQ,实现程序新增删除MQ队列的时候,可以动态调整监听。 废话不说了,贴代码:能力有限,欢迎批判。 import java.io.IOException; import java.util.ArrayList; import java....

    需求背景:最近项目要求,动态监听MQ,实现程序新增删除MQ队列的时候,可以动态调整监听。 

    废话不说了,贴代码:能力有限,欢迎批判。

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeoutException;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.log4j.Logger;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.Connection;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    
    class  
    {
        /*initListenerContainer and start listeners
         */
        private void initListenerContainer(){
            logger.info("---------------->>>>>>>>>>> 初始化RabbitMQ监听容器 <<<<<<<<<<<----------------");
            try {
                RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry = DynamicListenerCfg.getRabbitListenerEndpointRegistry();//这里是一个单例,返回一个RabbitListenerEndpointRegistry 实例。
                           
                rabbitListenerEndpointRegistry.stop();
    
                SimpleMessageListenerContainer container = null;//监听容器
                //connection info
                QueueInfo info = new QueueInfo();
                info.setUserName("userName");
                info.setPassword("helloworld");
                info.setServerIp("192.168.0.110");
                info.setServerPort(5672);
                info.setVirtualHost("/");
                String[] queueNames = null;
                    
                queueNames = new String[] {"queueA","queueB"};
    
                if (queueNames != null && queueNames.length > 0) {
                    container = createListenerContainer(MessageListener, info, queueNames);
                    Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
                    container.start();
                    listenerContainerList.add(container);
                }
    
                rabbitListenerEndpointRegistry.start();
        
            } catch (Exception e) {
                e.getSuppressed();
            }
            
        }
    
        //create container by queueName and lisenerInstance
        public SimpleMessageListenerContainer createListenerContainer(MessageListener listenerInstant,QueueInfo info, String ...queueNames) {
            SimpleMessageListenerContainer container = null;
            try {
                ConnectionFactory factory = connectionFactory(info.getServerPort(),info.getUserName(),info.getPassword(),info.getServerIp(),info.getVirtualHost());
                container = new SimpleMessageListenerContainer(factory);
                container.addQueueNames(queueNames);
                container.setMessageListener(listenerInstant);//配置自定义的监听类
                container.setAutoStartup(isAutoStartUp);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return container;
            
        }
    
        //create queue and bind listener
    	public boolean addListenerQueue(QueueInfo info) throws AmqpException, IOException {
    		boolean rst = false;
    		//create queue
    		
    		Connection connection = null;
    		try {
    			connection = connectionFactory(info.getServerPort(),info.getUserName(),info.getPassword(),info.getServerIp(),info.getVirtualHost()).createConnection();
                //如果删除队列,这里改成调用queueDelete方法即可(先判断下messageCount是否还有消息未处理)
    			connection.createChannel(false).queueDeclare(info.getQueueName(), false, false, true, null);
    
    			//监听队列	
    		        listenerContainerList.forEach(a->a.stop());
                //删除的时候这里改为调用removeQueueNames即可.
    			listenerContainerList.forEach(a->a.addQueueNames(info.getQueueName()));
    			listenerContainerList.forEach(a->a.start());
    
    			rst = true;
    		} catch (Exception e) {
    			rst = false;
    			e.printStackTrace();
    		} finally {
    			if (connection != null) {
    				connection.close();
    			}
    		}
    		return rst;
    	}
    }
    

     

    展开全文
  • springboot动态监听rabbitMq队列

    千次阅读 2020-07-15 14:16:12
    很多时候,要求mq队列能够配置,或者说相同的多个用户要求mq队列名称为用户名+固定的字符串,但是监听同一个方法,这个时候就需要用到动态监听。代码如下 @Autowired private AmqpAdmin amqpAdmin; @Autowired ...

    很多时候,要求mq队列能够配置,或者说多个用户要求mq队列名称为用户名+固定的字符串,但是监听同一个方法,这个时候就需要用到动态监听。代码如下

        @Autowired
        private AmqpAdmin amqpAdmin;
        @Autowired
        private  SimpleMessageListenerContainer container;
        public String addQueue(Queue queue){
            return amqpAdmin.declareQueue(queue);
        }
    
        @Override
        public void run(String... args) throws Exception {
            String queueName=SpringUtils.getApplicationContext().getBean(Environment.class).getProperty("queue");//配置文件中配置,也可配置在数据库
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageListen());//消费mq消息的类
            List<String> queueNamesList=new ArrayList<>();
            messageListenerAdapter.addQueueOrTagToMethodName(queueName,"getMessage");//消费mq消息的方法
            queueNamesList.add(queueName);
            logger.info("新增队列queueName={}",queueName);
            Queue queue = new Queue(queueName);
            addQueue(queue);
            if(!queueNamesList.isEmpty()){
                String[] strings=new String[queueNamesList.size()];
                container.addQueueNames(queueNamesList.toArray(strings));
            }
            container.setMessageListener(messageListenerAdapter);
        }

    如果是多个用户,则是给queueNamesList循环添加数据即可

    展开全文
  • Spring学习 Spring学习测试demo RabbitMQ项目 动态监听mq量子消息
  • 本项目需要根据前端传递的tenantId进行动态生成名字为device-{tenantId} 使用的是 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } ...

    本项目需要根据前端传递的tenantId进行动态生成名字为device-{tenantId}
    使用的是

    @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    
    @Component
    public class MQSend {
        @Autowired
        private RabbitAdmin rabbitAdmin;
        @Autowired
        private RabbitTemplate rabbitTemplate;
        public void sendMsg(Long tenantId){
            rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("device-iot-send-"+tenantId)).to(new DirectExchange("device-iot-send-"+tenantId)).with("device-iot-send-"+tenantId));
            rabbitTemplate.convertAndSend("device-iot-send-"+tenantId,"你好");
        }
    }
    

    上述代码进行动态生成队列
    项目地址:https://gitee.com/qingyaoyuanyin/rabbitmq-dynamic.git

    展开全文
  • 然后发送给mq,定时到指定时间消费,有个问题就是可能会有一天点击推送模板消息的用户会有很多,会存在消费者消费不过来的场景,了解到SimpleMessageListenerContainer这个容器,可以帮我动态新增消费者。...
  • 死信队列监听一开始的逻辑是正确的,但关于监听的内容以及动态判断有了新的思路,不断发现不断改善。监听新思路1.不必去破坏生产者消费者的关系,去创建死信队列的对应消费者,如果不同队列去创建对应的死信队列监听...
  • MQ通道配置示例:Server/Receiver Server/Requester 通道也是一种较常见的通道配置方式,从消息流向来看,Server 作为消 息的发送方,Requester 作为消息的接收方。但是从连接方式来看,Requester 却是连接的主 动...
  • 1、在微服务架构中,通常会使用轻量级的消息代理来构建一个共用的消息主题来连接各个微服务实例,它广播的消息会被所有在注册中心的微服务实例监听和消费,也称消息总线。 SpringCloud中也有对应的解决方案,Spring...
  • 2) 使用SPRING BOOT Conditional机制实现了两种产品按需加载,工程会根据配置文件开关动态加载 3) 实现了普通队列消息发送与监听,实现了基于TOPIC的消息发布与订阅 4) IBM-MQ无需提前创建主题,TongLink需要提前...
  • rabbitMQ配置动态启动,rabbitMQ代理不正常时可以不启动项目中MQ监听,主要解决,项目和MQ的启动顺序的问题。 默认不启动rebbitMQ #生产者 spring.rabbitmq.listener.direct.auto-startup=false #消费者 ...
  • 你好,这期我要和您分享的是面试--Kafka篇(1-11个问题,共18个问题)在上期当中,我们学习了:18 zk 节点宕机如何处理?...22 集群支持动态添加机器吗?23 Zookeeper 对节点的 watch 监听通知是永久的吗?为什...
  • 消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的 metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下: 生产者负载均衡:metaq发送消息的时候,生产者在发送...
  • 当有服务(如商品服务)数据发生变更,会发消息给MQ,此时缓存数据生产服务会监听到,从而从服务(商品服务)获取变更数据,更新到本地缓存Ehcache中,也会同步更新到Redis中。 nginx 上的 html 模板 + 本地缓存数据...
  • Spring Cloud Bus消息总线

    2021-05-01 13:08:23
    目录Bus简介RabbitMQ环境配置Spring Cloud Bus 动态刷新全局广播Spring Cloud Bus 动态刷新定点通知 Bus简介 什么是总线? 在微服务架构的系统中,通常会...ConfigClient实例都监听MQ中同一个topic(默认是springCl
  • springcloud bus

    2020-06-20 23:06:08
    configClient实例都监听MQ中同一个topic(默认是springcloudBus)当一个服务刷新数据时,他会把这个消息放入到topic中,这样其他监听同一个topic的服务就能得到通知,然后去更新自身的配置 原理 demo搭建 服务端...
  • Springcloud-Bus消息总线

    2020-07-12 17:46:23
    官网 Bus消息总线概述 ...ConfigClient实例都会监听MQ中同一个topic(默认是springcloudbus)。当一个服务刷新数据时会把这个信息放入topic中,这样监听他的服务就会得到通知,然后更新配置。 bus的两种代理模式
  • ConfigClient实例都监听MQ中同一个topic(默认是SpringCloudBus),当一个服务刷新数据的时候,它会把信息放到Topic中,这样其他监听同一个Topic的服务就能得到通知,然后更新自身的配置。 动态刷新全局广播 利用...
  • SpringCloud之Bus实战

    2021-03-19 22:24:17
    消息总线:在微服务架构中,通常会用一个轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务连接上来,该主题产生的消息会被所有实例监听和消费,我们称它为消息总线 Spring Cloud Bus可以很容易的搭起...
  • 我们利用一致性哈希算法,构造一个哈希环,网关监听WebSocket服务实例的上下线消息,根据实例的变化动态地更新哈希环。将需要迁移的WebSocket客户端重新连接到新的实例上,这样的代价是最小的;当然也取决与虚拟实例...
  • 短信服务监听MQ消息,收到消息后发送短信。 其它服务要发送短信时,通过MQ通知短信微服务。 2.13 文件上传微服务 使用分布式文件系统FastDFS实现图片上传。 FastDFS架构 FastDFS两个主要的角色:Tracker Server...
  • info.setServer ( rootPath + "/HiShow/SuperMap/mapData/mqdemo.smwu" ); info.setType ( WorkspaceType.SMWU ); mWorkspace.open ( info ); mMapView = (MapView) findViewById ( R.id.mapView ); ...

空空如也

空空如也

1 2
收藏数 26
精华内容 10
关键字:

动态监听mq