kafka_kafkameter - CSDN
kafka 订阅
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。 展开全文
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
信息
开发商
Apache软件基金会
软件授权
Apache License 2.0
软件名称
Apache Kafka
更新时间
2020-04-08
软件版本
2.5.0
软件平台
跨平台
软件语言
Scala , Java
软件大小
15M
Kafka名字的由来
kafka的架构师jay kreps对于kafka的名称由来是这样讲的,由于jay kreps非常喜欢franz kafka,并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。
收起全文
精华内容
参与话题
  • 大数据硬实战之kafka视频教程

    千人学习 2019-06-25 10:56:58
    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。
  • kafka 学习 非常详细的经典教程

    千次阅读 多人点赞 2018-05-10 18:47:41
    https://blog.csdn.net/tangdong3415/article/details/53432166一、基本概念介绍Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有...将向Kafka topic发布消息的程序成为produ...

    https://blog.csdn.net/tangdong3415/article/details/53432166


    一、基本概念

    介绍

    Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。

    这个独特的设计是什么样的呢?

    首先让我们看几个基本的消息系统术语:
    Kafka将消息以topic为单位进行归纳。
    将向Kafka topic发布消息的程序成为producers.
    将预订topics并消费消息的程序成为consumer.
    Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
    producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:

     

    客户端和服务端通过TCP协议通信。Kafka提供了Java客户端,并且对多种语言都提供了支持。


    Topics 和Logs

    先来看一下Kafka提供的一个抽象概念:topic.
    一个topic是对一组消息的归纳。对每个topic,Kafka 对它的日志进行了分区,如下图所示:
     

    每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
    在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。

    实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:一般情况下随着consumer不断的读取消息,这offset的值不断增加,但其实consumer可以以任意的顺序读取消息,比如它可以将offset设置成为一个旧的值来重读之前的消息。

    以上特点的结合,使Kafka consumers非常的轻量级:它们可以在不对集群和其他consumer造成影响的情况下读取消息。你可以使用命令行来"tail"消息而不会对其他正在消费消息的consumer造成影响。

    将日志分区可以达到以下目的:首先这使得每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能。

    分布式

    每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。
    每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader.如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。

    Producers

    Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。


    Consumers

    发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。

     

    由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个

    相比传统的消息系统,Kafka可以很好的保证有序性。
    传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

    在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

    Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

    ###########################################

    二、环境搭建


    Step 1: 下载Kafka

    点击下载最新的版本并解压.

    1. > tar -xzf kafka_2.9.2-0.8.1.1.tgz
    2. > cd kafka_2.9.2-0.8.1.1
    复制代码



    Step 2: 启动服务

    Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
    1. > bin/zookeeper-server-start.sh config/zookeeper.properties &
    2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    3. ...
    复制代码


    现在启动Kafka:
    1. > bin/kafka-server-start.sh config/server.properties
    2. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    4. ...
    复制代码


    Step 3: 创建 topic

    创建一个叫做“test”的topic,它只有一个分区,一个副本。
    1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    复制代码


    可以通过list命令查看创建的topic:
    1. > bin/kafka-topics.sh --list --zookeeper localhost:2181
    2. test
    复制代码


    除了手动创建topic,还可以配置broker让它自动创建topic.

    Step 4:发送消息.

    Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

    运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
    1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    2. This is a messageThis is another message
    复制代码


    ctrl+c可以退出发送。

    Step 5: 启动consumer

    Kafka also has a command line consumer that will dump out messages to standard output.
    Kafka也有一个命令行consumer可以读取消息并输出到标准输出:
    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    2. This is a message
    3. This is another message
    复制代码


    你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
    这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

    Step 6: 搭建一个多个broker的集群

    刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
    首先为每个节点编写配置文件:

    1. > cp config/server.properties config/server-1.properties
    2. > cp config/server.properties config/server-2.properties
    复制代码


    在拷贝出的新文件中添加以下参数:
    1. config/server-1.properties:
    2.     broker.id=1
    3.     port=9093
    4.     log.dir=/tmp/kafka-logs-1
    复制代码


    1. config/server-2.properties:
    2.     broker.id=2
    3.     port=9094
    4.     log.dir=/tmp/kafka-logs-2
    复制代码


    broker.id在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。

    We already have Zookeeper and our single node started, so we just need to start the two new nodes:
    刚才已经启动可Zookeeper和一个节点,现在启动另外两个节点:
    1. > bin/kafka-server-start.sh config/server-1.properties &
    2. ...
    3. > bin/kafka-server-start.sh config/server-2.properties &
    4. ...
    复制代码


    创建一个拥有3个副本的topic:
    1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    复制代码


    现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“"describe topics”命令就可以了:
    1. > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    复制代码

    1. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
    2.         Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
    复制代码


    下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
    leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
    replicas:列出了所有的副本节点,不管节点是否在服务中.
    isr:是正在服务中的节点.
    在我们的例子中,节点1是作为leader运行。
    向topic发送消息:

    1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    复制代码

    1. ...
    2. my test message 1my test message 2^C
    复制代码


    消费这些消息:
    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    复制代码

    1. ...
    2. my test message 1
    3. my test message 2
    4. ^C
    复制代码


    测试一下容错能力.Broker 1作为leader运行,现在我们kill掉它:
    1. > ps | grep server-1.properties7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
    2. > kill -9 7564
    复制代码


    另外一个节点被选做了leader,node 1 不再出现在 in-sync 副本列表中:
    1. > bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
    2. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
    3.         Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0
    复制代码


    虽然最初负责续写消息的leader down掉了,但之前的消息还是可以消费的:
    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    2. ...
    3. my test message 1
    4. my test message 2
    复制代码



    看来Kafka的容错机制还是不错的。

    ################################################

    三、搭建Kafka开发环境

    我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。下面我们来搭建kafka的开发环境。
    添加依赖

    搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。
    创建好maven项目后,在pom.xml中添加以下依赖:
    1. <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.0</version>
      </dependency>

    添加依赖后你会发现有两个jar包的依赖找不到。没关系我都帮你想好了,点击这里下载这两个jar包。


    jar包下载地址:   

    http://www.java2s.com/Code/Jar/j/Downloadjmxri121jar.htm

    http://www.datanucleus.org/downloads/maven2/com/sun/jdmk/jmxtools/1.2.1/

    解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,比如我的本地仓库是d:\mvn,完成后我的目录结构是这样的:


     


    或者直接引入高版本的kafka :

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>1.1.0</version>
    </dependency>

    配置程序

    首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:
    1. package com.sohu.kafkademon;
    2. public interface KafkaProperties
    3. {
    4.     final static String zkConnect = "10.22.10.139:2181";
    5.     final static String groupId = "group1";
    6.     final static String topic = "topic1";
    7.     final static String kafkaServerURL = "10.22.10.139";
    8.     final static int kafkaServerPort = 9092;
    9.     final static int kafkaProducerBufferSize = 64 * 1024;
    10.     final static int connectionTimeOut = 20000;
    11.     final static int reconnectInterval = 10000;
    12.     final static String topic2 = "topic2";
    13.     final static String topic3 = "topic3";
    14.     final static String clientId = "SimpleConsumerDemoClient";
    15. }
    复制代码

    producer

    1. package com.sohu.kafkademon;
    2. import java.util.Properties;
    3. import kafka.producer.KeyedMessage;
    4. import kafka.producer.ProducerConfig;
    5. /**
    6. * @author leicui bourne_cui@163.com
    7. */
    8. public class KafkaProducer extends Thread
    9. {
    10.     private final kafka.javaapi.producer.Producer<Integer, String> producer;
    11.     private final String topic;
    12.     private final Properties props = new Properties();
    13.     public KafkaProducer(String topic)
    14.     {
    15.         props.put("serializer.class", "kafka.serializer.StringEncoder");
    16.         props.put("metadata.broker.list", "10.22.10.139:9092");
    17.         producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
    18.         this.topic = topic;
    19.     }
    20.     @Override
    21.     public void run() {
    22.         int messageNo = 1;
    23.         while (true)
    24.         {
    25.             String messageStr = new String("Message_" + messageNo);
    26.             System.out.println("Send:" + messageStr);
    27.             producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
    28.             messageNo++;
    29.             try {
    30.                 sleep(3000);
    31.             } catch (InterruptedException e) {
    32.                 // TODO Auto-generated catch block
    33.                 e.printStackTrace();
    34.             }
    35.         }
    36.     }
    37. }
    复制代码

    consumer
    1. package com.sohu.kafkademon;
    2. import java.util.HashMap;
    3. import java.util.List;
    4. import java.util.Map;
    5. import java.util.Properties;
    6. import kafka.consumer.ConsumerConfig;
    7. import kafka.consumer.ConsumerIterator;
    8. import kafka.consumer.KafkaStream;
    9. import kafka.javaapi.consumer.ConsumerConnector;
    10. /**
    11. * @author leicui bourne_cui@163.com
    12. */
    13. public class KafkaConsumer extends Thread
    14. {
    15.     private final ConsumerConnector consumer;
    16.     private final String topic;
    17.     public KafkaConsumer(String topic)
    18.     {
    19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    20.                 createConsumerConfig());
    21.         this.topic = topic;
    22.     }
    23.     private static ConsumerConfig createConsumerConfig()
    24.     {
    25.         Properties props = new Properties();
    26.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
    27.         props.put("group.id", KafkaProperties.groupId);
    28.         props.put("zookeeper.session.timeout.ms", "40000");
    29.         props.put("zookeeper.sync.time.ms", "200");
    30.         props.put("auto.commit.interval.ms", "1000");
    31.         return new ConsumerConfig(props);
    32.     }
    33.     @Override
    34.     public void run() {
    35.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    36.         topicCountMap.put(topic, new Integer(1));
    37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    38.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    39.         ConsumerIterator<byte[], byte[]> it = stream.iterator();
    40.         while (it.hasNext()) {
    41.             System.out.println("receive:" + new String(it.next().message()));
    42.             try {
    43.                 sleep(3000);
    44.             } catch (InterruptedException e) {
    45.                 e.printStackTrace();
    46.             }
    47.         }
    48.     }
    49. }
    复制代码

    简单的发送接收

    运行下面这个程序,就可以进行简单的发送接收消息了:
    1. package com.sohu.kafkademon;
    2. /**
    3. * @author leicui bourne_cui@163.com
    4. */
    5. public class KafkaConsumerProducerDemo
    6. {
    7.     public static void main(String[] args)
    8.     {
    9.         KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
    10.         producerThread.start();
    11.         KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
    12.         consumerThread.start();
    13.     }
    14. }
    复制代码

    高级别的consumer

    下面是比较负载的发送接收的程序:
    1. package com.sohu.kafkademon;
    2. import java.util.HashMap;
    3. import java.util.List;
    4. import java.util.Map;
    5. import java.util.Properties;
    6. import kafka.consumer.ConsumerConfig;
    7. import kafka.consumer.ConsumerIterator;
    8. import kafka.consumer.KafkaStream;
    9. import kafka.javaapi.consumer.ConsumerConnector;
    10. /**
    11. * @author leicui bourne_cui@163.com
    12. */
    13. public class KafkaConsumer extends Thread
    14. {
    15.     private final ConsumerConnector consumer;
    16.     private final String topic;
    17.     public KafkaConsumer(String topic)
    18.     {
    19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    20.                 createConsumerConfig());
    21.         this.topic = topic;
    22.     }
    23.     private static ConsumerConfig createConsumerConfig()
    24.     {
    25.         Properties props = new Properties();
    26.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
    27.         props.put("group.id", KafkaProperties.groupId);
    28.         props.put("zookeeper.session.timeout.ms", "40000");
    29.         props.put("zookeeper.sync.time.ms", "200");
    30.         props.put("auto.commit.interval.ms", "1000");
    31.         return new ConsumerConfig(props);
    32.     }
    33.     @Override
    34.     public void run() {
    35.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    36.         topicCountMap.put(topic, new Integer(1));
    37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    38.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    39.         ConsumerIterator<byte[], byte[]> it = stream.iterator();
    40.         while (it.hasNext()) {
    41.             System.out.println("receive:" + new String(it.next().message()));
    42.             try {
    43.                 sleep(3000);
    44.             } catch (InterruptedException e) {
    45.                 e.printStackTrace();
    46.             }
    47.         }
    48.     }
    49. }
    复制代码

    ############################################################

    四、数据持久化


    不要畏惧文件系统!

    Kafka大量依赖文件系统去存储和缓存消息。对于硬盘有个传统的观念是硬盘总是很慢,这使很多人怀疑基于文件系统的架构能否提供优异的性能。实际上硬盘的快慢完全取决于使用它的方式。设计良好的硬盘架构可以和内存一样快。

    在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍。现代的操作系统都对次做了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,写的时候将各种微小琐碎的逻辑写入组织合并成一次较大的物理写入。对此的深入讨论可以查看这里,它们发现线性的访问磁盘,很多时候比随机的内存访问快得多。

    为了提高性能,现代操作系统往往使用内存作为磁盘的缓存,现代操作系统乐于把所有空闲内存用作磁盘缓存,虽然这可能在缓存回收和重新分配时牺牲一些性能。所有的磁盘读写操作都会经过这个缓存,这不太可能被绕开除非直接使用I/O。所以虽然每个程序都在自己的线程里只缓存了一份数据,但在操作系统的缓存里还有一份,这等于存了两份数据。

    另外再来讨论一下JVM,以下两个事实是众所周知的:

    •Java对象占用空间是非常大的,差不多是要存储的数据的两倍甚至更高。

    •随着堆中数据量的增加,垃圾回收回变的越来越困难。

    基于以上分析,如果把数据缓存在内存里,因为需要存储两份,不得不使用两倍的内存空间,Kafka基于JVM,又不得不将空间再次加倍,再加上要避免GC带来的性能影响,在一个32G内存的机器上,不得不使用到28-30G的内存空间。并且当系统重启的时候,又必须要将数据刷到内存中( 10GB 内存差不多要用10分钟),就算使用冷刷新(不是一次性刷进内存,而是在使用数据的时候没有就刷到内存)也会导致最初的时候新能非常慢。但是使用文件系统,即使系统重启了,也不需要刷新数据。使用文件系统也简化了维护数据一致性的逻辑。

    所以与传统的将数据缓存在内存中然后刷到硬盘的设计不同,Kafka直接将数据写到了文件系统的日志中。

    常量时间的操作效率

    在大多数的消息系统中,数据持久化的机制往往是为每个cosumer提供一个B树或者其他的随机读写的数据结构。B树当然是很棒的,但是也带了一些代价:比如B树的复杂度是O(log N),O(log N)通常被认为就是常量复杂度了,但对于硬盘操作来说并非如此。磁盘进行一次搜索需要10ms,每个硬盘在同一时间只能进行一次搜索,这样并发处理就成了问题。虽然存储系统使用缓存进行了大量优化,但是对于树结构的性能的观察结果却表明,它的性能往往随着数据的增长而线性下降,数据增长一倍,速度就会降低一倍。

    直观的讲,对于主要用于日志处理的消息系统,数据的持久化可以简单的通过将数据追加到文件中实现,读的时候从文件中读就好了。这样做的好处是读和写都是 O(1) 的,并且读操作不会阻塞写操作和其他操作。这样带来的性能优势是很明显的,因为性能和数据的大小没有关系了。

    既然可以使用几乎没有容量限制(相对于内存来说)的硬盘空间建立消息系统,就可以在没有性能损失的情况下提供一些一般消息系统不具备的特性。比如,一般的消息系统都是在消息被消费后立即删除,Kafka却可以将消息保存一段时间(比如一星期),这给consumer提供了很好的机动性和灵活性,这点在今后的文章中会有详述。

    ############################################################

    五、消息传输的事务定义

    之前讨论了consumer和producer是怎么工作的,现在来讨论一下数据传输方面。数据传输的事务定义通常有以下三种级别:
    • 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
    • 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
    • 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。
    大多数消息系统声称可以做到“精确的一次”,但是仔细阅读它们的的文档可以看到里面存在误导,比如没有说明当consumer或producer失败时怎么样,或者当有多个consumer并行时怎么样,或写入硬盘的数据丢失时又会怎么样。kafka的做法要更先进一些。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。关于副本的活动的概念,下节文档会讨论。现在假设broker是不会down的。

    如果producer发布消息时发生了网络错误,但又不确定实在提交之前发生的还是提交之后发生的,这种情况虽然不常见,但是必须考虑进去,现在Kafka版本还没有解决这个问题,将来的版本正在努力尝试解决。

    并不是所有的情况都需要“精确的一次”这样高的级别,Kafka允许producer灵活的指定级别。比如producer可以指定必须等待消息被提交的通知,或者完全的异步发送消息而不等待任何通知,或者仅仅等待leader声明它拿到了消息(followers没有必要)。

    现在从consumer的方面考虑这个问题,所有的副本都有相同的日志文件和相同的offset,consumer维护自己消费的消息的offset,如果consumer不会崩溃当然可以在内存中保存这个值,当然谁也不能保证这点。如果consumer崩溃了,会有另外一个consumer接着消费消息,它需要从一个合适的offset继续处理。这种情况下可以有以下选择:

    • consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
    • consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
    • “精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。


    ############################################################

    六、性能优化

    Kafka在提高效率方面做了很大努力。Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作。读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也尽量使读的操作更轻量化。

    我们之前讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的I/O操作和太多的字节拷贝。I/O问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中。

    消息集(message set)
    为了避免这些问题,Kafka建立了“消息集(message set)”的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。Producer把消息集一块发送给服务端,而不是一条条的发送;服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的I/O操作。consumer也可以一次性的请求一个消息集。

    另外一个性能优化是在字节拷贝方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka使用了标准的二进制消息格式,这个格式可以在producer,broker和producer之间共享而无需做任何改动。

    zero copy
    Broker维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式producer和consumer是共享的,这使得Kafka可以一个很重要的点进行优化:消息在网络上的传递。现代的unix操作系统提供了高性能的将数据从页面缓存发送到socket的系统函数,在linux中,这个函数是sendfile.

    为了更好的理解sendfile的好处,我们先来看下一般将数据从文件发送到socket的数据流向:
    • 操作系统把数据从文件拷贝内核中的页缓存中
    • 应用程序从页缓存从把数据拷贝自己的内存缓存中
    • 应用程序将数据写入到内核中socket缓存中
    • 操作系统把数据从socket缓存中拷贝到网卡接口缓存,从这里发送到网络上。

    这显然是低效率的,有4次拷贝和2次系统调用。Sendfile通过直接将数据从页面缓存发送网卡接口缓存,避免了重复拷贝,大大的优化了性能。
    在一个多consumers的场景里,数据仅仅被拷贝到页面缓存一次而不是每次消费消息的时候都重复的进行拷贝。这使得消息以近乎网络带宽的速率发送出去。这样在磁盘层面你几乎看不到任何的读操作,因为数据都是从页面缓存中直接发送到网络上去了。
    这篇文章详细介绍了sendfile和zero-copy技术在Java方面的应用。

    数据压缩
    很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有Kafka支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。
    Kafka采用了端到端的压缩:因为有“消息集”的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到consumer,消息从producer发出到consumer拿到都被是压缩的,只有在consumer使用的时候才被解压缩,所以叫做“端到端的压缩”。
    Kafka支持GZIP和Snappy压缩协议。更详细的内容可以查看这里

    ##########################################################

    七、Producer和Consumer


    Kafka Producer消息发送
    producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发。为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。

    客户端控制消息将被分发到哪个分区。可以通过负载均衡随机的选择,或者使用分区函数。Kafka允许用户实现分区函数,指定分区的key,将消息hash到不同的分区上(当然有需要的话,也可以覆盖这个分区函数自己实现逻辑).比如如果你指定的key是user id,那么同一个用户发送的消息都被发送到同一个分区上。经过分区之后,consumer就可以有目的的消费某个分区的消息。

    异步发送
    批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。这个策略可以配置的,比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去(比如100条消息就发送,或者每5秒发送一次)。这种策略将大大减少服务端的I/O次数。

    既然缓存是在producer端进行的,那么当producer崩溃时,这些消息就会丢失。Kafka0.8.1的异步发送模式还不支持回调,就不能在发送出错时进行处理。Kafka 0.9可能会增加这样的回调函数。见Proposed Producer API.

    Kafka Consumer
    Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息。consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息。customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。

    推还是拉?
    Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。

    一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式。

    Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略。

    Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。

    消费状态跟踪
    对消费消息状态的记录也是很重要的。
    大部分消息系统在broker端的维护消息被消费的记录:一个消息被分发到consumer后broker就马上进行标记或者等待customer的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。

    但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,一旦consumer处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到consumer已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果consumer处理消息成功了但是向broker发送响应时失败了,这条消息将被消费两次。第二个问题时,broker必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,
    Kafka采用了不同的策略。Topic被分成了若干分区,每个分区在同一时间只被一个consumer消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。

    这带来了另外一个好处:consumer可以把offset调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?consumer发现解析数据的程序有bug,在修改bug后再来解析一次消息,看起来是很合理的额呀!

    离线处理消息
    高级的数据持久化允许consumer每个隔一段时间批量的将数据加载到线下系统中比如Hadoop或者数据仓库。这种情况下,Hadoop可以将加载任务分拆,拆成每个broker或每个topic或每个分区一个加载任务。Hadoop具有任务管理功能,当一个任务失败了就可以重启而不用担心数据被重新加载,只要从上次加载的位置继续加载消息就可以了。

    #########################################################


    八、主从同步


    Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

    Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
    创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。flowers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

    许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否

    着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
    • 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
    • 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。
    符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。 

    只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数request.required.acks决定的。
    Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。

    Leader的选择
    Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

    如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。

    Kafaka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

    一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
    实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
    • 等待ISR中的任何一个节点恢复并担任leader。
    • 选择所有节点中(不只是ISR)第一个恢复的节点作为leader.
    这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
    这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

    副本管理
    以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
    优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

    ###################################################

    九、客户端API

    Kafka Producer APIs
    Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:
    1. class Producer {
    2. /* 将消息发送到指定分区 */
    3. publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
    4. /* 批量发送一批消息 */
    5. publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
    6. /* 关闭producer */
    7. publicvoid close();
    8. }
    复制代码



    Producer API提供了以下功能:
    • 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.time和batch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
    • 自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder。
      • interface Encoder<T> {
      • public Message toMessage(T data);
      • }
    • 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
    • 通过分区函数kafka.producer.Partitioner类对消息分区。
      • interface Partitioner<T> {
      • int partition(T key, int numPartitions);
      • }

      分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

    KafKa Consumer APIs

    Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。
    高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

    低级别的API
    1. class SimpleConsumer {
    2. /*向一个broker发送读取请求并得到消息集 */
    3. public ByteBufferMessageSet fetch(FetchRequest request);
    4. /*向一个broker发送读取请求并得到一个相应集 */
    5. public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    6. /**
    7. * 得到指定时间之前的offsets
    8. * 返回值是offsets列表,以倒序排序
    9. * @param time: 时间,毫秒,
    10. * 如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
    11. * 如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
    12. */
    13. publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    14. }
    复制代码


    低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

    高级别的API
    1. /* 创建连接 */
    2. ConsumerConnector connector = Consumer.create(consumerConfig);
    3. interface ConsumerConnector {
    4. /**
    5. * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)
    6. * Input: a map of <topic, #streams>
    7. * Output: a map of <topic, list of message streams>
    8. */
    9. public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
    10. /**
    11. * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代,
    12. * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
    13. */
    14. public List<KafkaStream> createMessageStreamsByFilter(
    15. TopicFilter topicFilter, int numStreams);
    16. /* 提交目前消费到的offset */
    17. public commitOffsets()
    18. /* 关闭连接 */
    19. public shutdown()
    20. }
    复制代码



    这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

    每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

    #######################################################

    十、消息和日志



    消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。
    1. /**
    2. * 具有N个字节的消息的格式如下
    3. *
    4. * 如果版本号是0
    5. *
    6. * 1. 1个字节的 "magic" 标记
    7. *
    8. * 2. 4个字节的CRC32校验码
    9. *
    10. * 3. N - 5个字节的具体信息
    11. *
    12. * 如果版本号是1
    13. *
    14. * 1. 1个字节的 "magic" 标记
    15. *
    16. * 2.1个字节的参数允许标注一些附加的信息比如是否压缩了,解码类型等
    17. *
    18. * 3.4个字节的CRC32校验码
    19. *
    20. * 4. N - 6 个字节的具体信息
    21. *
    22. */
    复制代码



    日志一个叫做“my_topic”且有两个分区的的topic,它的日志有两个文件夹组成,my_topic_0和my_topic_1,每个文件夹里放着具体的数据文件,每个数据文件都是一系列的日志实体,每个日志实体有一个4个字节的整数N标注消息的长度,后边跟着N个字节的消息。每个消息都可以由一个64位的整数offset标注,offset标注了这条消息在发送到这个分区的消息流中的起始位置。每个日志文件的名称都是这个文件第一条日志的offset.所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名字的差就是一个数字S,S差不多就是配置文件中指定的日志文件的最大容量。
    消息的格式都由一个统一的接口维护,所以消息可以在producer,broker和consumer之间无缝的传递。存储在硬盘上的消息格式如下所示:
    • 消息长度: 4 bytes (value: 1+4+n)
    • 版本号: 1 byte
    • CRC校验码: 4 bytes
    • 具体的消息: n bytes


     


    写操作消息被不断的追加到最后一个日志的末尾,当日志的大小达到一个指定的值时就会产生一个新的文件。对于写操作有两个参数,一个规定了消息的数量达到这个值时必须将数据刷新到硬盘上,另外一个规定了刷新到硬盘的时间间隔,这对数据的持久性是个保证,在系统崩溃的时候只会丢失一定数量的消息或者一个时间段的消息。

    读操作
    读操作需要两个参数:一个64位的offset和一个S字节的最大读取量。S通常比单个消息的大小要大,但在一些个别消息比较大的情况下,S会小于单个消息的大小。这种情况下读操作会不断重试,每次重试都会将读取量加倍,直到读取到一个完整的消息。可以配置单个消息的最大值,这样服务器就会拒绝大小超过这个值的消息。也可以给客户端指定一个尝试读取的最大上限,避免为了读到一个完整的消息而无限次的重试。
    在实际执行读取操纵时,首先需要定位数据所在的日志文件,然后根据offset计算出在这个日志中的offset(前面的的offset是整个分区的offset),然后在这个offset的位置进行读取。定位操作是由二分查找法完成的,Kafka在内存中为每个文件维护了offset的范围。

    下面是发送给consumer的结果的格式:

    1. MessageSetSend (fetch result)

    2. total length     : 4 bytes
    3. error code       : 2 bytes
    4. message 1        : x bytes
    5. ...
    6. message n        : x bytes
    7. MultiMessageSetSend (multiFetch result)

    8. total length       : 4 bytes
    9. error code         : 2 bytes
    10. messageSetSend 1
    11. ...
    12. messageSetSend n
    复制代码


    删除
    日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间删除),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

    可靠性保证
    日志文件有一个可配置的参数M,缓存超过这个数量的消息将被强行刷新到硬盘。一个日志矫正线程将循环检查最新的日志文件中的消息确认每个消息都是合法的。合法的标准为:所有文件的大小的和最大的offset小于日志文件的大小,并且消息的CRC32校验码与存储在消息实体中的校验码一致。如果在某个offset发现不合法的消息,从这个offset到下一个合法的offset之间的内容将被移除。
    有两种情况必须考虑:
    1,当发生崩溃时有些数据块未能写入。
    2,写入了一些空白数据块。第二种情况的原因是,对于每个文件,操作系统都有一个inode(inode是指在许多“类Unix文件系统”中的一种数据结构。每个inode保存了文件系统中的一个文件系统对象,包括文件、目录、大小、设备文件、socket、管道, 等等),但无法保证更新inode和写入数据的顺序,当inode保存的大小信息被更新了,但写入数据时发生了崩溃,就产生了空白数据块。CRC校验码可以检查这些块并移除,当然因为崩溃而未写入的数据块也就丢失了。
    展开全文
  • kafka介绍和使用

    万次阅读 多人点赞 2018-04-24 15:53:48
    1.1. 主要功能根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3...发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因 2:It lets you store streams of records in...

    1.1.       主要功能

    根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

      1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因

      2:It lets you store streams of records in a fault-tolerant way.以容错的方式记录消息流,kafka以文件的方式来存储消息流

      3:It lets you process streams of records as they occur.可以再消息发布的时候进行处理

     

     1.2.       使用场景

    1:Building real-time streaming data pipelines that reliably get data between systems or applications.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能

    2:Building real-time streaming applications that transform or react to the streams of data。构建实时的流数据处理程序来变换或处理数据流,数据处理功能

     

    1.3.       详细介绍

    Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

      1.3.1 消息传输流程

     

        Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

        Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

        Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

        从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。

      1.3.2 kafka服务器消息存储策略

     

        谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。

     

      在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。

      1.3.3 与生产者的交互

     

        生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中

        也可以通过指定均衡策略来将消息发送到不同的分区中

        如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中

      1.3.4  与消费者的交互

      

        在消费者消费消息时,kafka使用offset来记录当前消费的位置

        在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。

        对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费

        因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。

    2.       Kafka安装与使用

     

    2.1.       下载

      你可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件,根据网络状态可能需要fq,这里我们选择的版本是0.11.0.1,目前的最新版

     

    2.2.       安装

      Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,因此我们这里也使用linux来开始今天的实战。

      首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用

      说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到/home目录

     

    2.3.       配置

      在kafka解压目录下下有一个config的文件夹,里面放置的是我们的配置文件

      consumer.properites 消费者配置,这个配置文件用于配置于2.5节中开启的消费者,此处我们使用默认的即可

      producer.properties 生产者配置,这个配置文件用于配置于2.5节中开启的生产者,此处我们使用默认的即可

      server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置

      1. broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可
      2. listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:

              listeners=PLAINTEXT:// 192.168.180.128:9092。并确保服务器的9092端口能够访问

          3.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带                        zookeeper,使用默认配置即可 

              zookeeper.connect=localhost:2181

    2.4.       运行

    1. 启动zookeeper

    cd进入kafka解压目录,输入

    bin/zookeeper-server-start.sh config/zookeeper.properties

    启动zookeeper成功后会看到如下的输出

        2.启动kafka

    cd进入kafka解压目录,输入

    bin/kafka-server-start.sh config/server.properties

    启动kafka成功后会看到如下的输出

     

    2.5.       第一个消息

       2.5.1   创建一个topic

        Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷

        在kafka解压目录打开终端,输入

        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

        创建一个名为test的topic

     

             在创建topic后可以通过输入

                bin/kafka-topics.sh --list --zookeeper localhost:2181

       来查看已经创建的topic

      2.4.2   创建一个消息消费者

       在kafka解压目录打开终端,输入

        bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

       可以创建一个用于消费topic为test的消费者

     

     

             消费者创建完成之后,因为还没有发送任何数据,因此这里在执行后没有打印出任何数据

             不过别着急,不要关闭这个终端,打开一个新的终端,接下来我们创建第一个消息生产者

      2.4.3         创建一个消息生产者

        在kafka解压目录打开一个新的终端,输入

        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

        在执行完毕后会进入的编辑器页面

     

    在发送完消息之后,可以回到我们的消息消费者终端中,可以看到,终端中已经打印出了我们刚才发送的消息

     

    3.       使用java程序

        跟上节中一样,我们现在在java程序中尝试使用kafka

        3.1  创建Topic

    public static void main(String[] args) {
        //创建topic
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.180.128:9092");
        AdminClient adminClient = AdminClient.create(props);
        ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
        NewTopic newTopic = new NewTopic("topic-test", 1, (short) 1);
        topics.add(newTopic);
        CreateTopicsResult result = adminClient.createTopics(topics);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

      使用AdminClient API可以来控制对kafka服务器进行配置,我们这里使用NewTopic(String name, int numPartitions, short   replicationFactor)的构造方法来创建了一个名为“topic-test”,分区数为1,复制因子为1的Topic.

    3.2  Producer生产者发送消息

    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.180.128:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("topic-test", Integer.toString(i), Integer.toString(i)));

        producer.close();

    }

    使用producer发送完消息可以通过2.5中提到的服务器端消费者监听到消息。也可以使用接下来介绍的java消费者程序来消费消息

    3.3 Consumer消费者消费消息

    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.12.65:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList("topic-test"),new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //将偏移设置到最开始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }

    这里我们使用Consume API 来创建了一个普通的java消费者程序来监听名为“topic-test”的Topic,每当有生产者向kafka服务器发送消息,我们的消费者就能收到发送的消息。

    4.       使用spring-kafka

    Spring-kafka是正处于孵化阶段的一个spring子项目,能够使用spring的特性来让我们更方便的使用kafka

    4.1   基本配置信息

    与其他spring的项目一样,总是离不开配置,这里我们使用java配置来配置我们的kafka消费者和生产者。

    1. 引入pom文件

    <!--kafka start-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.3.0.RELEASE</version>
    </dependency>

    1. 创建配置类

    我们在主目录下新建名为KafkaConfig的类

    @Configuration
    @EnableKafka
    public class KafkaConfig {

    }

    1. 配置Topic

    在kafkaConfig类中添加配置

    //topic config Topic的配置开始
        @Bean
        public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<String, Object>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.180.128:9092");
            return new KafkaAdmin(configs);
        }

        @Bean
        public NewTopic topic1() {
            return new NewTopic("foo", 10, (short) 2);
        }
    //topic的配置结束

     

    1. 配置生产者Factort及Template

    //producer config start
        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<Integer,String>(producerConfigs());
        }
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<String,Object>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.128:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return props;
        }
        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<Integer, String>(producerFactory());
        }
    //producer config end

    5.配置ConsumerFactory

    //consumer config start
        @Bean
        public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }

        @Bean
        public ConsumerFactory<Integer,String> consumerFactory(){
            return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
        }


        @Bean
        public Map<String,Object> consumerConfigs(){
            HashMap<String, Object> props = new HashMap<String, Object>();
            props.put("bootstrap.servers", "192.168.180.128:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return props;
        }
    //consumer config end

     

     

    4.2  创建消息生产者

    //使用spring-kafka的template发送一条消息 发送多条消息只需要循环多次即可
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConfig.class);
        KafkaTemplate<Integer, String> kafkaTemplate = (KafkaTemplate<Integer, String>) ctx.getBean("kafkaTemplate");
            String data="this is a test message";
            ListenableFuture<SendResult<Integer, String>> send = kafkaTemplate.send("topic-test", 1, data);
            send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
                public void onFailure(Throwable throwable) {

                }

                public void onSuccess(SendResult<Integer, String> integerStringSendResult) {

                }
            });
    }

     

    4.3    创建消息消费者

    我们首先创建一个一个用于消息监听的类,当名为”topic-test”的topic接收到消息之后,我们的这个listen方法就会调用。

    public class SimpleConsumerListener {
        private final static Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class);
        private final CountDownLatch latch1 = new CountDownLatch(1);

        @KafkaListener(id = "foo", topics = "topic-test")
        public void listen(byte[] records) {
            //do something here
            this.latch1.countDown();
        }
    }

             我们同时也需要将这个类作为一个Bean配置到KafkaConfig中

    @Bean
    public SimpleConsumerListener simpleConsumerListener(){
        return new SimpleConsumerListener();
    }

    默认spring-kafka会为每一个监听方法创建一个线程来向kafka服务器拉取消息


    展开全文
  • Kafka史上最详细原理总结 ----用于自己看的

    万次阅读 多人点赞 2020-01-21 15:47:41
    Kafka Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如...
    Kafka
    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
     
    1.前言
    消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
     
     1.1  Kafka的特性:
    - 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
    - 可扩展性:kafka集群支持热扩展
    - 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
    - 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
    - 高并发:支持数千个客户端同时读写
     
    1.2   Kafka的使用场景:
    - 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
    - 消息系统:解耦和生产者和消费者、缓存消息等。
    - 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
    - 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
    - 流式处理:比如spark streaming和storm
    - 事件源
     
    1.3  Kakfa的设计思想
    Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。
    这里曾经发生过一个bug,TalkingData使用Kafka0.8.1的时候,kafka controller在Zookeeper上注册成功后,它和Zookeeper通信的timeout时间是6s,也就是如果kafka controller如果有6s中没有和Zookeeper做心跳,那么Zookeeper就认为这个kafka controller已经死了,就会在Zookeeper上把这个临时节点删掉,那么其他Kafka就会认为controller已经没了,就会再次抢着注册临时节点,注册成功的那个kafka broker成为controller,然后,之前的那个kafka controller就需要各种shut down去关闭各种节点和事件的监听。但是当kafka的读写流量都非常巨大的时候,TalkingData的一个bug是,由于网络等原因,kafka controller和Zookeeper有6s中没有通信,于是重新选举出了一个新的kafka controller,但是原来的controller在shut down的时候总是不成功,这个时候producer进来的message由于Kafka集群中存在两个kafka controller而无法落地。导致数据淤积。
    这里曾经还有一个bug,TalkingData使用Kafka0.8.1的时候,当ack=0的时候,表示producer发送出去message,只要对应的kafka broker topic partition leader接收到的这条message,producer就返回成功,不管partition leader 是否真的成功把message真正存到kafka。当ack=1的时候,表示producer发送出去message,同步的把message存到对应topic的partition的leader上,然后producer就返回成功,partition leader异步的把message同步到其他partition replica上。当ack=all或-1,表示producer发送出去message,同步的把message存到对应topic的partition的leader和对应的replica上之后,才返回成功。但是如果某个kafka controller 切换的时候,会导致partition leader的切换(老的 kafka controller上面的partition leader会选举到其他的kafka broker上),但是这样就会导致丢数据。
     Consumergroup:各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group 中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。所以如果想同时对一个topic做消费的话,启动多个consumer group就可以了,但是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。它不能像AMQ那样可以多个BET作为consumer去互斥的(for update悲观锁)并发处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许同一个consumer group下的一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。如果想多个不同的业务都需要这个topic的数据,起多个consumer group就好了,大家都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
        当启动一个consumer group去消费一个topic的时候,无论topic里面有多个少个partition,无论我们consumer group里面配置了多少个consumer thread,这个consumer group下面的所有consumer thread一定会消费全部的partition;即便这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费所有的partition。因此,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
        同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不能够一个consumer group的多个consumer同时消费一个partition。
        一个consumer group下,无论有多少个consumer,这个consumer group一定回去把这个topic下所有的partition都消费了。当consumer group里面的consumer数量小于这个topic下的partition数量的时候,如下图groupA,groupB,就会出现一个conusmer thread消费多个partition的情况,总之是这个topic下的partition都会被消费。如果consumer group里面的consumer数量等于这个topic下的partition数量的时候,如下图groupC,此时效率是最高的,每个partition都有一个consumer thread去消费。当consumer group里面的consumer数量大于这个topic下的partition数量的时候,如下图GroupD,就会有一个consumer thread空闲。因此,我们在设定consumer group的时候,只需要指明里面有几个consumer数量即可,无需指定对应的消费partition序号,consumer会自动进行rebalance。
        多个Consumer Group下的consumer可以消费同一条message,但是这种消费也是以o(1)的方式顺序的读取message去消费,,所以一定会重复消费这批message的,不能向AMQ那样多个BET作为consumer消费(对message加锁,消费的时候不能重复消费message)
    Consumer Rebalance的触发条件:(1)Consumer增加或删除会触发 Consumer Group的Rebalance(2)Broker的增加或者减少都会触发 Consumer Rebalance
    Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。所以必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由自己维护。一般来说都是使用high level api的。Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也可以配置成读完消息处理再commit,这种情况下consumer端的响应就会比较慢的,需要等处理完才行。
    一般情况下,一定是一个consumer group处理一个topic的message。Best Practice是这个consumer group里面consumer的数量等于topic里面partition的数量,这样效率是最高的,一个consumer thread处理一个partition。如果这个consumer group里面consumer的数量小于topic里面partition的数量,就会有consumer thread同时处理多个partition(这个是kafka自动的机制,我们不用指定),但是总之这个topic里面的所有partition都会被处理到的。。如果这个consumer group里面consumer的数量大于topic里面partition的数量,多出的consumer thread就会闲着啥也不干,剩下的是一个consumer thread处理一个partition,这就造成了资源的浪费,因为一个partition不可能被两个consumer thread去处理。所以我们线上的分布式多个service服务,每个service里面的kafka consumer数量都小于对应的topic的partition数量,但是所有服务的consumer数量只和等于partition的数量,这是因为分布式service服务的所有consumer都来自一个consumer group,如果来自不同的consumer group就会处理重复的message了(同一个consumer group下的consumer不能处理同一个partition,不同的consumer group可以处理同一个topic,那么都是顺序处理message,一定会处理重复的。一般这种情况都是两个不同的业务逻辑,才会启动两个consumer group来处理一个topic)。
     
    如果producer的流量增大,当前的topic的parition数量=consumer数量,这时候的应对方式就是很想扩展:增加topic下的partition,同时增加这个consumer group下的consumer。
                    
    Delivery Mode : Kafka producer 发送message不用维护message的offsite信息,因为这个时候,offsite就相当于一个自增id,producer就尽管发送message就好了。而且Kafka与AMQ不同,AMQ大都用在处理业务逻辑上,而Kafka大都是日志,所以Kafka的producer一般都是大批量的batch发送message,向这个topic一次性发送一大批message,load balance到一个partition上,一起插进去,offsite作为自增id自己增加就好。但是Consumer端是需要维护这个partition当前消费到哪个message的offsite信息的,这个offsite信息,high level api是维护在Zookeeper上,low level api是自己的程序维护。(Kafka管理界面上只能显示high level api的consumer部分,因为low level api的partition offsite信息是程序自己维护,kafka是不知道的,无法在管理界面上展示 )当使用high level api的时候,先拿message处理,再定时自动commit offsite+1(也可以改成手动), 并且kakfa处理message是没有锁操作的。因此如果处理message失败,此时还没有commit offsite+1,当consumer thread重启后会重复消费这个message。但是作为高吞吐量高并发的实时处理系统,at least once的情况下,至少一次会被处理到,是可以容忍的。如果无法容忍,就得使用low level api来自己程序维护这个offsite信息,那么想什么时候commit offsite+1就自己搞定了。
     
    Topic & Partition:Topic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量]  )。物理上存储上,这个topic会分成一个或多个partition,每个partiton相当于是一个子queue。在物理结构上,每个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic可以有无数多的partition,根据业务需求和数据量来设置。在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在创建Topic时通过参数指定parittion数量。Topic创建之后通过Kafka提供的工具也可以修改partiton数量。
       一般来说,(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。(2)同一个Partition的Replica尽量分散到不同的机器,高可用。
      当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance
    Partition Replica:每个partition可以在其他的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。这样如果某个broker宕机,其实整个kafka内数据依然是完整的。但是,replica副本数越高,系统虽然越稳定,但是回来带资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。
      (1)怎样传送消息:producer先把message发送到partition leader,再由leader发送给其他partition follower。(如果让producer发送给每个replica那就太慢了)
      (2)在向Producer发送ACK前需要保证有多少个Replica已经收到该消息:根据ack配的个数而定
      (3)怎样处理某个Replica不工作的情况:如果这个部工作的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。如果这个不工作的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工作的partition replca写message成功,但是会等到time out,然后返回失败因为某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工作的partition replica从ack列表中移除,以后的producer发送message的时候就不会有这个ack列表下的这个部工作的partition replica了。 
      (4)怎样处理Failed Replica恢复回来的情况:如果这个partition replica之前不在ack列表中,那么启动后重新受Zookeeper管理即可,之后producer发送message的时候,partition leader会继续发送message到这个partition follower上。如果这个partition replica之前在ack列表中,此时重启后,需要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工作的partition replica的时候自动从ack列表中移除的)
    Partition leader与follower:partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其他的broker的partition follower上选择follower变为parition leader。
    Topic分配partition和partition replica的算法:(1)将Broker(size=n)和待分配的Partition排序。(2)将第i个Partition分配到第(i%n)个Broker上。(3)将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上
     
    - 消息投递可靠性
    一个消息如何算投递成功,Kafka提供了三种模式:
    - 第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;
    - 第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;
    - 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型
      消息在broker上的可靠性,因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。
      消息消费的可靠性,Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况,这种情况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决,但是如果你的应用不在乎重复消费,那就干脆不要解决,以换取最大的性能。
     
    Partition ack:当ack=1,表示producer写partition leader成功后,broker就返回成功,无论其他的partition follower是否写成功。当ack=2,表示producer写partition leader和其他一个follower成功的时候,broker就返回成功,无论其他的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer全部写成功的时候,才算成功,kafka broker才返回成功信息。这里需要注意的是,如果ack=1的时候,一旦有个broker宕机导致partition的follower和leader切换,会导致丢数据。
      
    message状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
    message持久化:Kafka中会把消息持久化到本地文件系统中,并且保持o(1)极高的效率。我们众所周知IO读取是非常耗资源的性能也是最慢的,这就是为了数据库的瓶颈经常在IO上,需要换SSD硬盘的原因。但是Kafka作为吞吐量极高的MQ,却可以非常高效的message持久化到文件。这是因为Kafka是顺序写入o(1)的时间复杂度,速度非常快。也是高吞吐量的原因。由于message的写入持久化是顺序写入的,因此message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。一般的机器,单机每秒100k条数据。
    message有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
    Produer : Producer向Topic发送message,不需要指定partition,直接发送就好了。kafka通过partition ack来控制是否发送成功并把信息返回给producer,producer可以有任意多的thread,这些kafka服务器端是不care的。Producer端的delivery guarantee默认是At least once的。也可以设置Producer异步发送实现At most once。Producer可以用主键幂等性实现Exactly once
    Kafka高吞吐量: Kafka的高吞吐量体现在读写上,分布式并发的读和写都非常快,写的性能体现在以o(1)的时间复杂度进行顺序写入。读的性能体现在以o(1)的时间复杂度进行顺序读取, 对topic进行partition分区,consume group中的consume线程可以以很高能性能进行顺序读。
    - Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,但是可能会重复传输;(3)Exactly once每条信息肯定会被传输一次且仅传输一次,这是用户想要的。
    批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
    push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
    Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
    负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
    同步异步:Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
    分区机制partition:Kafka的broker端支持消息分区partition,Producer可以决定把消息发到哪个partition,在一个partition 中message的顺序就是Producer发送消息的顺序,一个topic中可以有多个partition,具体partition的数量是可配置的。partition的概念使得kafka作为MQ可以横向扩展,吞吐量巨大。partition可以设置replica副本,replica副本存在不同的kafka broker节点上,第一个partition是leader,其他的是follower,message先写到partition leader上,再由partition leader push到parition follower上。所以说kafka可以水平扩展,也就是扩展partition。
    离线数据装载:Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。
    实时数据与离线数据:kafka既支持离线数据也支持实时数据,因为kafka的message持久化到文件,并可以设置有效期,因此可以把kafka作为一个高效的存储来使用,可以作为离线数据供后面的分析。当然作为分布式实时消息系统,大多数情况下还是用于实时的数据处理的,但是当cosumer消费能力下降的时候可以通过message的持久化在淤积数据在kafka。
    插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
    解耦:  相当于一个MQ,使得Producer和Consumer之间异步的操作,系统之间解耦
    冗余:  replica有多个副本,保证一个broker node宕机后不会影响整个服务
    扩展性:  broker节点可以水平扩展,partition也可以水平增加,partition replica也可以水平增加
    峰值:  在访问量剧增的情况下,kafka水平扩展, 应用仍然需要继续发挥作用
    可恢复性:  系统的一部分组件失效时,由于有partition的replica副本,不会影响到整个系统。
    顺序保证性:由于kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。
    缓冲:由于producer那面可能业务很简单,而后端consumer业务会很复杂并有数据库的操作,因此肯定是producer会比consumer处理速度快,如果没有kafka,producer直接调用consumer,那么就会造成整个系统的处理速度慢,加一层kafka作为MQ,可以起到缓冲的作用。
    异步通信:作为MQ,Producer与Consumer异步通信

    2.Kafka文件存储机制

    2.1 Kafka部分名词解释如下:
     
         Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
    • Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
    • Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
    • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
    • Segment:partition物理上由多个segment组成,每个Segment存着message信息
    • Producer : 生产message发送到topic
    • Consumer : 订阅topic消费message, consumer作为一个线程来消费
    • Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
       
    • 2.2 kafka一些原理概念
    1.持久化
    kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.
     
    2.性能
    除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).
    其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.
     
    3.负载均衡
    kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".
    异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
     
    4.Topic模型
    其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。
    kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.
     
    5.消息传输一致
    Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。
    最少1次:可能会重传数据,有可能出现数据被重复处理的情况;
    最多1次:可能会出现数据丢失情况;
    恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。
     
    at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".
    at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".
    "Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:
    最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。
     
    6.副本
    kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.
    选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.
     
    7.log
    每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
    获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.
     
    8.分布式
    kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
    Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.
    Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.
    Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.
    Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
    Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.
    Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
    当consumer启动时,所触发的操作:
    A) 首先进行"Consumer id Registry";
    B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
    C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
     
    总结:
    1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
    2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
    3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。
     
    9.Leader的选择
    Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
    如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。
    Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
    一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
    实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
    1. 等待ISR中的任何一个节点恢复并担任leader。
    2. 选择所有节点中(不只是ISR)第一个恢复的节点作为leader.
    这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
    这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。
     
    10.副本管理
    以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
    优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
     
    11.Leader与副本同步
    对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。
     
    Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
    Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
    创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
    许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
    1. 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
    2. 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。
    符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
    只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。
    Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
     
     
    • 2.3  kafka拓扑结构

           一个典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服务器日志等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干ConsumerGroup,以及一个Zookeeper集群。Kafka通过Zookeeper管理Kafka集群配置:选举Kafka broker的leader,以及在Consumer Group发生变化时进行rebalance,因为consumer消费kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
     

    分析过程分为以下4个步骤:

    • topic中partition存储分布
    • partiton中文件存储方式 (partition在linux服务器上就是一个目录(文件夹))
    • partiton中segment文件存储结构
    • 在partition中如何通过offset查找message

    通过上述4过程详细分析,我们就可以清楚认识到kafka文件存储机制的奥秘。

     
    2.3 topic中partition存储分布

    假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2个topic名 称分别为report_push、launch_info, partitions数量都为partitions=4

    存储路径和目录规则为:

    xxx/message-folder

      |--report_push-0
      |--report_push-1
      |--report_push-2
      |--report_push-3
      |--launch_info-0
      |--launch_info-1
      |--launch_info-2
      |--launch_info-3
     
    在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
    消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition组成,其组织结构如下图所示:
     
    我们可以看到,Partition是一个Queue的结构,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition上,其中的每一个消息都被赋予了一个唯一的offset值。
     
    Kafka集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。
     
    Kafka只维护在Partition中的offset值,因为这个offsite标识着这个partition的message消费到哪条了。Consumer每消费一个消息,offset就会加1。其实消息的状态完全是由Consumer控制的,Consumer可以跟踪和重设这个offset值,这样的话Consumer就可以读取任意位置的消息。
     
    把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;第二就是可以提高并发,因为可以以Partition为单位读写了。
     
    通过上面介绍的我们可以知道,kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。如果你的副本数量设置为3,那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。
     
    Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的message的数量。Producer在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader作为读写用。
     
    关于如何设置partition值需要考虑的因素。一个partition只能被一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于同时运行的consumer的数量。另外一方面,建议partition的数量大于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每个topic都有上百个partition。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,如果partition数量越大,就要为kafka分配更大的heap space。
    2.4 partiton中文件存储方式
     
    • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
    • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

    这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

    2.5 partiton中segment文件存储结构
    producer发message到某个topic,message会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),kafka broker收到message往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
     
    每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
    • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
    • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
     
    每个segment中存储很多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

    下面文件列表是笔者在Kafka broker上做的一个实验,创建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据,如下图2所示segment文件列表形象说明了上述2个规则:

    以上述图2中一对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下:

    上述图3中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中 元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移 地址为497。

    从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构如下:

    参数说明:

    关键字 解释说明
    8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
    4 byte message size message大小
    4 byte CRC32 用crc32校验message
    1 byte “magic" 表示本次发布Kafka服务程序协议版本号
    1 byte “attributes" 表示为独立版本、或标识压缩类型、或编码类型。
    4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
    K byte key 可选
    value bytes payload 表示实际消息数据。
     
    2.6 在partition中如何通过offset查找message

    例如读取offset=368776的message,需要通过下面2个步骤查找。

    • 第一步查找segment file

      上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。

      当offset=368776时定位到00000000000000368769.index|log

    • 第二步通过segment file查找message通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到 offset=368776为止。

    segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
     
    kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted topic中。同时Kafka又在内存中维护了的三元组来维护最新的offset信息,consumer来取最新offset信息的时候直接内存里拿即可。当然,kafka允许你快速的checkpoint最新的offset信息到磁盘上。
     
    3.Partition Replication原则

    Kafka高效文件存储设计特点

    • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
    • 通过索引信息可以快速定位message和确定response的最大大小。
    • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
    • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
     
     

    1. Kafka集群partition replication默认自动分配分析

    下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:

    (1)

     

     

    (2)当集群中新增2节点,Partition增加到6个时分布情况如下:

     

    副本分配逻辑规则如下:

    • 在Kafka集群中,每个Broker都有均等分配Partition的Leader机会。
    • 上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。
    • 上述图种每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。
     
    副本分配算法如下:
    • 将所有N Broker和待分配的i个Partition排序.
    • 将第i个Partition分配到第(i mod n)个Broker上.
    • 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.
     
    4.Kafka Broker一些特性
    4.1 无状态的Kafka Broker :
    1. Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
    2. Broker不保存订阅者的状态,由订阅者自己保存。
    3. 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
    4. 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset进行重新读取消费消息。
     
    4.2 message的交付与生命周期 :
    1. 不是严格的JMS, 因此kafka对消息的重复、丢失、错误以及顺序型没有严格的要求。(这是与AMQ最大的区别)
    2. kafka提供at-least-once delivery,即当consumer宕机后,有些消息可能会被重复delivery。
    3. 因每个partition只会被consumer group内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的订阅。
    4. Kafka为每条消息为每条消息计算CRC校验,用于错误检测,crc校验不通过的消息会直接被丢弃掉。
     
    4.3 压缩
     
    Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩之后,在Consumer端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU。
     
    那么如何区分消息是压缩的还是未压缩的呢,Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为0,则表示消息未被压缩。
     
    4.4 消息可靠性
     
    在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况:
     
    - 一个消息发送失败
     
    - 一个消息被发送多次
     
    - 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次
     
    有许多系统声称它们实现了exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息,但是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理取过来的消息时失败了。
     
    从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。
     
    从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。
     
    4.5 备份机制
     
    备份机制是Kafka0.8版本的新特性,备份机制的出现大大提高了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了Kafka的备份机制:
     
     
    4.6 Kafka高效性相关设计
     
    4.6.1 消息的持久化
    Kafka高度依赖文件系统来存储和缓存消息(AMQ的nessage是持久化到mysql数据库中的),因为一般的人认为磁盘是缓慢的,这导致人们对持久化结构具有竞争性持怀疑态度。其实,磁盘的快或者慢,这决定于我们如何使用磁盘。因为磁盘线性写的速度远远大于随机写。线性读写在大多数应用场景下是可以预测的。
    4.6.2 常数时间性能保证
    每个Topic的Partition的是一个大文件夹,里面有无数个小文件夹segment,但partition是一个队列,队列中的元素是segment,消费的时候先从第0个segment开始消费,新来message存在最后一个消息队列中。对于segment也是对队列,队列元素是message,有对应的offsite标识是哪个message。消费的时候先从这个segment的第一个message开始消费,新来的message存在segment的最后。
     
    消息系统的持久化队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。它有一个优点,所有的操作都是常数时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。
     
    事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着我们可以提供一般消息系统无法提供的特性。比如说,消息被消费后不是立马被删除,我们可以将这些消息保留一段相对比较长的时间(比如一个星期)。
     
    5.Kafka 生产者-消费者
         消息系统通常都会由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息,不同的MQ实现的Broker实现会有所不同,不过Broker的本质都是要负责将消息落地到服务端的存储系统中。具体步骤如下:
    1. 生产者客户端应用程序产生消息:

      1. 客户端连接对象将消息包装到请求中发送到服务端
      2. 服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来
      3. 服务端返回响应结果给生产者客户端
    2. 消费者客户端应用程序消费消息:

      1. 客户端连接对象将消费信息也包装到请求中发送给服务端
      2. 服务端从文件存储系统中取出消息
      3. 服务端返回响应结果给消费者客户端
      4. 客户端将响应结果还原成消息并开始处理消息
     
                                                                                  图4-1 客户端和服务端交互
     
    5.1  Producers
     
    Producers直接发送消息到broker上的leader partition,不需要经过任何中介或其他路由转发。为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。
     
    Producer客户端自己控制着消息被推送到哪些partition。实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的partition,用户可以为每个消息指定一个partitionKey,通过这个key来实现一些hash分区算法。比如,把userid作为partitionkey的话,相同userid的消息将会被推送到同一个partition。
     
    以Batch的方式推送数据可以极大的提高处理效率,kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送请求。Batch的数量大小可以通过Producer的参数控制,参数值可以设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。
     
    Producers可以异步的并行的向kafka发送消息,但是通常producer在发送完消息之后会得到一个future响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,如果acks设置数量为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
     
    若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待直到broker确认收到消息。若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
     
    Kafka 消息有一个定长的header和变长的字节数组组成。因为kafka消息支持字节数组,也就使得kafka可以支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但我们推荐消息大小不要超过1MB,通常一般消息大小都在1~10kB之前。
     
    发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息,一次行发布),send消息时,producer client需指定消息所属的topic。
     
    5.2  Consumers
    Kafka提供了两套consumer api,分为high-level api和sample-api。Sample-api 是一个底层的API,它维持了一个和单一broker的连接,并且这个API是完全无状态的,每次请求都需要指定offset值,因此,这套API也是最灵活的。
     
    在kafka中,当前读到哪条消息的offset值是由consumer来维护的,因此,consumer可以自己决定如何读取kafka中的数据。比如,consumer可以通过重设offset值来重新消费已消费过的数据。不管有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过期时间,kafka才会删除这些数据。(这一点与AMQ不一样,AMQ的message一般来说都是持久化到mysql中的,消费完的message会被delete掉)
     
    High-level API封装了对集群中一系列broker的访问,可以透明的消费一个topic。它自己维持了已消费消息的状态,即每次消费的都是下一个消息。
     
    High-level API还支持以组的形式消费topic,如果consumers有同一个组名,那么kafka就相当于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不同的组名,那么此时kafka就相当与一个广播服务,会把topic中的所有消息广播到每个consumer。
     
    High level api和Low level api是针对consumer而言的,和producer无关。
     
    High level api是consumer读的partition的offsite是存在zookeeper上。High level api 会启动另外一个线程去每隔一段时间,offsite自动同步到zookeeper上。换句话说,如果使用了High level api, 每个message只能被读一次,一旦读了这条message之后,无论我consumer的处理是否ok。High level api的另外一个线程会自动的把offiste+1同步到zookeeper上。如果consumer读取数据出了问题,offsite也会在zookeeper上同步。因此,如果consumer处理失败了,会继续执行下一条。这往往是不对的行为。因此,Best Practice是一旦consumer处理失败,直接让整个conusmer group抛Exception终止,但是最后读的这一条数据是丢失了,因为在zookeeper里面的offsite已经+1了。等再次启动conusmer group的时候,已经从下一条开始读取处理了。
     
    Low level api是consumer读的partition的offsite在consumer自己的程序中维护。不会同步到zookeeper上。但是为了kafka manager能够方便的监控,一般也会手动的同步到zookeeper上。这样的好处是一旦读取某个message的consumer失败了,这条message的offsite我们自己维护,我们不会+1。下次再启动的时候,还会从这个offsite开始读。这样可以做到exactly once对于数据的准确性有保证。
     
     
    对于Consumer group:
    1. 允许consumer group(包含多个consumer,如一个集群同时消费)对一个topic进行消费,不同的consumer group之间独立消费。
    2. 为了对减小一个consumer group中不同consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不同的partition。
     
     
    Consumer与Partition的关系:
    - 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
    - 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
    - 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
    - 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
    - High-level接口中获取不到数据的时候是会block的
     
    负载低的情况下可以每个线程消费多个partition。但负载高的情况下,Consumer 线程数最好和Partition数量保持一致。如果还是消费不过来,应该再开 Consumer 进程,进程内线程数同样和分区数一致。
     
    消费消息时,kafka client需指定topic以及partition number(每个partition对应一个逻辑日志流,如topic代表某个产品线,partition代表产品线的日志按天切分的结果),consumer client订阅后,就可迭代读取消息,如果没有消息,consumer client会阻塞直到有新的消息发布。consumer可以累积确认接收到的消息,当其确认了某个offset的消息,意味着之前的消息也都已成功接收到,此时broker会更新zookeeper上地offset registry。
     
    5.3  高效的数据传输
    1.  发布者每次可发布多条消息(将消息加到一个消息集合中发布), consumer每次迭代消费一条消息。

    2.  不创建单独的cache,使用系统的page cache。发布者顺序发布,订阅者通常比发布者滞后一点点,直接使用Linux的page cache效果也比较后,同时减少了cache管理及垃圾收集的开销。

    3.  使用sendfile优化网络传输,减少一次内存拷贝。
     
    6.Kafka 与 Zookeeper
     
    6.1 Zookeeper 协调控制
    1. 管理broker与consumer的动态加入与离开。(Producer不需要管理,随便一台计算机都可以作为Producer向Kakfa Broker发消息)
    2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一
       个consumer group内的多个consumer的消费负载平衡。(因为一个comsumer消费一个或多个partition,一个partition只能被一个consumer消费)

    3.  维护消费关系及每个partition的消费信息。

     

    6.2 Zookeeper上的细节:

    1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。

    2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。

    3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

     

    转自:https://blog.csdn.net/YChenFeng/article/details/74980531

    展开全文
  • 很不错的的kafka详解

    万次阅读 多人点赞 2018-08-21 14:07:45
    1.Kafka独特设计在什么地方?2.Kafka如何搭建及创建topic、发送消息、消费消息?3.如何书写Kafka程序?4.数据传输的事务定义有哪三种?5.Kafka判断一个节点是否活着有哪两个条件?6.producer是否直接将数据发送到...

    1.Kafka独特设计在什么地方?
    2.Kafka如何搭建及创建topic、发送消息、消费消息?
    3.如何书写Kafka程序?
    4.数据传输的事务定义有哪三种?
    5.Kafka判断一个节点是否活着有哪两个条件?
    6.producer是否直接将数据发送到broker的leader(主节点)?
    7.Kafa consumer是否可以消费指定分区消息?
    8.Kafka消息是采用Pull模式,还是Push模式?
    9.Procuder API有哪两种?
    10.Kafka存储在硬盘上的消息格式是什么?




    一、基本概念

    介绍

    Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。

    这个独特的设计是什么样的呢?

    首先让我们看几个基本的消息系统术语:
    Kafka将消息以topic为单位进行归纳。
    将向Kafka topic发布消息的程序成为producers.
    将预订topics并消费消息的程序成为consumer.
    Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
    producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:

     

    客户端和服务端通过TCP协议通信。Kafka提供了Java客户端,并且对多种语言都提供了支持。


    Topics 和Logs

    先来看一下Kafka提供的一个抽象概念:topic.
    一个topic是对一组消息的归纳。对每个topic,Kafka 对它的日志进行了分区,如下图所示:
     

    每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
    在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。

    实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:一般情况下随着consumer不断的读取消息,这offset的值不断增加,但其实consumer可以以任意的顺序读取消息,比如它可以将offset设置成为一个旧的值来重读之前的消息。

    以上特点的结合,使Kafka consumers非常的轻量级:它们可以在不对集群和其他consumer造成影响的情况下读取消息。你可以使用命令行来"tail"消息而不会对其他正在消费消息的consumer造成影响。

    将日志分区可以达到以下目的:首先这使得每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能。

    分布式

    每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。
    每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader.如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。

    Producers

    Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。


    Consumers

    发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。

     

    由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个

    相比传统的消息系统,Kafka可以很好的保证有序性。
    传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

    在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

    Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

    ###########################################

    二、环境搭建


    Step 1: 下载Kafka

    点击下载最新的版本并解压.
     

    1. > tar -xzf kafka_2.9.2-0.8.1.1.tgz
    2. > cd kafka_2.9.2-0.8.1.1

    复制代码




    Step 2: 启动服务

    Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

    1. > bin/zookeeper-server-start.sh config/zookeeper.properties &
    2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    3. ...

    复制代码



    现在启动Kafka:

    1. > bin/kafka-server-start.sh config/server.properties
    2. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    4. ...

    复制代码



    Step 3: 创建 topic

    创建一个叫做“test”的topic,它只有一个分区,一个副本。

    1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    复制代码



    可以通过list命令查看创建的topic:

    1. > bin/kafka-topics.sh --list --zookeeper localhost:2181
    2. test

    复制代码



    除了手动创建topic,还可以配置broker让它自动创建topic.

    Step 4:发送消息.

    Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

    运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

    1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    2. This is a messageThis is another message

    复制代码



    ctrl+c可以退出发送。

    Step 5: 启动consumer

    Kafka also has a command line consumer that will dump out messages to standard output.
    Kafka也有一个命令行consumer可以读取消息并输出到标准输出:

    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    2. This is a message
    3. This is another message

    复制代码



    你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
    这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

    Step 6: 搭建一个多个broker的集群

    刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
    首先为每个节点编写配置文件:
     

    1. > cp config/server.properties config/server-1.properties
    2. > cp config/server.properties config/server-2.properties

    复制代码



    在拷贝出的新文件中添加以下参数:

    1. config/server-1.properties:
    2.     broker.id=1
    3.     port=9093
    4.     log.dir=/tmp/kafka-logs-1

    复制代码


     

    1. config/server-2.properties:
    2.     broker.id=2
    3.     port=9094
    4.     log.dir=/tmp/kafka-logs-2

    复制代码



    broker.id在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。

    We already have Zookeeper and our single node started, so we just need to start the two new nodes:
    刚才已经启动可Zookeeper和一个节点,现在启动另外两个节点:

    1. > bin/kafka-server-start.sh config/server-1.properties &
    2. ...
    3. > bin/kafka-server-start.sh config/server-2.properties &
    4. ...

    复制代码



    创建一个拥有3个副本的topic:

    1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    复制代码



    现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“"describe topics”命令就可以了:

    1. > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

    复制代码

     

    1. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
    2.         Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

    复制代码



    下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
    leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
    replicas:列出了所有的副本节点,不管节点是否在服务中.
    isr:是正在服务中的节点.
    在我们的例子中,节点1是作为leader运行。
    向topic发送消息:
     

    1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

    复制代码

     

    1. ...
    2. my test message 1my test message 2^C

    复制代码



    消费这些消息:

    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

    复制代码

     

    1. ...
    2. my test message 1
    3. my test message 2
    4. ^C

    复制代码



    测试一下容错能力.Broker 1作为leader运行,现在我们kill掉它:

    1. > ps | grep server-1.properties7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
    2. > kill -9 7564

    复制代码



    另外一个节点被选做了leader,node 1 不再出现在 in-sync 副本列表中:

    1. > bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
    2. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
    3.         Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0

    复制代码



    虽然最初负责续写消息的leader down掉了,但之前的消息还是可以消费的:

    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    2. ...
    3. my test message 1
    4. my test message 2

    复制代码




    看来Kafka的容错机制还是不错的。

    ################################################

    三、搭建Kafka开发环境

    我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。下面我们来搭建kafka的开发环境。
    添加依赖

    搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。
    创建好maven项目后,在pom.xml中添加以下依赖:
     

    1. <dependency>
    2.         <groupId> org.apache.kafka</groupId >
    3.         <artifactId> kafka_2.10</artifactId >
    4.         <version> 0.8.0</ version>
    5. </dependency>

    复制代码


    添加依赖后你会发现有两个jar包的依赖找不到。没关系我都帮你想好了,点击这里下载这两个jar包,解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,比如我的本地仓库是d:\mvn,完成后我的目录结构是这样的:

     


    配置程序

    首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:

    1. package com.sohu.kafkademon;
    2. public interface KafkaProperties
    3. {
    4.     final static String zkConnect = "10.22.10.139:2181";
    5.     final static String groupId = "group1";
    6.     final static String topic = "topic1";
    7.     final static String kafkaServerURL = "10.22.10.139";
    8.     final static int kafkaServerPort = 9092;
    9.     final static int kafkaProducerBufferSize = 64 * 1024;
    10.     final static int connectionTimeOut = 20000;
    11.     final static int reconnectInterval = 10000;
    12.     final static String topic2 = "topic2";
    13.     final static String topic3 = "topic3";
    14.     final static String clientId = "SimpleConsumerDemoClient";
    15. }

    复制代码


    producer
     

    1. package com.sohu.kafkademon;
    2. import java.util.Properties;
    3. import kafka.producer.KeyedMessage;
    4. import kafka.producer.ProducerConfig;
    5. /**
    6. * @author leicui bourne_cui@163.com
    7. */
    8. public class KafkaProducer extends Thread
    9. {
    10.     private final kafka.javaapi.producer.Producer<Integer, String> producer;
    11.     private final String topic;
    12.     private final Properties props = new Properties();
    13.     public KafkaProducer(String topic)
    14.     {
    15.         props.put("serializer.class", "kafka.serializer.StringEncoder");
    16.         props.put("metadata.broker.list", "10.22.10.139:9092");
    17.         producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
    18.         this.topic = topic;
    19.     }
    20.     @Override
    21.     public void run() {
    22.         int messageNo = 1;
    23.         while (true)
    24.         {
    25.             String messageStr = new String("Message_" + messageNo);
    26.             System.out.println("Send:" + messageStr);
    27.             producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
    28.             messageNo++;
    29.             try {
    30.                 sleep(3000);
    31.             } catch (InterruptedException e) {
    32.                 // TODO Auto-generated catch block
    33.                 e.printStackTrace();
    34.             }
    35.         }
    36.     }
    37. }

    复制代码


    consumer

    1. package com.sohu.kafkademon;
    2. import java.util.HashMap;
    3. import java.util.List;
    4. import java.util.Map;
    5. import java.util.Properties;
    6. import kafka.consumer.ConsumerConfig;
    7. import kafka.consumer.ConsumerIterator;
    8. import kafka.consumer.KafkaStream;
    9. import kafka.javaapi.consumer.ConsumerConnector;
    10. /**
    11. * @author leicui bourne_cui@163.com
    12. */
    13. public class KafkaConsumer extends Thread
    14. {
    15.     private final ConsumerConnector consumer;
    16.     private final String topic;
    17.     public KafkaConsumer(String topic)
    18.     {
    19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    20.                 createConsumerConfig());
    21.         this.topic = topic;
    22.     }
    23.     private static ConsumerConfig createConsumerConfig()
    24.     {
    25.         Properties props = new Properties();
    26.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
    27.         props.put("group.id", KafkaProperties.groupId);
    28.         props.put("zookeeper.session.timeout.ms", "40000");
    29.         props.put("zookeeper.sync.time.ms", "200");
    30.         props.put("auto.commit.interval.ms", "1000");
    31.         return new ConsumerConfig(props);
    32.     }
    33.     @Override
    34.     public void run() {
    35.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    36.         topicCountMap.put(topic, new Integer(1));
    37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    38.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    39.         ConsumerIterator<byte[], byte[]> it = stream.iterator();
    40.         while (it.hasNext()) {
    41.             System.out.println("receive:" + new String(it.next().message()));
    42.             try {
    43.                 sleep(3000);
    44.             } catch (InterruptedException e) {
    45.                 e.printStackTrace();
    46.             }
    47.         }
    48.     }
    49. }

    复制代码


    简单的发送接收

    运行下面这个程序,就可以进行简单的发送接收消息了:

    1. package com.sohu.kafkademon;
    2. /**
    3. * @author leicui bourne_cui@163.com
    4. */
    5. public class KafkaConsumerProducerDemo
    6. {
    7.     public static void main(String[] args)
    8.     {
    9.         KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
    10.         producerThread.start();
    11.         KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
    12.         consumerThread.start();
    13.     }
    14. }

    复制代码


    高级别的consumer

    下面是比较负载的发送接收的程序:

    1. package com.sohu.kafkademon;
    2. import java.util.HashMap;
    3. import java.util.List;
    4. import java.util.Map;
    5. import java.util.Properties;
    6. import kafka.consumer.ConsumerConfig;
    7. import kafka.consumer.ConsumerIterator;
    8. import kafka.consumer.KafkaStream;
    9. import kafka.javaapi.consumer.ConsumerConnector;
    10. /**
    11. * @author leicui bourne_cui@163.com
    12. */
    13. public class KafkaConsumer extends Thread
    14. {
    15.     private final ConsumerConnector consumer;
    16.     private final String topic;
    17.     public KafkaConsumer(String topic)
    18.     {
    19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    20.                 createConsumerConfig());
    21.         this.topic = topic;
    22.     }
    23.     private static ConsumerConfig createConsumerConfig()
    24.     {
    25.         Properties props = new Properties();
    26.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
    27.         props.put("group.id", KafkaProperties.groupId);
    28.         props.put("zookeeper.session.timeout.ms", "40000");
    29.         props.put("zookeeper.sync.time.ms", "200");
    30.         props.put("auto.commit.interval.ms", "1000");
    31.         return new ConsumerConfig(props);
    32.     }
    33.     @Override
    34.     public void run() {
    35.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    36.         topicCountMap.put(topic, new Integer(1));
    37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    38.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    39.         ConsumerIterator<byte[], byte[]> it = stream.iterator();
    40.         while (it.hasNext()) {
    41.             System.out.println("receive:" + new String(it.next().message()));
    42.             try {
    43.                 sleep(3000);
    44.             } catch (InterruptedException e) {
    45.                 e.printStackTrace();
    46.             }
    47.         }
    48.     }
    49. }

    复制代码


    ############################################################

    四、数据持久化


    不要畏惧文件系统!

    Kafka大量依赖文件系统去存储和缓存消息。对于硬盘有个传统的观念是硬盘总是很慢,这使很多人怀疑基于文件系统的架构能否提供优异的性能。实际上硬盘的快慢完全取决于使用它的方式。设计良好的硬盘架构可以和内存一样快。

    在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍。现代的操作系统都对次做了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,写的时候将各种微小琐碎的逻辑写入组织合并成一次较大的物理写入。对此的深入讨论可以查看这里,它们发现线性的访问磁盘,很多时候比随机的内存访问快得多。

    为了提高性能,现代操作系统往往使用内存作为磁盘的缓存,现代操作系统乐于把所有空闲内存用作磁盘缓存,虽然这可能在缓存回收和重新分配时牺牲一些性能。所有的磁盘读写操作都会经过这个缓存,这不太可能被绕开除非直接使用I/O。所以虽然每个程序都在自己的线程里只缓存了一份数据,但在操作系统的缓存里还有一份,这等于存了两份数据。

    另外再来讨论一下JVM,以下两个事实是众所周知的:

    •Java对象占用空间是非常大的,差不多是要存储的数据的两倍甚至更高。

    •随着堆中数据量的增加,垃圾回收回变的越来越困难。

    基于以上分析,如果把数据缓存在内存里,因为需要存储两份,不得不使用两倍的内存空间,Kafka基于JVM,又不得不将空间再次加倍,再加上要避免GC带来的性能影响,在一个32G内存的机器上,不得不使用到28-30G的内存空间。并且当系统重启的时候,又必须要将数据刷到内存中( 10GB 内存差不多要用10分钟),就算使用冷刷新(不是一次性刷进内存,而是在使用数据的时候没有就刷到内存)也会导致最初的时候新能非常慢。但是使用文件系统,即使系统重启了,也不需要刷新数据。使用文件系统也简化了维护数据一致性的逻辑。

    所以与传统的将数据缓存在内存中然后刷到硬盘的设计不同,Kafka直接将数据写到了文件系统的日志中。

    常量时间的操作效率

    在大多数的消息系统中,数据持久化的机制往往是为每个cosumer提供一个B树或者其他的随机读写的数据结构。B树当然是很棒的,但是也带了一些代价:比如B树的复杂度是O(log N),O(log N)通常被认为就是常量复杂度了,但对于硬盘操作来说并非如此。磁盘进行一次搜索需要10ms,每个硬盘在同一时间只能进行一次搜索,这样并发处理就成了问题。虽然存储系统使用缓存进行了大量优化,但是对于树结构的性能的观察结果却表明,它的性能往往随着数据的增长而线性下降,数据增长一倍,速度就会降低一倍。

    直观的讲,对于主要用于日志处理的消息系统,数据的持久化可以简单的通过将数据追加到文件中实现,读的时候从文件中读就好了。这样做的好处是读和写都是 O(1) 的,并且读操作不会阻塞写操作和其他操作。这样带来的性能优势是很明显的,因为性能和数据的大小没有关系了。

    既然可以使用几乎没有容量限制(相对于内存来说)的硬盘空间建立消息系统,就可以在没有性能损失的情况下提供一些一般消息系统不具备的特性。比如,一般的消息系统都是在消息被消费后立即删除,Kafka却可以将消息保存一段时间(比如一星期),这给consumer提供了很好的机动性和灵活性,这点在今后的文章中会有详述。

    ############################################################

    五、消息传输的事务定义

    之前讨论了consumer和producer是怎么工作的,现在来讨论一下数据传输方面。数据传输的事务定义通常有以下三种级别:

    • 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
    • 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
    • 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。

    大多数消息系统声称可以做到“精确的一次”,但是仔细阅读它们的的文档可以看到里面存在误导,比如没有说明当consumer或producer失败时怎么样,或者当有多个consumer并行时怎么样,或写入硬盘的数据丢失时又会怎么样。kafka的做法要更先进一些。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。关于副本的活动的概念,下节文档会讨论。现在假设broker是不会down的。

    如果producer发布消息时发生了网络错误,但又不确定实在提交之前发生的还是提交之后发生的,这种情况虽然不常见,但是必须考虑进去,现在Kafka版本还没有解决这个问题,将来的版本正在努力尝试解决。

    并不是所有的情况都需要“精确的一次”这样高的级别,Kafka允许producer灵活的指定级别。比如producer可以指定必须等待消息被提交的通知,或者完全的异步发送消息而不等待任何通知,或者仅仅等待leader声明它拿到了消息(followers没有必要)。

    现在从consumer的方面考虑这个问题,所有的副本都有相同的日志文件和相同的offset,consumer维护自己消费的消息的offset,如果consumer不会崩溃当然可以在内存中保存这个值,当然谁也不能保证这点。如果consumer崩溃了,会有另外一个consumer接着消费消息,它需要从一个合适的offset继续处理。这种情况下可以有以下选择:
     

    • consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
    • consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
    • “精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。



    ############################################################

    六、性能优化

    Kafka在提高效率方面做了很大努力。Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作。读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也尽量使读的操作更轻量化。

    我们之前讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的I/O操作和太多的字节拷贝。I/O问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中。

    消息集(message set)
    为了避免这些问题,Kafka建立了“消息集(message set)”的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。Producer把消息集一块发送给服务端,而不是一条条的发送;服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的I/O操作。consumer也可以一次性的请求一个消息集。

    另外一个性能优化是在字节拷贝方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka使用了标准的二进制消息格式,这个格式可以在producer,broker和producer之间共享而无需做任何改动。

    zero copy
    Broker维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式producer和consumer是共享的,这使得Kafka可以一个很重要的点进行优化:消息在网络上的传递。现代的unix操作系统提供了高性能的将数据从页面缓存发送到socket的系统函数,在linux中,这个函数是sendfile.

    为了更好的理解sendfile的好处,我们先来看下一般将数据从文件发送到socket的数据流向:

    • 操作系统把数据从文件拷贝内核中的页缓存中
    • 应用程序从页缓存从把数据拷贝自己的内存缓存中
    • 应用程序将数据写入到内核中socket缓存中
    • 操作系统把数据从socket缓存中拷贝到网卡接口缓存,从这里发送到网络上。


    这显然是低效率的,有4次拷贝和2次系统调用。Sendfile通过直接将数据从页面缓存发送网卡接口缓存,避免了重复拷贝,大大的优化了性能。
    在一个多consumers的场景里,数据仅仅被拷贝到页面缓存一次而不是每次消费消息的时候都重复的进行拷贝。这使得消息以近乎网络带宽的速率发送出去。这样在磁盘层面你几乎看不到任何的读操作,因为数据都是从页面缓存中直接发送到网络上去了。
    这篇文章详细介绍了sendfile和zero-copy技术在Java方面的应用。

    数据压缩
    很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有Kafka支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。
    Kafka采用了端到端的压缩:因为有“消息集”的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到consumer,消息从producer发出到consumer拿到都被是压缩的,只有在consumer使用的时候才被解压缩,所以叫做“端到端的压缩”。
    Kafka支持GZIP和Snappy压缩协议。更详细的内容可以查看这里

    ##########################################################

    七、Producer和Consumer


    Kafka Producer消息发送
    producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发。为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。

    客户端控制消息将被分发到哪个分区。可以通过负载均衡随机的选择,或者使用分区函数。Kafka允许用户实现分区函数,指定分区的key,将消息hash到不同的分区上(当然有需要的话,也可以覆盖这个分区函数自己实现逻辑).比如如果你指定的key是user id,那么同一个用户发送的消息都被发送到同一个分区上。经过分区之后,consumer就可以有目的的消费某个分区的消息。

    异步发送
    批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。这个策略可以配置的,比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去(比如100条消息就发送,或者每5秒发送一次)。这种策略将大大减少服务端的I/O次数。

    既然缓存是在producer端进行的,那么当producer崩溃时,这些消息就会丢失。Kafka0.8.1的异步发送模式还不支持回调,就不能在发送出错时进行处理。Kafka 0.9可能会增加这样的回调函数。见Proposed Producer API.

    Kafka Consumer
    Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息。consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息。customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。

    推还是拉?
    Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。

    一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式。

    Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略。

    Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。

    消费状态跟踪
    对消费消息状态的记录也是很重要的。
    大部分消息系统在broker端的维护消息被消费的记录:一个消息被分发到consumer后broker就马上进行标记或者等待customer的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。

    但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,一旦consumer处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到consumer已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果consumer处理消息成功了但是向broker发送响应时失败了,这条消息将被消费两次。第二个问题时,broker必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,
    Kafka采用了不同的策略。Topic被分成了若干分区,每个分区在同一时间只被一个consumer消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。

    这带来了另外一个好处:consumer可以把offset调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?consumer发现解析数据的程序有bug,在修改bug后再来解析一次消息,看起来是很合理的额呀!

    离线处理消息
    高级的数据持久化允许consumer每个隔一段时间批量的将数据加载到线下系统中比如Hadoop或者数据仓库。这种情况下,Hadoop可以将加载任务分拆,拆成每个broker或每个topic或每个分区一个加载任务。Hadoop具有任务管理功能,当一个任务失败了就可以重启而不用担心数据被重新加载,只要从上次加载的位置继续加载消息就可以了。

    #########################################################


    八、主从同步


    Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

    Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
    创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。flowers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

    许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否

    着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:

    • 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
    • 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。

    符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。 

    只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数request.required.acks决定的。
    Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。

    Leader的选择
    Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

    如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。

    Kafaka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

    一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
    实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:

    • 等待ISR中的任何一个节点恢复并担任leader。
    • 选择所有节点中(不只是ISR)第一个恢复的节点作为leader.

    这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
    这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

    副本管理
    以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
    优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

    ###################################################

    九、客户端API

    Kafka Producer APIs
    Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:

    1. class Producer {
    2. /* 将消息发送到指定分区 */
    3. publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
    4. /* 批量发送一批消息 */
    5. publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
    6. /* 关闭producer */
    7. publicvoid close();
    8. }

    复制代码




    Producer API提供了以下功能:

    • 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.time和batch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
    • 自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder。
      • interface Encoder<T> {
      • public Message toMessage(T data);
      • }
    • 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
    • 通过分区函数kafka.producer.Partitioner类对消息分区。
      • interface Partitioner<T> {
      • int partition(T key, int numPartitions);
      • }

      分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。
    •  

    KafKa Consumer APIs

    Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。
    高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

    低级别的API

    1. class SimpleConsumer {
    2. /*向一个broker发送读取请求并得到消息集 */
    3. public ByteBufferMessageSet fetch(FetchRequest request);
    4. /*向一个broker发送读取请求并得到一个相应集 */
    5. public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    6. /**
    7. * 得到指定时间之前的offsets
    8. * 返回值是offsets列表,以倒序排序
    9. * @param time: 时间,毫秒,
    10. * 如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
    11. * 如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
    12. */
    13. publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    14. }

    复制代码



    低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

    高级别的API

    1. /* 创建连接 */
    2. ConsumerConnector connector = Consumer.create(consumerConfig);
    3. interface ConsumerConnector {
    4. /**
    5. * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)
    6. * Input: a map of <topic, #streams>
    7. * Output: a map of <topic, list of message streams>
    8. */
    9. public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
    10. /**
    11. * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代,
    12. * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
    13. */
    14. public List<KafkaStream> createMessageStreamsByFilter(
    15. TopicFilter topicFilter, int numStreams);
    16. /* 提交目前消费到的offset */
    17. public commitOffsets()
    18. /* 关闭连接 */
    19. public shutdown()
    20. }

    复制代码




    这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

    每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

    #######################################################

    十、消息和日志



    消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。

    1. /**
    2. * 具有N个字节的消息的格式如下
    3. *
    4. * 如果版本号是0
    5. *
    6. * 1. 1个字节的 "magic" 标记
    7. *
    8. * 2. 4个字节的CRC32校验码
    9. *
    10. * 3. N - 5个字节的具体信息
    11. *
    12. * 如果版本号是1
    13. *
    14. * 1. 1个字节的 "magic" 标记
    15. *
    16. * 2.1个字节的参数允许标注一些附加的信息比如是否压缩了,解码类型等
    17. *
    18. * 3.4个字节的CRC32校验码
    19. *
    20. * 4. N - 6 个字节的具体信息
    21. *
    22. */

    复制代码




    日志一个叫做“my_topic”且有两个分区的的topic,它的日志有两个文件夹组成,my_topic_0和my_topic_1,每个文件夹里放着具体的数据文件,每个数据文件都是一系列的日志实体,每个日志实体有一个4个字节的整数N标注消息的长度,后边跟着N个字节的消息。每个消息都可以由一个64位的整数offset标注,offset标注了这条消息在发送到这个分区的消息流中的起始位置。每个日志文件的名称都是这个文件第一条日志的offset.所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名字的差就是一个数字S,S差不多就是配置文件中指定的日志文件的最大容量。
    消息的格式都由一个统一的接口维护,所以消息可以在producer,broker和consumer之间无缝的传递。存储在硬盘上的消息格式如下所示:

    • 消息长度: 4 bytes (value: 1+4+n)
    • 版本号: 1 byte
    • CRC校验码: 4 bytes
    • 具体的消息: n bytes



     


    写操作消息被不断的追加到最后一个日志的末尾,当日志的大小达到一个指定的值时就会产生一个新的文件。对于写操作有两个参数,一个规定了消息的数量达到这个值时必须将数据刷新到硬盘上,另外一个规定了刷新到硬盘的时间间隔,这对数据的持久性是个保证,在系统崩溃的时候只会丢失一定数量的消息或者一个时间段的消息。

    读操作
    读操作需要两个参数:一个64位的offset和一个S字节的最大读取量。S通常比单个消息的大小要大,但在一些个别消息比较大的情况下,S会小于单个消息的大小。这种情况下读操作会不断重试,每次重试都会将读取量加倍,直到读取到一个完整的消息。可以配置单个消息的最大值,这样服务器就会拒绝大小超过这个值的消息。也可以给客户端指定一个尝试读取的最大上限,避免为了读到一个完整的消息而无限次的重试。
    在实际执行读取操纵时,首先需要定位数据所在的日志文件,然后根据offset计算出在这个日志中的offset(前面的的offset是整个分区的offset),然后在这个offset的位置进行读取。定位操作是由二分查找法完成的,Kafka在内存中为每个文件维护了offset的范围。

    下面是发送给consumer的结果的格式:
     

    1. MessageSetSend (fetch result)
    2.  
    3. total length     : 4 bytes
    4. error code       : 2 bytes
    5. message 1        : x bytes
    6. ...
    7. message n        : x bytes
    8. MultiMessageSetSend (multiFetch result)
    9.  
    10. total length       : 4 bytes
    11. error code         : 2 bytes
    12. messageSetSend 1
    13. ...
    14. messageSetSend n

    复制代码



    删除
    日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间删除),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

    可靠性保证
    日志文件有一个可配置的参数M,缓存超过这个数量的消息将被强行刷新到硬盘。一个日志矫正线程将循环检查最新的日志文件中的消息确认每个消息都是合法的。合法的标准为:所有文件的大小的和最大的offset小于日志文件的大小,并且消息的CRC32校验码与存储在消息实体中的校验码一致。如果在某个offset发现不合法的消息,从这个offset到下一个合法的offset之间的内容将被移除。
    有两种情况必须考虑:
    1,当发生崩溃时有些数据块未能写入。
    2,写入了一些空白数据块。第二种情况的原因是,对于每个文件,操作系统都有一个inode(inode是指在许多“类Unix文件系统”中的一种数据结构。每个inode保存了文件系统中的一个文件系统对象,包括文件、目录、大小、设备文件、socket、管道, 等等),但无法保证更新inode和写入数据的顺序,当inode保存的大小信息被更新了,但写入数据时发生了崩溃,就产生了空白数据块。CRC校验码可以检查这些块并移除,当然因为崩溃而未写入的数据块也就丢失了。

     

    发表与2015-5

    转载自:http://www.aboutyun.com/thread-12882-1-1.html

    展开全文
  • 转自: http://blog.csdn.net/lizhitao/article/details/39499283 估计...apache kafka中国社区QQ群:162272557 目前QQ群1,2,3已满,请加群4 中国社区QQ群2:414762562 已满 中国社区QQ群3:191278841已...
  • Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建Apache Kafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apache kafka消息服务 2)kafak安装与使用 3)...
  • Kafka入门教程(一)

    万次阅读 多人点赞 2018-08-09 14:59:47
    1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以...
  • 为什么Kafka那么快

    万次阅读 多人点赞 2018-05-15 15:30:49
    网上有很多Kafka的测试文章,测试结果通常都是“吊打”其他MQ。感慨它的牛B之余我觉得必要仔细分析一下它如此快速的原因。这篇文章不同于其他介绍Kafka使用或者技术实现的文章,我会重点解释——为什么真快。(当然...
  • Kafka启动关闭及其相关命令

    万次阅读 2018-09-26 09:24:56
    开启zookeeper命令(备注:先进入zookeeper的bin目录) ./zkServer.sh start 关闭zookeeper命令(备注:先进入...Kafka启动命令(备注:先进入kafka目录) 常规模式启动kafka bin/kafka-server-start.sh config/serve...
  • 查看kafka版本

    万次阅读 2018-06-19 14:45:22
    很奇怪,kafka并没有什么命令可以查看具体的版本,那么怎么去查看安装的kafka版本呢?1: 如果安装的是apache官网的kafka,那么进入机器如下对应的目录: 标注的 1.0.0就是 kafka的版本2: 如果安装的是confulent的版本,...
  • kafka生产者与消费者相关命令行

    万次阅读 2018-08-19 21:55:41
    1,开启zookeeper集群 startzk.sh ...2,开启kafka可视化界面 kafka-manager : start-kafka-manager.sh 3,生产者操作: kafka-console-producer.sh --broker-list node1:9092 --topic my-kafka-topic //m...
  • 查看集群中kafka的Version(版本)

    万次阅读 多人点赞 2017-01-10 09:37:43
    linux中没有kafka -version的方法查看kafka相关版本,可以尝试用一下方法:
  • Kafka Java API示例

    万次阅读 2016-04-01 13:21:40
    Kafka Java API示例
  • kafka集群重启方法

    万次阅读 2019-09-29 08:53:15
    2.进入kafka:cd kafka 3.重启kafka:./startup.sh 4.进入zookeeper:cd /home/tmkj/zookeeper/bin 5.重启zookeeper:./zkServer.sh restart 按以上步骤操作集群中的其他服务器,即可重启kafka集群。 ...
  • org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.&amp;amp;lt;init&amp;amp;gt;(KafkaConsumer.java:79...
  • 输入:jps显示的有进程号代表成功
  • Kafka三款监控工具比较

    万次阅读 2016-07-10 15:18:01
    通过研究,发现主流的三种kafka监控程序分别...Kafka Web Conslole Kafka ManagerKafkaOffsetMonitor 现在依次介绍以上三种工具: Kafka Web Conslole 使用Kafka Web Console,可以监控: Bro
  • kafka查看topic和消息内容命令

    万次阅读 2019-07-21 22:01:50
    1、查询topic,进入kafka目录: bin/kafka-topics.sh --list --zookeeper localhost:2181 2、查询topic内容: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-...
  • kafka 状态查看命令记录

    万次阅读 2018-06-26 10:45:16
    查看topic信息./kafka-topics.sh --zookeeper 192.168.52.131:2181 --topic "test-topic" --describe查看指定group信息./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.52.131:...
1 2 3 4 5 ... 20
收藏数 150,190
精华内容 60,076
关键字:

kafka