精华内容
下载资源
问答
  • linux 部署kafka
    2019-06-12 09:59:37

    介绍:此文章中会介绍怎么部署kafka单机以及集群模式,如何测试生产消息、消费消息

    一、环境需要JDK

    1、上传jdk文件到服务器
    
    2、创建软链
    ln -s jdk1.8.0_121 jdk8
    
    3、配置环境变量
    vi /etc/profile
    JAVA_HOME=/usr/local/jdk8
    PATH=$PATH:$JAVA_HOME/bin
    export PATH
    
    4、让环境变量生效
    source /etc/profile
    

    二、下载配置kafka

    1、下载kafka目录/usr/local 
    wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
    
    2、解压重命名
    tar zxvf kafka_2.11-2.1.0.tgz
    mv kafka_2.11-2.1.0.tgz kafka
    
    3、启动zookeeper
    cd kafka
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    4、创建topic test
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    5、查看topic
    bin/kafka-topics.sh --list --zookeeper localhost:2181 test
    
    6、产生消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    7、消费消息
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    三、单机多broker集群配置

    1、复制配置文件
    cp config/server.properties config/server-1.properties
    
    2、编辑配置文件
    broker.id=1
    port=9093 
    log.dir=/tmp/kafka-logs-1
    
    3、启动服务
    bin/kafka-server-start.sh config/server-1.properties &
    
    4、生产消息消费消息跟上面一样
    

    四、多机多broker集群配置

    1、分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。 
    例如:在192.168.0.117,192.168.0.118,192.168.0.119三台机器部署,Zookeeper配置如下:
    
    initLimit=5 
    syncLimit=2 
    server.1=192.168.0.117:2888:3888 
    server.2=192.168.0.118:2888:3888 
    server.3=192.168.0.119:2888:3888
    
    2、分别配置多个机器上的Kafka服务 设置不同的broke id,zookeeper.connect设置如下:
    zookeeper.connect=192.168.0.117:2181,192.168.0.118:2181,192.168.0.119:2181
    
    3、启动Zookeeper与Kafka服务,按上文方式产生和消费消息,验证集群功能。
    
    更多相关内容
  • Linux部署 kafka集群

    千次阅读 2022-01-10 10:03:50
    2.1 linux服务器上新建目录 2.2上传kafka压缩包到服务器 2.3 解压 2.4 重命名解压后的文件夹 2.5 创建logs日志目录和data数据目录 2.6 修改配置 2.7 部署集群中的其他服务实例 3、启动 3.1 执行启动脚本 ...

    目录

    问题现象:

    解决方法:

    0、环境

    环境:

    1、下载

    地址:

    拓展:Scala是什么?

    2、部署

    2.1 linux服务器上新建目录

    2.2 上传kafka压缩包到服务器

    2.3 解压

    2.4 重命名解压后的文件夹

    2.5 创建logs日志目录和data数据目录

    2.6 修改配置

    2.7 部署集群中的其他服务实例

    3、启动

    3.1 执行启动脚本

    拓展:服务启动报错"Cannot allocate memory"

    3.2 查看 kafka 进程信息

    3.3 查看服务日志

    拓展:-daemon 参数

    拓展:server.properties 的 listeners 配置规范

    4、应用

    4.1 开放防火墙端口

    4.2 创建话题topic

    拓展:创建topic的脚本命令变动

    拓展:脚本命令参数

    4.3 查看topic列表

    4.4 查看topic的详细信息

    4.5 测试生产消息&消费消息

    4.6 删除话题topic


    问题现象:

            Linux 系统如何部署 kafka集群?


    解决方法:

    0、环境

    环境:

            linux 系统、JDK8、zookeeper集群、kafka 压缩包(tar.gz)。

    由于 kafka 是基于 zookeeper 的,因此安装 kafka 必须先安装好zookeeper 。

    注意:对于 linux 系统上安装 zookeeper集群 感兴趣的小伙伴,可以参考一下文章:

            https://blog.csdn.net/weixin_42585386/article/details/122359581

    1、下载

    地址:

            Apache Kafka

            可以看见有 2.12 和 2.13 版本,根据提示可知:如果用到了 Scala 则根据 Scala 的版本来选择,否则建议使用 2.13 版本


    拓展:Scala是什么?

            Scala是一门多范式的编程语言,一种类似java的编程语言,设计初衷是实现可伸缩的语言  、并集成面向对象编程函数式编程的各种特性,有人说 Scala可能是下一代Java!!!

    详情可以参考百度百科文章:

            Scala(编程语言)_百度百科


            回到正题,点击 2.13 版本后,跳转到新的页面,点击红框内的下载链接进行下载:

            下载后得到 kafka_2.13-3.0.0.tgz 压缩包文件

    2、部署

    2.1 linux服务器上新建目录

            先新建一个目录,如 kafka-cluster:

    # 进入自定义目录路径,用于存放kafka服务,如/home/thp/public
    cd /home/thp/public
     
    # 创建文件夹(目录),如kafka-cluster
    mkdir kafka-cluster

    2.2 上传kafka压缩包到服务器

            上传kafka压缩包到kafka-cluster文件夹下:

    # 进入kafka-cluster目录路径
    cd /home/thp/public/kafka-cluster

    2.3 解压

    # 解压 zookeeper压缩包
    tar -zxvf kafka_2.13-3.0.0.tgz

     2.4 重命名解压后的文件夹

            由于要搭建kafka集群,所以为了区分,我们对解压后的文件夹进行重命名:

    # 重命名为 kafka_2.13-3.0.0-1
    mv ​​​​​​​kafka_2.13-3.0.0 kafka_2.13-3.0.0-1

    2.5 创建logs日志目录和data数据目录

            新建 logs目录 用来存放 kafka服务日志 :

    # 进入 kafka_2.13-3.0.0-1目录
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1
     
    #创建 logs 目录
    mkdir logs 

    2.6 修改配置

            首先修改系统域名配置文件:

    # 编辑 hosts系统文件
    vi /etc/hosts

            加入以下内容:

    192.168.16.230 kafkahost

            修改后的hosts文件内容如下:

            注意ip不要使用127.0.0.1,因为kafka的配置中是无法识别的。

            接下来就是修改kafka配置文件 server.properties:

    # 进入解压后的文件夹下的 config 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/config
    
    # 编辑 server.properties配置文件
    vi server.properties

            需要修改的内容如下:

    # broker.id 是当前实例的id标识(集群中各实例的id标识不可相同)
    broker.id=1
    
    # kafka服务的ip(linux服务器ip)和port(默认9092),由于服务器9091端口已被占用,所以我用了0091
    listeners=PLAINTEXT://kafkahost:0091
    
    # kafka的服务日志存放目录
    log.dirs=/home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/logs
    
    # zookeeper集群各实例的ip和端口,用英文逗号分隔
    zookeeper.connect=192.168.16.230:2181,192.168.16.230:2182,192.168.16.230:2183
    
    # 在配置文件末尾添加如下配置,表示允许删除topic
    delete.topic.enable=true

             修改后内容如下:

            至此 kafka实例1就配置完成了!!!

    2.7 部署集群中的其他服务实例

            有了上面已经配置好的服务实例1,接下来部署其他服务实例就容易多了:

            1、复制 kafka_2.13-3.0.0-1 文件夹到 /home/thp/public/kafka-cluster 目录下,命名为 kafka_2.13-3.0.0-2

    # 进入 kafka-cluster目录路径
    cd /home/thp/public/kafka-cluster
     
    # 复制 kafka_2.13-3.0.0-1 并命名为 kafka_2.13-3.0.0-2
    cp -r kafka_2.13-3.0.0-1 kafka_2.13-3.0.0-2

             2、仿照 2.6 ,修改 /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-2/config 目录下的  server.properties 文件:

    # 进入 config 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-2/config
    
    # 编辑 server.properties 配置文件
    vi server.properties

            需要修改的内容如下:

    # broker.id 是当前实例的id标识(集群中各实例的id标识不可相同)
    broker.id=2
    
    # 由于服务器9092端口已被占用,所以我用了0092
    listeners=PLAINTEXT://kafkahost:0092
    
    # kafka的服务日志存放目录
    log.dirs=/home/thp/public/kafka-cluster/kafka_2.13-3.0.0-2/logs

            修改后内容如下: 

    ​​​​​​​

             3、同样的再配置一个服务实例,复制 kafka_2.13-3.0.0-1 文件夹到 /home/thp/public/kafka-cluster 目录下,命名为 kafka_2.13-3.0.0-3

    # 进入 kafka-cluster目录路径
    cd /home/thp/public/kafka-cluster
     
    # 复制 kafka_2.13-3.0.0-1 并命名为 kafka_2.13-3.0.0-3
    cp -r kafka_2.13-3.0.0-1 kafka_2.13-3.0.0-3

             修改 /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-3/config 目录下的  server.properties 文件:

    # 进入 config 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-3/config
    
    # 编辑 server.properties 配置文件
    vi server.properties

            需要修改的内容如下:

    # broker.id 是当前实例的id标识(集群中各实例的id标识不可相同)
    broker.id=3
    
    # 由于服务器9093端口已被占用,所以我用了0093
    listeners=PLAINTEXT://kafkahost:0093
    
    # kafka的服务日志存放目录
    log.dirs=/home/thp/public/kafka-cluster/kafka_2.13-3.0.0-3/logs

            修改后内容如下: 

             至此所有的服务实例已经配置完成!!!

     

    3、启动

    3.1 执行启动脚本

            依次执行3个服务实例的启动脚本:

    # 进入 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/bin    
    
    # 执行 kafka-server-start.sh 启动脚本文件
    ./kafka-server-start.sh -daemon ../config/server.properties &
    
    # 进入 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-2/bin    
    
    # 执行 kafka-server-start.sh 启动脚本文件
    ./kafka-server-start.sh -daemon ../config/server.properties &
    
    # 进入 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-3/bin    
    
    # 执行 kafka-server-start.sh 启动脚本文件
    ./kafka-server-start.sh -daemon ../config/server.properties &

            依次执行3个服务实例的启动脚本文件:


    拓展:服务启动报错"Cannot allocate memory"

            假如在执行启动脚本之后,出现"Cannot allocate memory"(无法分配内存)报错,说明是当前服务器内存满了,这个不是kafka服务有问题,而是服务占用的内存大于当前服务器剩余可运行的内存大小

            这个情况下可以修改kafka服务的启动脚本文件:

    # 进入 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/bin   
    
    # 编辑启动脚本文件
    kafka-server-start.sh

            服务所占内存大小初始值是1G

            修改为512M,或者更小的256M、128M等,减少服务所占用内存大小:


    3.2 查看 kafka 进程信息

    # 查看带有 ‘kafka’ 关键字的进程
    ps -ef |grep kafka

            由于篇幅问题,就不截图剩余部分了。执行命令后是可以看见有3个进程在运行,对应了3个kafka服务实例。

    3.3 查看服务日志

            进一步确认服务运行情况

    # 进入 服务实例的logs目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/logs
    
    # 查看日志
    tail -n 300 -f server.log

             日志中出现类似 [KafkaServer id=服务实例id] started (kafka.server.KafkaServer) 的信息,则表示服务启动成功!!!


     拓展:-daemon 参数

            通过查看 kafka-server-start.sh 启动脚本文件,可知启动命令中可以添加  -daemon 参数

    分析:

            -daemon 参数的作用就是让kafka服务实例以后台进程(守护进程)的模式来启动,简单的说就是不会占用当前命令行窗口,而不添加该参数的话,就会占用/阻塞当前命令行窗口进程,导致无法在当前窗口进行其他操作(除非ctrl + z / ctrl + c 停止当前服务实例,才会释放进程),必须打开一个新的命令窗口才能进行其他操作,这样显然是不方便的。

    总结:

            1、在执行服务启动脚本时,强烈建议带上-daemon 参数!!!

            2、要依次启动服务集群中的3个服务实例,不要漏掉!!!


    拓展:server.properties 的 listeners 配置规范

            关于 server.properties 配置文件,有个很关键的点是需要注意的,就是 listeners 这个配置,如下:

    # kafka服务的ip(linux服务器ip)和port(默认9092),由于服务器9091端口已被占用,所以我用了0091
    listeners=PLAINTEXT://192.168.16.230:0091

            这里我使用了域名配置,而没有使用linux服务器ip地址,假如我在服务实例1中不用域名,而是使用linux服务器ip地址,如下:

             可能会出现如下情况:

            一、执行服务实例1的启动脚本后,服务是起不来的,而且没有任何报错日志,如下:

    # 服务启动后,查看 服务实例1 的端口(注意我配置的端口是0091,linux会识别为91,而不是0091)
    netstat -tunlp | grep 91

           可以看到没有占用 91端口的相关进程!!! 但是服务实例2和服务实例3是起来了的!!!

            二、服务实例1能启动,但通过查看服务日志会看见以下报错:

    # 进入 服务实例的logs目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/logs
    
    # 查看日志
    tail -n 300 -f server.log

             可以看见,日志显示无法连接到zookeeper服务集群!!!

    分析:

            通过查看配置可以发现有如下提示:

            也就是说该配置的格式必须是服务器域名:端口号,而不能是ip:port。

            此外经过测试发现,当使用localhost域名时,也会报同样的错误:

    总结:

            server.properties 的 listeners 配置规范:

            1、格式必须是服务器域名:端口号,而不能是ip:port。

            2、服务器域名不能使用localhost,无法识别

    4、应用

    4.1 开放防火墙端口

            开放3个kafka服务实例所用的端口:

    # 开放端口
    sudo firewall-cmd --zone=public --add-port=0091/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=0092/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=0093/tcp --permanent
    
    # 重启防火墙
    sudo firewall-cmd --reload
    
    # 查看开放端口列表
    sudo firewall-cmd --zone=public --list-ports

            端口开放成功:

    4.2 创建话题topic

            创建一个分区数为 2(分区数不可大于集群中服务实例总数),脚本数为 2(脚本数不可大于分区数),话题名为 first 的 topic:

    # 进入 服务实例去的 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/bin
    
    # 创建topic话题
    ./kafka-topics.sh --bootstrap-server 192.168.16.230:0091 --create --replication-factor 2 --partitions 2 --topic first

            出现如下信息则为创建成功:


    拓展:创建topic的脚本命令变动

            有些小伙伴可能是用以下命令执行的脚本:

    ./kafka-topics.sh --create --zookeeper 192.168.16.230:2181 --replication-factor 2 --partitions 2 --topic first

            这个我测试过是不行的,查阅了以下网上资料,原因应该是因为kafka版本不同,据说是从kafka 2.12 版本开始,改成了新的创建topic的脚本命令,如:

    # 创建topic话题
    ./kafka-topics.sh --bootstrap-server 192.168.16.230:0091 --create --replication-factor 2 --partitions 2 --topic first

            可以看见新旧命令的区别在于 集群服务实例的指定参数上:

            旧版本用的是:--zookeeper 192.168.16.230:2181,

            新版本用的是: --bootstrap-server 192.168.16.230:0091​​​​​​​,

    原因:

            旧版本的topic的offset信息存储在zookeeper集群上(数据存储在kafka集群中),所以消费者在消费消息/获取数据时,必须先和zookeeper服务实例通信获取到offset,再和kafka服务实例通信消费消息/获取数据

            新版本中offset信息和数据都存储在kafka集群上了,kafka会自动创建一个 topic(__consumer_offsets)来保存offset的信息,消费者在消费消息/获取数据时,只需要和kafka服务实例进行一次通信即可,有利于提高效率。


    拓展:脚本命令参数

            有些小伙伴可能是用以下命令执行的脚本:

    ./kafka-topics.sh --create --zookeeper 192.168.16.230:2181 --replication-factor 2 --partitions 2 --topic first

            这个我测试过是不行的,查阅了以下网上资料,原因应该是因为kafka版本不同,据说是从kafka 2.12 版本开始(确切的说是从kafka 2.11大版本的0.9.0.0小版本开始,为了方便记忆,我们记住大版本就好),改成了新的创建topic的脚本命令,如:

    # 创建topic话题
    ./kafka-topics.sh --bootstrap-server 192.168.16.230:0091 --create --replication-factor 2 --partitions 2 --topic first

    --create : topic 创建标识

    --bootstrap-server ip:0091,ip:0092,ip:0092.... : 指定kafka集群(kafka版本2.2以上)

    --zookeeper ip:0091,ip:0092,ip:0092.... : 指定zookeeper集群

    --partitions 分区数 : 指定分区总数

    --replication-factor 副本数 : 指定每个分区的副本数

    --topic 话题名称 : 指定新建话题的名称


            创建topic成功后,可以从3个服务实例各自的logs目录下,看到对应的分区目录,如:

            服务实例1 有两个分区,分别是 first-0 和 first-1 :

            服务实例2 有一个分区,是 first-1 (为什么不是 first-0?随机分配的,别在意这些 ):

            因此服务实例3 上就是另一个分区 first-1 :

    4.3 查看topic列表

    # kafka旧版本(2.2以下)
    ./kafka-topics.sh --list --zookeeper 172.17.80.219:2181
    
    # kafka新版本(2.2及以上)
    ./kafka-topics.sh --list --bootstrap-server 192.168.16.230:0091
    

               可以看见91、92、93服务所在的kafka集群中目前只有同一个topic,名为 first。

     4.4 查看topic的详细信息

    # kafka旧版本(2.2以下)
    ./kafka-topics.sh -zookeeper 192.168.16.230:2181 -describe -topic first
    
    # kafka新版本(2.2及以上)
    ./kafka-topics.sh --describe --bootstrap-server 192.168.16.230:0091 --topic first

    属性描述:

    第一行(topic话题主要信息):

            Topic:话题名称

            TopicId:话题id

            PartitionCount:分区数

            ReplicationFactor:每个分区的副本数

    第二行、第三行。。。(各分区信息记录):

            Topic:话题名称

            Partition:分区记录id

            Leader:领导者id,取值来源于kafka服务实例id(broker.id)

            Replicas:副本所在的kafka服务实例id(broker.id),逗号分隔

            Isr: 副本所在的kafka服务实例id(broker.id),逗号分隔,该值在投票选举的时候用的,哪个分区副本的数据和leader数据越接近,这个分区所在的kafka服务实例id(broker.id)就越靠前,当leader挂掉时,就取Isr中最靠前的一个kafka服务实例来充当leader。

    4.5 测试生产消息&消费消息

            执行脚本命令让服务实例3作为生产者

            由于执行后会占用当前命令窗口,而消息通信具有实时性,所以需要开启一个新的命令窗口,再执行脚本命令让服务实例1作为消费者,从头部开始消费消息

            再开启一个新的命令窗口,再执行脚本命令让服务实例2作为消费者,从尾部开始消费消息

            执行如下命令:

    # 进入 服务实例3 的 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-3/bin
    
    # 服务实例3(192.168.16.230:0093)生产消息
    ./kafka-console-producer.sh --broker-list 192.168.16.230:0093 --topic first
    
    
    
    
    # 进入 服务实例1 的 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-1/bin
    
    # 开启新的命令窗口后,让服务实例2(192.168.16.230:0092)从头部消费消息
    ./kafka-console-consumer.sh --bootstrap-server 192.168.16.230:0091 --topic first --from-beginning
    
    
    
    
    
    # 进入 服务实例2 的 bin 目录路径
    cd /home/thp/public/kafka-cluster/kafka_2.13-3.0.0-2/bin
    
    # 开启新的命令窗口后,让服务实例2(192.168.16.230:0092)从尾部消费消息
    ./kafka-console-consumer.sh --bootstrap-server 192.168.16.230:0092 --topic first --offset latest --partition 0
    

            服务实例3生产消息:

             服务实例1从头部消费消息:

              服务实例2从尾部消费消息:

            可以看出服务实例1完全按顺序获取到了服务实例3实时生产的消息,而服务实例2会出现丢失,推测是因为服务实例3中生产的消费是随机存储在分区0和1中的,而从尾部消费消息时,是需要指定分区的,因此只能获取到该分区上存储的消费消息。

    参数说明:

            --from-beginning :从头开始消费消息。

            --offset latest :从尾部开始消费消息;--offset 位移量,如: --offset 2 表示从第2个消息开始消费。

    4.6 删除话题topic

    # kafka旧版本
    ./kafka-topics.sh --zookeeper 192.168.16.230:0091 --delete --topic first
    
    # kafka新版本
    ./kafka-topics.sh --bootstrap-server 192.168.16.230:0091 --topic first --delete 
    

            可以看见之前创建的话题topic名为first的已经不存在了! 但是多了两个topic,推测是之前kafka集群在处理first话题的时候,自动生成的。

            同理也可以将这两个topic也删除掉:

    展开全文
  • 一、单节点无ZK部署(假设机器名为 node) 1、下载并解压 jdk、kafka,备份配置文件,配置并加载JAVA环境变量 wget -P ~ https://repo.huaweicloud.com/java/jdk/8u181-b13/jdk-8u181-linux-x64.tar.gz &&...

    一、单节点无ZK部署(假设机器名为 node)

    1.1、下载并解压 jdk、kafka,备份配置文件,配置并加载JAVA环境变量

    rm -rf ~/jdk-8u181-linux-x64.tar.gz && \
    wget -P ~ https://repo.huaweicloud.com/java/jdk/8u181-b13/jdk-8u181-linux-x64.tar.gz && \
     
    tar -zxf ~/jdk-8u181-linux-x64.tar.gz -C /opt && \
     
     
    echo "
    export JAVA_HOME=/opt/jdk1.8.0_181
    export JRE_PATH=\${JAVA_HOME}/jre
    export CLASSPATH=.:\${JAVA_HOME}/lib:\${JRE_PATH}/lib
    export PATH=\${PATH}:\${JAVA_HOME}/bin
    " >> /etc/profile && source /etc/profile
    rm -rf ~/kafka_2.13-2.8.0.tgz && \
    wget -P ~ https://repo.huaweicloud.com/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz && \
    
    tar -zxf ~/kafka_2.13-2.8.0.tgz -C /opt && \
    
    echo "
    
    export KAFKA_HOME=/opt/kafka_2.13-2.8.0
    export PATH=\${PATH}:\${KAFKA_HOME}/bin
    " >> /etc/profile && source /etc/profile && \
    
    
    rm -rf ${KAFKA_HOME}/config/kraft/server_sample.properties && \
    cp ${KAFKA_HOME}/config/kraft/server.properties ${KAFKA_HOME}/config/kraft/server_sample.properties

    1.2、配置单节点kafka

    cp ${KAFKA_HOME}/config/kraft/server_sample.properties ${KAFKA_HOME}/config/kraft/server.properties && \
    sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/${HOSTNAME}:9092/g" ${KAFKA_HOME}/config/kraft/server.properties

    1.3、配置开机启动并启动服务

         创建启动脚本,配置kafka开机启动,启动服务

    echo "
    #!/bin/bash
    echo \`kafka-storage.sh random-uuid\` | \
    xargs kafka-storage.sh format --ignore-formatted -c ${KAFKA_HOME}/config/kraft/server.properties -t
    kafka-server-start.sh ${KAFKA_HOME}/config/kraft/server.properties
    " > ${KAFKA_HOME}/bin/kafka-start-one.sh && \
    chmod +x ${KAFKA_HOME}/bin/kafka-start-one.sh && \
    
    
    echo "
    [Unit]
    Description=Apache Kafka server (broker)
    After=network.target
    
    [Service]
    Type=simple
    Environment=\"PATH=${PATH}\"
    User=root
    Group=root
    ExecStart=/bin/sh -c 'kafka-start-one.sh'
    ExecStop=/bin/sh -c 'kafka-server-stop.sh'
    Restart=on-failure
    
    [Install]
    WantedBy=multi-user.target
    " > /etc/systemd/system/kafka-start-one.service && \
    systemctl daemon-reload && \
    systemctl enable kafka-start-one && \
    echo `systemctl start kafka-start-one` && \
    systemctl status kafka-start-one

    1.4、测试

    创建一个topic
    kafka-topics.sh --create --topic warnRuleTopic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
    发送
    kafka-console-producer.sh --bootstrap-server localhost:9092 --topic warnRuleTopic
    接收
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic warnRuleTopic --from-beginning
    查看topic信息
    kafka-topics.sh --describe --topic warnRuleTopic --bootstrap-server localhost:9092
    删除topic
    kafka-topics.sh --delete --topic warnRuleTopic --bootstrap-server localhost:9092

    二、集群部署

    准备至少三台机器:node1、node2、node3,且机器个数为奇数

    以下操作须在每台机器上执行。

    2.1、下载并解压 jdk、kafka、zookeeper,备份配置文件,配置并加载JAVA环境变量

    rm -rf ~/jdk-8u181-linux-x64.tar.gz ~/kafka_2.13-2.8.0.tgz ~/apache-zookeeper-3.6.3-bin.tar.gz && \
    wget -P ~ https://repo.huaweicloud.com/java/jdk/8u181-b13/jdk-8u181-linux-x64.tar.gz && \
    wget -P ~ https://repo.huaweicloud.com/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz && \
    wget -P ~ https://repo.huaweicloud.com/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz && \
    
    tar -zxf ~/jdk-8u181-linux-x64.tar.gz -C /opt && \
    tar -zxf ~/kafka_2.13-2.8.0.tgz -C /opt && \
    tar -zxf ~/apache-zookeeper-3.6.3-bin.tar.gz -C /opt && \
    
    
    echo "
    export JAVA_HOME=/opt/jdk1.8.0_181
    export JRE_PATH=\${JAVA_HOME}/jre
    export CLASSPATH=.:\${JAVA_HOME}/lib:\${JRE_PATH}/lib
    export PATH=\${PATH}:\${JAVA_HOME}/bin
    
    export KAFKA_HOME=/opt/kafka_2.13-2.8.0
    export PATH=\${PATH}:\${KAFKA_HOME}/bin
    
    export ZK_HOME=/opt/apache-zookeeper-3.6.3-bin
    export PATH=\${PATH}:\${ZK_HOME}/bin
    " >> /etc/profile && source /etc/profile && \
    
    
    rm -rf ${KAFKA_HOME}/config/server_sample.properties && \
    cp ${KAFKA_HOME}/config/server.properties ${KAFKA_HOME}/config/server_sample.properties
    
    

    2.2、配置kafka、zookeeper

    不同机器下需修改的配置项说明:

    broker.id:kafka集群中的每个broker都有一个唯一的id值用来区分彼此

    myid文件和server.myid:zk集群用来发现彼此的一个重要标识

    hostname1=node1 && \
    hostname2=node2 && \
    hostname3=node3 && \
    myid=1 && \
    
    
    cp ${KAFKA_HOME}/config/server_sample.properties ${KAFKA_HOME}/config/server.properties && \
    sed -i "s/broker.id=0/broker.id=${myid}/g" ${KAFKA_HOME}/config/server.properties && \
    sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/${HOSTNAME}:9092/g" ${KAFKA_HOME}/config/server.properties && \
    sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/${HOSTNAME}:9092/g" ${KAFKA_HOME}/config/server.properties && \
    sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=${hostname1}:2181,${hostname2}:2181,${hostname3}:2181/g" ${KAFKA_HOME}/config/server.properties
    
    
    rm -rf /opt/zk_data_log && mkdir -p /opt/zk_data_log/data /opt/zk_data_log/log && \
    
    touch /opt/zk_data_log/data/myid && echo "${myid}" > /opt/zk_data_log/data/myid && \
    
    cp ${ZK_HOME}/conf/zoo_sample.cfg ${ZK_HOME}/conf/zoo.cfg && \
    sed -i "s/dataDir=\/tmp\/zookeeper/dataDir=\/opt\/zk_data_log\/data/g" ${ZK_HOME}/conf/zoo.cfg && \
    sed -i "s/#maxClientCnxns=60/maxClientCnxns=2000/g" ${ZK_HOME}/conf/zoo.cfg && \
    
    echo "dataLogDir=/opt/zk_data_log/log
    server.1=${hostname1}:2888:3888
    server.2=${hostname2}:2888:3888
    server.3=${hostname3}:2888:3888
    forceSync=no
    4lw.commands.whitelist=*" >> ${ZK_HOME}/conf/zoo.cfg

    2.3、配置开机启动并启动服务

      zookeeper开机启动

    installDir=/opt && \
    
    echo "
    [Unit]
    Description=Zookeeper Service
    After=network.target
    
    [Service]
    Type=forking
    Environment=\"ZOO_LOG_DIR=${installDir}/zk_data_log/log\"
    Environment=\"PATH=${PATH}\"
    ExecStart=/bin/sh -c 'zkServer.sh start'
    ExecStop=/bin/sh -c 'zkServer.sh stop'
    ExecReload=/bin/sh -c 'zkServer.sh restart'
    
    [Install]
    WantedBy=multi-user.target
    " > /etc/systemd/system/zookeeper.service && systemctl daemon-reload && systemctl enable zookeeper

      kafka开机启动

    installDir=/opt && \
    
    echo "
    [Unit]
    Description=Apache Kafka server (broker)
    After=network.target zookeeper.service
    
    [Service]
    Type=simple
    Environment=\"KAFKA_HOME=${KAFKA_HOME}\"
    Environment=\"PATH=${PATH}\"
    
    User=root
    Group=root
    ExecStart=/bin/sh -c 'kafka-server-start.sh \${KAFKA_HOME}/config/server.properties'
    ExecStop=/bin/sh -c 'kafka-server-stop.sh'
    Restart=on-failure
    
    [Install]
    WantedBy=multi-user.target
    " > /etc/systemd/system/kafka.service && systemctl daemon-reload && systemctl enable kafka

    启动服务

    systemctl start zookeeper && \
    systemctl start kafka

    2.4、运维

    #查看zk启动状态 installDir/apache-zookeeper-3.6.3-bin/bin
    zkServer.sh status
    
    #连接
    zkCli.sh -server 127.0.0.1:2181
    
    删除
    /opt/apache-zookeeper-3.6.3-bin/bin/zkCli.sh -server 127.0.0.1:2181
    [zk: 127.0.0.1:2181(CONNECTED) 1] ls /
    [zk: 127.0.0.1:2181(CONNECTED) 1] deleteall /clickhouse
    
    

    2. 5、错误解决

    ERROR [main:QuorumPeer@1148] - Unable to load database on disk
    
    删除 /tmp/zookeeper , 重新创建data,log,myid 然后重启

    2.6、kafka-manager

    ================= 安装kafka-manager =======================
    https://blog.wolfogre.com/posts/kafka-manager-download/
    https://github.com/wolfogre/kafka-manager-docker/releases/download/3.0.0.4/cmak-3.0.0.4.zip
    https://github.com/wolfogre/kafka-manager-docker/releases/download/2.0.0.2/kafka-manager-2.0.0.2.zip
    解压
    unzip kafka-manager-2.0.0.2.zip && sudo mv ./kafka-manager-2.0.0.2 /opt
    修改配置
    cd /opt/kafka-manager-2.0.0.2
    vi ./conf/application.conf
    kafka-manager.zkhosts="node1:2181,node2:2181,node3:2181"
    启动停止脚本
    vi km.sh
    
    #! /bin/bash
    export JAVA_HOME=/opt/jdk1.8.0_181
    export PATH=$JAVA_HOME/bin:$PATH
    case $1 in
    	"start"){
    		echo " -------- Start KafkaManager -------"
    		#export ZK_HOSTS="elk-01:2181,elk-02:2181,elk-03:2181"
    		echo `rm -rf /opt/kafka-manager-2.0.0.2/RUNNING_PID`
    		nohup /opt/kafka-manager-2.0.0.2/bin/kafka-manager -Dconfig.file=/opt/kafka-manager-2.0.0.2/conf/application.conf -Dhttp.port=9000 >/opt/kafka-manager-2.0.0.2/kafka-manager.log 2>&1 &
    		sleep 1 && jps
    		};;
    	"stop"){
    		echo " -------- Stop KafkaManager -------"
    		#ps -ef | grep ProdServerStart | grep -v grep | awk '{print $2}' | xargs kill
    		cat /opt/kafka-manager-2.0.0.2/RUNNING_PID | xargs kill -9
    		rm -rf /opt/kafka-manager-2.0.0.2/RUNNING_PID
    		};;
    	* )
    		echo "usage:./km.sh start|stop"
    	;;
    esac
    
    sudo chmod +x km.sh
    
    启动
    ./km.sh start
    ===========================================================================================

    展开全文
  • 小白linux部署kafka并集成springboot

    简单实现,不涉及集群及更多扩展的kafka知识。

    1. kafka linux部署(单节点)

    kafka官网:http://kafka.apache.org/

    应用版本:kafka_2.12-3.0.0

    前置环境:jdk1.8

    1.1 kafka配置

    文件目录:config/server.properties

    修改项:

    log.dirs=/tmp/kafka-logslog.dirs=/data/kafka-logs

    listeners=PLAINTEXT://0.0.0.0:9092

    advertised.listeners=PLAINTEXT://192.168.62.133:9092 #虚拟机ip

    1.2 zookeeper配置

    文件目录:config/zookeeper.properties

    修改项:dataDir=/tmp/zookeeper → dataDir=/data/zookeeper

    linux tmp文件夹默认有清除机制,所以数据文件不要放在tmp目录下

    2. 启动服务

    2.1 启动zookeeper服务

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties &
    

    2.2 启动kafka服务

    ./kafka-server-start.sh -daemon ../config/server.properties &
    

    启动顺序要注意,先启动zookeeper,后kafka。

    3.3 jps检查启动是否成功

    [root@template ~]# jps
    7793 Kafka
    24361 QuorumPeerMain
    21049 Jps

    3.4 测试

    创建一个topic

    ./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 8 --topic test
    

    查看topic是否创建成功

    ./kaf-topics.sh --bootstrap-server localhost:9092 --list
    

    测试发送消息

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    Hello World

    启动消费者

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    Hello World

    3. SpringBoot集成kafka

    3.1 maven

    <!--Kafka 依赖-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.1.0</version>
    </dependency>
    

    这里spring-kafka没有指定版本,使用 Spring Boot 时(并且您尚未使用 start.spring.io 创建项目),省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本

    3.2 application.properties

    server.port=8077
    spring.application.name=gas-kafka-sync
    
    spring.kafka.bootstrap-servers=192.168.62.133:9092
    spring.kafka.producer.retries=3
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.acks=1
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.group-id=default-group
    spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=manual_immediate
    

    3.3 消息发送接口

    根据项目场景,一般是对接第三方数据。由于不保证第三方拥有直接对接kafka的能力,这里提供一个公共的消息发送接口。对方只需要传入topic和数据对象(Object)到DataDto即可。

    package com.gsafety.bg.kafka.service;
    
    import cn.hutool.json.JSONUtil;
    import com.gsafety.bg.kafka.model.DataDto;
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Service;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    /**
     * @author Mr.wanter
     * @time 2022-7-18 0018
     * @description
     */
    
    @Service
    @AllArgsConstructor
    @Slf4j
    public class KafkaProducerImpl implements KafkaProducerService {
    
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        @Override
        public void sendMsg(DataDto dto) {
            //1 构建消息数据
            String msg = JSONUtil.toJsonStr(dto.getData());
            //2.发送消息
            ListenableFuture future = kafkaTemplate.send(dto.getTopic(), msg);
            // 回调函数
            future.addCallback(
                    new ListenableFutureCallback<SendResult<Integer, String>>() {
                        @Override
                        public void onFailure(Throwable ex) {
                            log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, dto.getTopic(), msg);
                        }
    
                        @Override
                        public void onSuccess(SendResult<Integer, String> result) {
                            log.info("kafka sendMessage success topic = {}, data = {}", dto.getTopic(), msg);
                        }
                    });
            // 未指定分区发送
            // kafkaTemplate.send(TOPIC_NAME, msg);
            // 指定分区发送
            // kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        }
    }
    

    3.4 消费接口

    消费接口根据业务场景,对固定topic下的数据进行解析入库。

    package com.gsafety.bg.gas.service.impl;
    
    import cn.hutool.json.JSONUtil;
    import com.gsafety.bg.gas.dao.QuakeAlarmDao;
    import com.gsafety.bg.gas.model.QuakeAlarmEntity;
    import com.gsafety.bg.gas.model.dto.QuakeAlarmDto;
    import com.gsafety.bg.gas.service.mapping.QuakeAlarmMapping;
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import javax.transaction.Transactional;
    
    /**
     * @author Mr.wanter
     * @time 2022-7-15 0015
     * @description
     */
    @Component
    @Slf4j
    @AllArgsConstructor
    @Transactional(rollbackOn = Exception.class)
    public class KafkaConsumer {
    
        private final QuakeAlarmDao dao;
        private final QuakeAlarmMapping mapping;
    	//自动提交时Acknowledgment 参数会报错
        @KafkaListener(id = "0", topics = "quake_alarm", groupId = "default-group")
        public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
            try {
                String value = record.value();
                QuakeAlarmDto quakeAlarmDto = JSONUtil.toBean(value, QuakeAlarmDto.class);
                QuakeAlarmEntity entity = mapping.convertFrom(quakeAlarmDto);
                QuakeAlarmEntity save = dao.save(entity);
                log.info("kafka mq:{}", record.value());
                log.info("entity:{}", save);
                //消费成功后手动提交offset
                ack.acknowledge();
            } catch (Exception e) {
    			ack.acknowledge();
                log.error("异常原因:{}", e.getCause());
    			//如果对方传入的json数据不符合约定的对象结构,那么这条数据视为垃圾数据,直接消费掉。但是如果此时数据库挂了导致数据没有存入数据库,但是kafka已经被消费怎么办?
            }
        }
    }
    

    4. 遇到的问题

    4.1 kafka启动不成功

    检查服务器内存是否够用。

    4.2 boot启动后日志一直连接localhost/172.0.0.1

    kafka没有对外暴漏ip地址

    # 允许外部端口连接                                            
    listeners=PLAINTEXT://0.0.0.0:9092  
    # 外部代理地址                                                
    advertised.listeners=PLAINTEXT://192.168.62.133:9092
    

    4.3 消费端改为手动提交offset后报错

    配置文件对应修改:

    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=manual_immediate
    

    4.4 删除topic及数据

    1. 关闭kafka、zookeeper服务
    2. 删除/data/kafka-logs目录下所有数据
    3. 删除/data/zookeeper/version-2目录下所有数据
    4. 重启

    参考:

    https://blog.csdn.net/qq_41969358/article/details/123032806

    https://www.cnblogs.com/along21/p/10278100.html

    https://blog.csdn.net/qq_41432730/article/details/121924814

    https://docs.spring.io/spring-kafka/reference/html/#getting-started

    https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka

    展开全文
  • Linux部署Kafka+sasl

    2022-06-21 09:53:45
    kafka3.0以后不支持jdk8,需要升级版本,以下用的事Kafka2.12_2.2.1在服务器节点配置认证文件: 文件路径: 文件内容: 注意配置文件中的两个分号的位置,多一不可,缺一不可。 修改服务器节点的启动配置文件: 复制...
  • 1、zookeeper安装、默认安装最新版本(直接粘贴即可,不需要改动命令) docker pull zookeeper docker run -d -p 2181:2181 --name zookeeper zookeeper 2、kafka安装(直接粘贴即可,不...--name kafka -e KAFKA
  • kafka是一个优秀的消息队列框架,现在基于linux安装写个文档记录下。 提示:以下是本篇文章正文内容,下面案例可供参考 一、环境准备 1.下载linux 安装包。 https://kafka.apache.org/downloads 注意下载二进制包...
  • Linux 搭建 Kafka教程

    2022-01-05 11:02:20
    kafka解压到linux去配置文件中配置环境 配置kafka文件内容 进入kafka/config目录修改server.properties 文件 修改broker.id= id里面的数值不可以重复 同时添加主机的ip 和端口 host.name=192.168.10.101 ...
  • Linux 搭建Kafka集群,最新教程,细到极致
  • linux 安装Kafka-Eagle

    2021-08-27 11:10:29
    [root@ls_t3rdypvp ~]# tar -zxvf kafka-eagle-xxx-bin.tar.gz [root@ls_t3rdypvp ~]# mv kafka-eagle-web-2.0.6 /usr/local/ [root@ls_t3rdypvp local]# mv kafka-eagle-web-2.0.6 kafka-eagle 2.配置 [root@ls_t3...
  • linux部署kafka

    2016-05-12 12:48:51
    Kafka默认开启JVM压缩指针,但只是在64位的HotSpot VM受支持,如果安装了32位的HotSpot VM,需要修改/bin/kafka-run-class.sh文件: 找到如下行: KAFKA_JVM_PERFORMANCE_OPTS= "-server -XX:+...
  • linux-搭建kafka环境

    千次阅读 2021-06-01 09:36:09
    也可以参考 linux下搭建java环境配置jdk+tomcat配置 接着启动zookeeper,服务将运行在2181端口(可在config文件夹在的zookeeper.properties文件中查看该端口) 由于目前Kafka部署Linux服务器上的,外网如果想要...
  • linux环境zookeeper+kafka集群的部署

    千次阅读 2022-04-02 09:42:41
    安装包下载地址zookeeper 3.4.9+kafka 2.12_2.6.1 ...提取码:s3ms ...部署zookeeper集群 (网络需要打通3个端口 2181对外提供服务端口(kafka用),2888 leader和follower通信端口,3888 leader选举的端...
  • LinuxKafka_2.13 2.8.X 安装配置(图文详细) 一、资源准备 ZK按照配置【可使用Kafka自带的ZK包程序,前置环境】: https://blog.csdn.net/weixin_44187730/article/details/117199116 资源下载地址: ...
  • 基于linux系统搭建kafka

    2021-11-04 10:04:25
    3、进入kafka官网下载安装包,并将安装包上传到linux中。 二、配置zookeeper 1、进入zookeeper安装目录中创建data和log文件夹 cd apache-zookeeper-3.6.3-bin mkdir data mkdir log 2、进入conf目录, 复制一个zoo...
  • Linux Kafka 配置公网IP

    千次阅读 2021-12-16 11:05:42
    在config/server.properties内配置 #绑定内网IP listeners=PLAINTEXT://内网IP:9092 #绑定外网IP advertised.listeners=PLAINTEXT://外网IP:9092 advertised.host.name=外网IP```
  • linux部署kafka单机集群环境

    千次阅读 2018-04-12 17:59:34
    一、说明: 操作系统:linux kafka版本信息:kafka_2.11-0.8.2.1二、具体操作:1、安装kafka之间先检查操作系统中是否装有JDK,若没有点击打开链接有JDK安装步骤。2、关闭SELINUX、开启防火墙9092端口 2.1、关闭...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 23,168
精华内容 9,267
关键字:

linux 部署kafka