kafka集群重启_kafka集群如何重启 - CSDN
精华内容
参与话题
  • 在某次运维发现线上的kafka server集群的默认配置的size太小,不能满足业务发送数据的要求,导致业务阻塞,于是,更改了kafka server的某项参数的size大小之后,并重启了线上kafka server集群。 在重启集群之后,...

    1. 背景

    在某次运维发现线上的kafka server集群的默认配置的size太小,不能满足业务发送数据的要求,导致业务阻塞,于是,更改了kafka server的某项参数的size大小之后,并重启了线上kafka server集群。
    在重启集群之后,线上实时业务消费kafka topic的消费者开始报错,在消费端的错误信息为:

    1. 消费方的error错误信息为:
    "Container exception":
    org.apache.kafka.common.errors.TimeoutException: Timeout of 6000ms expected expired before successfully committing offsets{orders-5=OffsetAndMetadata{offset=197572354, leaderEpoch=null, metadata=''}}
    
    1. 查看error错误信息的上下文,发现有相应的warn日志:
    [Consumer clientId=consumer-25, groupId=orderconsumer-my-consumer] Offset commit failed on partition order-3 at offset 197449610: The coordinator is loading and hence can't process requests.
    
    1. 另外 kafka server在重启的过程中也打印了相应的启动的日志,大致是__consumer_offset这个topic正在loading相关的数据。
    输出日志信息待补充
    

    2. 业务处理

    在kafka server 集群重启后,业务这边消费端的偏移量无法提交,并不断报上述的错误,导致某些线上业务出现异常。我们第一直觉上,是认为可能是kafak 集群重启后,导致消费端和kafka server 集群的网络通信有问题,因此,在线上业务反馈异常后马上重启了线上的消费端的java进程服务,之后业务正常消费并提交偏移量offset数据。

    3. 问题分析

    事后,我们通过分析上述的日志,主要问题是kafka server集群的组协调器在loading __consumer_offsets这个topic的数据,而我们的这个topic的数据量有接近七八百兆,因此,整个loading处理数据完成大概要15分钟左右。因此,上面我们通过重启业务消费端的机器其实是无效的,因为我们重启完成后,kafka server的loading也差不多操作完成。

    3.1 问题:为什么__consumer_offsets这个topic的数据量会这么大?

    在kafka 0.9.0版本之后,消费端consumer的偏移量每次提交是保存在kafka的一个特殊的topic中,即__consumer_offsets这个topic中,而这个topic的配置需要特殊的配置。
    在与运维沟通过程中,发现运维一直以为有在server.conf配置文件中配置这个topic的过期时间以及压缩策略,


    log.dirs=/data/kafka-logs
    log.cleaner.enable=true
    log.cleanup.policy=delete // delete | compact
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000

    以上配置对于特殊的topic,比如__consumer_offsets不一定有效,可以通过bin目录自带的kafka-config.sh脚本查看,操作如下:

    ./kafka-configs.sh --zookeeper 172.19.228.188:2181 --entity-type topics --entity-name __consumer_offsets --describe

    显示如下:

    Configs for topic __consumer_offsets are segment.bytes=104857600,clieanup.policy=compact,compression.type=producer

    看的出来,segment.bytes、cleanup.policy、compression.type这三个配置项是针对topic,server.conf配置log.cleanup.policy,log.segment.bytes没有效果。

    3.2 问题:如何设置__consumer_offsets这个过期策略

    ./kafka-configs.sh --zookeeper 172.19.228.188:2181 --entity-type topics --entity-name __consumer_offsets --alter--delete-config cleanup.policy
    然后系统在后台就清理对应过大的文件,也就释放了磁盘空间。

    4 如何查看__consumer_offsets的日志信息

    默认状况下,_consumer_offsets有50个分区。
    查看offset内容的数据:

    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
    

    输出:

    ...
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
    [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
    [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
     ...
    

    在_consumer_offsets topic中的每一项日志格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
    offset是从coordinator的缓存中读取的
    To avoid re-processing the last message read if a consumer is restarted, the commited offset should be the next message your application should consume, i.e:: lastoffset+1。
    即kafak提交的位移是下一条消费记录的位移。

    5 总结

    当期__consumer_offsets的清理策略为compact,日志保留周期为24小时,但是系统默认的log.cleaner.enable为false,导致kafka不会对超过保留周期的数据进行压缩处理,topic保留了系统上线以来的所有历史数据。

    5.1 系统topic分区大小异常的原因

    1. __consumer_offsets 默认清理策略设置不当,导致过期历史数据无法正常清理
    2. 部分应用消费方式不当,导致产生了大量commit信息

    5.2 解决方式

    1. 要求应用优化代码,减少commit信息的产生,应用代码改造之后commit信息日志量由原先的37G减少到1.5G
    2. 调整topic清理策略,将系统log.cleaner.enable设置为true,重启broker节点触发日志清理
      优化之后__consumer_offsets数据量由原先的900G下降到2G。

    6 参考文献

    https://www.oreilly.com/library/view/apache-kafka-cookbook/9781785882449/ch02s04.html
    https://dbaplus.cn/news-73-1202-1.html

    展开全文
  • Kafka集群操作指南

    千次阅读 2016-12-06 22:58:39
    此部分不可用于生产,但新接触kafka时,可以先有个感性的认识 Step 1: 下载Kafka下载最新的版本并解压. Shell 12$ wget ...

    #(一)单机版安装

    此部分不可用于生产,但新接触kafka时,可以先有个感性的认识
    Step 1: 下载Kafka下载最新的版本并解压.

    Step 2: 启动服务
    Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

    现在启动Kafka:

    Step 3: 创建 topic

    创建一个叫做“test”的topic,它只有一个分区,一个副本。

    可以通过list命令查看创建的topic:

    除了手动创建topic,还可以配置broker让它自动创建topic.
    Step 4:发送消息.
    Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

    运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

    ctrl+c可以退出发送。
    默认情况下,日志数据会被放置到/tmp/kafka-logs中,每个分区一个目录
    Step 5: 启动consumer

    你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
    这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

    #(二)集群安装

    注意,必须先搭建zookeeper集群

    1、使用3台机器搭建Kafka集群:
    192.168.169.92 gdc-dn01-test
    192.168.169.93 gdc-dn02-test
    192.168.169.94 gdc-dn03-test

    2、在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
    3、首先,在gdc-dn01-test上准备Kafka安装文件,执行如下命令:

    4、修改配置文件kafka/config/server.properties,修改如下内容:

    这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:

    而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台 ZooKeeper服务器:

    5、然后,将配置好的安装文件同步到其他的dn02、dn03节点上:

    6、最后,在dn02、dn03节点上配置修改配置文件kafka/config/server.properties内容如下所示:

    因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。

    7、在集群中的dn01、dn02、dn03这三个节点上分别启动Kafka,分别执行如下命令:

    可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。

    8、创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

    9、查看创建的Topic,执行如下命令:

    结果信息如下所示:


    上面Leader、Replicas、Isr的含义如下:

    1 Partition: 分区
    2 Leader : 负责读写指定分区的节点
    3 Replicas : 复制该分区log的节点列表
    4 Isr : “in-sync” replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
    我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。

    11、在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

    12、在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

    可以在Producer终端上输入字符串消息行,就可以在Consumer终端上看到消费者消费的消息内容。
    也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

    #(三)集群启停操作

    1、启动集群

    2、停止集群

    3、重启
    没有专用脚本,先停后启即可

    注:当然也可以使用kill命令来关闭,但使用脚本有以下好处:
    (1)It will sync all its logs to disk to avoid needing to do any log recovery when it restarts (i.e. validating the checksum for all messages in the tail of the log). Log recovery takes time so this speeds up intentional restarts.
    (2)It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.

    #(四)topic相关的操作

    1、创建topic

    (1)zookeeper指定其中一个节点即可,集群之间会自动同步。
    (2)–replication-factor 2 –partitions 3理论上应该是可选参数,但此脚本必须写这2个参数。
    (3)还可以使用–config 来指定topic的某个具体参数,以代替配置文件中的参数。如:
    bin/kafka-topics.sh –create –zookeeper 192.168.172.98:2181/kafka –replication-factor 2 –partitions 3 –topic test_topic retention.bytes=3298534883328
    指定了某个topic最大的保留日志量,单位是字节。

    2、查看全部topic
    bin/kafka-topics.sh –list –zookeeper 192.168.172.98:2181/kafka

    3、查看某个topic的详细信息

    (1)第一行列出了这个topic的总体情况,如topic名称,分区数量,副本数量等。
    (2)第二行开始,每一行列出了一个分区的信息,如它是第几个分区,这个分区的leader是哪个broker,副本位于哪些broker,有哪些副本处理同步状态。

    4、启动一个console producer,用于在console中模拟输入消息

    5、启动一个console consumer,用于模拟接收消息,并在console中输出

    此脚本可以用于验证一个topic的数据情况,看消息是否正常流入等。

    6、删除一个topic

    (1)配置文件中必须delete.topic.enable=true,否则只会标记为删除,而不是真正删除。
    (2)执行此脚本的时候,topic的数据会同时被删除。如果由于某些原因导致topic的数据不能完全删除(如其中一个broker down了),此时topic只会被marked for deletion,而不会真正删除。此时创建同名的topic会有冲突。

    7、修改topic
    使用—-alert原则上可以修改任何配置,以下列出了一些常用的修改选项:
    (1)改变分区数量

    (2)增加、修改或者删除一个配置参数

    #(五)某个broker挂掉,本机器可重启

    【结论】如果一个broker挂掉,且可以重启则处理步骤如下:
    (1)重启kafka进程
    (2)执行rebalance(由于已经设置配置项自动执行balance,因此此步骤一般可忽略)

    详细分析见下面操作过程。
    1、topic的情况

    集群中有4台机器,id为【2-5】,topic 有3个分区,每个分区2个副本,leader分别位于2,3,5中。

    2、模拟机器down,kill掉进程
    分区0的leader位于id=5的broker中,kill掉这台机器的kafka进程

    3、再次查看topic的情况

    可以看到,分区0的leader已经移到id=2的机器上了,它的副本位于2,5这2台机器上,但处于同步状态的只有id=2这台机器。

    4、重启kafka进程

    5、再次查看状态

    发现分区0的2个副本都已经处于同步状态,但leader依然为id=2的broker。

    6、执行leader平衡
    详见leader的平衡部分。

    如果配置文件中

    则此步骤不需要执行。

    7、重新查看topic

    此时leader已经回到了id=5的broker,一切恢复正常。

    #(六)某个broker挂掉且无法重启,需要其它机器代替

    【结论】当一个broker挂掉,需要换机器时,采用以下步骤:
    1、将新机器kafka配置文件中的broker.id设置为与原机器一样
    2、启动kafka,注意kafka保存数据的目录不会自动创建,需要手工创建

    详细分析过程如下:
    1、初始化机器,主要包括用户创建,kafka文件的复制等。

    2、修改config/server.properties文件
    注意,只需要修改一个配置broker.id,且此配置必须与挂掉的那台机器相同,因为kafka是通过broker.id来区分集群中的机器的。此处设为

    3、查看topic的当前状态

    当前topic有3个分区,其中分区1的leader位于id=5的机器上。

    4、关掉id=5的机器
    kill -9 ** 用于模拟机器突然down
    或者:

    用于正常关闭

    5、查看topic的状态

    可见,topic的分区0的leader已经迁移到了id=2的机器上,且处于同步的机器只有一个了。

    6、启动新机器

    7、再看topic的状态

    id=5的机器也处于同步状态了,但还需要将leader恢复到这台机器上。

    8、执行leader平衡
    详见leader的平衡部分。

    如果配置文件中

    则此步骤不需要执行。

    9、done