kafka架构详解_kafka技术内幕 图文详解kafka源码设计与实现pdf - CSDN
  • kafka-架构(详解)

    2017-05-25 17:15:02
    特点 一种高吞吐量的分布式发布订阅消息系统 1、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间...整体架构Producer 消息生产者,发布消息到 kafka 集群的终端或服务,

    目录

    特点

    一种高吞吐量的分布式发布订阅消息系统
    1、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
    2、高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
    3、支持通过Kafka服务器和消费机集群来分区消息,支持Hadoop并行数据加载。

    整体架构

    kafa-01

    Producer

    消息生产者,发布消息到 kafka 集群的终端或服务,
    负责发布消息到Kafka broker

    producer 写入消息流程:

    1. producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
    2. producer 将消息发送给该 leader
    3. leader 将消息写入本地 log
    4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
    5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

    Broker

    Kafka集群包含一个或多个服务器,这种服务器被称为broker

    brokers

    broker failover 流程:

    1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
    2. controller 从 /brokers/ids 节点读取可用broker
    3. controller决定set_p,该集合包含宕机 broker 上的所有 partition
    4. 对 set_p 中的每一个 partition
      4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
      4.2 决定新 leader(如4.3节所描述)
      4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
    5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

    controller

      kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。

    controller failover 流程:

    当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
    当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:
    1. 读取并增加 Controller Epoch。
    2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
    3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
    4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
    5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
    6. 通过 replicaStateMachine在Broker Ids Patch(/brokers/ids)上注册Watch。
    7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
    8. 启动 replicaStateMachine 和 partitionStateMachine。
    9. 将 brokerState 状态设置为 RunningAsController。
    10. 将每个Partition的Leadership信息发送给所有“活”着的Broker。
    11. 若auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。
    12. 若delete.topic.enable设置为true且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

    Topic

     每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的
    Topic即可生产或消费数据而不必关心数据存于何处

    topic创建流程:

    1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
    2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
      2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
      2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
    3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

    topic 删除流程:

    1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
    2. 若d elete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。

    Partition

     partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。

    Replica

     partition 的副本,保障 partition 的高可用。

    leader

      replica 中的一个角色, producer 和 consumer 只跟 leader 交互

    follower

      replica 中的一个角色,从 leader 中复制数据。

    zookeeper

     kafka 通过 zookeeper 来存储集群的 meta 信息。
     

    kafka-02

    Consumer

    从 kafka 集群中消费消息的终端或服务,向Kafka broker读取消息的客户端。

    kafka 提供了两套 consumer API:
    1. The high-level Consumer API
    2. The SimpleConsumer API
    其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。
    6.1.1 The high-level consumer API
    high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。
    使用 high-level consumer API 可以是多线程的应用,应当注意:
    1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
    2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
    3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的
    6.1.2 The SimpleConsumer API
    如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:
    1. 多次读取一个消息
    2. 只消费一个 patition 中的部分消息
    3. 使用事务来保证一个消息仅被消费一次
    但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:
    1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
    2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁
    3. 需要处理 leader 的变更
    使用 SimpleConsumer API 的一般流程如下:
    1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
    2. 找出每个 partition 的 follower
    3. 定义好请求,该请求应该能描述应用程序需要哪些数据
    4. fetch 数据
    5. 识别 leader 的变化,并对之作出必要的响应
    以下针对 high-level Consumer API 进行说明。

    Consumer Group

      high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

    如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。
    kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

    Consumer Group

    offset

    offset

    offset2

    参考站点

    http://www.w2bc.com/article/177804

    http://www.infoq.com/cn/articles/kafka-analysis-part-2/

    http://baike.baidu.com/link?url=3gxiMW5rNaYfdJDG9IWlBUWKf76_Ay9OoOqjHRtt0zcssAXdkpuuVjazTGJ0oXtjeVBl7TsX-0cLSC37eXR_bq

    展开全文
  • Kafka是一个分布式的消息队列系统(Message Queue)。 kafka集群有多个Broker服务器组成,每个类型的消息被定义为topic。 同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的Broker上。 ...

    一、前述

    Kafka是一个分布式的消息队列系统(Message Queue)。

     

    • kafka集群有多个Broker服务器组成,每个类型的消息被定义为topic。
    • 同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的Broker上。
    • 消息生产者producer和消费者consumer可以在多个Broker上生产/消费topic。

    二、概念理解

    2.1、Topics and Logs:

    • Topic即为每条发布到Kafka集群的消息都有一个类别,topic在Kafka中可以由多个消费者订阅、消费。
    • 每个topic包含一个或多个partition(分区),partition数量可以在创建topic时指定,每个分区日志中记录了该分区的数据以及索引信息。如下图:

     

    • Kafka只保证一个分区内的消息有序,不能保证一个主题的不同分区之间的消息有序。如果你想要保证所有的消息都绝对有序可以只为一个主题分配一个分区。(分区内有序,一个主题topic不一定是有序的
    • 分区会给每个消息记录分配一个顺序ID号(偏移量), 能够唯一地标识该分区中的每个记录。Kafka集群保留所有发布的记录,不管这个记录有没有被消费过,Kafka提供相应策略通过配置从而对旧数据处理。

    •  实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即:消费者可以通过修改偏移量读取任何位置的数据。

    2.2、Producers -- 生产者

    消息生产者,自己决定往哪个partition中写入数据

    • 1、基于轮询的负载均衡
    • 2、基于hash的partition策略

    指定topic来发送消息到Kafka Broker

    2.3、Consumers -- 消费者

    • 根据topic消费相应的消息
    • 消息消费者,自己在zookeeper中维护offset
    • 每个消费者都有自己的消费者组,同一个topic中的数据只能在相同的消费组内消费一次,topic的每个partition只能同时被一个消费者组内的消费者消费
    • 不同的消费者组之间消费相同的topic会不影响

    2.4、broker

    组成kafka集群的节点,没有主从关系,依靠zookeeper协调,broker负责消息的读写,存储。一个broker可以管理多个partition。

    2.5、topic

    消息队列,一类消息。topic由partition组成,一个topic有多少个partition?创建可以指定

    2.6、partition

    • 组成topic的单元,相当于一个文件,一个partition归一个broker管理,每个partition有副本,有多少个?创建指定
    • 一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。
    • 消息直接写入文件(partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行),并不是存储在内存中;根据时间策略(默认一周)删除,而不是消费完就删除

    2.7、zookeeper的作用:

      1、存储原数据

             topic

             partition

             broker

      2、存储consumer的offsets

    三、Kafka特点

    • 消息系统的特点:生存者消费者模型,FIFO –––– partition内部是FIFO的,partition之间呢不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO
    • 高性能:单节点支持上千个客户端,百MB/s吞吐
    • 持久性:消息直接持久化在普通磁盘上且性能好 –––– 直接写到磁盘里面去,就是直接append到磁盘里面去,这样的好处是直接持久话,数据不会丢,第二个好处是顺序写,然后消费数据也是顺序的读,所以持久化的同时还能保证顺序读写
    • 分布式:数据副本冗余、流量负载均衡、可扩展 –––– 分布式,数据副本,也就是同一份数据可以到不同的broker上面去,也就是当一份数据,磁盘坏掉的时候,数据不会丢失,比如3个副本,就是在3个机器磁盘都坏掉的情况下数据才会丢。
    •  很灵活:消息长时间持久化+Client维护消费状态 –––– 消费方式非常灵活,第一原因是消息持久化时间跨度比较长,一天或者一星期等,第二消费状态自己维护消费到哪个地方了,可以自定义消费偏移量

    kafka与其他消息队列对比 :

    • RabbitMQ:分布式,支持多种MQ协议,重量级
    • ActiveMQ:与RabbitMQ类似
    • ZeroMQ:以库的形式提供,使用复杂,无持久化
    • redis:单机、纯内存性好,持久化较差
    • kafka:分布式,较长时间持久化,高性能,轻量灵活

    消息中间件如何实现每秒几十万的高并发写入?

    1、页缓存技术 + 磁盘顺序写:操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。

    2、零拷贝技术:不需要把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,两次拷贝都省略了,所以叫做零拷贝。

     

    四、集群安装

    4.1、集群规划

    Zookeeper集群共三台服务器,分别为:node03、node04、node05。

    Kafka集群共三台服务器,分别为:node03、node04、node05。

    4.2、安装Kafka:  tar -zxvf kafka_2.10-0.9.0.1.tgz -C /opt/

    4.3、修改配置文件config/server.properties

    • 节点编号:(不同节点按0,1,2,3整数来配置)
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    • 真实数据存储位置:
    
    # A comma seperated list of directories under which to store log files
    log.dirs=/opt/huawei/kafka-logs
    
    • zookeeper的节点:
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=node03:2181,node04:2181,node05:2181
    

    核心配置参数说明:broker.id: broker集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器

    注:当前Kafka集群共三台节点,分别为:node03、node04、node05。对应的broker.id分别为0、1、2。

    4.4、节点分发

    [root@node03 huawei]# scp -r kafka_2.10-0.8.2.2/ node04:`pwd`
    [root@node03 huawei]# scp -r kafka_2.10-0.8.2.2/ node05:`pwd`

    4.5、 启动kakka集群

    注:先启动Zookeeper集群,再启动Kafka集群。

    bin/kafka-server-start.sh config/server.properties

    优化:分别在三台服务器上执行以下命令启动:

    后台启动:nohup bin/kafka-server-start.sh   config/server.properties > kafka.log 2>&1 &
    可以创建个脚本:(放在与bin同一级别下,注意创建后要修改权限:chmod 755 startkafka.sh)

     测试: 


    五、Kafka常用命令

    (kafka-topics.sh --help查看帮助手册)

    5.1、创建topic话题

    ./bin/kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --create --replication-factor 2 --partitions 3 --topic lxk
    • --replication-factor:指定每个分区的复制因子个数,默认1个
    • --partitions:指定当前创建的kafka分区数量,默认为1个
    • --topic:指定新建topic的名称

    5.2、查看topic列表

    bin/kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --list

    5.3、查看topic描述

    bin/kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --describe --topic lxk

    5.4、创建生产者

    bin/kafka-console-producer.sh --broker-list node03:9092,node04:9092,node05:9092 --topic lxk

    5.5、创建消费者 

    bin/kafka-console-consumer.sh --zookeeper node03:2181,node04:2181,node05:2181 
    --from-beginning --topic lxk

    5.6、指定消费者组

    bin/kafka-console-consumer.sh --zookeeper node03:2181,node04:2181,node05:2181
     --topic lxk20191108 --consumer.cofig consumer.properties

     

    注:查看帮助手册  bin/kafka-console-consumer.sh help  (默认更加key的hash值分区,只有value默认key为null,默认10min换一个分区)

    5.7、删除kafka中的数据

    ① :在kafka集群中删除topic,当前topic被标记成删除。

     ./kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --delete --topic t0425

    在每台broker节点上删除当前这个topic对应的真实数据。

    ② :进入zookeeper客户端,删除topic信息

    rmr /brokers/topics/t0425

    ③ :删除zookeeper中被标记为删除的topic信息

    rmr /admin/delete_topics/t0425

    5.8、kafka的leader的均衡机制

    当一个broker停止或者crashes时,所有本来将它作为leader的分区将会把leader转移到其他broker上去,极端情况下,会导致同一个leader管理多个分区,导致负载不均衡,同时当这个broker重启时,如果这个broker不再是任何分区的leader,kafka的client也不会从这个broker来读取消息,从而导致资源的浪费。

    kafka中有一个被称为优先副本(preferred replicas)的概念。如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。当0节点的broker挂掉时,会启动1这个节点broker当做leader。当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制。

    在配置文件conf/ server.properties中配置开启(默认就是开启):auto.leader.rebalance.enable true

    六、整合Java API 

    package com.lxk.sparkstreaming;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    
    /**
     * 向kafka中生产数据
     */
    public class SparkStreamingDataManuallyProducerForKafka extends Thread {	
    	public static void main(String[] args) {
    		new SparkStreamingDataManuallyProducerForKafka("t1017").start();
    	}
    
    	static String[] channelNames = new String[] { "Spark", "Scala", "Kafka", "Flink", "Hadoop", "Storm", "Hive",
    			"Impala", "HBase", "ML" };
    
    	static String[] actionNames = new String[] { "View", "Register" };
    
    	private String topic; // 发送给Kafka的数据,topic
    	private Producer<String, String> producerForKafka;
    
    	private static String dateToday;
    	private static Random random;
    
    	public SparkStreamingDataManuallyProducerForKafka(String topic) {
    		dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
    		this.topic = topic;
    		random = new Random();
    		Properties properties = new Properties();
    		properties.put("metadata.broker.list", "node03:9092,node04:9092,node05:9092");
    		// 发送消息key的编码格式
    		properties.put("key.serializer.class", StringEncoder.class.getName());
    		// 发送消息value的编码格式
    		properties.put("serializer.class", StringEncoder.class.getName());
    		producerForKafka = new Producer<String, String>(new ProducerConfig(properties));
    	}
    
    	@Override
    	public void run() {
    		int counter = 0;
    		int flagNum = 0;
    		while (true) {
    			counter++;
    			flagNum++;
    			String userLog = userlogs();
    			System.out.println("product:" + userLog + "   ");
    			producerForKafka.send(new KeyedMessage<String, String>(topic,"key_"+flagNum, userLog));
    			// producerForKafka.send(new KeyedMessage<String, String>(topic, userLog));
    			// 每两条数据暂停2秒
    			if (0 == counter % 2) {
    				counter = 0;
    				try {
    					Thread.sleep(2000);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    
    	// 生成随机数据
    	private static String userlogs() {
    		StringBuffer userLogBuffer = new StringBuffer("");
    		int[] unregisteredUsers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8 };
    		long timestamp = new Date().getTime();
    		Long userID = 0L;
    		long pageID = 0L;
    
    		// 随机生成的用户ID
    		if (unregisteredUsers[random.nextInt(8)] == 1) {
    			userID = null;
    		} else {
    			userID = (long) random.nextInt(2000);
    		}
    		// 随机生成的页面ID
    		pageID = random.nextInt(2000);
    		// 随机生成Channel
    		String channel = channelNames[random.nextInt(10)];
    		// 随机生成action行为
    		String action = actionNames[random.nextInt(2)];
    		userLogBuffer.append(dateToday).append("\t").append(timestamp).append("\t").append(userID).append("\t")
    				.append(pageID).append("\t").append(channel).append("\t").append(action);
    		// .append("\n");
    		return userLogBuffer.toString();
    	}
    }
    

    结果:

    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    product:2019-10-17	1571323543254	891	610	Storm	Register   
    product:2019-10-17	1571323543383	1155	1962	Kafka	Register   
    product:2019-10-17	1571323545385	1915	1932	Hadoop	Register   
    product:2019-10-17	1571323545386	352	897	HBase	Register   
    product:2019-10-17	1571323547389	741	1802	Spark	Register   
    product:2019-10-17	1571323547390	1081	1721	Impala	Register   
    product:2019-10-17	1571323549391	null	627	Impala	View   
    product:2019-10-17	1571323549392	null	730	Flink	View   
    product:2019-10-17	1571323551394	1566	1639	Spark	Register   

    展开全文
  • kafka架构详解

    千次阅读 2020-03-19 08:28:39
    生产到消费 备份副本1 备份副本2 leader follwer 第二个生产到消费 元数据信息,节点信息等记录到zk中

    生产到消费
    在这里插入图片描述


    备份副本1
    在这里插入图片描述


    备份副本2
    在这里插入图片描述


    leader
    在这里插入图片描述


    follwer

    在这里插入图片描述


    第二个生产到消费
    在这里插入图片描述


    元数据信息,节点信息等记录到zk中

    在这里插入图片描述

    展开全文
  • Kafka架构详解

    2019-07-21 00:05:32
    文章目录Kafka架构详解Kafka架构图Zookeeper在Kafka的作用zookeeper在Kafka中保存的meta信息Kafka文件存储机制向test03写入数据,查看分区目录test04-0ISR-实现partition在多个副本中Leader的选择LEO(LogEndOffset...

    Kafka架构详解

    Kafka架构图

    [外链图片转存失败(img-eFr4rAYy-1563638477133)(img/kafka02.png)]

    Zookeeper在Kafka的作用

    1. 无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
    2. Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产,消息存储,消息消费的过程结合在一起。
    3. 同时借助zookeeper,kafka能够将生产者,消费者和broker在内的所有组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者和消费者的负载均衡。

    zookeeper在Kafka中保存的meta信息

    [外链图片转存失败(img-kWOjodbK-1563638477134)(img/zknode.png)]

    • admin目录下保存着标记删除的topic信息;

    Kafka文件存储机制

    Kafka中消息是以topic进行分类的,生产者通过topic向Kafka broker发送消息,消费者通过topic读取数据,然而topic在物理层面又能以partiton为分组,一个topic可以分为若干个partitionpartition还可以细分为segment,一个partition物理上由多个segment组成。

    ​ 在Kafka文件存储中,同一个topic下有多个不同的partition,每个partiton为一个目录partition名称规则为:topic名称+有序序号,第一个序号从0开始计,最大的序号为partition数量减1,partition是实际物理上的概念,而topic是逻辑上的概念。
    ​ 上面提到partition还可以细分为segment,这个segment又是什么?如果就以partition为最小存储单位,我们可以想象当Kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理带来严重的影响,所以这里以segment为单位又将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。

    segment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件数据文件。这两个文件的命名规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:

    指定log.segment.bytes=1048576,即每个segment数据文件大小最大为1mb,重启kafka服务,创建topic test04只有一个分区

    kafka-topics.sh --create --zookeeper pseudo01:2181 --replication-factor 1 --partitions 1 --topic test04
    
    kafka-console-producer.sh --broker-list pseudo01:9092 --topic test04 --producer.config $KAFKA_HOME/config/producer.properties
    
    kafka-console-consumer.sh --bootstrap-server pseudo01:9092 --zookeeper pseudo01:2181 --topic test04 --from-beginning
    
    向test03写入数据,查看分区目录test04-0

    [外链图片转存失败(img-54P2M8B6-1563638477134)(img/segment.png)]

    segment的.index和.log文件解析

    [外链图片转存失败(img-7u87N5mQ-1563638477134)(img/segment01.png)]

    ISR-实现partition在多个副本中Leader的选择

    In-Sync Replicas
    这个是指副本同步队列。副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 所有的副本(replicas)统称为Assigned Replicas,即ARISRAR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSROutof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。那么在之后追上leader,并被重新加入了ISRAR=ISR+OSR

    Leader的选择Kafka的核心是日志文件,日志文件在集群中的同步时分布式数据系统最基础的要素。
    如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.
    Kafka并不是使用这种方法。
    Kafaka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被ISR中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leaderISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

    LEO(LogEndOffset)

    [外链图片转存失败(img-VQKPpWWf-1563638477134)(img/part01.png)]
    [外链图片转存失败(img-YvxwvvET-1563638477135)(img/write02.png)]

    Topic的创建

    1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
    2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
      • (1) 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
      • (2)将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
    3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest

    Topic的删除

    1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
    2. 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest

    总结

    消息状态: 在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
    消息持久化: Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
    消息有效期: Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
    批量发送: Kafka支持以消息集合为单位进行批量发送,以提高push效率。
    push-and-pull: Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
    Kafka集群中broker之间的关系: 不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
    负载均衡方面
    同步异步: Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
    分区机制partition: Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。离线数据装载: Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。

    展开全文
  • 深入剖析kafka架构内部原理

    万次阅读 多人点赞 2019-02-22 11:35:31
    1 概述 Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala...Kafka凭借着自身的优势,越来越受到互联网企业的青睐,唯品会也采用Kafka作为其内部核心消息引擎之一。Kafka作...
  • Kafka 架构-图文讲解

    2019-06-04 01:39:48
    Kafka是一个开源的、分布式的、可分区的、可复制的基于日志提交的发布订阅消息系统。它具备以下特点:1. 消息持久化: 为了从大数据中获取有价值的信息,任何信息的丢失都是...
  • Kafka学习(四):Kafka架构详解

    千次阅读 2020-03-18 22:20:03
    学习目标kafka架构介绍kafka架构内部细节剖析 kafka架构介绍 1、生产者 API 允许应用程序发布记录流至一个或者多个kafka的主题 topics(生产数据到topic)。 2、消费者 API 允许应用程序订阅一个或者多个主题,...
  • kafka是什么 kafka是一种高吞吐量的分布式消息系统 kafka有什么作用 作为常规的消息队列使用 作为网站的活性跟踪工具 作为日志收集中心 kafka架构 名词解释 broker:
  • 1.Kafka工作流程 Kafka中的消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是...
  • 一.Kafka简介  Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,使用Scala语言编写,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务。它主要...
  • kafka架构详解

    千次阅读 2018-01-08 21:46:07
    Kafka 的每个 borker 都是普通节点,但启动集群时会通过 ZK Lead 选举机制选出一个Leader 作为主节点。 Productor: 生产数据写到 Kafka,持久化到硬盘。对同一个 Topic 来讲,生产者通常只有“一个”(可以多并发...
  • Kafka 设计与原理详解

    万次阅读 多人点赞 2015-08-31 15:53:56
    一、Kafka简介1.1 背景历史当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点...
  • Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 1.1 点对点模式 消息队列的第一种模式 一对一,消费者(Consumer)主动拉取数据,消息收到后消息清除 1.2 ...
  • Kafka 原理和架构解析

    千次阅读 2019-11-03 11:45:50
    Kafka 是由 LinkedIn 开发的一个分布式的消息系统,使用 Scala 编写,它以可水平扩展和高吞吐率而被广泛使用。Kafka 是一种分布式的,基于发布 / 订阅的消息系统。主要设计目标如下: 以时间复杂度为 O(1) 的方式...
  • kafka名词说明 分布式 高吞吐量 消息系统 producer:生产者 consumer:消费者 broker:机器,节点 controller:kafka服务器的主节点 负责管理元数据(zk存储一份) follower:kafka服务器的从节点 (同步元...
  • 大家都知道 Kafka 是一个非常牛逼的消息队列框架,阿里的 RocketMQ 也是在 Kafka 的基础上进行改进的。对于初学者来说,一开始面对这么一个庞然大物会不知道怎么入手。那么这篇文章就带你先了解一下 Kafka 的技术...
  • 一、什么是kafka kafka是分布式发布-订阅消息...重点内容 二、kafka架构 Kafka内在就是分布式的,一个Kafka集群通常包括多个broker。为了均衡负载,将话题分成多个分区,每个broker存储一或多个分区。多个生产者和
  • zookeeper+kafka详解

    千次阅读 2018-06-02 17:45:53
    一、为什么需要消息系统 1.解耦:  允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.冗余:  消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。...
  • kafka入门及配置详解

    千次阅读 2019-01-18 11:29:39
    kafka架构图及术语说明 术语 Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker,可以水平扩展,一般broker数量越多,集群吞吐率越高,而且kafka 每个节点可以有多个 broker Producer 负责发布...
  • 多图详解 Kafka 原理与架构
1 2 3 4 5 ... 20
收藏数 5,516
精华内容 2,206
关键字:

kafka架构详解