精华内容
下载资源
问答
  • kafka动态监听topic,配置轮询时间 使用spring-kafka依赖 使用topicPattern进行正则表达式匹配,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。(默认为五分钟一次) @Kafka...

    kafka动态监听topic,配置轮询时间

    使用spring-kafka依赖

    使用topicPattern进行正则表达式匹配,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。(默认为五分钟一次)

     @KafkaListener(topicPattern = "topicTest.*", containerFactory = "testKafkaListenerFactory")
        public void p1EquipmentTeamSyncEventListener(String command) {
            System.out.println("-------------------------------------监听消费:"+command);
        }
    

    showcase.*中.是必须的,否则匹配不到目标topic。

    配置轮询时间

          @Bean("testKafkaListenerFactory")
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            Map<String, Object> props = new HashMap<>(5);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    //        props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
            props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,30000);
            ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer()));
            return factory;
        }
    

    props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,30000); //单位为毫秒

    展开全文
  • 问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听topic。 方案一: 如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。 @...

    问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听的topic。

    方案一:

    如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        private static final String KAFKA_SERVERS_CONFIG = "192.168.77.202:9092";
        private static final String LOCAL_GROUP_ID = "cctest";
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
            // kv都用string来序列化
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @KafkaListener(topicPattern = "${topicPattern}")
        public void listen(String data) {
            System.out.println("message have been consumed:"+data);
        }
    }
    
    

    方案弊端:

    1. 变动的topic只能在限定范围内,如果新增topic不符合规则,无法消费。
    2. 因为每个topic只有一个partition,单线程消费性能低下,如果线上的数据量太大,消费一个大的topic时其他topic无法消费。

    方案二:

    用Spring管理的SingletonBean缓存kafkaconsumer配置,并设置定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁bean里已有消费者,创建新的消费者。

    管理消费者bean
    @Component
    @Data
    public class ResourceNotifyConsumer {
    
        private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);
    
    
        @Autowired
        // 消费数据的service,异步处理
        private MonitorPoolService monitorPoolService;
    
        private KafkaConsumer<String, String> consumer = null;
    
        public void closeConsumer() {
    		// 如果consumer.wakeup()停掉当前poll并抛出异常,在没阻塞的时候,会在下一次poll抛出异常,但下一次poll已经是新的consumer对象。
    		//	同时,consumer不支持多线程同时操作,所以这里把引用去掉,靠gc回收旧consumer。
            consumer = null;
        }
    
        public void onMessage() {
            while (consumer != null) {
                // 从kafka中取出100毫秒的数据
                List<Map<String, Object>> datas = new ArrayList<>();
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> recordData = new HashMap<>(8);
                    recordData.put(record.key(), record.value());
                    datas.add(recordData);
                }
                // 处理消息
                if (CollectionUtils.isNotEmpty(datas)) {
                    monitorPoolService.dealResource(datas);
                }
            }
        }
    }
    
    
    定时调用接口
    @GetMapping("/check")
    public ResponseMessage checkConfig() {
    	// 定时查询配置是否有变动,因场景这里只做增加检查,如果场景有修改,可以缓存旧数据与配置做对比。
        List<DeviceTopicConfig> deviceTopicConfigs = topicConfigDao.findAll();
        List<DeviceTopicConfig> topicWithoutListening = deviceTopicConfigs.stream().filter(t -> !t.getListening()).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(topicWithoutListening)) {
            resourceNotifyConsumer.closeConsumer();
    		// 新的监听topic集合
            List<String> topics = deviceTopicConfigs.stream().map(DeviceTopicConfig::getTopic).collect(Collectors.toList());
    		// 此方法类似上面配置新建监听新topic集合的consumer,并调用bean的onMessage方法
            resourceService.buildConsumer(topics);
            // 更新配置的状态
            topicWithoutListening.forEach(deviceTopicConfig -> deviceTopicConfig.setListening(true));
            topicConfigDao.saveAll(topicWithoutListening);
        }
        return ResponseMessage.success();
    

    方案弊端:

    1. 如果提交offset不合理,很可能因consumer配置的不同出现重复消费或者未消费情况。
    2. 消费数据采用线程池,如果监听的topic接收消息过多可能触发RejectedExecutionHandler。

    方案三:

    1. 独立kafka消费模块为一个单独的jar文件
    2. 另起一个系统,定时查询数据库,发现topic改变后就java调用linux命令杀掉kafka的jar进程

    方案弊端:

    1. 能在模块内解决的问题,尽量不变复杂。。

    总结:

    最后选择了方案二,因为尽管有topic的增加,但实际场景只可能增加一两个topic,两个问题的权衡:1.在增加的时候消息丢失或重复消费都可以接受,并且这个问题只需要多手动提交(比如在接口调用时和旧消费者置空之前)可以避免。2.评估后消息不会太多。

    展开全文
  • 现springboot整合的active目前,监听话题使用的是@JmsListener,但是目前想动态里面参数,一愁莫展,所以想问哈各位,有没有啥方法达到动态修改@JmsListener的参数,或则其他具体点的方法达到动态监听,要哭了!...
  • spring boot 动态添加监听kafka哪些Topic代码配置 代码 //设置kafka信息 @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){ ...

    spring boot 动态添加监听kafka哪些Topic

    代码

    //设置kafka信息
    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){
    	ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    	factory.setConsumerFactory(consumerFactory);
    	/*
    	设置默认不监听kafka设置的
    	(此功能是可以动态设置是否启动kafka监听,如果不需要可以
    	设置为true,甚至不用写这个方法;动态监听方法见下放)
    	*/
    	factory.setAutoStartup(false);
    	return factory;
    }
    
    public static final String KAFKA_LISTENER_ID = "KAFKA_ID";
    //监听配置文件中的receiveTopics的值使用,分割
    //会监听下方配置中的test、test1、test2;三个topic
    @KafkaListener(id = KAFKA_LISTENER_ID,topics = {"#{'${receiveTopics}'.split(',')}"},containerFactory = "ackContainerFactory")
    public void message(ConsumerRecord<String, String> record) {
    	//获取topic
    	String topic = record.topic();
    	//获取键值(有可能乱码)
    	String flieName = new String(record.key().getBytes("UTF-8"),"UTF-8");
    	//获取内容
    	String valueTopic = new String(record.value().getBytes("UTF-8"),"UTF-8");
    	…………处理操作
    }
    

    配置

    receiveTopics: test,test1,test2
    

    其他步骤

    1. kafka服务自带zookeeper下载与启动
    2. Spring boot配置kafka服务
    3. kafka生产者发送消息成功回调
    4. kafka根据ip端口获取消息队列上的topic
    5. kafka动态设置监听哪些topic
    6. 动态启动关闭kafka监听、设置默认不监听kafka
    7. kafka设置:1只接受消息、不发送消息;2只发送消息不接受消息;3既接受消息也发送消息;4既不接收消息也不发送消息
    8. kafka会把历史数据都获取下来
    9. Spring boot kafka执行多次多次消费
    展开全文
  • 公司需要做一个mysql同步到Pgsql的数据同步,采用Debezium结合kafka connect读取mysql的binlog同步到Kafka,然后需要消费kafka的消息,生成DDL和DML,插入到Pgsqlkafka相关问题topic动态;?? Debezium+kafka connect...

    场景介绍

    ??公司需要做一个mysql同步到Pgsql的数据同步,采用Debezium结合kafka connect读取mysql的binlog同步到Kafka,然后需要消费kafka的消息,生成DDL和DML,插入到Pgsql

    kafka相关问题

    topic动态;

    ?? Debezium+kafka connect 会动态生成topic,一个表一个topic,因为实际生产中,会存在表新建的情况,会动态添加一个新的topic需要消费,这也是这篇文章主要想要解决的问题

    顺序保证

    ?? 因为读取的是mysql的binlog,需要按照这个顺序去消费,所以每个topic只有一个partition,消费的时候我们需要考虑效率问题。

    其他问题和kafka无关,不做说明

    解决方案

    topic动态问题,

    在我的业务场景中,虽然topic是动态的,但是topic是有规则的,比如topic的规则都是 【服务名.数据库名称.表名】

    如:test1.dbname1.t_test

    刚刚开始想着直接使用@KafkaListener的topicPattern属性,配置上正则去解决,但是会导致另外一个问题,使用该方式或导致这个消费者匹配到的所有符合规则的topic,比如此处的正则可以配置为:【test1.dbname1.*】。然后因为每个topic只有一个partition,单线程消费性能低下,线上的数据量太大,消费一个大的topic时其他topic无法消费。

    (如果topic有多个分区,可以开启concurrency属性;concurrency为监听topic的分区数量之和)

    为了解决性能问题,将单次消费1条数据,改造成批量消费。

    ok,开启批量消费

    # 每次消费500条

    spring.kafka.consumer.max-poll-records=500

    # 监听器消费类型

    spring.kafka.listener.type=batch

    消费者代码修改为:

    /**

    * 注解方式创建消息监听器(使用正则表达式监听topic)

    * @param records 消息列表

    * @param ack

    */

    @KafkaListener(id = "dmlBatchConsumer",topicPattern = "${dmlTopics}")

    public void batchDMLConsumer(List> records, Acknowledgment ack){

    long startTime = System.currentTimeMillis();

    log.info("本次批量获取线程ID:{},消息长度:{}",Thread.currentThread().getId(),records.size());

    batchMsgHandler(records, ack);

    long endTime = System.currentTimeMillis();

    log.info("本次批量消费耗时{}毫秒,",endTime-startTime);

    }

    改完之后,测试发现性能确实提升了不少。预计100W数据 10分钟左右全部同步完成。

    但是这10分钟之内只能消费这个topic,其他topic依然无法消费,因为只有一个消费者线程。

    查阅了很多资料,也问了我认识的几个大佬,均没找到解决方案。

    @KafkaListener用正则匹配到的topic,似乎并不能每个topic开启一个消费者线程,特别是每个topic都是一个partition的情况下。

    那应该怎么解决这个消费的性能问题呢?

    消费的性能问题

    既然使用注解KafkaListener方式很难做到,那就手动创建消费者吧。

    首先我需要获取到topic列表,这里分为已经存在的topic和后面新增表生成的topic

    已经存在的topic

    topic列表我可以读取pgsql中已经同步完成的表,根据规则生成topic列表

    新增表生成的topic

    新增的表会先创建执行到监听DDL的一个消费者,DDL的topic是固定的,所以当我拿到了DDL新建表的消息时候,我可以把这个新的DDL保存在数据库中,然后启动一个定时器去监听这个新的topic。

    来吧。直接上代码:

    创建消息监听器方法

    /**

    * 手动创建批量消息监听器

    * @param topic

    * @return

    */

    public KafkaMessageListenerContainer manualListenerContainer(String topic) {

    ContainerProperties properties = new ContainerProperties(topic);

    properties.setGroupId(groupId);

    properties.setAckMode(ContainerProperties.AckMode.MANUAL);

    properties.setMessageListener((new BatchAcknowledgingMessageListener() {

    @Override

    public void onMessage(List> list, Acknowledgment acknowledgment) {

    long startTime = System.currentTimeMillis();

    log.info("本次批量获取线程ID:{},消息长度:{}",Thread.currentThread().getId(),list.size());

    batchMsgHandler(list, acknowledgment);

    log.info("本次批量消费耗时{}毫秒,",System.currentTimeMillis()-startTime);

    }

    }));

    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(config.consumerFactory(true), properties);

    return container;

    }

    获取已有消息监听器列表

    @Configuration

    @Slf4j

    public class KafkaTopicConfig implements InitializingBean {

    @Autowired

    private BasicDao basicDao;

    @Value("${listenerTopicTitle}")

    private String topicTitle;

    @Value("${listenerTopicDB}")

    private String schema;

    public List topics = new ArrayList<>();

    @Override

    public void afterPropertiesSet() throws Exception {

    //获取指定schema下的table表名

    List> result = basicDao.getDbTableList(schema);

    if(CollectionUtils.isEmpty(result)){

    return;

    }

    //生成topic列表

    for (Map map : result) {

    StringBuilder sb = new StringBuilder(topicTitle);

    String tableName = (String) map.get("tablename");

    String schemaName = (String) map.get("schemaname");

    String topic = sb.append(".").append(schemaName).append(".").append(tableName).toString();

    topics.add(topic);

    }

    }

    }

    根据topic列表启动消息监听器

    /**

    * 启动时默认加载已经存在的表对应的topic

    */

    @Bean

    public void startTopicListener(){

    List listeners = new ArrayList<>();

    for (String topic : topicConfig.topics) {

    KafkaMessageListenerContainer container =  manualListenerContainer(topic);

    listeners.add(container);

    }

    for (KafkaMessageListenerContainer listener : listeners) {

    try {

    listener.start();

    }catch (Exception e){

    log.error("监听器启动失败,异常:{}",e.getMessage(),e);

    }

    }

    }

    解析DDL的消费者得到新的topic,保存到数据库,然后定时器去监听这个topic我就不贴代码了,明白这个方案即可。

    结果测试,没有问题,多个消费者监听了多个topic,然后加上批量消费,消费速度也很快。

    最后

    使用正则表达式的时候,需要主要 * 前面必须有一个 . 。比如serverName.dbName.t_test.* 或者 serverName.dbName.*

    @KafkaListener的topicPattern属性默认需要为一个字符串常量,如果需要动态配置的话,可以 implements InitializingBean ,然后重写afterPropertiesSet方法,通过System.setProperty("dmlTopics", dmlTopic); 然后再监听器中就可以${dmlTopics}获取到。

    相关资料

    这次我查阅了很多资料,但是有个资料对我帮忙特别大,我贴出来方便大家也可以学习一下

    Spring-Kafka(六)—— @KafkaListener的花式操作

    在此感谢下原作者。

    我的这个方案可能只是适合我的这个场景,仅供大家参考。如果有更好的方案,也欢迎大佬指点一下。

    展开全文
  • 监听kafka消息

    2020-12-22 07:48:48
    /*** kafka消费者抽象.../*** 消费的Topic与消费线程数组成的Map*/ private MaptopicThreadMap;/*** Consumer实例所需的配置*/ privateProperties properties;/*** 线程池*/ privateThreadPoolExecutor taskExecutor;...
  • 文章目录spring kafka 动态创建 topic 监听问题分析需要解决的问题解决方案Consumer topickafak 通用配置 设置timer 触发实现ConsumerMessageHelper 业务处理运行效果输出 spring kafka 动态创建 topic 监听 问题...
  • //动态感知partition的变化 10ms动态获取topic的元数据,新增的partition会自动从最早的位点开始消费数据 properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");...
  • 我的天啊,经过几天奋战,终于解决这个大坑,我一定要写个博客纪念一下。...其次写一个 KafkaTopicConfig类,继承InitializingBean,重写afterPropertiesSet()方法。 我的如下: @Configuration public...
  • 通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。 二、实现 1、新建一个demo数据库并添加几条数据来进行测试 站点设备信息...
  • kafka监控topic消费

    千次阅读 2018-01-11 17:52:33
    一、 KafkaOffsetMonitor监控 安装 1.wget  ... 2.启动 java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ ... --zk  --port 8088 \ ... 查看topic如下      
  • ActiveMQ topic 注解监听

    2017-11-06 15:45:00
    队列的实现方式比较简单,后面在研究使用注解实现topic监听的时候,,发现mq发送的消息监听类无法接收到,于是百度了一下也没有标准答案,结合了一下源码还有百度,发现需要先配置topic主题jsm监听容器工厂,原因是...
  • SpringBoot2.0_MQTT消息订阅之动态Topic

    万次阅读 多人点赞 2019-06-05 16:06:09
    之前总结过两篇关于MQTT消息发布及消息订阅的文章,很多朋友都觉得不错,不过也有些小疑问,特别是关于mqtt消息订阅的,如何在项目应用中,做到动态添加Topic监听消息?很多朋友对此都有疑问,却又不知如何下手,...
  • springboot整合mq同时监听queue和topic

    千次阅读 2018-11-09 14:09:31
    前言:springboot和mq整合的时候,默认情况下,要么只能监听queue要么只能监听topic,而不能二者兼得。 在application.properties文件中通过如下配置项,切换监听消息的类型。 1 2 #为true时是...
  • Android基于MQTT实现发布消息与监听topic接收消息 MQTT 是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。 MQTT 可以被解释为一种低开销,低带宽占用的即时通讯协议,可以用极少的代码和带宽的为连接远程...
  • } } (5)定义监听者,并且同时监听两个频道(主题): @Component public class KafkaListenerService { @KafkaListener(topics = {"jojo_test","test_jojo"}) public void listen(ConsumerRecord,?> consumer){ ...
  • } } public String send(String topic,String tags,String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new...
  • 我就废话不多说了,直接 上代码吧! import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata;...
  • 1.rabbitmq消息监听,兼容多种模式的消息,fanout/topic等模式 MQ消息配置监听: package com.test.ddyin.conf; import java.util.HashMap; import java.util.List; import java.uti...
  • 在spring使用activeMQ订阅多个topic并实现监听监听2017年07月30日 21:16:00阅读数:3068最近有个需求,在spring上集成activeMQ且订阅多个topic,并且需要实现监听监听多个topic。一、maven依赖配置pom.xml&...
  • 而对于监听器来说它要知道去哪监听监听哪个消息目的地,所以我们配置向监听既要知道 ConnectionFactory(从哪里监听) 和 destination(监听什么),监听到消息后怎么处理,这三个核心的东西。 <!--这个是主题...
  •  //设置监听端口//published监听所发布的消息 mqttServer.on('published',function(packet,client){ console.log('published----',packet);switch(packet.topic){ //topic字段是消息主题,可以理解为接口中的方法...
  • 1.获取所有topic package com.example.demo; import java.io.IOException; import java.util.List; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import or....
  • 如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和) 运行程序,console打印的效果如下: ...
  • kafka动态添加topic动态添加消费者 依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 写个Kafka...
  • 简单示例1. 配置文件自定义配置2. 利用Spring的SpEl表达式,将...kafka.consumer.topics=topic1,topic2 2. 利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${kafka.consumer.topics}’.spli
  • kafka动态配置topic

    万次阅读 热门讨论 2019-01-11 14:18:43
     之前使用@org.springframework.kafka.annotation.KafkaListener...}),这样去单独监听某一个topic,生产者也固定在代码里定义变量读取配置文件。昨天改了个需求,希望以后通过配置文件去动态配置生产者和消费者的...
  • String topic = record.topic(); logger.info("====topic=="+topic+"========message =" + message); rmbKafkaClientService.loadData2Es(topic,message.toString()); logger.info("----------send-to-es-success--...
  • 有一个Topic:hw_data 有3个分区 3个副本 组:hw-data-group 将这个主题的消息分发给两个(或者多个)消费者消费,(不能消费相同的消息) 1.图解 2.关键注解@kafkaListener @Target({ ElementType.TYPE, ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 47,930
精华内容 19,172
关键字:

动态监听topic