精华内容
下载资源
问答
  • linux搭建kafka集群

    2020-08-27 17:31:46
    linux搭建kafka集群 安装zookeeper集群环境 kafka是依赖于zookeeper注册中心的一款分布式消息对列,所以需要有zookeeper单机或者集群环境。先前我们已经安装了,这里就不做介绍了 准备三台虚拟机 下载安装包 ...

    linux搭建kafka集群

    安装zookeeper集群环境
    kafka是依赖于zookeeper注册中心的一款分布式消息对列,所以需要有zookeeper单机或者集群环境。先前我们已经安装了,这里就不做介绍了
    准备三台虚拟机
    下载安装包
    https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz
    用mkdir命令创建kafka文件夹
    用xftp上传安装包并解压

    tar -zxvf kafka_2.12-2.3.1.tgz
    

    添加环境变量
    打开环境变量配置文件

    vim /etc/profile
    

    加入kafka配置

    export KAFKA_HOME=/usr/local/programs/kafka_2.12-2.3.1
    export PATH=${KAFKA_HOME}/bin:$PATH
    

    #让环境变量生效

    source /etc/profile
    

    修改$KAFKA_HOME/config 目录下的server.properties文件
    #这个1就是和zookeeper的myid文件的1对应

    broker.id=1
    

    外网访问地址

    listeners=PLAINTEXT://192.168.229.128:9092
    advertised.listeners=PLAINTEXT://192.168.229.128:9092
    

    #填写3台机器的地址,中间用逗号隔开

    zookeeper.connect=server1:2181,server2:2181,server3:2181
    

    #启动zookeeper

    zkServer.sh start
    

    #启动kafka

    kafka-server-start.sh -daemon /root/ff/kafka/kafka_2.12-2.3.1/config/server.properties
    

    #查看进程

    Jps
    

    测试
    #在leader上创建主题

    kafka-topics.sh --create --zookeeper server1:2181,server2:2181,server3:2181 --replication-factor 3 --partitions 3 --topic WinterTop
    

    #查看主题List

    kafka-topics.sh --list --zookeeper server1:2181,server2:2181,server3:2181
    

    #在leader上发送消息

    kafka-console-producer.sh --broker-list server1:9092,server2:9092,server3:9092 --topic WinterTop
    

    #在follower上消费消息

    kafka-console-consumer.sh --bootstrap-server server1:9092,server2:9092,server3:9092  --from-beginning --topic WinterTop
    
    展开全文
  • Linux搭建Kafka集群

    千次阅读 2018-11-19 15:44:46
    Linux搭建Kafka集群 1 说明 1.1 编写目的 本文编写旨在介绍Kafka的集群架构配置,了解Kafka的基本使用。 1.2 适用范围 本文适用于对高效率消息中间件及对大数据处理比较有兴趣的童鞋。本文仅介绍了Kafka的基本知识、...

    Linux搭建Kafka集群

    1 说明

    1.1 编写目的

    本文编写旨在介绍Kafka的集群架构配置,了解Kafka的基本使用。

    1.2 适用范围

    本文适用于对高效率消息中间件及对大数据处理比较有兴趣的童鞋。本文仅介绍了Kafka的基本知识、集群架构,以及Kafka的基本使用。

    2 Kafka简介

    2.1 什么是kafka

    Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka使用Scala编写,是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
    其主要设计目标为:
    1、以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
    2、高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
    3、支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
    4、同时支持离线数据处理和实时数据处理。
    5、 Scale out:支持在线水平扩展。

    2.2 Kafka基本概念

    1、Producer:消息生产者,就是向kafka broker发消息的客户端。
    2、Consumer:消息消费者,向kafka broker取消息的客户端。
    3、Topic:特定类型的消息流,可以理解为一个队列。
    4、Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
    5、Broker:已发布的消息保存在一组服务器中,称之为Kafka集群。一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
    6、Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
    7、Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

    2.3 Kafka架构

    2.3.1 集群示意

    kafka集群示意图
    Kafka集群的业务处理示意如上图所示。生产者发布消息到集群中,消费者订阅某topic,创建消息流。生产者发布到该话题的消息被均衡地分发到这些流中。

    2.3.2 整体架构

    kafka整体架构图
    Kafka的整体架构如上图所示,展示了生产者、消费者、Topic、Partition之间的关系和数据传递。因为Kafka内在就是分布式的,一个Kafka集群通常包括多个代理。为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。

    3 配置准备

    3.1 服务器准备

    一般情况下,Kafka集群至少需要3台机器。本文档就以3台机器作为安装环境进行配置,操作系统以Centos 7为例。
    各个节点及描述如下表所示:

    机器名 IP
    node1 192.168.204.127
    node2 192.168.204.128
    node3 192.168.204.129

    如以下步骤使用机器名替代ip需要在/etc/hosts文件中配置例如192.168.204.127 node1信息 ,直接使用ip则不需要配置

    3.2 环境准备

    由于Kafka需要通过Zookeeper进行分布式系统的协调和促进,通过Zookeeper协调Broker、生产者和消费者。所以安装前需在每台机器上安装好Zookeeper(虽然Kafka自带有Zookeeper,但一般使用外置安装),具体安装步骤见https://blog.csdn.net/zhangkai19910815/article/details/84230349

    3.3 软件准备

    Kafka属于Apache的一个子项目,需到其官网上进行下载,下载方式为:
    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
    (具体版本以官网为准)

    4 Kafka安装

    如无特别说明,以下操作均需要在每台服务器上完成。

    4.1 Kafka安装

    1、进入某一目录(本文以/home/tmp/为例),执行如下命令完成kafka的下载:
    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/ kafka_2.12-1.1.1.tgz
    2、执行命令解压压缩包至软件安装常用目录(本人常用目录为/usr/local/apps,个人按情况而定)
    [root@node1 tmp]# tar zxvf kafka_2.12-1.1.1.tgz -C /usr/local/apps
    3、为了方便操作,进入 /usr/local/apps 并修改安装目录名:
    [root@ node1 tmp]# cd /usr/local/apps
    [root@ node1 apps]# mv kafka_2.12-1.1.1 kafka
    4、配置环境变量

    vi /etc/profile
    添加系统变量并修改PATH:

    export KAFKA_HOME=/usr/local/apps/kafka
    export PATH=.:${JAVA_HOME}/bin:${ZK_HOME}/bin:${STORM_HOME}/bin:$PATH:${KAFKA_HOME}/bin
    

    4.2 Kafka配置

    安装完成后需对Kafka进行配置,配置主要体现在config目录下的server.properties
    [root@ node1 config]# cd /user/local/apps/kafka/config/
    [root@ node1 config]# vi server.properties
    将配置信息按如下内容进行基础性的修改:
    1、 修改broker.id,确保每台机器的broker.id不一致,本文3台服务器的broker.id分别设置为1、2、3;
    2、 port默认为9092,可以根据需要进行修改,一般情况下保持3台服务器一致;
    3、 修改host.name为本机真实IP;
    4、 num.partitions默认为1,可根据集群要求进行修改,本文修改为4;
    5、 修改zookeeper.connect,其值为所有服务器上zookeeper的IP端口串,如下所示:
    zookeeper.connect=192.168.204.127:2181,192.168.204.128:2181,192.168.204.129:2181
    6、 log.dirs=/usr/local/apps/kafka/kafka-logs 配置kafka日志目录

    4.3 启动与停止

    4.3.1 启动

    Kafka的启动顺序为启动Zookeeper—>启动Kafka。
    1、 启动Zookeeper
    在Zookeeper的安装目录下执行启动命令:
    [root@node1 bin]#./zkServer.sh start
    Zookeeper相关配置可见
    https://blog.csdn.net/zhangkai19910815/article/details/84230349
    2、 启动Kafka
    执行如下命令,进行Kafka启动:
    [root@ node1 bin]#./kafka-server-start.sh …/config/server.properties &

    4.3.2 停止

    Kafka的停止与启动顺序刚好相反:停止Kafka—>停止Zookeeper
    1、停止Kafka
    执行如下命令,进行Kafka停止:
    [root@ node1 bin]#./kafka-server-stop.sh
    第一次启动Kafka建议进入Kafka安装目录执行
    bin/kafka-server-start.sh config/server.properties命令,方便查看详情启动日志
    2、停止Zookeeper
    在Zookeeper的安装目录下执行启动命令:
    [root@ node1 bin]#./zkServer.sh stop
    如果仅仅是停止Kafka而不是停止整个集群环境,可以不用停止Zookeeper。

    5 Kafka常用命令

    5.1 创建topic

    在安装目录下执行以下命令进行topic的创建:

    创建一个1个分区,3个副本,topic名为test01。
    ./kafka-topics.sh --create --zookeeper 192.168.204.127:2181 --replication-factor 3 --partitions 1 --topic test01

    5.2 查看topic

    在安装目录下执行以下命令,可查看kafka的topic情况:
    [root@ node1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.204.127:2181

    5.3 创建发送者

    在安装目录下执行以下命令,可创建kafka生产者进行消息发布:
    [root@node1 bin]#./kafka-console-producer.sh --broker-list 192.168.204.127:9092 --topic test01
    Test msg1
    Test msg2

    5.4 创建消费者

    结合5.3创建消费者使用,在一台机器的安装目录下执行以下命令,可创建kafka消费者进行消息读取:
    [root@ node1 bin]#./kafka-console-consumer.sh --zookeeper 192.168.204.128:2181 --topic test01 --from-beginning
    Test msg1
    Test msg2

    展开全文
  • Linux搭建kafka集群

    2020-11-08 21:05:34
    1首先安装zookeper集群 tar -zxvf zookeeper-3.4.5.tar.gz mv zookeeper-3.4.5 zk vi ~/.bashrc export ZOOKEEPER_HOME=/usr/local/zk export PATH=$ZOOKEEPER_HOME/bin source ~/.bashrc cd zk/conf cp zoo_sample....

    1首先安装zookeper集群

    wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
    tar -zxvf zookeeper-3.4.14.tar.gz
    mv zookeeper-3.4.14 zk
    vi ~/.bashrc
    export ZOOKEEPER_HOME=/usr/local/zk
    export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf
    source ~/.bashrc
    cd zk/conf
    cp zoo_sample.cfg zoo.cfg
    vi zoo.cfg
    修改:dataDir=/usr/local/zk/data
    新增:(redis1,3,4为主机别名 vi /etc/hosts配置 )
    server.0=redis1:2888:3888	
    server.1=redis3:2888:3888
    server.2=redis4:2888:3888
    cd zk
    mkdir data
    cd data
    

    #配置集群唯一标识第一个为0,第二个为1

    vi myid
    0
    

    #启动zookeeper集群

    zkServer.sh start
    #设置zk 开机自启动
    cd /etc/init.d
    vi zk.sh
    #!/bin/bash    
    #chkconfig:2345 20 90 
    #description:zookeeper
    #processname:zookeeper
    export JAVA_HOME=/usr/local/java/jdk1.8.0_11
    case $1 in
            start) su root /usr/local/zk/bin/zkServer.sh start;;
            stop) su root /usr/local/zk/bin/zkServer.sh stop;;
            status) su root /usr/local/zk/bin/zkServer.sh status;;
            restart) su root /usr/local/zk/bin/zkServer.sh restart;;
            *) echo "require start|stop|status|restart" ;;
    esac
    
    #赋权
    chmod 777 zk
    chkconfig --add zk
    #查看是否有zk自启动的权限
    chkconfig --list 
    
    #其次安装scala
    tar -zxvf scala-2.11.4.tgz
    mv scala-2.11.4 scala
    vi ~/.bashrc
    export SCALA_HOME=/usr/local/scala
    export PATH=$SCALA_HOME/bin
    source ~/.bashrc
    

    #查看是否安装成功

    scala -version
    

    安装kafka

    tar -zxvf kafka_2.9.2-0.8.1.tgz
    mv kafka_2.9.2-0.8.1 kafka
    vi /usr/local/kafka/config/server.properties
    broker.id:依次增长的整数,0、1、2,集群中Broker的唯一id
    

    #添加集群机器

    zookeeper.connect=192.168.1.102:2181,192.168.1.105:2181,192.168.1.106:2181
    

    #advertised.host.name=
    改为advertised.host.name=192.168.1.102(当前服务器ip地址)
    安装slf4j,把slf4j中的slf4j-nop-1.7.6.jar复制到kafka的libs目录下面

    vi /usr/local/kafka/bin/kafka-run-class.sh 
    
    if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
      KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
    fi
    
    去掉-XX:+UseCompressedOops即可
    
    #后台启动kafka
    nohup bin/kafka-server-start.sh config/server.properties &
    
    #验证是否安装成功
    bin/kafka-topics.sh --zookeeper 192.168.1.102:2181,192.168.1.105:2181,192.168.1.106:2181 --topic test --replication-factor 1 --partitions 1 --create
    bin/kafka-console-producer.sh --broker-list 192.168.1.102:9092,192.168.1.105:9092,192.168.1.106:9092 --topic test
    bin/kafka-console-consumer.sh --zookeeper   192.168.1.102:2181,192.168.1.105:2181,192.168.1.106:2181 --topic test --from-beginning
    #新版kafka不能使用zookeeper创建消费者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    错误总结:
    1.错误是说没有分区元数据
    1.No partition metadata for topic test due to kafka.common.LeaderNotAvailableException

    这种错误修改server.properties

    #advertised.host.name=<hostname routable by clients>
    

    改为advertised.host.name=192.168.1.102(当前服务器ip地址)

    2.启动报未知名称或服务
    2.kafka启动报java.net.UnknownHostException: 主机名: 主机名: 未知的名称或服务
    hostname 查看当前主机名称(我的主机名称为root)

    vi /etc/hosts
    

    将主机名 添加到本地ip127.0.0.1的后面

    127.0.0.1   root  localhost localhost.localdomain localhost4 localhost4.localdomain4
    

    3.启动报错kafka Unrecognized VM option ‘UseCompressedOops’

    vi /usr/local/kafka/bin/kafka-run-class.sh 
    if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
      KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
    fi
    去掉-XX:+UseCompressedOops即可
    
    展开全文
  • linux 搭建 kafka集群

    2017-06-02 17:55:39
    1. 下载kafka wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz 2.解压缩 tar -zxvf kafka_2.12-0.10.2.1.tgz 3.创建项目目录和kafka消息目录 mkdir kafkas cd kafkas mkdir ...

    1. 下载kafka

    wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz


    2.解压缩

    tar -zxvf kafka_2.11-0.10.2.0.tgz


    3.创建项目目录和kafka消息目录

    mkdir kafka-logs



    4.配置文件修改

    cd soft-kafka/kafka_2.11-0.10.2.0/config

    vi server.properties

    修改

    #broker.id=0  注释掉,每台服务器的broker.id都不能相同,

    也可以自己自定义,不能相同


    #在log.retention.hours=168 下面新增下面三项    
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880

    #设置zookeeper的连接端口

    zookeeper.connect=192.168.0.15:12181,192.168.0.16:12181:12181,192.168.0.17:12181


    5.启动zookeeper和kafka server

    ./bin/zookeeper-server-start.sh config/zookeeper.properties &

    ./bin/kafka-server-start.sh config/server.properties


    6.测试生产者和消费者

    6.1 创建topic

        

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    6.2 查看topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181

    6.3 启动生产者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    6.4 启动消费者

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    7.注意事项

    出现下面异常,请将localhost换成具体ip

    [2017-06-06 14:03:17,516] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.


    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 9,053
精华内容 3,621
关键字:

linux搭建kafka集群

linux 订阅