kafka 订阅
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。 展开全文
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
信息
开发商
Apache软件基金会
软件授权
Apache License 2.0
软件名称
Apache Kafka
更新时间
2020-04-08
软件版本
2.5.0
软件平台
跨平台
软件语言
Scala , Java
软件大小
15M
Kafka名字的由来
kafka的架构师jay kreps对于kafka的名称由来是这样讲的,由于jay kreps非常喜欢franz kafka,并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。
收起全文
精华内容
下载资源
问答
  • kafka

    万次阅读 2020-01-29 20:46:13
    Kafka是什么 Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。 Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是...

    Kafka是什么

    在这里插入图片描述
    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
    Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
    Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
    Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。
    无论是kafka集群,还是producer和consumer都依赖于
    zookeeper**集群保存一些meta信息,来保证系统可用性

    消息队列内部的实现原理

    为什么需要消息队列

    消息系统的核心作用就是三点:解耦,异步和并行
    以用户注册的案列来说明消息系统的作用

    用户注册的一般流程

    在这里插入图片描述

    问题:随着后端流程越来越多,每步流程都需要额外的耗费很多时间,从而会导致用户更长的等待延迟。

    用户注册的并行执行

    在这里插入图片描述

    问题**:系统并行的发起了4个请求,4个请求中,如果某一个环节执行1分钟,其他环节再快,用户也需要等待1分钟。如果其中一个环节异常之后,整个服务挂掉了。在这里插入图片描述

    用户注册的最终一致

    在这里插入图片描述

    Kafka架构,分布式模型

    Topic :消息根据Topic进行归类
    Producer:发送消息者,生产者
    Consumer:消息接受者,消费者
    broker:每个kafka实例(server)
    Zookeeper:依赖集群保存meta信息。

    在这里插入图片描述

    Kafka的环境搭建

    基础环境准备

    安装前的准备工作(zk已经部署完毕)

    l 关闭防火墙

    chkconfig iptables off && setenforce 0

    kafka单机版安装采用自带的zookeeper处理

    1.校验一下java是否安装

    ​ [root@localhost Desktop]# java -version
    ​ java version “1.8.0_171”
    ​ Java™ SE Runtime Environment (build 1.8.0_171-b11)
    ​ Java HotSpot™ 64-Bit Server VM (build 25.171-b11, mixed mode)

    如果没有安装,请先安装java环境 参考JDK安装方法 〜/ .bashrc文件(环境变量配置文件方式二)

    2.上传kafka文件到虚拟机中kafka_2.12-2.2.0

    3.解压安装kafka到/usr/local中
    [root@localhost Desktop]# tar -zxvf kafka_2.11-1.0.0.tgz -C /opt/
    3.1 重命名kafka的文件 :mv kafka_2.11-1.0.0 /opt/kafka

    4.切换到kafka的配置文件目录
    [root@localhost config]# pwd

           /opt/kafka/config
    

    5.kafka安装目录下的config文件夹为其配置文件,我们需要修改的有 server.properties和zookeeper.properties。

    ​ [root@localhost kafka]# mkdir kafka-logs-0

    ​ server.properties: kafka的配置文件
    ​ log.dirs=/tmp/kafka-logs
    ​ 修改为
    ​ log.dirs=/opt/kafka1/kafka-logs-0

    ​ zookeeper.properties kafka自带的zookeeper的配置
    ​ dataDir=/tmp/zookeeper
    ​ 修改为
    ​ dataDir=/opt/kafka1/my_zookeeper

    6.启动zookeeper
    [root@localhost ~]# /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

    7.启动kafka
    [root@localhost Desktop]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

    8.创建主题 (让我们创建一个名为“test”的主题,它只包含一个分区,只有一个副本)
    /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 1704D

                备注: 或者,您可以将代理配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。
    

    9.查看主题: 如果我们运行list topic命令,我们现在可以看到该主题
    /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

    10.发送一些消息
    Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。运行生产者,然后在控制台中键入一些消息以发送到服务器。

    ​ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1704D

    ​ >等待输入发送的消息

    11.启动消费者
    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D --from-beginning 从第一条开始接受
    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D从现在生产者发送开始接受。]

    展开全文
  • SpringBoot集成kafka全面实战

    万次阅读 多人点赞 2020-03-28 14:13:29
    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》、《秒懂kafka HA(高可用)》两篇文章。 一、生产者实践 普通生产者 带回调的生产者 ...

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》《秒懂kafka HA(高可用)》两篇文章。

    一、生产者实践

    • 普通生产者

    • 带回调的生产者

    • 自定义分区器

    • kafka事务提交

    二、消费者实践

    • 简单消费

    • 指定topic、partition、offset消费

    • 批量消费

    • 监听异常处理器

    • 消息过滤器

    • 消息转发

    • 定时启动/停止监听器

    一、前戏

    1、在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP),

    advertised.listeners=PLAINTEXT://112.126.74.249:9092

    2、在开始前我们先创建两个topic:topic1、topic2,其分区和副本数都设置为2,用来测试,

    [root@iZ2zegzlkedbo3e64vkbefZ ~]#  cd /usr/local/kafka-cluster/kafka1/bin/
    [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
    Created topic topic1.
    [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
    Created topic topic2.

    当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send("topic1", normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下,

    @Configuration
    public class KafkaInitialConfiguration {
        // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
        @Bean
        public NewTopic initialTopic() {
            return new NewTopic("testtopic",8, (short) 2 );
        }
    ​
         // 如果要修改分区数,只需修改配置值重启项目即可
        // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
        @Bean
        public NewTopic updateTopic() {
            return new NewTopic("testtopic",10, (short) 2 );
        }
    }

    3、新建SpringBoot项目

    ① 引入pom依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    ② application.propertise配置(本文用到的配置项这里全列了出来)

    ###########【Kafka集群】###########
    spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
    ###########【初始化生产者配置】###########
    # 重试次数
    spring.kafka.producer.retries=0
    # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
    spring.kafka.producer.acks=1
    # 批量大小
    spring.kafka.producer.batch-size=16384
    # 提交延时
    spring.kafka.producer.properties.linger.ms=0
    # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
    # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
    ​
    # 生产端缓冲区大小
    spring.kafka.producer.buffer-memory = 33554432
    # Kafka提供的序列化和反序列化类
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 自定义分区器
    # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
    ​
    ###########【初始化消费者配置】###########
    # 默认的消费组ID
    spring.kafka.consumer.properties.group.id=defaultConsumerGroup
    # 是否自动提交offset
    spring.kafka.consumer.enable-auto-commit=true
    # 提交offset延时(接收到消息后多久提交offset)
    spring.kafka.consumer.auto.commit.interval.ms=1000
    # 当kafka中没有初始offset或offset超出范围时将自动重置offset
    # earliest:重置为分区中最小的offset;
    # latest:重置为分区中最新的offset(消费分区中新产生的数据);
    # none:只要有一个分区不存在已提交的offset,就抛出异常;
    spring.kafka.consumer.auto-offset-reset=latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    spring.kafka.consumer.properties.session.timeout.ms=120000
    # 消费请求超时时间
    spring.kafka.consumer.properties.request.timeout.ms=180000
    # Kafka提供的序列化和反序列化类
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    # 消费端监听的topic不存在时,项目启动会报错(关掉)
    spring.kafka.listener.missing-topics-fatal=false
    # 设置批量消费
    # spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    # spring.kafka.consumer.max-poll-records=50

    二、Hello Kafka

    1、简单生产者

    @RestController
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    ​
        // 发送消息
        @GetMapping("/kafka/normal/{message}")
        public void sendMessage1(@PathVariable("message") String normalMessage) {
            kafkaTemplate.send("topic1", normalMessage);
        }
    }

     2、简单消费

    @Component
    public class KafkaConsumer {
        // 消费监听
        @KafkaListener(topics = {"topic1"})
        public void onMessage1(ConsumerRecord<?, ?> record){
            // 消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    }

    上面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息,

    可以看到监听器消费成功,

    三、生产者

    1、带回调的生产者

    kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

    @GetMapping("/kafka/callbackOne/{message}")
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }
    @GetMapping("/kafka/callbackTwo/{message}")
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }
    
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }

    2、自定义分区器

    我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

    ① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

    ② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

    ③  patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

    ※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

    public class CustomizePartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 自定义分区规则(这里假设全部发到0号分区)
            // ......
            return 0;
        }
    ​
        @Override
        public void close() {
    ​
        }
    ​
        @Override
        public void configure(Map<String, ?> configs) {
    ​
        }
    }

    在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名,

    # 自定义分区器
    spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

    3、kafka事务提交

    如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,

    @GetMapping("/kafka/transaction")
    public void sendMessage7(){
        // 声明事务:后面报错消息不会发出去
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });
    ​
        // 不声明事务:后面报错但前面消息已经发送成功了
       kafkaTemplate.send("topic1","test executeInTransaction");
       throw new RuntimeException("fail");
    }

    四、消费者

    1、指定topic、partition、offset消费

    前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供,

    /**
     * @Title 指定topic、partition、offset消费
     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
     * @Author long.yuan
     * @Date 2020/3/22 13:38
     * @Param [record]
     * @return void
     **/
    @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0" }),
            @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
    })
    public void onMessage2(ConsumerRecord<?, ?> record) {
        System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
    }

    属性解释:

    ① id:消费者ID;

    ② groupId:消费组ID;

    ③ topics:监听的topic,可监听多个;

    ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

    上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

    注意:topics和topicPartitions不能同时使用;

    2、批量消费

    设置application.prpertise开启批量消费即可,

    # 设置批量消费
    spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    spring.kafka.consumer.max-poll-records=50

    接收消息时用List来接收,监听代码如下,

    @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
        System.out.println(">>>批量消费一次,records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println(record.value());
        }
    }

    3、ConsumerAwareListenerErrorHandler 异常处理器

    通过异常处理器,我们可以处理consumer在消费时发生的异常。

    新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

    // 新建一个异常处理器,用@Bean注入
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("消费异常:"+message.getPayload());
            return null;
        };
    }
    ​
    // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
    @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
    public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
        throw new Exception("简单消费-模拟异常");
    }
    ​
    // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
    @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
        System.out.println("批量消费一次...");
        throw new Exception("批量消费-模拟异常");
    }

    执行看一下效果,

    4、消息过滤器

    消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

    配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

    @Component
    public class KafkaConsumer {
        @Autowired
        ConsumerFactory consumerFactory;
    ​
        // 消息过滤器
        @Bean
        public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            // 被过滤的消息将被丢弃
            factory.setAckDiscarded(true);
            // 消息过滤策略
            factory.setRecordFilterStrategy(consumerRecord -> {
                if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                    return false;
                }
                //返回true消息则被过滤
                return true;
            });
            return factory;
        }
    ​
        // 消息过滤监听
        @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
        public void onMessage6(ConsumerRecord<?, ?> record) {
            System.out.println(record.value());
        }
    }

    上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数,

    5、消息转发

    在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

    在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下,

    /**
     * @Title 消息转发
     * @Description 从topic1接收到的消息经过处理后转发到topic2
     * @Author long.yuan
     * @Date 2020/3/23 22:15
     * @Param [record]
     * @return void
     **/
    @KafkaListener(topics = {"topic1"})
    @SendTo("topic2")
    public String onMessage7(ConsumerRecord<?, ?> record) {
        return record.value()+"-forward message";
    }

    6、定时启动、停止监听器

    默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

    ① 禁止监听器自启动;

    ② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

    新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动,

    @EnableScheduling
    @Component
    public class CronTimer {
    ​
        /**
         * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
         * 而是会被注册在KafkaListenerEndpointRegistry中,
         * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
         **/
        @Autowired
        private KafkaListenerEndpointRegistry registry;
    ​
        @Autowired
        private ConsumerFactory consumerFactory;
    ​
        // 监听器容器工厂(设置禁止KafkaListener自启动)
        @Bean
        public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
            ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
            container.setConsumerFactory(consumerFactory);
            //禁止KafkaListener自启动
            container.setAutoStartup(false);
            return container;
        }
    ​
        // 监听器
        @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
        public void onMessage1(ConsumerRecord<?, ?> record){
            System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    ​
        // 定时启动监听器
        @Scheduled(cron = "0 42 11 * * ? ")
        public void startListener() {
            System.out.println("启动监听器...");
            // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
            if (!registry.getListenerContainer("timingConsumer").isRunning()) {
                registry.getListenerContainer("timingConsumer").start();
            }
            //registry.getListenerContainer("timingConsumer").resume();
        }
    ​
        // 定时停止监听器
        @Scheduled(cron = "0 45 11 * * ? ")
        public void shutDownListener() {
            System.out.println("关闭监听器...");
            registry.getListenerContainer("timingConsumer").pause();
        }
    }

    启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

    11:42分监听器启动开始工作,消费消息,

    11:45分监听器停止工作,

     

    感兴趣的可以关注一下博主的公众号,1W+技术人的选择,致力于原创技术干货,包含Redis、RabbitMQ、Kafka、SpringBoot、SpringCloud、ELK等热门技术的学习&资料。

    展开全文
  • springboot整合kafka

    万次阅读 热门讨论 2019-03-19 21:44:41
    业务中需要使用到异步消息队列,为了快速搭建一个消息中间件,这里选了kafka,一方面是kafka搭建比较简单,而且这个中间件比较成熟,而且对于大数据量的消息支持很好,下面首先说说kafka的安装,我这里演示一下在...

    业务中需要使用到异步消息队列,为了快速搭建一个消息中间件,这里选了kafka,一方面是kafka搭建比较简单,而且这个中间件比较成熟,而且对于大数据量的消息支持很好,下面首先说说kafka的安装,我这里演示一下在linux上如何搭建kafka,

    1、首先安装zookeeper,kafka的启动需要依赖zookeeper,zookeeper安装比较简单,基本上就是上传压缩包,解压,然后启动即可,

    在这里插入图片描述
    然后使用这个命令看看zookeeper是否启动成功:
    在这里插入图片描述

    2、安装kafka,kafka的安装相对于其他中间件的安装是非常简单的,但这里有个版本问题要注意,如果使用的是2.X之前的版本,就算你的配置和我这里说的全部一样可能还是会报出一些奇怪的错误,我之前遇到过,低版本的安装起来需要依赖scala,大家可网上查阅,我这里使用的是2.11的版本,
    在这里插入图片描述

    3、解压完毕,进入config文件,对配置文件稍做修改,主要的修改地方我做了标注,

    展开全文
  • SpringBoot集成kafka配置使用(一)

    万次阅读 2021-05-07 15:39:57
    4、消息的消费者(KafkaReceiver) 5、yml配置文件 项目中原版使用的是rocketmq,由于客户那边用的是kafka。先把中间件更换长kafka,步入正题: 1、pom文件的引入 <!-- kafka --> <dependency> ...

    目录

    1、pom文件的引入

    2、kafkaConfig

    3、消息的生产者(KafkaSender)

    4、消息的消费者(KafkaReceiver)

    5、yml配置文件

    6、使用(so easy)


    项目中原版使用的是rocketmq,由于客户那边用的是kafka。先把中间件更换长kafka,步入正题:

    1、pom文件的引入

            <!--    kafka    -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>

    2、kafkaConfig

    package com.wlsj.gxdc.config;
    
    import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
    import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
    import org.springframework.util.backoff.FixedBackOff;
    
    @Configuration
    public class KafkaConfig {
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<Object, Object> template) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2))); // dead-letter after 3 tries
            return factory;
        }
    
    
    }
    

    3、消息的生产者(KafkaSender)

    package com.wlsj.gxdc.kafka;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class KafkaSender {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        //异步发送消息方法
        public void sendAsynchronize(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    
        //同步发送消息方法
        public void sendSynchronize(String topic, String message) throws Exception {
            kafkaTemplate.send(topic, message).get();
    
        }
    
    }
    

    4、消息的消费者(KafkaReceiver)

    package com.wlsj.gxdc.kafka;
    
    import com.alibaba.fastjson.JSONObject;
    import com.wlsj.gxdc.entity.Bicycle;
    import com.wlsj.gxdc.entity.BicyclePosition;
    import com.wlsj.gxdc.service.IBicyclePositionService;
    import com.wlsj.gxdc.service.IBicycleService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Slf4j
    @Component
    public class KafkaReceiver {
        @Autowired
        private IBicyclePositionService bicyclePositionService;
        @Autowired
        private IBicycleService bicycleService;
    
        @KafkaListener(topics = {"${kafka.topic.bicycle_topic}"})
        public void listen1(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                JSONObject jsonObject = JSONObject.parseObject(message.toString());
                if (null != jsonObject) {
                    Bicycle bicycle = JSONObject.toJavaObject(jsonObject, Bicycle.class);
                    try {
                        bicycleService.save(bicycle);
                    } catch (Exception e) {
                        log.info(e.getMessage());
                    }
                }
                log.info("kafka车辆备案消息:{}", message);
                log.info("----------------- record =" + record);
            }
    
        }
    
        @KafkaListener(topics = {"${kafka.topic.bicycle_position_topic}"})
        public void listen2(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                JSONObject jsonObject = JSONObject.parseObject(message.toString());
                if (null != jsonObject) {
                    BicyclePosition bicyclePosition = JSONObject.toJavaObject(jsonObject, BicyclePosition.class);
                    log.info("=======================bicyclePosition =" + bicyclePosition);
                    try {
                        bicyclePositionService.save(bicyclePosition);
                        bicyclePositionService.updatePositionByBicycleId(bicyclePosition);
                    } catch (Exception e) {
                        log.info(e.getMessage());
                    }
                }
                log.info("kafka车辆位置消息:{}", message);
            }
    
        }
    }
    

    5、yml配置文件

    server:
      undertow:
        io-threads: 6
        worker-threads: 48
        buffer-size: 1024
        buffers-per-region: 1024
        direct-buffers: true
      port: ${PORT:8080}
      #  tomcat:
      #    max-swallow-size: -1
      servlet:
        context-path: /${spring.application.name}
        compression:
          enabled: true
          mime-types: application/javascript,application/json,application/xml,text/html,text/xml,text/plain,text/css,image/*
    
    management:
      endpoints:
        web:
          exposure:
            include: metrics,httptrace
    spring:
      servlet:
        multipart:
          max-file-size: 10MB
          max-request-size: 10MB
      kafka:
        bootstrap-servers: 172.16.251.70:6667,172.16.251.77:6667,172.16.251.46:6667,172.16.251.66:6667, 172.16.251.87:6667, 172.16.251.85:6667, 172.16.251.73:6667, 172.16.251.61:6667
        producer:
          # 发生错误后,消息重发的次数。
          retries: 0
          #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
          batch-size: 16384
          # 设置生产者内存缓冲区的大小。
          buffer-memory: 33554432
          # 键的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: 1
          properties:
            sasl.mechanism: PLAIN
            security.protocol: SASL_PLAINTEXT
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="share_bikes" password="9cfb8e5b";
        consumer:
          # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
          auto-commit-interval: 1S
          # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
          # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
          # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
          auto-offset-reset: earliest
          # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
          enable-auto-commit: false
          group-id: ${spring.profiles.active}-group
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          properties:
            sasl.mechanism: PLAIN
            security.protocol: SASL_PLAINTEXT
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="share_bikes" password="9cfb8e5b";
        listener:
          # 在侦听器容器中运行的线程数。
          concurrency: 5
          #listner负责ack,每调用一次,就立即commit
          ack-mode: manual_immediate
          missing-topics-fatal: false
    kafka:
      topic:
        bicycle_topic: bikes_record
        bicycle_position_topic: share_bikes
    

    6、使用(so easy)

    @Autowired
    KafkaSender kafkaSender;
    kafkaSender.sendAsynchronize(kafkaTopicPo.getBicyclePositionTopic(), JSON.toJSONString(bicyclePosition));
    展开全文
  • Kafka常见面试题

    万次阅读 多人点赞 2019-05-19 19:58:39
    1 什么是kafka Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。 2 为什么要...
  • Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,...
  • kafkatool 连接kafka工具

    2019-01-24 10:51:49
    kafka连接工具
  • kafka自带了很多工具类,在源码kafka.tools里可以看到: 源码包下载地址:http://archive.apache.org/dist/kafka/ 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的...
  • 解释Kafka的Zookeeper是什么?我们可以在没有Zookeeper的情况下使用Kafka吗? Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。 不,不可能越过Zookeeper,直接联系Kafka broker。一旦...
  • Kafka教程,Kafka面试题,Kafka详解

    千次阅读 2020-09-24 11:33:04
    kafka教程,Kafka面试题,Springboot Kafkakafka最初是由Linkedin公司基于Scala和 Java语言开发的分布式消息发布-订阅系统,现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个消息队列系统存在,而事实上...
  • Kafka为什么需要复制? Kafka的信息复制确保了任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
  • org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.&amp;amp;lt;init&amp;amp;gt;(KafkaConsumer.java:79...
  • 请说明什么是Apache Kafka? Apache Kafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。
  • Kafka的设计时什么样的呢? Kafka将消息以topic为单位进行归纳 将向Kafka topic发布消息的程序成为producers. 将预订topics并消费消息的程序成为consumer. Kafka以集群的方式运行,可以由一个或多个服务组成,每个...
  • kafka template操作kafka

    千次阅读 2021-02-08 15:53:54
    依赖 <...org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka<
  • Kafka日志配置

    万次阅读 2018-04-24 13:51:41
    在server.properties中配置了log.dirs值,表示kafka数据的存放目录,而非Kafka的日志目录。 [root@node1 kafka_2.11-1.0.1]# vi config/server.properties log.dirs=/data/kafka Kafka运行时日志默认输出到$...
  • 说说Kafka的使用场景? ①异步处理 ②应用解耦 ③流量削峰 ④日志处理 ⑤消息通讯等。
  • 解释Kafka的用户如何消费信息? 在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节从套接口转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。
  • Flink提供的Kafka连接器,用于向Kafka主题读取或写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖kafka的消费者群体偏移跟踪,而是在内部跟踪和...
  • 分布式面试之Kafka面试题

    万次阅读 2020-10-11 12:41:38
    文章目录1、Kafka的设计是什么样的?2、数据传输的事物定义有哪三种?3、Kafka判断一个节点是否还活着有那两个条件?4、producer是否直接将数据发送到broker的leader(主节点)?5、Kafa consumer是否可以消费指定...
  • Kafka高效文件存储设计特点: (1).Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。 (2).通过索引信息可以快速定位message和确定response...
  • kafka消息数据是否有序? 消费者组里某具体分区是有序的,所以要保证有序只能建一个分区,但是实际这样会存在性能问题,具体业务具体分析后确认。
  • linux安装kafka

    万次阅读 多人点赞 2019-06-14 14:20:37
    今天来安装kafka 安装kafka前台必须安装zookeeper 不会安装请移步:点我快速进入安装zookeeper文章 一、安装kafka 下载kafka两种方式 1、手动下载 下载地址:http://kafka.apache.org/downloads 下载...
  • Kafka 与传统消息系统之间有三个关键区别 (1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留 (2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性 ...
  • 使用Kafka有什么优点和缺点? 优点: ①支持跨数据中心的消息复制; ②单机吞吐量:十万级,最大的优点,就是吞吐量高; ③topic数量都吞吐量的影响:topic从几十个到几百个的时候,吞吐量会大幅度下降。所以在同等...
  • 滴滴开源了其Kafka 监控与管控平台 Logi-KafkaManager,因为有30+个集群的维护经验,使用过kafka-manager,kafka-eagle,kafka-mirrorkaker工具,所以很期待能有1个工具能够整合kafka所有工具优点于一身,这样对于...
  • Kafka笔记

    万次阅读 2020-09-03 10:02:28
    【1】Kafka概念介绍 【2】安装kafka 【3】kafka配置 【4】kafkatool 可视化客户端工具 【5】集群部署 【6】伪集群部署 【7】kafka manager 【8】spring kafka 配置 【1】Kafka概念介绍 kafka学习笔记:知识点整理 ...
  • apache kafka技术分享系列(目录索引)

    万次阅读 多人点赞 2018-02-08 11:30:11
    apache Kafka中国社区中国社区QQ群1:162272557未满 收费5¥,保证QQ运营,腾讯QQ VIP收年费,2000人群非常活跃,质量很高中国社区QQ群2:414762562未满 1000人群中国社区QQ群3:191278841未满 1000人群中国社区QQ群...
  • 我启动kafka的方式是: ./kafka-server-start.sh start …/config/server.properties & 当出现下面这个错误时: 可以换下启动方式: 从kafka的根目录启动 bin/kafka-server-start.sh config/server.properties 就...
  • springboot+kafka中@KafkaListener如何动态指定多个topic

    万次阅读 热门讨论 2019-03-27 10:38:15
    springboot+kafka中@KafkaListener如何动态指定多个topic 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的独特消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 275,577
精华内容 110,230
关键字:

kafka