kafka集群_kafka集群搭建 - CSDN
精华内容
参与话题
  • Kafka集群搭建

    千次阅读 2018-07-12 14:02:52
    Kafka集群搭建Kafka初识Kafka使用背景在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:我们想分析下用户行为(pageviews),以便我们设计出更好的广告位我想对用户的搜索关键词进行...
    Kafka集群搭建
    Kafka初识
    Kafka使用背景
    在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:


    我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
    我想对用户的搜索关键词进行统计,分析出当前的流行趋势
    有些数据,存储数据库浪费,直接存储硬盘效率又低
    这些场景都有一个共同点:
    数据是由上游模块产生,上游模块,使用上游模块的数据计算、统计、分析,这个时候就可以使用消息系统,尤其是分布式消息系统!

    Kafka的定义
    What is Kafka:它是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。

    Kafka和其他主流分布式消息系统的对比
    定义解释:

    Java 和 scala都是运行在JVM上的语言。
    erlang和最近比较火的和go语言一样是从代码级别就支持高并发的一种语言,所以RabbitMQ天生就有很高的并发性能,但是 有RabbitMQ严格按照AMQP进行实现,受到了很多限制。kafka的设计目标是高吞吐量,所以kafka自己设计了一套高性能但是不通用的协议,他也是仿照AMQP( Advanced Message Queuing Protocol 高级消息队列协议)设计的。
    事物的概念:在数据库中,多个操作一起提交,要么操作全部成功,要么全部失败。举个例子, 在转账的时候付款和收款,就是一个事物的例子,你给一个人转账,你转成功,并且对方正常行收到款项后,这个操作才算成功,有一方失败,那么这个操作就是失败的。
    对应消在息队列中,就是多条消息一起发送,要么全部成功,要么全部失败。3个中只有ActiveMQ支持,这个是因为,RabbitMQ和Kafka为了更高的性能,而放弃了对事物的支持 。
    集群:多台服务器组成的整体叫做集群,这个整体对生产者和消费者来说,是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的。
    负载均衡,对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息,系统必须均衡这些请求使得每一台服务器的请求达到平衡,而不是大量的请求,落到某一台或几台,使得这几台服务器高负荷或超负荷工作,严重情况下会停止服务或宕机。
    动态扩容是很多公司要求的技术之一,不支持动态扩容就意味着停止服务,这对很多公司来说是不可以接受的。
    注:
    阿里巴巴的Metal,RocketMQ都有Kafka的影子,他们要么改造了Kafka或者借鉴了Kafka,最后Kafka的动态扩容是通过Zookeeper来实现的。

    Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。


    Kafka相关概念
    AMQP协议
    Advanced Message Queuing Protocol (高级消息队列协议)

    The Advanced Message Queuing Protocol (AMQP):是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。

    上面说的3种比较流行的消息队列协议,要么支持AMQP协议,要么借鉴了AMQP协议的思想进行了开发、实现、设计。

    一些基本的概念
    消费者:(Consumer):从消息队列中请求消息的客户端应用程序
    生产者:(Producer) :向broker发布消息的应用程序
    AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
    kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
    可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)

    Kafka架构
    生产者生产消息、kafka集群、消费者获取消息这样一种架构

    kafka集群中的消息,是通过Topic(主题)来进行组织的
    一些基本的概念:

    主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
    分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
    kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
    备份(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。
    kafka0.7是一个很大的改变:1、增加了备份2、增加了控制借点概念,增加了集群领导者选举 。

    kafka集群搭建,没有太多可以说的东西,几乎就是照葫芦画瓢。有什么地方不懂或是报错,可以评论,博主会做些修改,若有急需解决问题的,也可直接联系博主,不忙的情况下,博主帮忙解决


    环境说明

            工具:

                    虚拟机         vmware

                    操作系统     红帽7(red hat 7)

                    zookeeper  3.4.5

                    kafka          0.9

          环境拟定:三节点,主机分别叫node1,node2,node3,ip分别对应:192.168.163.131、192.168.163.130、192.168.163..132


    1、下载

    博主也是从别人那拿过来的,稍后博主会上传连接给出下载地址。

    kafka的0.9版本。


    2、安装

     安装没什么好说的,tar -zxvf 文件名,解压归档就完事,然而还是上个图。


    3、配置文件配置

    3.1、基于OS的环境配置

    为了简化后续操作,建议把安装目录下的bin目录放进path里面。
    [root@node1 bin]# vi /etc/profile


    3.2、外部zookeeper集群安装以及启动



    3.3、kafka配置

    ${kafka_home}/config/server.properties
    1. # Licensed to the Apache Software Foundation (ASF) under one or more
    2. # contributor license agreements. See the NOTICE file distributed with
    3. # this work for additional information regarding copyright ownership.
    4. # The ASF licenses this file to You under the Apache License, Version 2.0
    5. # (the "License"); you may not use this file except in compliance with
    6. # the License. You may obtain a copy of the License at
    7. #
    8. # http://www.apache.org/licenses/LICENSE-2.0
    9. #
    10. # Unless required by applicable law or agreed to in writing, software
    11. # distributed under the License is distributed on an "AS IS" BASIS,
    12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. # See the License for the specific language governing permissions and
    14. # limitations under the License.
    15. # see kafka.server.KafkaConfig for additional details and defaults
    16. ############################# Server Basics #############################
    17. # The id of the broker. This must be set to a unique integer for each broker.
    18. broker.id=1
    19. ############################# Socket Server Settings #############################
    20. listeners=PLAINTEXT://192.168.163.131:9092
    21. # The port the socket server listens on
    22. port=9092
    23. # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    24. host.name=localhost
    25. # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    26. # value for "host.name" if configured. Otherwise, it will use the value returned from
    27. # java.net.InetAddress.getCanonicalHostName().
    28. #advertised.host.name=<hostname routable by clients>
    29. # The port to publish to ZooKeeper for clients to use. If this is not set,
    30. # it will publish the same port that the broker binds to.
    31. #advertised.port=<port accessible by clients>
    32. # The number of threads handling network requests
    33. num.network.threads=3
    34. # The number of threads doing disk I/O
    35. num.io.threads=8
    36. # The send buffer (SO_SNDBUF) used by the socket server
    37. socket.send.buffer.bytes=102400
    38. # The receive buffer (SO_RCVBUF) used by the socket server
    39. socket.receive.buffer.bytes=102400
    40. # The maximum size of a request that the socket server will accept (protection against OOM)
    41. socket.request.max.bytes=104857600
    42. ############################# Log Basics #############################
    43. # A comma seperated list of directories under which to store log files
    44. log.dirs=/tmp/kafka-logs
    45. # The default number of log partitions per topic. More partitions allow greater
    46. # parallelism for consumption, but this will also result in more files across
    47. # the brokers.
    48. num.partitions=3
    49. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    50. # This value is recommended to be increased for installations with data dirs located in RAID array.
    51. num.recovery.threads.per.data.dir=1
    52. ############################# Log Flush Policy #############################
    53. # Messages are immediately written to the filesystem but by default we only fsync() to sync
    54. # the OS cache lazily. The following configurations control the flush of data to disk.
    55. # There are a few important trade-offs here:
    56. # 1. Durability: Unflushed data may be lost if you are not using replication.
    57. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    58. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
    59. # The settings below allow one to configure the flush policy to flush data after a period of time or
    60. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    61. # The number of messages to accept before forcing a flush of data to disk
    62. #log.flush.interval.messages=10000
    63. # The maximum amount of time a message can sit in a log before we force a flush
    64. #log.flush.interval.ms=1000
    65. ############################# Log Retention Policy #############################
    66. # The following configurations control the disposal of log segments. The policy can
    67. # be set to delete segments after a period of time, or after a given size has accumulated.
    68. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    69. # from the end of the log.
    70. # The minimum age of a log file to be eligible for deletion
    71. log.retention.hours=168
    72. # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    73. # segments don't drop below log.retention.bytes.
    74. #log.retention.bytes=1073741824
    75. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    76. log.segment.bytes=1073741824
    77. default.replication.factor=3
    78. # The interval at which log segments are checked to see if they can be deleted according
    79. # to the retention policies
    80. log.retention.check.interval.ms=300000
    81. ############################# Zookeeper #############################
    82. # Zookeeper connection string (see zookeeper docs for details).
    83. # This is a comma separated host:port pairs, each corresponding to a zk
    84. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    85. # You can also append an optional chroot string to the urls to specify the
    86. # root directory for all kafka znodes.
    87. zookeeper.connect=192.168.163.130:2181,192.168.163.131:2181,192.168.163.132:2181
    88. # Timeout in ms for connecting to zookeeper
    89. zookeeper.connection.timeout.ms=6000
    ==============================================================================================================
    解析:
            1、使用默认配置就好,注意五个个选项,brokerid、num.partitions、default.replication.factor、zookeeper.connect、zookeeper.connection.timeout.ms
            2、brokerid,当前节点的id号,唯一标识,建议给顺序数字,方便管理
            3、num.partitions,控制设定几个分区,default.replication.factor,控制设定几个备份。
            这里博主有三个节点,所以各给了3。同学们也可根据自身实际情况以及自身需求给定
            4、zookeeper.connect指定外部zk源的各个节点。若无特殊指定,外部zk集群默认端口2181
            5、zookeeper.connection.timeout.ms根据自身网络情况设定,通常默认就好
    ==============================================================================================================


    3.4将配置好的一份server.properties分发给各个节点

    记得修改brokerid选项。


    4、启动

    各个节点均启动
    kafka-server-start.sh /usr/local/kafka_2.11-0.9.0.1/config/server.properties > /usr/local/kafka_2.11-0.9.0.1/logs/logs & //启动线程并写入日志



    返回进程号,即开启进程,详细信息给进入自定义的日志路径内查看



    5、验证

    思路:以下给出几条kafka指令。创建一个topic,一个节点作为生产者,两个节点作为消费者分别看看能否接收数据,进行验证

    创建及查看topic
    1. kafka-topics.sh -list -zookeeper 192.168.163.130:2181,192.168.163.131:2181,192.168.163.132:2181 查看所有topic
    2. kafka-topics.sh --create --zookeeper 192.168.163.130:2181,192.168.163.131:2181,192.168.163.132:2181 --replication-factor 3 --partitions 3 --topic xxx (rf参数副本数,par参数分区数,xxx是topic的名称)创建topic

    开启生产者以及消费者
    1. kafka-console-producer.sh --broker-list 192.168.163.130:9092,192.168.163.131:9092,192.168.163.132:9092 --topic test 生产者
    2. kafka-console-consumer.sh --zookeeper 192.168.163.130:2181,192.168.163.131:2181,192.168.163.132:2181 --topic test --from-beginning 消费者

    如图,是博主之前建立的topic



    node1开启生产者:


    node2、node3开启消费者,查看队列:




    消费者均收到生产者消息,即为成功,这里博主之前的消息未清理,其他多余消息请忽略


    6、常见问题解决

    6.1、问题描述,启动过一次以后,修改brokerID,再启动,无法成功

            这是由于kafka日志系统无法写入导致的,每次写入日志文件时候,会在日志文件同目录下,生成相关几个文件。其中meta.properties会记录之前brokerID,若是再启动,日志会与kafka的brokerID进行比对,若是一致才准许写入指定的log文件,若是不一致则报错,线程停止



    7 Kafka常用命令

    以下是kafka常用命令行总结:  

    1.查看topic的详细信息  

    ./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1  

    2、为topic增加副本  

    ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute  

    3、创建topic 

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1  

    4、为topic增加partition  

    ./bin/kafka-topics.sh zookeeper 127.0.0.1:2181 alter partitions 20 topic testKJ1  

    5kafka生产者客户端命令  

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1  

    6kafka消费者客户端命令  

    ./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1  

    7kafka服务启动  

    ./kafka-server-start.sh -daemon ../config/server.properties   

    8、下线broker  

    ./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60  

    shutdown broker  

    9、删除topic  

    ./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181  

    ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1  

    10、查看consumer组内消费的offset  

    ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic testKJ1



    展开全文
  • Kafka集群部署

    万次阅读 2019-03-03 22:01:20
    1、部署Kafka集群搭建需要服务器至少3台,奇数台 2、Kafka的安装需要java环境,jdk1.8 3、Kafka安装包版本:kafka_2.11-0.10.0.1.tar.gz 4、假设3台服务器分别为:kafka1、kafka2、kafka3 kafka服务器名 ...

    一、前提条件

    1、部署Kafka集群搭建需要服务器至少3台,奇数台

    2、Kafka的安装需要java环境,jdk1.8

    3、Kafka安装包版本:kafka_2.11-0.10.0.1.tar.gz

    4、假设3台服务器分别为:kafka1、kafka2、kafka3

    kafka服务器名

    IP

    域名

    kafka1

    192.168.172.134

    kafka1.sd.cn

    kafka2

    192.168.172.141

    kafka2.sd.cn

    kafka3

    192.168.172.142

    kafka3.sd.cn

    5、增加host配置:

    二、Zookeeper集群搭建

    直接使用kafka自带的zookeeper建立zk集群

    1、将安装包kafka_2.11-0.10.0.1.tar.gz上传到/opt 目录下

    2、解压:tar -zxvf kafka_2.11-0.10.0.1.tar.gz

    3、进入目录:cd /opt/kafka_2.11-0.10.0.1/

    4、创建zookeeper目录:mkdir zk_kfk_data

    5、进入目录:cd /opt/kafka_2.11-0.10.0.1/config

    6、修改zookeeper.properties文件:

    三台机器上的zookeeper.properties文件配置相同,data.Dir 为zk的数据目录,server.1、server.2、server.3 为集群信息。

    2888端口号是zookeeper服务之间通信的端口

    3888端口是zookeeper与其他应用程序通信的端口。

    tickTime:CS通信心跳数

    Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

          tickTime以毫秒为单位。

          tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍。

    initLimit:LF初始通信时限

    集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)

    syncLimit:LF同步通信时限

    集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)

    7、创建myid文件:进入/opt/kafka_2.11-0.10.0.1/zk_kfk_data目录,创建myid文件,将三台服务器上的myid文件分别写入1,2,3。myid是zookeeper集群用来发现彼此的标识,必须创建,且不能相同。

    8、执行启动zookeeper命令:

    nohup /opt/kafka_2.11-0.10.0.1/bin/zookeeper-server-start.sh /opt/kafka_2.11-0.10.0.1/config/zookeeper.properties &>> /opt/kafka_2.11-0.10.0.1/zookeeper.log &

    三台机器都执行启动命令,查看zookeeper的日志文件,没有报错就说明zookeeper集群启动成功了。

    三、Kafka集群搭建

    1、进入目录:cd /opt/kafka_2.11-0.10.0.1/
    2、创建kafka日志数据目录:mkdir kafka-logs-1
    3、进入目录:cd /opt/kafka_2.11-0.10.0.1/config
    4、修改server.properties配置文件:

     

    修改参数如下:

    参数名称

    参数值

    备注

    broker.id

    0

    broker.id的值三个节点要配置不同的值,分别配置为012

    advertised.host.name

    kafka1.sd.cn

    hosts文件配置kafka1域名,另外两台分别为:kafka2.sd.cnkafka3.sd.cn

    advertised.port

    9092

    默认端口,不需要改

    log.dirs

    /opt/kafka_2.11-0.10.0.1/kafka-logs-1

    Kafka日志数据目录

    num.partitions

    40

    分区数,根据自行修改

    log.retention.hours

    24

    日志保存时间

    zookeeper.connect

    kafka1.sd.cn:3181,kafka2.sd.cn:3181,kafka3.sd.cn:3181

    zookeeper连接地址,多个以逗号隔开

    5、 启动kafka集群:
    nohup /opt/kafka_2.11-0.10.0.1/bin/kafka-server-start.sh /opt/kafka_2.11-0.10.0.1/config/server.properties &>> /opt/kafka_2.11-0.10.0.1/kafka.log &

    三个节点均要启动;启动无报错,即搭建成功。

    四、测试Kafka集群

    1、创建topic:test

    /opt/kafka_2.11-0.10.0.1/bin/kafktopics.sh --create --zookeeper kafka1.sd.cn:3181,kafka2.sd.cn:3181,kafka3.sd.cn:3181 --replication-factor 1 --partitions 1 --topic test

    2、列出已创建的topic列表

    /opt/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --list --zookeeper localhost:3181

    3、模拟客户端去发送消息

    /opt/kafka_2.11-0.10.0.1/bin/kafka-console-producer.sh --broker-list kafka1.sd.cn:9092,kafka2.sd.cn:9092,kafka3.sd.cn:9092 --topic test

    4、模拟客户端去接受消息

    /opt/kafka_2.11-0.10.0.1/bin/kafka-console-consumer.sh --zookeeper kafka1.sd.cn:3181,kafka2.sd.cn:3181,kafka3.sd.cn:3181 --from-beginning --topic test

     

    展开全文
  • kafka简述与集群配置

    千次阅读 2020-04-17 00:33:58
    一、kafka简述 1、简介 kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,...(1)Kafka架构是由producer(消息生产者)、consumer(消息消费者)、borker(kafka集群的server,负责处理消息读、...

    一、kafka简述

     1、简介

    kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。

    2、集群介绍

    (1)Kafka架构是由producer(消息生产者)、consumer(消息消费者)、borker(kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker)、topic(消息队列/分类相当于队列,里面有生产者和消费者模型)、zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成。

    (2)kafka里面的消息是有topic来组织的,简单的我们可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition内部消息强有序,相当于有序的队列,其中每个消息都有个序号offset,比如0到12,从前面读往后面写。一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,消息不经过内存缓冲,直接写入文件,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念。

    (3)producer自己决定往哪个partition里面去写,这里有一些的策略,譬如如果hash,不用多个partition之间去join数据了。consumer自己维护消费到哪个offset,每个consumer都有对应的group,group内是queue消费模型(各个consumer消费不同的partition,因此一个消息在group内只消费一次),group间是publish-subscribe消费模型,各个group各自独立消费,互不影响,因此一个消息在被每个group消费一次。

    3、leader负载均衡机制

    当一个broker停止或者crashes时,所有本来将它作为leader的分区将会把leader转移到其他broker上去,极端情况下,会导致同一个leader管理多个分区,导致负载不均衡,同时当这个broker重启时,如果这个broker不再是任何分区的leader,kafka的client也不会从这个broker来读取消息,从而导致资源的浪费。

    kafka中有一个被称为优先副本(preferred replicas)的概念。如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。当0节点的broker挂掉时,会启动1这个节点broker当做leader。当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制。

    在配置文件conf/ server.properties中配置开启(默认就是开启):

    auto.leader.rebalance.enable true

     

    二、集群配置

    1、zookeeper安装与配置

    (1)下载并解压

        去下载https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

       在node01 /opt/bigdata/下 解压 tar -zxvf zookeeper-3.4.6.tar.gz

    (2)编辑配置

             配置hosts vim /etc/hosts

    192.168.172.73 node03
    192.168.172.72 node02
    192.168.172.71 node01

          配置zookeeper环境变量

    export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.6 #zookeeper安装路径  
    export PATH=$ZOOKEEPER_HOME/bin:$PATH

          配置zoo.cfg

    在/opt/bigdata下,复制cp zookeeper-3.4.5/conf/zoo_sample.cfg  zookeeper-3.4.5/conf/zoo.cfg 

    编辑:vim zookeeper-3.4.5/conf/zoo.cfg 

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/opt/bigdata/data/zookeeper/zkdata #zookeeper数据存放路径
    dataLogDir=/opt/bigdata/data/zookeeper/zkdatalog #zookeeper日志存放路径
    # the port at which the clients will connect
    clientPort=2181        ##zookeeper对外通信端口
    
    server.1=node01:2888:3888  
    server.2=node02:2888:3888  
    server.3=node03:2888:3888 
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1

    分别在node01、node02、node03下/opt/bigdata/data/zookeeper/zkdata

    vim myid 新建myid文件,内容分别为1、2、3保存

    (3)节点分发

    在node01 /opt/bigdata下 scp远程复制,分别分发到node02、node03对应目录下

    scp -r zookeeper-3.4.6 node02:`pwd`
    scp -r zookeeper-3.4.6 node03:`pwd`

    (4)启动zookeeper集群

    分别在node01、node02、node03下执行 zkServer.sh start命令启动zookeeper

    稍等片刻,分别在node01、node02、node03下执行zkServer.sh status命令,查看状态

    [root@node01 ~]# zkServer.sh status
    JMX enabled by default
    Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
    Mode: leader
    [root@node02 bigdata]# zkServer.sh status
    JMX enabled by default
    Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
    Mode: follower
    [root@node03 ~]# zkServer.sh status
    JMX enabled by default
    Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
    Mode: follower


    3、kafka安装与配置

    (1)下载并解压

    wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz 去下载

    在node01上  /opt/bigdata/下 解压

    tar zxvf kafka_2.11-1.1.0.tgz  

    (2)编辑配置

    在/opt/bigdata/下 vim kafka_2.11-1.1.0/config/server.properties编辑配置

    这里重点修改三个参数broker.id标识本机、log.dirs是kafka接收消息存放路径、

    zookeeper.connect指定连接的zookeeper集群地址

    其他参数保持默认即可,也可自己根据情况修改

    ############################# Server Basics #############################
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1
    ############################# Socket Server Settings #############################
    listeners=PLAINTEXT://:9092
    
    # The port the socket server listens on
    #port=9092
    
    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    #host.name=localhost
    
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    #advertised.host.name=<hostname routable by clients>
    
    # The port to publish to ZooKeeper for clients to use. If this is not set,
    # it will publish the same port that the broker binds to.
    #advertised.port=<port accessible by clients>
    
    # The number of threads handling network requests
    num.network.threads=3
    
    # The number of threads doing disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    ############################# Log Basics #############################
    # A comma seperated list of directories under which to store log files
    log.dirs=/opt/bigdata/kafka_2.11-1.1.0/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Log Flush Policy #############################
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according 
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
    log.cleaner.enable=false
    
    ############################# Zookeeper #############################
    zookeeper.connect=node01:2181,node02:2181,node03:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

    (3)节点分发

    在 /opt/bigdata下 远程复制到node01、node02对应路径下,当然复制时需要ssh免登录

    scp -r  kafka_2.11-1.1.0 node02:`pwd`
    scp -r  kafka_2.11-1.1.0 node03:`pwd`

    分别修改server.properties对应的broker.id为2、3即可

    (4)启动kafka集群

    kafka集群启动前要启动zookeeper集群,若zookeeper集群没启动,首先启动

    在/opt/bigdata下 ,三个节点分别执行如下命令,启动kafka集群

    ./kafka_2.11-1.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-1.1.0/config/server.properties &

    (5)基本操作

    1)、创建topic

    ./kafka_2.11/bin/kafka-topics.sh --create --zookeeper node02:2181,node03:2181,node04:2181 --replication-factor 3 --partitions 6 --topic kfk_test

    2)、列出创建的topic

    ./kafka_2.11/bin/kafka-topics.sh --list --zookeeper node02:2181,node03:2181,node04:2181

    3)、生成数据

    ./kafka_2.11/bin/kafka-console-producer.sh -broker-list node02:9092,node03:9092,node04:9092 --topic kfk_test

    4)、消费生产数据

    kafka 0.9版本之前用zookeeper 
    ./kafka_2.11/bin/kafka-console-consumer.sh --zookeeper node02:2181,node03:2181,node04:2181 --from-beginning --topic kfk_test
    
    kafka 0.9版本之后不推荐zookeeper方式,仍然支持,但逐渐会被取消,推荐bootstrap-server方式
    ./kafka_2.11/bin/kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic kfk_test

    5)、查看指定topic信息

    ./kafka_2.11/bin/kafka-topics.sh --describe --zookeeper node02:2181,node03:2181,node04:2181 --topic kfk_test

    信息如下:

    Topic:kfk_test  PartitionCount:6	ReplicationFactor:3	Configs:
    Topic: kfk_test	 Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 2,3,1
    Topic: kfk_test	 Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,3,1
    Topic: kfk_test	 Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 2,3,1
    Topic: kfk_test	 Partition: 3	Leader: 1	Replicas: 1,2,3	Isr: 2,3,1
    Topic: kfk_test	 Partition: 4	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
    Topic: kfk_test	 Partition: 5	Leader: 3	Replicas: 3,1,2	Isr: 2,3,1

    可以看到6个分区,每个分区3个副本

    partiton: partion id  分区id
    leader:当前负责读写的lead broker id ,就是server.properties的broker.id
    replicas:当前partition的所有replication broker  list 
    isr:(in-sync replicas)relicas的子集,只包含出于活动状态的broker,离线或挂掉的broker不在此列表

    6)、删除指定的topic

    删除kafka中的topic

    ./kafka_2.11/bin/kafka-topics.sh --delete --zookeeper node02:2181,node03:2181,node04:2181 --topic kfk_test

    删除zookeeper中的topic信息

    rmr /brokers/topics/kfk_test 
    
    rmr /config/topics/kfk_test 
    
    rmr /admin/delete_topics/kfk_test
    
    rmr /consumers/kfk_test-group

     删除topic数据相关的目录

    rm -rf /var/local/kafka/data/kfk_test*

     

    展开全文
  • Kafka集群搭建详细步骤

    万次阅读 2018-02-06 18:06:01
    Kafka集群搭建 1、 Kafka的安装需要java环境,cent os 7自带java1.6版本,可以不用重新安装,直接使用自带的jdk 即可;如果觉得jdk版本太旧,也可以自己重新安装; 2、 准备好kafka安装包,官网下载地址: ...

    Kafka集群搭建

    1、 Kafka的安装需要java环境,cent os 7自带java1.6版本,可以不用重新安装,直接使用自带的jdk 即可;如果觉得jdk版本太旧,也可以自己重新安装;

    2、 准备好kafka安装包,官网下载地址:
    http://kafka.apache.org/downloads.html

    3、 下载好kafka安装包后,将其解压到/usr/local目录下,删除压缩包

    4、 目前搭建了三个节点的kafka集群,分别在10.10.67.102,10.10.67.104和10.10.67.106服务器上;

    5、 查看配置文件
    进入kafka的config的目录:

    这里写图片描述

    6、 先建立zk集群,直接使用kafka自带的zookeeper建立zk集群,修改zookeeper.properties文件:

    这里写图片描述

    三个机器上的zookeeper.properties文件配置相同,需要注意的是日志保存的路径,不会自动生成,需要自己手动建立相关的路径, dataLogDir是我自己加的,日志文件太多,把日志文件区分开;

    7、 创建myid文件,进入/usr/local/kafka/zookeeper,创建myid文件,将三个服务器上的myid文件分别写入1,2,3,如图:

    这里写图片描述
    —-myid是zk集群用来发现彼此的标识,必须创建,且不能相同;

    8、 进入kafka目录 执行启动zookeeper命令:
    ./bin/zookeeper-server-start.sh config/zookeeper.properties &
    三台机器都执行启动命令,查看zookeeper的日志文件,没有报错就说明zookeeper集群启动成功了。

    9、 搭建kafka集群,修改server.properties配置文件:
    这里写图片描述

    这里写图片描述

    server.properties配置文件的修改主要在开头和结尾,中间保持默认配置即可;需要注意的点是broker.id的值三个节点要配置不同的值,分别配置为0,1,2;log.dirs必须保证目录存在,不会根据配置文件自动生成;

    10、 启动kafka集群,进入kafka目录,执行如下命令 :
    ./bin/kafka-server-start.sh –daemon config/server.properties &
    三个节点均要启动;启动无报错,即搭建成功,可以生产和消费消息,来检测是否搭建成功。

    11、 如何生产和消费消息,请见下一篇博客:
    http://blog.csdn.net/zxy987872674/article/details/72493128

    展开全文
  • Kafka集群搭建及生产者消费者案例

    万次阅读 2018-04-27 13:02:30
    Kafka集群搭建及生产者消费者案例 本文搭建的集群是采3台机器,分别是server01,server02,server03。linux系统是centos6.7。 kafka需要配合zookeeper使用,在安装kafka之前,需要先安装zookeeper集群,关于安装...
  • Kafka之三:Kafka集群工作流程

    千次阅读 2019-05-23 09:17:34
    Kafka之三:Kafka集群工作流程 文章目录Kafka之三:Kafka集群工作流程一、工作流程分析1. producer写入流程2. 分区(Partition)3. 副本(Replication)4. Broker 保存消息5. Zookeeper存储结构二、Kafka消费过程...
  • kafka集群搭建及简单使用

    万次阅读 2018-07-23 19:16:55
    Kafka Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如...
  • kafka集群及与springboot集成

    万次阅读 2018-06-26 16:51:04
    linux搭建,kafkao3节点虚拟机为CentOS6,ip为192.168.1.128,192.168.1.129和192.168.1.130,域名分别为... 集群 #192.168.1.128 [root@master local]# cd /home/gilbert/app/rar/ [root@master rar]# tar zxv...
  • zookeeper集群、kafka集群、Hadoop高可用

    万次阅读 2018-09-07 19:00:57
    一、zookeeper集群 1.1 zookeeper概述 1.1.1 什么是zookeeper ZooKeeper是一个分布式的协调服务 1.1.2 ZooKeeper能干什么 ZooKeeper是用来保证数据在集群间的事务性一致 1.1.3&nbsp;zookeeper 应用...
  • kafka集群搭建

    千次阅读 2017-11-18 14:35:08
    kafka自带了zookeeper,建议还是用外部的zk集群,搭建步骤如下:  准备3台机器,假设我们叫c1、c2、c3,ip分别为ip1、ip2、ip3将附件zookeeper-3.4.6.tar.gz分别拷贝至3台机器c1机器上解压zookeeper-3.4.6.tar.gz...
  • java代码访问kafka集群

    千次阅读 2019-10-14 08:57:12
    本例环境: jdk 1.7 zookeeper-3.4.10 kafka_2.11-0.11环境搭建可参考如下: kafka环境搭建(windows版本) : https://blog.csdn.net/zhangbeizhen18/article/details/101323691 kafka集群环境搭建(windo...
  • Docker搭建Kafka集群环境准备依赖单实例(Without Docker)安装JDK下载安装包启动进程测试集群安装(Without Docker)准备启动zookeeper 集群更改配置启动测试Docker 集群安装安装Docker 环境准备 依赖 CentOS7.6 单...
  • kafka集群环境搭建

    千次阅读 2016-08-30 10:34:49
    kafka集群测试环境搭建全记录。
  • centos7搭建kafka集群

    千次阅读 2020-04-28 06:42:10
    centos7搭建kafka集群 环境 提取码:uh51: jdk-8u201-linux-x64.tar.gz zookeeper-3.4.14.tar.gz kafka_2.12-2.2.0.tgz 服务器三台: 192.168.18.133 192.168.18.134 192.168.18.136 以下需要开放的端口 # 开放2888,...
  • Linux下Kafka集群环境搭建

    千次阅读 2020-10-15 10:02:27
    Kafka集群环境搭建本文只讲述Kafka集群环境的搭建步骤,后续会对kafka的其他相关知识进行整理.1、准备工作Linux服务器3台(本文将在一台linux服务器上建立三个文件夹来模拟三台linux服务器,搭建伪集群)JDK1.8...
  • kafka集群部署以及java客户端测试

    千次阅读 2016-12-08 16:40:12
    kafka集群部署以及java客户端测试本文主要讲述本人的集群部署kafka过程以及遇到的问题: 其中:kafka版本为:kafka_2.10,zookeeper版本为:zookeeper-3.4.8,jdk-8u101-linux-x64 一 kafka以及zookeeper安装以及...
  • zookeeper集群搭建和kafka集群搭建 环境 linux 开发机(主) 4台centos7虚拟机 192.168.0.201 192.168.0.202(nginx-kafka) 192.168.0.203 192.168.0.204 虚拟机配置 jdk1.8 jps zookeeper集群搭建 ...
  • [Kafka调优]--调优Apache Kafka集群

    千次阅读 2018-04-04 22:10:50
    本文转自:http://www.cnblogs.com/huxi2b/p/6936348.html今天带来一篇译文“调优Apache Kafka集群”,里面有一些观点并无太多新颖之处,但总结得还算详细。该文从四个不同的目标出发给出了各自不同的参数配置,值得...
  • Kafka集群部署、sasl安全认证

    千次阅读 2018-11-15 18:07:24
    Zookeeper集群部署: Kafka是依赖于zookeeper,需要先将zooleeper启动起来。首先可以从http://zookeeper.apache.org/releases.html 下载最新版zookeeper(当前版本3.4.13),进入其中的conf文件夹,可以看到一个名为...
1 2 3 4 5 ... 20
收藏数 75,982
精华内容 30,392
关键字:

kafka集群