kafka 订阅
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。 展开全文
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
信息
开发商
Apache软件基金会
软件授权
Apache License 2.0
软件名称
Apache Kafka
更新时间
2020-04-08
软件版本
2.5.0
软件平台
跨平台
软件语言
Scala , Java
软件大小
15M
Kafka名字的由来
kafka的架构师jay kreps对于kafka的名称由来是这样讲的,由于jay kreps非常喜欢franz kafka,并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。
收起全文
精华内容
下载资源
问答
  • kafka
    万次阅读
    2020-01-29 20:46:13

    Kafka是什么

    在这里插入图片描述
    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
    Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
    Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
    Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。
    无论是kafka集群,还是producer和consumer都依赖于
    zookeeper**集群保存一些meta信息,来保证系统可用性

    消息队列内部的实现原理

    为什么需要消息队列

    消息系统的核心作用就是三点:解耦,异步和并行
    以用户注册的案列来说明消息系统的作用

    用户注册的一般流程

    在这里插入图片描述

    问题:随着后端流程越来越多,每步流程都需要额外的耗费很多时间,从而会导致用户更长的等待延迟。

    用户注册的并行执行

    在这里插入图片描述

    问题**:系统并行的发起了4个请求,4个请求中,如果某一个环节执行1分钟,其他环节再快,用户也需要等待1分钟。如果其中一个环节异常之后,整个服务挂掉了。在这里插入图片描述

    用户注册的最终一致

    在这里插入图片描述

    Kafka架构,分布式模型

    Topic :消息根据Topic进行归类
    Producer:发送消息者,生产者
    Consumer:消息接受者,消费者
    broker:每个kafka实例(server)
    Zookeeper:依赖集群保存meta信息。

    在这里插入图片描述

    Kafka的环境搭建

    基础环境准备

    安装前的准备工作(zk已经部署完毕)

    l 关闭防火墙

    chkconfig iptables off && setenforce 0

    kafka单机版安装采用自带的zookeeper处理

    1.校验一下java是否安装

    ​ [root@localhost Desktop]# java -version
    ​ java version “1.8.0_171”
    ​ Java™ SE Runtime Environment (build 1.8.0_171-b11)
    ​ Java HotSpot™ 64-Bit Server VM (build 25.171-b11, mixed mode)

    如果没有安装,请先安装java环境 参考JDK安装方法 〜/ .bashrc文件(环境变量配置文件方式二)

    2.上传kafka文件到虚拟机中kafka_2.12-2.2.0

    3.解压安装kafka到/usr/local中
    [root@localhost Desktop]# tar -zxvf kafka_2.11-1.0.0.tgz -C /opt/
    3.1 重命名kafka的文件 :mv kafka_2.11-1.0.0 /opt/kafka

    4.切换到kafka的配置文件目录
    [root@localhost config]# pwd

           /opt/kafka/config
    

    5.kafka安装目录下的config文件夹为其配置文件,我们需要修改的有 server.properties和zookeeper.properties。

    ​ [root@localhost kafka]# mkdir kafka-logs-0

    ​ server.properties: kafka的配置文件
    ​ log.dirs=/tmp/kafka-logs
    ​ 修改为
    ​ log.dirs=/opt/kafka1/kafka-logs-0

    ​ zookeeper.properties kafka自带的zookeeper的配置
    ​ dataDir=/tmp/zookeeper
    ​ 修改为
    ​ dataDir=/opt/kafka1/my_zookeeper

    6.启动zookeeper
    [root@localhost ~]# /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

    7.启动kafka
    [root@localhost Desktop]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

    8.创建主题 (让我们创建一个名为“test”的主题,它只包含一个分区,只有一个副本)
    /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 1704D

                备注: 或者,您可以将代理配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。
    

    9.查看主题: 如果我们运行list topic命令,我们现在可以看到该主题
    /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

    10.发送一些消息
    Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。运行生产者,然后在控制台中键入一些消息以发送到服务器。

    ​ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1704D

    ​ >等待输入发送的消息

    11.启动消费者
    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D --from-beginning 从第一条开始接受
    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.133:9092 --topic 1704D从现在生产者发送开始接受。]

    更多相关内容
  • kafka全套视频教程 某客学院

    热门讨论 2018-03-22 19:08:38
    视频教程从入门到源码分析,内容全面,课程质量很高!
  • kafka_2.9.2-0.8.2.1.tgz

    热门讨论 2015-12-22 16:20:01
    kafka_2.9.2-0.8.2.1.tgz 安装 liunx环境 安装jdk vi /etc/profile在末尾追加 JAVA_HOME JRE_HOME PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:...
  • Kafka Eagle是一个用于监控和管理kafka的开源组件,可以同时监控多个kafka集群。 Kafka Eagle提供了完善的监控页面和kafka常用操作的管理界面,便于管理员对kafka集群进行运维管理。 Kafka Eagle提供了KSQL操作的...

    一、背景

    Kafka产线环境需要管理的Topic和Consumser越来越多,使用命令行工具进行管理会非常繁杂。因此,大数据平台上需要一套Kafka的管理监控系统,Kafka-Eagle。

    Kafka Eagle是一个用于监控和管理kafka的开源组件,可以同时监控多个kafka集群。
    Kafka Eagle提供了完善的监控页面和kafka常用操作的管理界面,便于管理员对kafka集群进行运维管理。

    Kafka Eagle提供了KSQL操作的可视化界面,让你可以非常快速的查看kafka中的消息。

    Kafka Eagle支持多种报警方式,如钉钉,微信和邮件等。

    二、安装

    1. 安装MySQL,建库、建用户,授权

    # yum -y install https://repo.mysql.com//mysql57-community-release-el7-11.noarch.rpm


    # sed   -i  's/gpgcheck=1/gpgcheck=0/g'   /etc/yum.repos.d/mysql-community.repo

    # yum clean all 

    # yum -y install mysql-community-server  mysql-community-client

    # cat /etc/my.cnf

    ####################################

    [mysqld]
    pid-file  = /var/run/mysqld/mysqld.pid
    socket  = /var/lib/mysql/mysql.sock
    log-error  = /var/log/mysqld.log
    datadir  = /var/lib/mysql
    symbolic-links  = 0
    max_connections = 1000
    skip_name_resolve
    character-set-client-handshake  = FALSE
    character-set-server =  utf8
    collation-server = utf8_general_ci
    init_connect =  "SET NAMES 'utf8'"

    [mysql]
    default-character-set  = utf8

    [client]
    default-character-set  = utf8

    ####################################

    初始化MySQL,可以通过执行 mysqld --initialize ,使用 --initialize 选项,以“安全模式”初始化,会在日志中生成一个随机的root初始密码。

    # mysqld --initialize --datadir=/opt/data/mysql --user=mysql

    #  cat /var/log/mysqld.log | grep "temporary password"

    # mysql_init_passwd=`cat /var/log/mysqld.log | grep "temporary password" | awk '{print $NF}'`

    # systemctl  start mysqld

    # systemctl  enable  mysqld

    # systemctl status  mysqld

    #  mysql -u root -p"${mysql_init_passwd}"

    首次登录,必须更改随机的root初始密码,且密码强度需满足一定要求,才能执行数据库操作。

    > ALTER USER 'root'@'localhost' IDENTIFIED BY 'MySQL@123';
    > exit

    #  mysql -u root -p"MySQL@123"


    > CREATE DATABASE ke DEFAULT CHARACTER SET utf8 DEFAULT COLLATE  utf8_general_ci;
    > GRANT ALL PRIVILEGES ON ke.* TO 'ke'@'127.0.0.1' IDENTIFIED BY 'Ke@123';
    > flush privileges; 

    2. 安装kafka-eagle

    下载 Kafka Eagle
    https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz

    # tar -zxf kafka-eagle-bin-2.1.0.tar.gz
    # tar -zxf kafka-eagle-bin-2.1.0/efak-web-2.1.0-bin.tar.gz
    # mv efak-web-2.1.0 /opt/efak

    # vim /opt/efak/system-config.properties

    ##############################################

    ######################################
    # multi zookeeper & kafka cluster list
    # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
    ######################################
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=10.12.32.79:2189,10.12.32.80:2189,10.12.32.81:2189
    
    ######################################
    # zookeeper enable acl
    ######################################
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123
    
    ######################################
    # broker size online list
    ######################################
    cluster1.efak.broker.size=20
    
    ######################################
    # zk client thread limit
    ######################################
    kafka.zk.limit.size=16
    
    ######################################
    # EFAK webui port
    ######################################
    efak.webui.port=8048
    
    ######################################
    # EFAK enable distributed
    ######################################
    efak.distributed.enable=false
    efak.cluster.mode.status=master
    efak.worknode.master.host=localhost
    efak.worknode.port=8085
    
    ######################################
    # kafka jmx acl and ssl authenticate
    ######################################
    cluster1.efak.jmx.acl=false
    cluster1.efak.jmx.user=keadmin
    cluster1.efak.jmx.password=keadmin123
    cluster1.efak.jmx.ssl=false
    cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
    cluster1.efak.jmx.truststore.password=ke123456
    
    ######################################
    # kafka offset storage
    ######################################
    cluster1.efak.offset.storage=kafka
    #cluster2.efak.offset.storage=zk
    
    ######################################
    # kafka jmx uri
    ######################################
    cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
    
    ######################################
    # kafka metrics, 15 days by default
    ######################################
    efak.metrics.charts=true
    efak.metrics.retain=15
    
    ######################################
    # kafka sql topic records max
    ######################################
    efak.sql.topic.records.max=5000
    efak.sql.topic.preview.records.max=10
    
    ######################################
    # delete kafka topic token
    ######################################
    efak.topic.token=keadmin
    
    ######################################
    # kafka sasl authenticate
    ######################################
    cluster1.efak.sasl.enable=false
    cluster1.efak.sasl.protocol=SASL_PLAINTEXT
    cluster1.efak.sasl.mechanism=SCRAM-SHA-256
    cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
    cluster1.efak.sasl.client.id=
    cluster1.efak.blacklist.topics=
    cluster1.efak.sasl.cgroup.enable=false
    cluster1.efak.sasl.cgroup.topics=
    cluster2.efak.sasl.enable=false
    cluster2.efak.sasl.protocol=SASL_PLAINTEXT
    cluster2.efak.sasl.mechanism=PLAIN
    cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
    cluster2.efak.sasl.client.id=
    cluster2.efak.blacklist.topics=
    cluster2.efak.sasl.cgroup.enable=false
    cluster2.efak.sasl.cgroup.topics=
    
    ######################################
    # kafka ssl authenticate
    ######################################
    #cluster3.efak.ssl.enable=false
    #cluster3.efak.ssl.protocol=SSL
    #cluster3.efak.ssl.truststore.location=
    #cluster3.efak.ssl.truststore.password=
    #cluster3.efak.ssl.keystore.location=
    #cluster3.efak.ssl.keystore.password=
    #cluster3.efak.ssl.key.password=
    #cluster3.efak.ssl.endpoint.identification.algorithm=https
    #cluster3.efak.blacklist.topics=
    #cluster3.efak.ssl.cgroup.enable=false
    #cluster3.efak.ssl.cgroup.topics=
    
    ######################################
    # kafka sqlite jdbc driver address
    ######################################
    #efak.driver=org.sqlite.JDBC
    #efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    #efak.username=root
    #efak.password=www.kafka-eagle.org
    
    ######################################
    # kafka mysql jdbc driver address
    ######################################
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=ke
    efak.password=Ke@123

    ##############################################

    设置Kafka Eagle环境变量

    # vim /etc/profile

    ##############################################

    #kafka eagle
    KE_HOME=/opt/efak
    JAVA_HOME=/opt/jdk
    JRE_HOME=/opt/jdk/jre

    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$KE_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib

    export KE_HOME JAVA_HOME JRE_HOME PATH CLASSPATH

    ##############################################

     # source /etc/profile

    启动 Kafka Eagle

    # chown  -R  test:test   opt/efak

    #  su  -  test

    $ ke.sh start

    $  ss -tan | grep 8048

    #  systemctl stop firewalld
    # systemctl disable firewalld

    三、Kafka Eagle 使用

    浏览器访问: http://xx.xx.xx.xx:8048
    默认用户名密码:admin/123456

     

    四、参考

    Kafka Eagle安装详情及问题解答
    https://www.cnblogs.com/smartloli/p/12110570.html

    各版本下载地址
    http://www.kafka-eagle.org/articles/docs/changelog/changelog.html

    安装使用文档
    https://docs.kafka-eagle.org/2.env-and-install/2.installing

    Dashboard
    http://www.kafka-eagle.org/articles/docs/quickstart/dashboard.html

    展开全文
  • SpringBoot集成kafka全面实战

    万次阅读 多人点赞 2020-03-28 14:13:29
    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》、《秒懂kafka HA(高可用)》两篇文章。 一、生产者实践 普通生产者 带回调的生产者 ...

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》《秒懂kafka HA(高可用)》两篇文章。

    一、生产者实践

    • 普通生产者

    • 带回调的生产者

    • 自定义分区器

    • kafka事务提交

    二、消费者实践

    • 简单消费

    • 指定topic、partition、offset消费

    • 批量消费

    • 监听异常处理器

    • 消息过滤器

    • 消息转发

    • 定时启动/停止监听器

    一、前戏

    1、在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP),

    advertised.listeners=PLAINTEXT://112.126.74.249:9092

    2、在开始前我们先创建两个topic:topic1、topic2,其分区和副本数都设置为2,用来测试,

    [root@iZ2zegzlkedbo3e64vkbefZ ~]#  cd /usr/local/kafka-cluster/kafka1/bin/
    [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
    Created topic topic1.
    [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
    Created topic topic2.

    当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send("topic1", normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下,

    @Configuration
    public class KafkaInitialConfiguration {
        // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
        @Bean
        public NewTopic initialTopic() {
            return new NewTopic("testtopic",8, (short) 2 );
        }
    ​
         // 如果要修改分区数,只需修改配置值重启项目即可
        // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
        @Bean
        public NewTopic updateTopic() {
            return new NewTopic("testtopic",10, (short) 2 );
        }
    }

    3、新建SpringBoot项目

    ① 引入pom依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    ② application.propertise配置(本文用到的配置项这里全列了出来)

    ###########【Kafka集群】###########
    spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
    ###########【初始化生产者配置】###########
    # 重试次数
    spring.kafka.producer.retries=0
    # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
    spring.kafka.producer.acks=1
    # 批量大小
    spring.kafka.producer.batch-size=16384
    # 提交延时
    spring.kafka.producer.properties.linger.ms=0
    # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
    # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
    ​
    # 生产端缓冲区大小
    spring.kafka.producer.buffer-memory = 33554432
    # Kafka提供的序列化和反序列化类
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 自定义分区器
    # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
    ​
    ###########【初始化消费者配置】###########
    # 默认的消费组ID
    spring.kafka.consumer.properties.group.id=defaultConsumerGroup
    # 是否自动提交offset
    spring.kafka.consumer.enable-auto-commit=true
    # 提交offset延时(接收到消息后多久提交offset)
    spring.kafka.consumer.auto.commit.interval.ms=1000
    # 当kafka中没有初始offset或offset超出范围时将自动重置offset
    # earliest:重置为分区中最小的offset;
    # latest:重置为分区中最新的offset(消费分区中新产生的数据);
    # none:只要有一个分区不存在已提交的offset,就抛出异常;
    spring.kafka.consumer.auto-offset-reset=latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    spring.kafka.consumer.properties.session.timeout.ms=120000
    # 消费请求超时时间
    spring.kafka.consumer.properties.request.timeout.ms=180000
    # Kafka提供的序列化和反序列化类
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    # 消费端监听的topic不存在时,项目启动会报错(关掉)
    spring.kafka.listener.missing-topics-fatal=false
    # 设置批量消费
    # spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    # spring.kafka.consumer.max-poll-records=50

    二、Hello Kafka

    1、简单生产者

    @RestController
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    ​
        // 发送消息
        @GetMapping("/kafka/normal/{message}")
        public void sendMessage1(@PathVariable("message") String normalMessage) {
            kafkaTemplate.send("topic1", normalMessage);
        }
    }

     2、简单消费

    @Component
    public class KafkaConsumer {
        // 消费监听
        @KafkaListener(topics = {"topic1"})
        public void onMessage1(ConsumerRecord<?, ?> record){
            // 消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    }

    上面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息,

    可以看到监听器消费成功,

    三、生产者

    1、带回调的生产者

    kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

    @GetMapping("/kafka/callbackOne/{message}")
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }
    @GetMapping("/kafka/callbackTwo/{message}")
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }
    
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }

    2、自定义分区器

    我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

    ① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

    ② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

    ③  patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

    ※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

    public class CustomizePartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 自定义分区规则(这里假设全部发到0号分区)
            // ......
            return 0;
        }
    ​
        @Override
        public void close() {
    ​
        }
    ​
        @Override
        public void configure(Map<String, ?> configs) {
    ​
        }
    }

    在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名,

    # 自定义分区器
    spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

    3、kafka事务提交

    如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,

    @GetMapping("/kafka/transaction")
    public void sendMessage7(){
        // 声明事务:后面报错消息不会发出去
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });
    ​
        // 不声明事务:后面报错但前面消息已经发送成功了
       kafkaTemplate.send("topic1","test executeInTransaction");
       throw new RuntimeException("fail");
    }

    四、消费者

    1、指定topic、partition、offset消费

    前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供,

    /**
     * @Title 指定topic、partition、offset消费
     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
     * @Author long.yuan
     * @Date 2020/3/22 13:38
     * @Param [record]
     * @return void
     **/
    @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0" }),
            @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
    })
    public void onMessage2(ConsumerRecord<?, ?> record) {
        System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
    }

    属性解释:

    ① id:消费者ID;

    ② groupId:消费组ID;

    ③ topics:监听的topic,可监听多个;

    ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

    上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

    注意:topics和topicPartitions不能同时使用;

    2、批量消费

    设置application.prpertise开启批量消费即可,

    # 设置批量消费
    spring.kafka.listener.type=batch
    # 批量消费每次最多消费多少条消息
    spring.kafka.consumer.max-poll-records=50

    接收消息时用List来接收,监听代码如下,

    @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
        System.out.println(">>>批量消费一次,records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println(record.value());
        }
    }

    3、ConsumerAwareListenerErrorHandler 异常处理器

    通过异常处理器,我们可以处理consumer在消费时发生的异常。

    新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

    // 新建一个异常处理器,用@Bean注入
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("消费异常:"+message.getPayload());
            return null;
        };
    }
    ​
    // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
    @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
    public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
        throw new Exception("简单消费-模拟异常");
    }
    ​
    // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
    @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
        System.out.println("批量消费一次...");
        throw new Exception("批量消费-模拟异常");
    }

    执行看一下效果,

    4、消息过滤器

    消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

    配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

    @Component
    public class KafkaConsumer {
        @Autowired
        ConsumerFactory consumerFactory;
    ​
        // 消息过滤器
        @Bean
        public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            // 被过滤的消息将被丢弃
            factory.setAckDiscarded(true);
            // 消息过滤策略
            factory.setRecordFilterStrategy(consumerRecord -> {
                if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                    return false;
                }
                //返回true消息则被过滤
                return true;
            });
            return factory;
        }
    ​
        // 消息过滤监听
        @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
        public void onMessage6(ConsumerRecord<?, ?> record) {
            System.out.println(record.value());
        }
    }

    上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数,

    5、消息转发

    在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

    在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下,

    /**
     * @Title 消息转发
     * @Description 从topic1接收到的消息经过处理后转发到topic2
     * @Author long.yuan
     * @Date 2020/3/23 22:15
     * @Param [record]
     * @return void
     **/
    @KafkaListener(topics = {"topic1"})
    @SendTo("topic2")
    public String onMessage7(ConsumerRecord<?, ?> record) {
        return record.value()+"-forward message";
    }

    6、定时启动、停止监听器

    默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

    ① 禁止监听器自启动;

    ② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

    新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动,

    @EnableScheduling
    @Component
    public class CronTimer {
    ​
        /**
         * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
         * 而是会被注册在KafkaListenerEndpointRegistry中,
         * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
         **/
        @Autowired
        private KafkaListenerEndpointRegistry registry;
    ​
        @Autowired
        private ConsumerFactory consumerFactory;
    ​
        // 监听器容器工厂(设置禁止KafkaListener自启动)
        @Bean
        public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
            ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
            container.setConsumerFactory(consumerFactory);
            //禁止KafkaListener自启动
            container.setAutoStartup(false);
            return container;
        }
    ​
        // 监听器
        @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
        public void onMessage1(ConsumerRecord<?, ?> record){
            System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    ​
        // 定时启动监听器
        @Scheduled(cron = "0 42 11 * * ? ")
        public void startListener() {
            System.out.println("启动监听器...");
            // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
            if (!registry.getListenerContainer("timingConsumer").isRunning()) {
                registry.getListenerContainer("timingConsumer").start();
            }
            //registry.getListenerContainer("timingConsumer").resume();
        }
    ​
        // 定时停止监听器
        @Scheduled(cron = "0 45 11 * * ? ")
        public void shutDownListener() {
            System.out.println("关闭监听器...");
            registry.getListenerContainer("timingConsumer").pause();
        }
    }

    启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

    11:42分监听器启动开始工作,消费消息,

    11:45分监听器停止工作,

     

    感兴趣的可以关注一下博主的公众号,1W+技术人的选择,致力于原创技术干货,包含Redis、RabbitMQ、Kafka、SpringBoot、SpringCloud、ELK等热门技术的学习&资料。

    展开全文
  • 几种常见的 Kafka 集群监控工具

    千次阅读 2022-04-22 01:05:56
    本文选自电子工业出版社的新书《kafka进阶》,推荐一下。一个功能健全的kafka集群可以处理相当大的数据量,由于消息系统是很多大型应用的基石,因此broker集群在性能上的缺陷,都会引起整个应用栈的各种问题。Kafka...

    本文选自电子工业出版社的新书《kafka进阶》,推荐一下。

    一个功能健全的kafka集群可以处理相当大的数据量,由于消息系统是很多大型应用的基石,因此broker集群在性能上的缺陷,都会引起整个应用栈的各种问题。

    Kafka的度量指标主要有以下三类:

    1.Kafka服务器(Kafka)指标

    2.生产者指标

    3.消费者指标

    另外,由于Kafka的状态靠Zookeeper来维护,对于Zookeeper性能的监控也成为了整个Kafka监控计划中一个必不可少的组成部分。

    Kafka的监控指标

    Broker度量指标

    Kafka的服务端度量指标是为了监控broker,也是整个消息系统的核心。因为所有消息都通过kafka broker传递,然后被消费,所以对于broker集群上出现的问题的监控和告警就尤为重要。broker性能指标有以下三类:

    • Kafka本身的指标

    • 主机层面的指标

    • JVM垃圾回收指标

    UnderReplicatedPartitions

    在一个运行健康的集群中,处于同步状态的副本数(ISR)应该与总副本数(简称AR:Assigned Repllicas)完全相等,如果分区的副本远远落后于leader,那这个follower将被ISR池删除,随之而来的是IsrShrinksPerSec(可理解为isr的缩水情况,后面会讲)的增加。由于kafka的高可用性必须通过副本来满足,所有有必要重点关注这个指标,让它长期处于大于0的状态。

    IsrShrinksPerSec

    IsrExpandsPerSec

    任意一个分区的处于同步状态的副本数(ISR)应该保持稳定,只有一种例外,就是当你扩展broker节点或者删除某个partition的时候。为了保证高可用性,健康的kafka集群必须要保证最小ISR数,以防在某个partiton的leader挂掉时它的follower可以接管。如果IsrShrinksPerSec(ISR缩水) 增加了,但并没有随之而来的IsrExpandsPerSec(ISR扩展)的增加,就将引起重视并人工介入。

    ActiveControllerCount

    controller的职责是维护partition leader的列表,当遇到这个值等于0且持续了一小段时间(<1秒)的时候,必须发出明确的告警。

    OfflinePartitionsCount

    这个指标报告了没有活跃leader的partition数。

    LeaderElectionRateAndTimeMs

    leader选举的频率(每秒钟多少次)和集群中无leader状态的时长(以毫秒为单位)

    UncleanLeaderElectionsPerSec

    这个指标如果存在的话很糟糕,这说明kafka集群在寻找partition leader节点上出现了故障

    TotalTimeMs

    这个指标是由4个其他指标的总和构成的:

    lqueue:处于请求队列中的等待时间

    llocal:leader节点处理的时间

    lremote:等待follower节点响应的时间

    lresponse:发送响应的时间

    BytesInPerSec

    BytesOutPerSec

    Kafka的吞吐量

    生产者度量指标

    Response rate

    响应的速率是指数据从producer发送到broker的速率

    Request rate

    请求的速率是指数据从producer发送到broker的速率

    Request latency avg

    平均请求延迟

    Outgoing byte rate

    Producer的网络吞吐量

    IO wait time ns avg

    Producer的I/O等待的时间

    消费者度量指标


    ConsumerLag MaxLag

    指consumer当前的日志偏移量相对生产者的日志偏移量

    BytesPerSec

    消费者的网络吞吐量

    MessagesPerSec

    消息的消费速度

    ZooKeeperCommitsPerSec

    当zookeeper处于高写负载的时候,将会遇到成为性能瓶颈,从而导致从kafka管道抓取数据变得缓慢。随着时间推移跟踪这个指标,可以帮助定位到zookeeper的性能问题,如果发现有大量发往zookeeper的commit请求,你需要考虑的是,要不对zookeeper集群进行扩展。

    MinFetchRate

    消费者最小拉取的速率

    通过官方网站的说明(http://kafka.apache.org/documentation/#monitoring),可以查看Kafka提供的所有的监控指标参数。在这里只是列出了部分主要的参数指标。

    使用Kafka客户端监控工具

    Kafka常用的客户端管理、监控工具,主要有以下几种:

    • Kafka Manager

    • Kafka Tool

    • KafkaOffsetMonitor

    • JConsole

    其中,前三个工具都是专门用于Kafka集群的管理与监控;而JConsole(Java Monitoring and Management Console),是一种基于JMX的可视化监视、管理工具,安装好了JDK以后,Java就为我们提供了JConsole的客户端工具。利用它我们也可以监控Kafka的各项指标。

    这里我们简单介绍一下JMX。JMX的全称为Java Management Extensions。可以管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等。而Kafka底层也是基于Java的,所以我们也就可以使用JMX的标准来管理和监控运行中的Kafka了。

    下面我们分别介绍它们的使用方法。

     Kafka Manager

    Kafka Manager的Github地址是https://github.com/yahoo/kafka-manager。这款监控框架的好处在于监控内容相对丰富,既能够实现broker级常见的JMX监控(比如出入站流量监控),也能对consumer消费进度进行监控(比如lag等)。另外用户还能在页面上直接对集群进行管理,比如分区重分配或创建topic——当然这是一把双刃剑,好在kafka manager自己提供了只读机制,允许用户禁掉这些管理功能。

    这里我们使用的版本是:kafka-manager-2.0.0.2.zip。安装和配置非常简单,按照下面的步骤配置Kafka Manager:

    (1)首先,需要在启动Kafka集群的命令脚本中,增加JMX的相关参数。否则无法使用客户端工具管理和监控Kafka集群。这里我们以kafka101主机上运行的broker 0和broker 1为例,来为大家演示。进入kafka安装目录下的bin目录

    cd /root/training/kafka_2.11-2.4.0/bin/

    (2)修改kafka-run-class.sh文件,找到“JMX setting”的位置(第176行)。增加JMX Server的配置信息。如图7.1所示。

    -Djava.rmi.server.hostname=kafka101

    118ca7ba67b77a5a773c50839cfb3219.png

    图7.1 修改Kafka Manager的JMX Setting

    注意:

    由于在kafka101主机上将会启动两个broker,为了方便可以在命令终端中使用export命令设置JMX的端口地址;也可以像下面这样把JMX的端口写到kafka-server-start.sh脚本中,如图7.2所示,修改第30行。

    export JMX_PORT="9999"

    ba2b5160d006a6d1a2785761db2c4ec7.png

    图7.2 设置JMX的端口

    (3)启动Kafka Broker 0

    export JMX_PORT="9990"

    bin/kafka-server-start.sh config/server.properties &

    (4)重新开启一个命令行终端,启动Kafka Broker 1

    export JMX_PORT="9991"

    bin/kafka-server-start.sh config/server1.properties &

    (5)将Kafka Manager的压缩包解压至/root/training目录

    unzip kafka-manager-2.0.0.2.zip -d ~/training/

    (6)进入Kafka Manager的conf目录,并修改application.conf文件

    #这里我们指定ZooKeeper集群的地址

    kafka-manager.zkhosts="kafka101:2181,kafka102:2181,kafka103:2181"

    #将下面的这一行注释掉

    #kafka-manager.zkhosts=${?ZK_HOSTS}

    (7)采用nohup的方式启动Kafka Manager

    nohup bin/kafka-manager &

    也可以像下面这样启动Kafka Manager的时候,指定相关参数:

    nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

    (8)启动成功后,将输出如下的日志信息,如图7.3所示:

    1578df86aca1a8f385770a48194377cc.png

    图7.3 启动Kafka Manager

    可以看到,Kafka Manager将运行在9000端口上。

    (9)通过浏览器访问9000端口,可以打开Kafka Manager的Web控制台,如图7.4所示:

    ef0782d58944dcc3dafab8c4f99aed64.png

    图7.4 Kafka Manager的Web控制台

    (10)选择“Cluster”-->“Add Cluster”,添加一个新的Kafka集群。勾选“Enable JMX Polling”,并点击“Save”。如图7.5所示:

    ee45115b0c139748d913647d8414d313.png

    图7.5 添加Kafka集群

    (11)添加成功后,点击“Go to Cluster View”,跳转到Kafka 集群的首页,如图7.6所示:

    f68ce83c3bd51639df90af06b446fea3.png

    图7.6 Kafka集群首页

    在这里,可以看到当前的Kafka集群中共存在2个Broker,即:broker 0和broker 1;还有3个Topics。

    (12)点击Brokers的数字“2”,跳转到Broker的监控页面上。在这里就可以实时监控Kafka集群Broker的相关信息了。如Kafka集群的吞吐量(Bytes in /sec、Bytes out /sec)等等,如图7.7所示:

    1df89f729687155dda98df6891811eeb.png

    图7.7 监控Kafka Broker

    (13)图7.8所示,展示了Kafka集群Topic的监控信息。

    2d7264603ea1ac877f34cff93ca8e36e.png

    图7.8 监控Kafka Topic

     Kafka Tool


    Kafka Tool是用于管理和使用Apache Kafka集群的图形应用程序。它提供了一种直观的界面风格,可让用户快速查看Kafka集群中的对象以及集群主题中存储的消息。它包含面向开发人员和管理员的功能,一些关键功能如下:

    快速查看所有Kafka集群,包括其broker,主题和消费者

    • 查看分区中消息的内容并添加新消息

    • 查看消费者的偏移量,包括Apache Storm中的spout消费者

    • 以良好的格式显示JSON和XML消息

    • 添加和删除主题以及其他管理功能

    • 将单个消息从您的分区保存到本地硬盘驱动器

    • 编写自己的插件,使您可以查看自定义数据格式

    • Kafka工具可在Windows,Linux和Mac OS上运行

    从Kafka Tool的官方网站(https://www.kafkatool.com/download.html)上,直接下载Kafka Tool。这里我们直接下载Kafka Tool 2.0.8的版本。如下图7.9所示:

    f9af79d9c73f172b11b4f744b42e7046.png

    图7.9 下载Kafka Tool

    下载完成后,直接安装启动即可。图7.10展示了启动的初始界面。

    4d2355ebbd65bca11dcd7060ef9749fe.png

    图7.10 Kafka Tool的启动界面

    添加一个Kafka Cluster集群,并测试。如图7.11所示:

    179d9e96dbcb02a52af2ca46a8320bca.png

    图7.11 添加Kafka集群

    点击“是”,进入Kafka集群的首界面。如图7.12所示:

    5bbc75c8e0db2e7060fdf5f1cf144dc8.png

    图7.12 Kafka集群的首界面

    在这里可以看到Kafka集群中的Broker信息、Topics的信息以及Consumers消费者的信息。

    现在我们使用Kafka Tool来创建一个新的Topic。

    (1)选择“Browsers”中集群的“Topics”节点,并在右边的界面上点击0076796cbf568d2b1be213d1bfb07da8.png按钮,添加一个新的Topic。

    (2)输入Topic的名字、分区数、以及每个分区的副本数。这里我们新创建的Topic名称是mytopic2,它由两个分区组成,并且每个分区的副本数为。如图7.13所示。

    3fce17a9b05ac2f6c8e55c78d5c1594c.png

    图7.13 Add Topic

    (3)点击“Add”,将成功创建Topic,如图7.14所示。

    8caf63276c4ae1dc9d4872cd07b16500.png

    图7.14 成功创建Topic

    (4)现在我们使用Kafka Tool来接收mytopic2上的消息数据。选择刚刚创建好的mytopic2的主题,并在右边的窗口中选择“Data”的页面,如图7.15所示。

    97743e26a44253ffd377c83374484cc0.png

    图7.15 通过Kakfa Tool接收数据

    (5)启动一个Kafka Producer的命令行终端,并发送一些消息。如图7.16所示:

    bin/kafka-console-producer.sh --broker-list kafka101:9092 --topic mytopic2

    24a3152456d16525fcdc5cfeb70109b8.png

    图7.16 通过命令行发送数据

    (6)在Kafka Tool上,点击16ce3552a5ec8faefcc8dd0ebf348b66.png接收消息。这里就可以看到刚才我们在Kafka Producer命令行上发送的消息。如图7.17所示:

    f8e78d1b045f09b7b83da24469dcc6e3.png

    图7.17 在Kafka Tool上接收数据

    (7)这里的数据格式默认是“Byte Array”,我们可以在Properties的设置里面将其修改为String,并点击“Update”,如图7.18所示:

    e0ae523e14f43a0bb14036cb1870a820.png

    图7.18 修改Topic的数据格式

    (8)回到Data页面,这时候数据将按照正确的格式显示,如图7.19所示:

    b3405f621e39434d9edd0c1d2103d897.png

    图7.19 显示正确的数据

    KafkaOffsetMonitor


    KafkaOffsetMonitor是一个基于Web界面的管理平台,可以用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以进行实时的监控。KafkaOffsetMonitor可以从github上下载,地址是:https://github.com/quantifind/KafkaOffsetMonitor 。这里我们使用的是KafkaOffsetMonitor-assembly-0.2.0.jar

    KafkaOffsetMonitor的安装启动比较简单。我们可以直接在kafka101的主机上执行下面的指令:

    java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \

    com.quantifind.kafka.offsetapp.OffsetGetterWeb \

    --zk kafka101:2181 \

    --port 8089 \

    --refresh 10.seconds \

    --retain 1.days

    其中:

    lcom.quantifind.kafka.offsetapp.OffsetGetterWeb是运行Web监控的类

    • --zk用于指定ZooKeeper的地址

    • --port是Web运行端口

    • --refresh和--retain用于指定页面数据刷新的时间以及保留数据的时间值

    打开浏览器访问8089端口,就可以打开KafkaOffsetMonitor的首页面,如图7.20所示。

    3e078e5c9ae575d7ec0fb4e43178e19c.png

    图7.20 KafkaOffsetMonitor首页

    选择“Topic List”,就可以监控某个具体的Topic信息了,如图7.21所示。

    2211e951009001a966fc3c9b79dae5e0.png

    图7.21 通过KafkaOffsetMonitor监控Topic

    JConsole


    JConsole(Java Monitoring and Management Console),一种基于JMX的可视化监视、管理工具,从Java 5开始引入。JConsole是用Java写的GUI程序,用来监控VM,并可监控远程的VM,非常易用,而且功能非常强。命令行里打 jconsole,就可以直接启动了。

    这里我们方便,我们直接在Window上启动JConsole。在CMD创建中直接输入JConsole,如图7.22所示。

    9a24eb0fc9e69f25778031b768803b62.png

    图7.22 启动JConsole

    JConsole的启动界面如图7.23所示。

    cb9356f8e4c3d756e142acf9dd0e102f.png

    图7.23  JConsole的启动界面

    由于在前面配置Kafka Manager的时候,我们已经启用了broker 0和broker 1的JMX配置,所以这里可以直接通过JConsole连接到broker 0或者broker 1上。我们以broker 0 为例。选择“远程进程”,并输入broker 0的JMX地址,点击“连接”,如图7.24所示。

    kafka101:9990

    2c1c0f3dfbbc8206158193f5a4eefb61.png

    图7.24 通过JConsole连接broker 0

    选择“不安全的连接”,进入JConsole监控的主界面,如图7.25和图7.26所示。

    fced75ee4f84a7859031397deffc6e7a.png

    图7.25 不安全的连接

    5eda29544070fd724f84beabf170bf51.png

    图7.26 JConsole的主页面

    JConsole提供六个选项卡显示应用信息:

    (1)概览选项卡:提供内存使用的概述、运行的线程数量、创建的对象数量以及CPU使用情况。

    (2)内存选项卡:显示使用的内存数量。可以选择要监视的内存类型(堆、非堆或池)组合。

    (3)线程选项卡:显示线程数量和每个线程的详细信息。

    (4)类选项卡:显示加载的对象数量的信息。

    (5)VM 概要选项卡:提供运行应用的JVM概要。

    (6)MBean选项卡:显示有关应用的托管bean的信息。

    这里我们选择“MBean选项卡”,就可以看到Kakfa相关的MBean信息,如图7.27所示。

    06f6e1943b8bf2ac59957fb8e4828d36.png

    图7.27 通过JConsole监控Kafka

    以上图监控的参数“MessagesInPerSec”为例,它表示的是Kafka集群消息的速率。关于所有的Kafka监控的MBean信息,可以参考官方网站上的说明,地址是:http://kafka.apache.org/documentation/#monitoring

    监控ZooKeeper


    前面提到,整个Kafka的状态靠Zookeeper来维护,对于Zookeeper性能的监控也成为了整个Kafka监控计划中一个必不可少的组成部分。在典型的Kafka集群中, Kafka通过Zookeeper管理集群配置,例如:选举Leader,以及在Consumer Group发生变化时进行Rebalance;生产者Producer将消息发布到broker,Consumer从broker订阅并消费消息。这些操作都离不开ZooKeeper。所以在Kafka集群的管理监控中,ZooKeeper的监控也就成为了非常重要的一部分。

    由于ZooKeeper本身也是由Java开发的应用程序,我们当然也可以前面提到的JMX的方式进行监控,例如使用JConsole。图7.28展示了通过JConsole监控ZooKeeper MBean的监控信息。

    0ee93c11a14ac747dea5413cc82f8dce.png

    图7.28 通过JConsole监控ZooKeeper

    这里我们也可以使用另一个客户端工具ZooInspector监控ZooKeeper。图7.29展示了它的主界面。

    02cee2edde46447cbad69ddd49a049a8.png

    图7.29 通过ZooInspector监控ZooKeeper

          本文选自电子工业出版社的《kafka进阶》一书,略有修改,经出版社授权刊登于此。

    9c2f3284f7c0f5096c461082ecc4943a.png

          本书基于作者多年的教学与实践进行编写,重点介绍Kafka消息系统的核心原理与架构,内容涉及开发、运维、管理与架构。全书共11章,第1章,介绍Kafka体系架构基础,包括消息系统的基本知识、Kafka的体系架构与ZooKeeper;第2章,介绍Kafka的环境部署,以及基本的应用程序开发;第3章,介绍Kafka的生产者及其运行机制,包括生产者的创建和执行过程、生产者的消息发送模式和生产者的高级特性等;第4章,介绍Kafka的消费者及其运行机制,包括消费者的消费模式、消费者组与消费者、消费者的偏移量与提交及消费者的高级特性等;第5章,介绍Kafka服务器端的核心原理,包括主题与分区、消息的持久性与传输保障、Kafka配额与日志的管理;第6章,介绍Kafka的流处理引擎Kafka Stream;第7章,介绍使用不同的工具监控Kafka,包括Kafka Manager、Kafka Tool、KafkaOffsetMonitor和JConsole;第8章至第11章,介绍Kafka与外部系统的集成,包括集成Flink、集成Storm、集成Spark和集成Flume。

    展开全文
  • 这里是自己结合spring项目的配置。按照上面的配置可以实现生产发送消息。消费者接受消息。分类设计等
  • kafka管理界面 kafka-eagle

    千次阅读 2021-07-07 13:51:09
    下载地址: http://download.kafka-eagle.org/: ...vi /home/kafka/kafka_2.11-2.0.0/bin/kafka-server-start.sh if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -X
  • docker 安装kafka

    千次阅读 2022-03-15 17:40:56
    kafka 依赖 zookeeper [root@k8s-master ~]# docker search zookeeper NAME DESCRIPTION STARS OFFICIAL AUTOMATED zookeeper Apache ZooKeeper is an open-source server
  • Kafka简单介绍 Kafka是由Apache软件基金会开发的一个分布式、分区的、多副本的、多订阅者的开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作...
  • docker搭建kafka集群

    千次阅读 2022-03-24 23:36:35
    docker搭建kafka集群 我在M1 mbp上使用的以下镜像 新建文件zk-kafka-docker-compose.yml version: "2" services: zookeeper: user: root image: docker.io/zookeeper ports: - "12181:2181" environment: -...
  • spring.kafka.bootstrap-servers=10.11.114.247:9092 spring.kafka.producer.acks=1 spring.kafka.producer.retries=3 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 ...
  • Docker Kafka

    千次阅读 2022-04-02 14:40:47
    0X00 部署Docker版Kafka 镜像: Docker Hubhttps://registry.hub.docker.com/r/bitnami/kafka 命令: docker pull bitnami/zookeeper:latest docker pull bitnami/kafka:latest docker-compose.yml: ...
  • Flink对接kafkakafka source和kafka sink

    千次阅读 2020-06-12 16:00:40
    Flink本身没有提供了kafka sink接口,需要导入相关依赖才可以使用。 依赖: <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <...
  • Kafka视频教程

    热门讨论 2017-11-04 16:49:20
    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
  • kafka的java依赖包

    千次下载 热门讨论 2015-10-18 21:58:29
    kafka的java依赖包,包含kafka本地调用的所有jar包
  • centos7安装Kafka Eagle

    千次阅读 2022-05-04 15:53:25
    centos7安装Kafka Eagle
  • Kafka Connect使用教程

    千次阅读 2021-10-28 16:23:26
    Kafka Connect介绍
  • window下kafka安装及其使用

    千次阅读 2022-02-16 21:47:12
    1.kafka安装 1.1安装JDK1.8 1.2安装Zookeeper3.7 1.3 Kafka2.13安装 2.命令行测试 3.客户端程序开发 3.1openssl编译 3.3生产者 3.4 消费者 1.kafka安装 本地装了一套kafka的环境: 序号 ...
  • SpringBoot集成Kafka(生产)

    千次阅读 2022-04-29 11:43:12
    SpringBoot集成Kafka(生产)
  • Kafka Java客户端Stream API

    千次阅读 2022-02-08 19:12:06
    Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与...
  • Kafka在Mac下的安装与使用

    千次阅读 2022-04-22 14:24:41
    kafka在Mac下的安装与使用,比如启动、关闭。如果zookeeper关闭,但kafka没有关闭,zookeeper的子进程就没有关闭。
  • kafka_2.11-0.10.0.1.tar.gz

    热门讨论 2016-09-28 19:48:48
    包含自定义配置的kafka
  • Kafka安全认证授权配置

    千次阅读 2021-11-17 14:38:59
    Kafka安全认证授权配置一 概述1.1 Kafka的权限分类1.2 实现方式二 安全认证授权配置2.1 zookeeper配置2.1.1 引入kafka依赖包2.1.2修改ZK配置文件2.1.3 创建jaas文件2.1.4 修改zkEnv.sh2.1.5 启动zk2.2 Kafka配置...
  • kafka命令行操作大全

    千次阅读 2022-04-12 23:52:42
    最近利用flink使用一个流式SQL处理平台,利用kafka, mysql, hive等组件比较多,命令行突然间需要操作一次记不住命令很麻烦,索性直接整理成笔记。 在 0.9.0.0 之后的 Kafka,出现了几个新变动,一个是在 Server 端...
  • Apache Kafka 是一款开源的消息系统,在开发各类系统的时候,我们经常会选择使用Kafka来帮助我们削峰、做异步处理、解耦,那么正好借此机会跟大家说说kafka的安装部署、应用场景以及简单的使用。 kafka的安装部署 想...
  • 如果尚未安装kafka,请移步《centos7系统安装kafka》 查看操作主题命令参数 命令:./bin/kafka-topics.sh 日常工作中常用的命令如下表: 参数 描述 –bootstrap-server <String: server toconnect to&...
  • kafka 自动与手动管理offset

    千次阅读 2022-03-13 10:41:25
    kafka 自动与手动管理offset
  • 20道常见的kafka面试题以及答案

    万次阅读 2022-06-18 13:18:05
    1、kafka的消费者是pull(拉)还是push(推)模式,这种模式有什么好处?2、kafka维护消息状态的跟踪方法3、zookeeper对于kafka的作用是什么?4、kafka判断一个节点还活着的有那两个条件?5、讲一讲 kafka 的 ack 的三种...
  • 消息队列Kafka

    千次阅读 2021-02-06 16:28:11
    Kafka 简介及使用场景 写在前面 我们提到消息中间件,往往都会聊到,RabbitMQ,RocketMQ,Kafka。 对于刚入门消息中间件,听到这三个主流消息中件,心中难免会有疑惑,这消息中间件搞这么多干什么?傻傻分不清楚,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 344,201
精华内容 137,680
关键字:

kafka

友情链接: Original Design.zip