精华内容
下载资源
问答
  • 场景介绍??公司需要做一个mysql同步到Pgsql的数据同步,采用Debezium结合kafka connect读取mysql的... Debezium+kafka connect 会动态生成topic,一个表一个topic,因为实际生产中,会存在表新建的情况,会动态添...

    场景介绍

    ??公司需要做一个mysql同步到Pgsql的数据同步,采用Debezium结合kafka connect读取mysql的binlog同步到Kafka,然后需要消费kafka的消息,生成DDL和DML,插入到Pgsql

    kafka相关问题

    topic动态;

    ?? Debezium+kafka connect 会动态生成topic,一个表一个topic,因为实际生产中,会存在表新建的情况,会动态添加一个新的topic需要消费,这也是这篇文章主要想要解决的问题

    顺序保证

    ?? 因为读取的是mysql的binlog,需要按照这个顺序去消费,所以每个topic只有一个partition,消费的时候我们需要考虑效率问题。

    其他问题和kafka无关,不做说明

    解决方案

    topic动态问题,

    在我的业务场景中,虽然topic是动态的,但是topic是有规则的,比如topic的规则都是 【服务名.数据库名称.表名】

    如:test1.dbname1.t_test

    刚刚开始想着直接使用@KafkaListener的topicPattern属性,配置上正则去解决,但是会导致另外一个问题,使用该方式或导致这个消费者匹配到的所有符合规则的topic,比如此处的正则可以配置为:【test1.dbname1.*】。然后因为每个topic只有一个partition,单线程消费性能低下,线上的数据量太大,消费一个大的topic时其他topic无法消费。

    (如果topic有多个分区,可以开启concurrency属性;concurrency为监听topic的分区数量之和)

    为了解决性能问题,将单次消费1条数据,改造成批量消费。

    ok,开启批量消费

    # 每次消费500条

    spring.kafka.consumer.max-poll-records=500

    # 监听器消费类型

    spring.kafka.listener.type=batch

    消费者代码修改为:

    /**

    * 注解方式创建消息监听器(使用正则表达式监听topic)

    * @param records 消息列表

    * @param ack

    */

    @KafkaListener(id = "dmlBatchConsumer",topicPattern = "${dmlTopics}")

    public void batchDMLConsumer(List> records, Acknowledgment ack){

    long startTime = System.currentTimeMillis();

    log.info("本次批量获取线程ID:{},消息长度:{}",Thread.currentThread().getId(),records.size());

    batchMsgHandler(records, ack);

    long endTime = System.currentTimeMillis();

    log.info("本次批量消费耗时{}毫秒,",endTime-startTime);

    }

    改完之后,测试发现性能确实提升了不少。预计100W数据 10分钟左右全部同步完成。

    但是这10分钟之内只能消费这个topic,其他topic依然无法消费,因为只有一个消费者线程。

    查阅了很多资料,也问了我认识的几个大佬,均没找到解决方案。

    @KafkaListener用正则匹配到的topic,似乎并不能每个topic开启一个消费者线程,特别是每个topic都是一个partition的情况下。

    那应该怎么解决这个消费的性能问题呢?

    消费的性能问题

    既然使用注解KafkaListener方式很难做到,那就手动创建消费者吧。

    首先我需要获取到topic列表,这里分为已经存在的topic和后面新增表生成的topic

    已经存在的topic

    topic列表我可以读取pgsql中已经同步完成的表,根据规则生成topic列表

    新增表生成的topic

    新增的表会先创建执行到监听DDL的一个消费者,DDL的topic是固定的,所以当我拿到了DDL新建表的消息时候,我可以把这个新的DDL保存在数据库中,然后启动一个定时器去监听这个新的topic。

    来吧。直接上代码:

    创建消息监听器方法

    /**

    * 手动创建批量消息监听器

    * @param topic

    * @return

    */

    public KafkaMessageListenerContainer manualListenerContainer(String topic) {

    ContainerProperties properties = new ContainerProperties(topic);

    properties.setGroupId(groupId);

    properties.setAckMode(ContainerProperties.AckMode.MANUAL);

    properties.setMessageListener((new BatchAcknowledgingMessageListener() {

    @Override

    public void onMessage(List> list, Acknowledgment acknowledgment) {

    long startTime = System.currentTimeMillis();

    log.info("本次批量获取线程ID:{},消息长度:{}",Thread.currentThread().getId(),list.size());

    batchMsgHandler(list, acknowledgment);

    log.info("本次批量消费耗时{}毫秒,",System.currentTimeMillis()-startTime);

    }

    }));

    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(config.consumerFactory(true), properties);

    return container;

    }

    获取已有消息监听器列表

    @Configuration

    @Slf4j

    public class KafkaTopicConfig implements InitializingBean {

    @Autowired

    private BasicDao basicDao;

    @Value("${listenerTopicTitle}")

    private String topicTitle;

    @Value("${listenerTopicDB}")

    private String schema;

    public List topics = new ArrayList<>();

    @Override

    public void afterPropertiesSet() throws Exception {

    //获取指定schema下的table表名

    List> result = basicDao.getDbTableList(schema);

    if(CollectionUtils.isEmpty(result)){

    return;

    }

    //生成topic列表

    for (Map map : result) {

    StringBuilder sb = new StringBuilder(topicTitle);

    String tableName = (String) map.get("tablename");

    String schemaName = (String) map.get("schemaname");

    String topic = sb.append(".").append(schemaName).append(".").append(tableName).toString();

    topics.add(topic);

    }

    }

    }

    根据topic列表启动消息监听器

    /**

    * 启动时默认加载已经存在的表对应的topic

    */

    @Bean

    public void startTopicListener(){

    List listeners = new ArrayList<>();

    for (String topic : topicConfig.topics) {

    KafkaMessageListenerContainer container =  manualListenerContainer(topic);

    listeners.add(container);

    }

    for (KafkaMessageListenerContainer listener : listeners) {

    try {

    listener.start();

    }catch (Exception e){

    log.error("监听器启动失败,异常:{}",e.getMessage(),e);

    }

    }

    }

    解析DDL的消费者得到新的topic,保存到数据库,然后定时器去监听这个topic我就不贴代码了,明白这个方案即可。

    结果测试,没有问题,多个消费者监听了多个topic,然后加上批量消费,消费速度也很快。

    最后

    使用正则表达式的时候,需要主要 * 前面必须有一个 . 。比如serverName.dbName.t_test.* 或者 serverName.dbName.*

    @KafkaListener的topicPattern属性默认需要为一个字符串常量,如果需要动态配置的话,可以 implements InitializingBean ,然后重写afterPropertiesSet方法,通过System.setProperty("dmlTopics", dmlTopic); 然后再监听器中就可以${dmlTopics}获取到。

    相关资料

    这次我查阅了很多资料,但是有个资料对我帮忙特别大,我贴出来方便大家也可以学习一下

    Spring-Kafka(六)—— @KafkaListener的花式操作

    在此感谢下原作者。

    我的这个方案可能只是适合我的这个场景,仅供大家参考。如果有更好的方案,也欢迎大佬指点一下。

    展开全文
  • 现springboot整合的active目前,监听话题使用的是@JmsListener,但是目前想动态里面参数,一愁莫展,所以想问哈各位,有没有啥方法达到动态修改@JmsListener的参数,或则其他具体点的方法达到动态监听,要哭了!...
  • spring boot 动态添加监听kafka哪些Topic代码配置 代码 //设置kafka信息 @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){ ...

    spring boot 动态添加监听kafka哪些Topic

    代码

    //设置kafka信息
    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){
    	ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    	factory.setConsumerFactory(consumerFactory);
    	/*
    	设置默认不监听kafka设置的
    	(此功能是可以动态设置是否启动kafka监听,如果不需要可以
    	设置为true,甚至不用写这个方法;动态监听方法见下放)
    	*/
    	factory.setAutoStartup(false);
    	return factory;
    }
    
    public static final String KAFKA_LISTENER_ID = "KAFKA_ID";
    //监听配置文件中的receiveTopics的值使用,分割
    //会监听下方配置中的test、test1、test2;三个topic
    @KafkaListener(id = KAFKA_LISTENER_ID,topics = {"#{'${receiveTopics}'.split(',')}"},containerFactory = "ackContainerFactory")
    public void message(ConsumerRecord<String, String> record) {
    	//获取topic
    	String topic = record.topic();
    	//获取键值(有可能乱码)
    	String flieName = new String(record.key().getBytes("UTF-8"),"UTF-8");
    	//获取内容
    	String valueTopic = new String(record.value().getBytes("UTF-8"),"UTF-8");
    	…………处理操作
    }
    

    配置

    receiveTopics: test,test1,test2
    

    其他步骤

    1. kafka服务自带zookeeper下载与启动
    2. Spring boot配置kafka服务
    3. kafka生产者发送消息成功回调
    4. kafka根据ip端口获取消息队列上的topic
    5. kafka动态设置监听哪些topic
    6. 动态启动关闭kafka监听、设置默认不监听kafka
    7. kafka设置:1只接受消息、不发送消息;2只发送消息不接受消息;3既接受消息也发送消息;4既不接收消息也不发送消息
    8. kafka会把历史数据都获取下来
    9. Spring boot kafka执行多次多次消费
    展开全文
  • 问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听topic。 方案一: 如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。 @...

    问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听的topic。

    方案一:

    如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        private static final String KAFKA_SERVERS_CONFIG = "192.168.77.202:9092";
        private static final String LOCAL_GROUP_ID = "cctest";
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
            // kv都用string来序列化
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @KafkaListener(topicPattern = "${topicPattern}")
        public void listen(String data) {
            System.out.println("message have been consumed:"+data);
        }
    }
    
    

    方案弊端:

    1. 变动的topic只能在限定范围内,如果新增topic不符合规则,无法消费。
    2. 因为每个topic只有一个partition,单线程消费性能低下,如果线上的数据量太大,消费一个大的topic时其他topic无法消费。

    方案二:

    用Spring管理的SingletonBean缓存kafkaconsumer配置,并设置定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁bean里已有消费者,创建新的消费者。

    管理消费者bean
    @Component
    @Data
    public class ResourceNotifyConsumer {
    
        private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);
    
    
        @Autowired
        // 消费数据的service,异步处理
        private MonitorPoolService monitorPoolService;
    
        private KafkaConsumer<String, String> consumer = null;
    
        public void closeConsumer() {
    		// 如果consumer.wakeup()停掉当前poll并抛出异常,在没阻塞的时候,会在下一次poll抛出异常,但下一次poll已经是新的consumer对象。
    		//	同时,consumer不支持多线程同时操作,所以这里把引用去掉,靠gc回收旧consumer。
            consumer = null;
        }
    
        public void onMessage() {
            while (consumer != null) {
                // 从kafka中取出100毫秒的数据
                List<Map<String, Object>> datas = new ArrayList<>();
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> recordData = new HashMap<>(8);
                    recordData.put(record.key(), record.value());
                    datas.add(recordData);
                }
                // 处理消息
                if (CollectionUtils.isNotEmpty(datas)) {
                    monitorPoolService.dealResource(datas);
                }
            }
        }
    }
    
    
    定时调用接口
    @GetMapping("/check")
    public ResponseMessage checkConfig() {
    	// 定时查询配置是否有变动,因场景这里只做增加检查,如果场景有修改,可以缓存旧数据与配置做对比。
        List<DeviceTopicConfig> deviceTopicConfigs = topicConfigDao.findAll();
        List<DeviceTopicConfig> topicWithoutListening = deviceTopicConfigs.stream().filter(t -> !t.getListening()).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(topicWithoutListening)) {
            resourceNotifyConsumer.closeConsumer();
    		// 新的监听topic集合
            List<String> topics = deviceTopicConfigs.stream().map(DeviceTopicConfig::getTopic).collect(Collectors.toList());
    		// 此方法类似上面配置新建监听新topic集合的consumer,并调用bean的onMessage方法
            resourceService.buildConsumer(topics);
            // 更新配置的状态
            topicWithoutListening.forEach(deviceTopicConfig -> deviceTopicConfig.setListening(true));
            topicConfigDao.saveAll(topicWithoutListening);
        }
        return ResponseMessage.success();
    

    方案弊端:

    1. 如果提交offset不合理,很可能因consumer配置的不同出现重复消费或者未消费情况。
    2. 消费数据采用线程池,如果监听的topic接收消息过多可能触发RejectedExecutionHandler。

    方案三:

    1. 独立kafka消费模块为一个单独的jar文件
    2. 另起一个系统,定时查询数据库,发现topic改变后就java调用linux命令杀掉kafka的jar进程

    方案弊端:

    1. 能在模块内解决的问题,尽量不变复杂。。

    总结:

    最后选择了方案二,因为尽管有topic的增加,但实际场景只可能增加一两个topic,两个问题的权衡:1.在增加的时候消息丢失或重复消费都可以接受,并且这个问题只需要多手动提交(比如在接口调用时和旧消费者置空之前)可以避免。2.评估后消息不会太多。

    展开全文
  • kafka动态配置topic

    千次阅读 热门讨论 2019-01-11 14:18:43
     之前使用@org.springframework.kafka.annotation.KafkaListener...}),这样去单独监听某一个topic,生产者也固定在代码里定义变量读取配置文件。昨天改了个需求,希望以后通过配置文件去动态配置生产者和消费者的...

     之前使用@org.springframework.kafka.annotation.KafkaListener这个注解的时候,是在yml文件中配置,然后使用@KafkaListener(topics = {"${kafka.topic.a2b.name}"}),这样去单独监听某一个topic,生产者也固定在代码里定义变量读取配置文件。昨天改了个需求,希望以后通过配置文件去动态配置生产者和消费者的topic(不知道个数和topic名字),而不需要改代码。

    一、踩坑

     刚开始的时候,由于考虑不充分(没有考虑到topic个数未知),想到@KafkaListener注解中的topics本身就是个字符串数组,于是想通过传入变量的形式。产生了以下两种方法:

    1.传入变量方法一

      使用@Value注解提取配置文件中相关配置,@KafkaListener中传入变量

        public static String[] topicArr;
        @Value("${kafka.bootstrap.servers}")
        public void setTopicArr(String[] value){
            String topicArr = value;
        }
        @KafkaListener(topics= topicArr)
    

    emmmm。。。结果可想而知,不行。

    2.传入变量方法二

     还是传入变量,不过这次写了个动态配置的代码

        注解里这么写
        @KafkaListener(topics = "${topicName1}","${topicName2}","${topicName3}")
        提前将yml文件里添加
        topics: topicName1,topicName2,topicName3
        然后加载进来
        @Value("${kafka.topics}")
        public void setTopics(String value){
            topics = value;
        }
        动态配置代码:
        @Configuration
        public class KafkaTopicConfiguration implements InitializingBean {
            @Autowired
            private KafkaConfig kafkaconfig;
            @Override
            public void afterPropertiesSet() throws Exception {
                String[] topicArr = kafkaconfig.split(",");
                int i = 1;
                for(String topic : topicArr){
                    String topicName = "topicName"+i;
                    System.setProperty(topicName, topic);
                }
            }
        }
    

    相比方法一,可行。但是未知topic数量呢。GG。

    3.不用注解

     百度找到几个老哥的动态获取并创建topic的方法

    https://www.cnblogs.com/gaoyawei/p/7723974.html
    https://www.cnblogs.com/huxi2b/p/7040617.html
    https://blog.csdn.net/qq_27232757/article/details/78970830
    

    写了几版,各种各样的问题,还是我太菜。就想再看看有没有别的简单点的解决办法,没有了再回来搞这个。

    4.正则匹配topic

     这期间又找到一个使用正则匹配topic的。直接贴链接

    @KafkaListener(topicPattern = "showcase.*")
    这里使用正则匹配topic,其中【*】之前得加上【.】才能匹配到。
    

    中间模仿写了一版使用正则匹配的,其实也可以糊弄实现需求,除了topic取名的时候一定得规范以外,还得考虑到如果不想用某个topic了又得想怎么去避免他。
    这种方法不太严谨,继续踩坑吧。

    二、问题解决

     用蹩脚的英语google了一下,嗯?好多老哥们也是用的以上差不多的方法。然而最后在某个老哥github的issues中看到了解决办法。老哥的需求跟我差不多,感谢大佬,贴上最终问题解决方案。

    1.kafka消费者监听配置

    还是注解的形式
    @KafkaListener(topics = "#{'${kafka.listener_topics}'.split(',')}")
    

    读取yml文件中kafka.listener_topics的参数,然后根据“,”去split,得到一个topics数组。
    这么做就可以根据配置文件动态的去监听topic。

    2.yml配置文件

    只列出topic相关部分(mqTypes是我用来判断使用哪个topic发送的)
        kafka:
          listener_topics: kafka-topic-a2b,kafka-topic-c2b
          consume:
            topic:
              - name: kafka-topic-a2b
                partitions: 12
                replication_factor: 2
              - name: kafka-topic-c2b
                partitions: 12
                replication_factor: 2
          product:
            topic:
              - name: kafka-topic-b2a
                partitions: 12
                replication_factor: 2
                mqTypes: type1
              - name: kafka-topic-b2c
                partitions: 12
                replication_factor: 2
                mqTypes: type1
    

    3.yml参数解析

    这里我将kafka的topic相关加载到bean中处理。
    创建KafkaConsumerBean和KafkaProducerBean分别用来存储yml中生产者和消费者的topic相关参数

    //KafkaConsumerBean
    @Component
    @ConfigurationProperties(prefix = "kafka.consume")
    public class KafkaConsumerBean {
        private List<Map<String,String>> topic;
        public void setTopic(List<Map<String, String>> topic) {
            this.topic = topic;
        }
        public List<Map<String, String>> getTopic() {
            return topic;
        }
    }
    
    //KafkaProducerBean
    @Component
    @ConfigurationProperties(prefix = "kafka.product")
    public class KafkaProducerBean {
        private List<Map<String,String>> topic;
        public void setTopic(List<Map<String, String>> topic) {
            this.topic = topic;
        }
    
        private Map<String,String> mqType2NameMap = new HashMap<String,String>();
        public List<Map<String, String>> getTopic() {
            return topic;
        }
    
        public String getTopic(String mqType){
            String name = mqType2NameMap.get(mqType);
            if(name != null){
                return name;
            }else{
                for(Map<String,String> topicProperty : topic){
                    if (topicProperty.get("mqTypes").indexOf(mqType) >= 0){
                        name = topicProperty.get("name");
                        mqType2NameMap.put(mqType,name);
                        return name;
                    }
                }
                return null;
            }
    
        }
    }
    
    

    4.创建topic

        List<Map<String,String>> producerTopicList = kafkaProducerBean.getTopic();
        for (Map<String,String> topicProperty : producerTopicList){
            KafkaClient.createTopic(topicProperty.get("name"),Integer.parseInt(topicProperty.get("partitions")),Integer.parseInt(topicProperty.get("replication_factor")));
        }
        List<Map<String,String>> consumerTopicList = kafkaConsumerBean.getTopic();
        for (Map<String,String> topicProperty : consumerTopicList){
            KafkaClient.createTopic(topicProperty.get("name"),Integer.parseInt(topicProperty.get("partitions")),Integer.parseInt(topicProperty.get("replication_factor")));
        }
    

    三、总结

     上面解决问题的方法关键在于

    @KafkaListener(topics = "#{'${kafka.listener_topics}'.split(',')}")
    

    @KafkaListener这个注解会去读取spring的yml配置文件中

    kafka:
          listener_topics: kafka-topic-a2b,kafka-topic-c2b
    

    这块listener_topics配置信息,然后通过’,'分割成topic数组,KafkaListener注解中的 topics 参数,本身就是个数组,如下

    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.springframework.kafka.annotation;
    
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Repeatable;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    
    @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(KafkaListeners.class)
    public @interface KafkaListener {
        String id() default "";
    
        String containerFactory() default "";
    
        String[] topics() default {};
    
        String topicPattern() default "";
    
        TopicPartition[] topicPartitions() default {};
    
        String group() default "";
    }
    

     结合我之前的kafka文章,应该是可以拼出一套成型的。

    展开全文
  • @KafkaListener 动态指定topic

    万次阅读 2018-03-06 18:30:17
    背景: 使用 @org.springframework.kafka.annotation.KafkaListener 的时候,kafka的topic可以自行设定,但是有种需求是,在启动之前,是不知道需要监听哪个topic的,需要在启动的时候,根据环境变量的输入来决定监听...
  • 简单示例1. 配置文件自定义配置2. 利用Spring的SpEl表达式,将...kafka.consumer.topics=topic1,topic2 2. 利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${kafka.consumer.topics}’.spli
  • , Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8)) } } 同时定义创建将数据动态发往不同topic的kafka生产者的方法 /** * 创建将数据动态发往不同topic的kafka生产者 * * @param boostrapServers ...
  • SpringBoot2.0_MQTT消息订阅之动态Topic

    万次阅读 热门讨论 2019-06-05 16:06:09
    之前总结过两篇关于MQTT消息发布及消息订阅的文章,很多朋友都觉得不错,不过也有些小疑问,特别是关于mqtt消息订阅的,如何在项目应用中,做到动态添加Topic监听消息?很多朋友对此都有疑问,却又不知如何下手,...
  • 如果我们想让监听器在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry 实现,只需要定义两个 controller 接口分别通过 KafkaListenerEndpointRegistry 来控制监听器的开启、...
  • 默认情况下,当项目启动时,监听器就开始工作(监听消费发送到指定 topic的消息)。如果我们不想让监听器立即工作,想在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry实现,...
  • kafka设置监听器的启动和禁用

    千次阅读 2018-12-24 20:32:13
    拿个需求是要对kafka监听设置动态配置... 开关定义kafka是否启用;   但是在使用kafka的时候,springcloud集成 的kafka就是直接添加注解,启用监听就可以了...但是监听监听,有点儿难... @KafkaListener(topics...
  • javascript如何监听 form.submit()事件

    千次阅读 2010-12-26 10:35:48
    众所周知,动态表单提交(form.submit())无法触发onsubmit事件。 如果想要想要捕捉到form.submit(),在此提供两种解决方案 第一种: 在form.submit()方法之前加上form.fireEvent("onsubmit");...
  • 大家都知道,注解只能配置常量,在一些构架的开发中,有时候我们需要给注解动态配置一些值,或者想从配置文件中...在RocketMQ的监听配置中,就使用了这个技术,如下代码所示:@RocketMQMessageListener(topic = "$...
  • SpringBoot注解内容的动态配置

    千次阅读 2020-02-11 10:29:54
    大家都知道,注解只能配置常量,在一些构架的开发中,有时候我们需要给注解动态配置一些值,或者想从配置文件中读取配置。直接在注解上配置是无法实现的,但是我们可以在拿到注解的值之后,再对这些值进行另外的操作...
  • springcloud bus

    2020-06-20 23:06:08
    configClient实例都监听MQ中同一个topic(默认是springcloudBus)当一个服务刷新数据时,他会把这个消息放入到topic中,这样其他监听同一个topic的服务就能得到通知,然后去更新自身的配置 原理 demo搭建 服务端...
  • ConfigClient实例都监听MQ中同一个topic(默认是SpringCloudBus),当一个服务刷新数据的时候,它会把信息放到Topic中,这样其他监听同一个Topic的服务就能得到通知,然后更新自身的配置。 动态刷新全局广播 利用...
  • Springcloud-Bus消息总线

    2020-07-12 17:46:23
    官网 Bus消息总线概述 ...ConfigClient实例都会监听MQ中同一个topic(默认是springcloudbus)。当一个服务刷新数据时会把这个信息放入topic中,这样监听他的服务就会得到通知,然后更新配置。 bus的两种代理模式
  • Routing模式6 Topic动态路由(通配符)AmqpAdmin-RabbitMQ组件管理 SpringBoot中使用RabbitMQ RabbitMQ总共6种模型“Hello World!”、Work Queues、Publish/Subscribe、Routing、Topics、RPC 通过@RabbitListener...
  • GEF的图形渐变方法

    2011-11-20 20:27:00
    http://www.4ucode.com/Study/Topic/1967675(引用) 在GEF的Flow例子里,已经提供了另一种实现动态变化的方式,这个实现方法...需要窗口类控制器需要监听状态变化,以决定什么时候开始激活动态过程 每个可能属...
  • ##################################1、工程说明##################...3) 实现了普通队列消息发送与监听,实现了基于TOPIC的消息发布与订阅 4) IBM-MQ无需提前创建主题,TongLink需要提前创建主题以及对应的虚拟队列;
  • springboot-shiro 整合siro 实现 动态授权 认证 记住我功能 结合thymeleaf 动态显示菜单页等 springboot-sms 整合阿里云短信发送工具 springboot-swagger 整合swagger 接口文档工具 springboot-thymeleaf 整合 ...
  • 现在,这些节点将会用base_scan的topic来通信从而代替,并且将不再监听"扫描"topic的信息。然后我们就可以为我们的新激光测距仪启动另外一个hokuyo_node。 参考文献 http://www.ros.org/wiki/ros ...
  • JSP应用开发详解

    2013-04-20 15:45:57
    1.2 动态网页技术 3 1.2.1 CGI 4 1.2.2 ASP 4 1.2.3 ASP.NET 5 1.2.4 PHP 6 1.2.5 Servlet 7 1.2.6 JSP 8 1.2.7 Python、Ruby等脚本语言 9 1.2.8 动态网页技术的比较 10 1.3 JSP技术原理 11 1.3.1 JSP的工作原理 11 ...

空空如也

空空如也

1 2
收藏数 31
精华内容 12
关键字:

动态监听topic