发布_发布app - CSDN
精华内容
参与话题
  • 怎样发布程序

    千次阅读 2016-04-11 21:14:11
    发布程序的方法很多.如果你只想直接在别人电脑上运行,就不用下载发布程序的软件了.如果想打包软件,并且可以在别的电脑上安装使用,则需要下载第三方发布软件.有一个免费的发布软件叫做Inno Setup. 附上百度云链接...

    发布程序的方法很多.如果你只想直接在别人电脑上运行,就不用下载发布程序的软件了.如果想打包软件,并且可以在别的电脑上安装使用,则需要下载第三方发布软件.有一个免费的发布软件叫做Inno Setup.
    附上百度云链接链接:http://pan.baidu.com/s/1hr47FyW 密码:a62p

    注意:我们一般在Debug模式下写程序调试程序,一旦完成想发布的时候一般都是发布Release版本,所以这里不讨论Debug版本的发布方式.

    下面所涉及到的库都是VS2015的,我也是在VS2015测试通过的,其它版本应该差不多.

    发布win32控制台程序:
    这个可以把生成的exe文件直接发送到别的电脑上运行.发布的时候也不需要添加dll文件.

    发布win32应用程序:
    同上

    发布MFC程序:
    1.设置MFC项目属性

    平台工具集选择兼容WINXP,否则在XP电脑上运行不了
    MFC的使用,如果静态的话可以直接发到别的电脑上运行.
    字符集,现在windows一般都使用Unicode编码

    2.生成Release版本的MFC程序

    3.打开Inno Setup软件,选择如图方式创建脚本

    4.

    直接下一步

    5.

    a.程序名称就是发布后,别人下载安装后所显示的程序名称
    b.版本随便填,一般第一次发布都是1.0版本,然后慢慢更新提升
    c.发布者随便填,填啥都行
    d.(因为没试过,所以并不知道怎么用)

    6.

    下一步

    7.

    其它应用程序文件里面加入什么文件都可以.
    这里必须的三个文件为:
    mfc140u.dll MFC运行库
    vcruntime140.dll C/C++运行库
    api-ms-win-crt-runtime-l1-1-0.dll

    8.

    下一步

    9.

    一般不需要填,下一步

    10.

    下一步

    11

    a.输出文件夹随便建一个就行,不过一般安装文件夹都写成Setup
    b.文件名一般为setup
    c.安装程序的图标,不是程序图标.
    d.安装的时候是否需要密码

    12.

    下一步

    13.点击完成即可

    14.编辑脚本(也可以稍后自己编辑)

    15.保存脚本,方便以后修改

    16.成功后如图所示

    那个程序的图标就是安装程序的图标,是我自己添加上去的.

    17.接着你可以压缩后发布了.别人就只用解压安装就可以使用了.
    如果你的程序很大,需要很多的动态库静态库,则需要一起打包放进去.

    展开全文
  • 发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号:T | T 一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,...

    发布&订阅的消息系统 Kafka的深度解析

    2015-01-27 10:25 Jason Guo Jason Guo的博客 字号:T | T
    一键收藏,随时查看,分享好友!

    一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

    AD:【51CTO技术沙龙】春节献礼:移动APP创新之美_UI设计

    背景介绍

    Kafka简介

    Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

    • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
    • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
    • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
    • 同时支持离线数据处理和实时数据处理

    为什么要用Message Queue

    • 解耦
      在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

    • 冗余
      有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。

    • 扩展性
      因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

    • 灵活性 & 峰值处理能力
      在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    • 可恢复性
      当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。

    • 送达保证
      消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。

    • 顺序保证
      在大多使用场景下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。

    • 缓冲
      在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行—写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。

    • 理解数据流
      在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。

    • 异步通信
      很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

    常用Message Queue对比

    • RabbitMQ
      RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

    • Redis
      Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

    • ZeroMQ
      ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。

    • ActiveMQ
      ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

    • Kafka/Jafka
      Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

    Kafka解析

    Terminology

    • Broker
      Kafka集群包含一个或多个服务器,这种服务器被称为broker
    • Topic
      每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
    • Partition
      parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
    • Producer
      负责发布消息到Kafka broker
    • Consumer
      消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

    Kafka架构

    如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

    Push vs. Pull

    作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。
    push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

    Topic & Partition

    Topic在逻辑上可以被认为是一个在的queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。

     

    每个日志文件都是“log entries”序列,每一个log entry包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消费格式如下:

    message length : 4 bytes (value: 1+4+n)

    “magic” value : 1 byte

    crc : 4 bytes

    payload : n bytes

    这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,如下图所示。

    因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

    每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。

    1. # The default number of log partitions per topic. More partitions allow greater  
    2. # parallelism for consumption, but this will also result in more files across  
    3. # the brokers.  
    4. num.partitions=3 

    在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)

    1. import kafka.producer.Partitioner;  
    2. import kafka.utils.VerifiableProperties;  
    3.  
    4. public class JasonPartitioner<T> implements Partitioner {  
    5.  
    6.     public JasonPartitioner(VerifiableProperties verifiableProperties) {}  
    7.  
    8.     @Override 
    9.     public int partition(Object key, int numPartitions) {  
    10.         try {  
    11.             int partitionNum = Integer.parseInt((String) key);  
    12.             return Math.abs(Integer.parseInt((String) key) % numPartitions);  
    13.         } catch (Exception e) {  
    14.             return Math.abs(key.hashCode() % numPartitions);  
    15.         }  
    16.     }  
    17. }  

    如果将上例中的class作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic2(包含4个partition)。

    1. public void sendMessage() throws InterruptedException{  
    2. for(int i = 1; i <= 5; i++){  
    3.       List messageList = new ArrayList<KeyedMessage<String, String>>();  
    4.       for(int j = 0; j < 4; j++){  
    5.           messageList.add(new KeyedMessage<String, String>("topic2", j+"""The " + i + " message for key " + j));  
    6.       }  
    7.       producer.send(messageList);  
    8.     }  
    9. producer.close();  

    则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和partition序号相同。(partition序号从0开始,本例中的key也正好从0开始)。如下图所示。

    对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。

    1. ############################# Log Retention Policy #############################  
    2.  
    3. # The following configurations control the disposal of log segments. The policy can  
    4. # be set to delete segments after a period of time, or after a given size has accumulated.  
    5. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens  
    6. # from the end of the log.  
    7.  
    8. # The minimum age of a log file to be eligible for deletion  
    9. log.retention.hours=168 
    10.  
    11. # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining  
    12. # segments don't drop below log.retention.bytes.  
    13. #log.retention.bytes=1073741824 
    14.  
    15. # The maximum size of a log segment file. When this size is reached a new log segment will be created.  
    16. log.segment.bytes=1073741824 
    17.  
    18. # The interval at which log segments are checked to see if they can be deleted according  
    19. # to the retention policies  
    20. log.retention.check.interval.ms=300000 
    21.  
    22. # By default the log cleaner is disabled and the log retention policy will default to   
    23. #just delete segments after their retention expires.  
    24. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs   
    25. #can then be marked for log compaction.  
    26. log.cleaner.enable=false 

    这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

    Replication & Leader election

    Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties中配置。

    1. default.replication.factor = 1 

    该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,leader批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。

    和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能“落后太多”。

    leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预定值,该值可在$KAFKA_HOME/config/server.properties中配置

    1. #If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead  
    2. replica.lag.max.messages=4000 
    3.  
    4. #If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead  
    5. replica.lag.time.max.ms=10000    

    需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。

    一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。

    这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。

    上文说明了Kafka是如何做replication的,另外一个很重要的问题是当leader宕机了,怎样在follower中选举出新的leader。因为follower可能落后许多或者crash了,所以必须确保选择“最新”的follower作为新的leader。一个基本的原则就是,如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。这就需要作一个折衷,如果leader在标明一条消息被commit前等待更多的follower确认,那在它die之后就有更多的follower可以作为新的leader,但这也会造成吞吐率的下降。

    一种非常常用的选举leader的方式是“majority 灵秀”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个replica(包含leader和follower),那在commit之前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。因为在剩下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几台server,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的replica,而大量的replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种expensive的方式。

    实际上,leader election算法非常多,比如Zookeper的Zab, RaftViewstamped Replication。而Kafka所使用的leader election算法更像微软的PacificA算法。

    Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个replica的失败,majority vote和ISR在commit前需要等待的replica数量是一样的,但是ISR需要的总的replica的个数几乎是majority vote的一半。

    虽然majority vote与ISR相比有不需等待最慢的server这一优势,但是Kafka作者认为Kafka可以通过producer选择是否被commit阻塞来改善这一问题,并且节省下来的replica和磁盘使得ISR模式仍然值得。

    上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

    • 等待ISR中的任一个replica“活”过来,并且选它作为leader
    • 选择第一个“活”过来的replica(不一定是ISR中的)作为leader

    这就需要在可用性和一致性当中作出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源(前文有说明,所有读写都由leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

    上文说明了一个parition的replication过程,然尔Kafka集群需要管理成百上千个partition,Kafka通过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也需要平衡leader的分布,尽可能的让所有partition的leader均匀分布在不同broker上。另一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的所有partition,并为之选举leader。实际上,Kafka选举一个broker作为controller,这个controller通过watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。

    这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。

    Consumer group

    (本节所有描述都是基于consumer hight level API而非low level API)。

    每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)

    很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。下图展示了Kafka在Linkedin的一种简化部署。

    为了更清晰展示Kafka consumer group的特性,笔者作了一项测试。创建一个topic (名为topic1),创建一个属于group1的consumer实例,并创建三个属于group2的consumer实例,然后通过producer向topic1发送key分别为1,2,3r的消息。结果发现属于group1的consumer收到了所有的这三条消息,同时group2中的3个consumer分别收到了key为1,2,3的消息。如下图所示。

    Consumer Rebalance

    (本节所讲述内容均基于Kafka consumer high level API)

    Kafka保证同一consumer group中只有一个consumer会消息某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。

    如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。

    如下例所示,如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。

    增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。

    再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2

    再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。

    此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。

    接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。

    再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
     

    consumer rebalance算法如下:

    • Sort PT (all partitions in topic T)
    • Sort CG(all consumers in consumer group G)
    • Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
    • Remove current entries owned by Ci from the partition owner registry
    • Assign partitions from iN to (i+1)N-1 to consumer Ci
    • Add newly assigned partitions to the partition owner registry

    目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:

    • Register itself in the consumer id registry under its group.
    • Register a watch on changes under the consumer id registry.
    • Register a watch on changes under the broker id registry.
    • If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
    • Force itself to rebalance within in its consumer group.

    在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。

    目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。但该方式有不利的方面:

    • Herd effect
      任何broker或者consumer的增减都会触发所有的consumer的rebalance
    • Split Brain
      每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的consumer都认为rebalance已经完成了,但实际上可能并非如此。

    根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。流程如下:

    消息Deliver guarantee

    通过上文介绍,想必读者已经明天了producer和consumer是如何工作的,以及Kafka是如何做replication的,接下来要讨论的是Kafka如何确保消息在producer和consumer之间传输。有这么几种可能的delivery guarantee:

    • At most once 消息可能会丢,但绝不会重复传输
    • At least one 消息绝不会丢,但可能会重复传输
    • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
      Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了At least once,但可通过设置producer异步发送实现At most once)。
      接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
    • 读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
    • 读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性—如是否幂等性,当成Kafka本身的feature)
    • 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
      总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。而Exactly once要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。

    Benchmark

    纸上得来终觉浅,绝知些事要躬行。笔者希望能亲自测一下Kafka的性能,而非从网上找一些测试数据。所以笔者曾在0.8发布前两个月做过详细的Kafka0.8性能测试,不过很可惜测试报告不慎丢失。所幸在网上找到了Kafka的创始人之一的Jay Kreps的bechmark。以下描述皆基于该benchmark。(该benchmark基于Kafka0.8.1)

    测试环境

    该benchmark用到了六台机器,机器配置如下

    • Intel Xeon 2.5 GHz processor with six cores
    • Six 7200 RPM SATA drives
    • 32GB of RAM
    • 1Gb Ethernet

      这6台机器其中3台用来搭建Kafka broker集群,另外3台用来安装Zookeeper及生成测试数据。6个drive都直接以非RAID方式挂载。实际上kafka对机器的需求与Hadoop的类似。

    producer吞吐率

    该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。

    1个producer线程,无replication

    在这一测试中,创建了一个包含6个partition且没有replication的topic。然后通过一个线程尽可能快的生成50 million条比较短(payload100字节长)的消息。测试结果是821,557 records/second78.3MB/second)。
    之所以使用短消息,是因为对于消息系统来说这种使用场景更难。因为如果使用MB/second来表征吞吐率,那发送长消息无疑能使得测试结果更好。
    整个测试中,都是用每秒钟delivery的消息的数量乘以payload的长度来计算MB/second的,没有把消息的元信息算在内,所以实际的网络使用量会比这个大。对于本测试来说,每次还需传输额外的22个字节,包括一个可选的key,消息长度描述,CRC等。另外,还包含一些请求相关的overhead,比如topic,partition,acknowledgement等。这就导致我们比较难判断是否已经达到网卡极限,但是把这些overhead都算在吞吐率里面应该更合理一些。因此,我们已经基本达到了网卡的极限。
    初步观察此结果会认为它比人们所预期的要高很多,尤其当考虑到Kafka要把数据持久化到磁盘当中。实际上,如果使用随机访问数据系统,比如RDBMS,或者key-velue store,可预期的最高访问频率大概是5000到50000个请求每秒,这和一个好的RPC层所能接受的远程请求量差不多。而该测试中远超于此的原因有两个。

    • Kafka确保写磁盘的过程是线性磁盘I/O,测试中使用的6块廉价磁盘线性I/O的最大吞吐量是822MB/second,这已经远大于1Gb网卡所能带来的吞吐量了。许多消息系统把数据持久化到磁盘当成是一个开销很大的事情,这是因为他们对磁盘的操作都不是线性I/O。
    • 在每一个阶段,Kafka都尽量使用批量处理。如果想了解批处理在I/O操作中的重要性,可以参考David Patterson的”Latency Lags Bandwidth

    1个producer线程,3个异步replication

    该项测试与上一测试基本一样,唯一的区别是每个partition有3个replica(所以网络传输的和写入磁盘的总的数据量增加了3倍)。每一个broker即要写作为leader的partition,也要读(从leader读数据)写(将数据写到磁盘)作为follower的partition。测试结果为786,980 records/second75.1MB/second)。
    该项测试中replication是异步的,也就是说broker收到数据并写入本地磁盘后就acknowledge producer,而不必等所有replica都完成replication。也就是说,如果leader crash了,可能会丢掉一些最新的还未备份的数据。但这也会让message acknowledgement延迟更少,实时性更好。
    这项测试说明,replication可以很快。整个集群的写能力可能会由于3倍的replication而只有原来的三分之一,但是对于每一个producer来说吞吐率依然足够好。

    1个producer线程,3个同步replication

    该项测试与上一测试的唯一区别是replication是同步的,每条消息只有在被in sync集合里的所有replica都复制过去后才会被置为committed(此时broker会向producer发送acknowledgement)。在这种模式下,Kafka可以保证即使leader crash了,也不会有数据丢失。测试结果为421,823 records/second40.2MB/second)。
    Kafka同步复制与异步复制并没有本质的不同。leader会始终track follower replica从而监控它们是否还alive,只有所有in sync集合里的replica都acknowledge的消息才可能被consumer所消费。而对follower的等待影响了吞吐率。可以通过增大batch size来改善这种情况,但为了避免特定的优化而影响测试结果的可比性,本次测试并没有做这种调整。

    3个producer,3个异步replication

    该测试相当于把上文中的1个producer,复制到了3台不同的机器上(在1台机器上跑多个实例对吞吐率的增加不会有太大帮忙,因为网卡已经基本饱和了),这3个producer同时发送数据。整个集群的吞吐率为2,024,032 records/second193,0MB/second)。

    Producer Throughput Vs. Stored Data

    消息系统的一个潜在的危险是当数据能都存于内存时性能很好,但当数据量太大无法完全存于内存中时(然后很多消息系统都会删除已经被消费的数据,但当消费速度比生产速度慢时,仍会造成数据的堆积),数据会被转移到磁盘,从而使得吞吐率下降,这又反过来造成系统无法及时接收数据。这样就非常糟糕,而实际上很多情景下使用queue的目的就是解决数据消费速度和生产速度不一致的问题。

    但Kafka不存在这一问题,因为Kafka始终以O(1)的时间复杂度将数据持久化到磁盘,所以其吞吐率不受磁盘上所存储的数据量的影响。为了验证这一特性,做了一个长时间的大数据量的测试,下图是吞吐率与数据量大小的关系图。

    上图中有一些variance的存在,并可以明显看到,吞吐率并不受磁盘上所存数据量大小的影响。实际上从上图可以看到,当磁盘数据量达到1TB时,吞吐率和磁盘数据只有几百MB时没有明显区别。

    这个variance是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。上图的测试结果是在生产环境中对Kafka集群做了些tuning后得到的,这些tuning方法可参考这里

    consumer吞吐率

    需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。

    1个consumer

    该测试从有6个partition,3个replication的topic消费50 million的消息。测试结果为940,521 records/second89.7MB/second)。
    可以看到,Kafkar的consumer是非常高效的。它直接从broker的文件系统里读取文件块。Kafka使用sendfile API来直接通过操作系统直接传输,而不用把数据拷贝到用户空间。该项测试实际上从log的起始处开始读数据,所以它做了真实的I/O。在生产环境下,consumer可以直接读取producer刚刚写下的数据(它可能还在缓存中)。实际上,如果在生产环境下跑I/O stat,你可以看到基本上没有物理“读”。也就是说生产环境下consumer的吞吐率会比该项测试中的要高。

    3个consumer

    将上面的consumer复制到3台不同的机器上,并且并行运行它们(从同一个topic上消费数据)。测试结果为2,615,968 records/second249.5MB/second)。
    正如所预期的那样,consumer的吞吐率几乎线性增涨。

    Producer and Consumer

    上面的测试只是把producer和consumer分开测试,而该项测试同时运行producer和consumer,这更接近使用场景。实际上目前的replication系统中follower就相当于consumer在工作。
    该项测试,在具有6个partition和3个replica的topic上同时使用1个producer和1个consumer,并且使用异步复制。测试结果为795,064 records/second75.8MB/second)。
    可以看到,该项测试结果与单独测试1个producer时的结果几乎一致。所以说consumer非常轻量级。

    消息长度对吞吐率的影响

    上面的所有测试都基于短消息(payload 100字节),而正如上文所说,短消息对Kafka来说是更难处理的使用方式,可以预期,随着消息长度的增大,records/second会减小,但MB/second会有所提高。下图是records/second与消息长度的关系图。

    正如我们所预期的那样,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。但是如果看每秒钟发送的消息的总大小,它会随着消息长度的增加而增加,如下图所示。

    从上图可以看出,当消息长度为10字节时,因为要频繁入队,花了太多时间获取锁,CPU成了瓶颈,并不能充分利用带宽。但从100字节开始,我们可以看到带宽的使用逐渐趋于饱和(虽然MB/second还是会随着消息长度的增加而增加,但增加的幅度也越来越小)。

    端到端的Latency

    上文中讨论了吞吐率,那消息传输的latency如何呢?也就是说消息从producer到consumer需要多少时间呢?该项测试创建1个producer和1个consumer并反复计时。结果是,2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)
    (这里并没有说明topic有多少个partition,也没有说明有多少个replica,replication是同步还是异步。实际上这会极大影响producer发送的消息被commit的latency,而只有committed的消息才能被consumer所消费,所以它会最终影响端到端的latency)

    重现该benchmark

    如果读者想要在自己的机器上重现本次benchmark测试,可以参考本次测试的配置和所使用的命令
    实际上Kafka Distribution提供了producer性能测试工具,可通过bin/kafka-producer-perf-test.sh脚本来启动。所使用的命令如下

    1. Producer  
    2. Setup  
    3. bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1  
    4. bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3  
    5.  
    6. Single thread, no replication  
    7.  
    8. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 
    9.  
    10. Single-thread, async 3x replication  
    11.  
    12. bin/kafktopics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3  
    13. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test6 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 
    14.  
    15. Single-thread, sync 3x replication  
    16.  
    17. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=-1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000 
    18.  
    19. Three Producers, 3x async replication  
    20. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 
    21.  
    22. Throughput Versus Stored Data  
    23.  
    24. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 
    25.  
    26. Effect of message size  
    27.  
    28. for i in 10 100 1000 10000 100000;  
    29. do  
    30. echo ""  
    31. echo $i  
    32. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test $((1000*1024*1024/$i)) $i -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000 
    33. done;  
    34.  
    35. Consumer  
    36. Consumer throughput  
    37.  
    38. bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1  
    39.  
    40. 3 Consumers  
    41.  
    42. On three servers, run:  
    43. bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1  
    44.  
    45. End-to-end Latency  
    46.  
    47. bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181 test 5000  
    48.  
    49. Producer and consumer  
    50.  
    51. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 
    52.  
    53. bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1  

    broker配置如下

    1. ############################# Server Basics #############################  
    2.  
    3. # The id of the broker. This must be set to a unique integer for each broker.  
    4. broker.id=0 
    5.  
    6. ############################# Socket Server Settings #############################  
    7.  
    8. # The port the socket server listens on  
    9. port=9092 
    10.  
    11. # Hostname the broker will bind to and advertise to producers and consumers.  
    12. # If not set, the server will bind to all interfaces and advertise the value returned from  
    13. # from java.net.InetAddress.getCanonicalHostName().  
    14. #host.name=localhost 
    15.  
    16. # The number of threads handling network requests  
    17. num.network.threads=4 
    18.  
    19. # The number of threads doing disk I/O  
    20. num.io.threads=8 
    21.  
    22. # The send buffer (SO_SNDBUF) used by the socket server  
    23. socket.send.buffer.bytes=1048576 
    24.  
    25. # The receive buffer (SO_RCVBUF) used by the socket server  
    26. socket.receive.buffer.bytes=1048576 
    27.  
    28. # The maximum size of a request that the socket server will accept (protection against OOM)  
    29. socket.request.max.bytes=104857600 
    30.  
    31.  
    32. ############################# Log Basics #############################  
    33.  
    34. # The directory under which to store log files  
    35. log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs  
    36.  
    37. # The number of logical partitions per topic per server. More partitions allow greater parallelism  
    38. # for consumption, but also mean more files.  
    39. num.partitions=8 
    40.  
    41. ############################# Log Flush Policy #############################  
    42.  
    43. # The following configurations control the flush of data to disk. This is the most  
    44. # important performance knob in kafka.  
    45. # There are a few important trade-offs here:  
    46. #    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.  
    47. #    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).  
    48. #    3. Throughput: The flush is generally the most expensive operation.   
    49. # The settings below allow one to configure the flush policy to flush data after a period of time or  
    50. # every N messages (or both). This can be done globally and overridden on a per-topic basis.  
    51.  
    52. # Per-topic overrides for log.flush.interval.ms  
    53. #log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000  
    54.  
    55. ############################# Log Retention Policy #############################  
    56.  
    57. # The following configurations control the disposal of log segments. The policy can  
    58. # be set to delete segments after a period of time, or after a given size has accumulated.  
    59. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens  
    60. # from the end of the log.  
    61.  
    62. # The minimum age of a log file to be eligible for deletion  
    63. log.retention.hours=168 
    64.  
    65. # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining  
    66. # segments don't drop below log.retention.bytes.  
    67. #log.retention.bytes=1073741824 
    68.  
    69. # The maximum size of a log segment file. When this size is reached a new log segment will be created.  
    70. log.segment.bytes=536870912 
    71.  
    72. # The interval at which log segments are checked to see if they can be deleted according   
    73. # to the retention policies  
    74. log.cleanup.interval.mins=1 
    75.  
    76. ############################# Zookeeper #############################  
    77.  
    78. # Zookeeper connection string (see zookeeper docs for details).  
    79. # This is a comma separated host:port pairs, each corresponding to a zk  
    80. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".  
    81. # You can also append an optional chroot string to the urls to specify the  
    82. # root directory for all kafka znodes.  
    83. zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181  
    84.  
    85. # Timeout in ms for connecting to zookeeper  
    86. zookeeper.connection.timeout.ms=1000000 
    87.  
    88. # metrics reporter properties  
    89. kafka.metrics.polling.interval.secs=5 
    90. kafkakafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
    91. kafka.csv.metrics.dir=/tmp/kafka_metrics  
    92. # Disable csv reporting by default.  
    93. kafka.csv.metrics.reporter.enabled=false 
    94.  
    95. replica.lag.max.messages=10000000 

    读者也可参考另外一份Kafka性能测试报告

    参考

    【责任编辑:林师授 TEL:(010)68476606】


    展开全文
  • Kafka深度解析

    千次阅读 2015-07-22 15:34:21
    [size=16.7999992370605px] Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,并保证即使对TB级以上数据也能保证常数时间的访问性能 高...
    背景介绍Kafka简介
    
    [size=16.7999992370605px]  Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
    • 以时间复杂度为O(1)的方式提供消息持久化能力,并保证即使对TB级以上数据也能保证常数时间的访问性能
    • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
    • 支持Kafka Server间的消息分区,及分布式消息消费,同时保证每个partition内的消息顺序传输
    • 同时支持离线数据处理和实时数据处理

    为什么要用Message Queue
    • [size=17.6399993896484px]解耦
      在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
    • [size=17.6399993896484px]冗余
      有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
    • [size=17.6399993896484px]扩展性
      因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
    • [size=17.6399993896484px]灵活性 & 峰值处理能力
      在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
    • [size=17.6399993896484px]可恢复性
      当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
    • [size=17.6399993896484px]送达保证
      消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
    • [size=17.6399993896484px]顺序保证
      在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。
    • [size=17.6399993896484px]缓冲
      在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行—写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
    • [size=17.6399993896484px]理解数据流
      在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
    • [size=17.6399993896484px]异步通信
      很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

    常用Message Queue对比
    • [size=17.6399993896484px]RabbitMQ
      RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
    • [size=17.6399993896484px]Redis
      Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
    • [size=17.6399993896484px]ZeroMQ
      ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中默认使用ZeroMQ作为数据流的传输。
    • [size=17.6399993896484px]ActiveMQ
      ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
    • [size=17.6399993896484px]Kafka/Jafka
      Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

    Kafka解析Terminology
    • Broker
      Kafka集群包含一个或多个服务器,这种服务器被称为broker
    • Topic
      每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
    • Partition
      parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
    • Producer
      负责发布消息到Kafka broker
    • Consumer
      消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

    Kafka架构
    [size=16.7999992370605px]  
      如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。   
    Push vs. Pull
    [size=16.7999992370605px]  作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。
      push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
    Topic & Partition
    [size=16.7999992370605px]  Topic在逻辑上可以被认为是一个在的queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
        
      每个日志文件都是“log entries”序列,每一个log entry包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消费格式如下:
      message length : 4 bytes (value: 1+4+n)
      “magic” value : 1 byte
      crc : 4 bytes
      payload : n bytes
      这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,如下图所示。
        
      因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
        
      每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。
    # The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=3
    [size=16.7999992370605px]  在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
    import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;public class JasonPartitioner<T> implements Partitioner {    public JasonPartitioner(VerifiableProperties verifiableProperties) {}    @Override    public int partition(Object key, int numPartitions) {        try {            int partitionNum = Integer.parseInt((String) key);            return Math.abs(Integer.parseInt((String) key) % numPartitions);        } catch (Exception e) {            return Math.abs(key.hashCode() % numPartitions);        }    }}
    [size=16.7999992370605px] 
      如果将上例中的class作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic2(包含4个partition)。   
    public void sendMessage() throws InterruptedException{  for(int i = 1; i <= 5; i++){        List messageList = new ArrayList<KeyedMessage<String, String>>();        for(int j = 0; j < 4; j++){            messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));        }        producer.send(messageList);    }  producer.close();}
    [size=16.7999992370605px]  则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和partition序号相同。(partition序号从0开始,本例中的key也正好从0开始)。如下图所示。
        
      对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。
      ############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletionlog.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining# segments don't drop below log.retention.bytes.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000# By default the log cleaner is disabled and the log retention policy will default to #just delete segments after their retention expires.# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs #can then be marked for log compaction.log.cleaner.enable=false
    [size=16.7999992370605px]  这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。      
    Replication & Leader election
    [size=16.7999992370605px]  Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties中配置。
    1. default.replication.factor = 1
    复制代码

    [size=16.7999992370605px]  该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。  每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,leader批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。
      和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能“落后太多”。
      leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预定值,该值可在$KAFKA_HOME/config/server.properties中配置
    #If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as deadreplica.lag.max.messages=4000#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as deadreplica.lag.time.max.ms=10000
    [size=16.7999992370605px]  需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。
      一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。
      这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。
      
      上文说明了Kafka是如何做replication的,另外一个很重要的问题是当leader宕机了,怎样在follower中选举出新的leader。因为follower可能落后许多或者crash了,所以必须确保选择“最新”的follower作为新的leader。一个基本的原则就是,如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。这就需要作一个折衷,如果leader在标明一条消息被commit前等待更多的follower确认,那在它die之后就有更多的follower可以作为新的leader,但这也会造成吞吐率的下降。
      一种非常常用的选举leader的方式是“majority 灵秀”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个replica(包含leader和follower),那在commit之前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。因为在剩下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几台server,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的replica,而大量的replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种expensive的方式。
      实际上,leader election算法非常多,比如Zookeper的ZabRaftViewstamped Replication。而Kafka所使用的leader election算法更像微软的PacificA算法。
      Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个replica的失败,majority vote和ISR在commit前需要等待的replica数量是一样的,但是ISR需要的总的replica的个数几乎是majority vote的一半。
      虽然majority vote与ISR相比有不需等待最慢的server这一优势,但是Kafka作者认为Kafka可以通过producer选择是否被commit阻塞来改善这一问题,并且节省下来的replica和磁盘使得ISR模式仍然值得。
      
      上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
    • 等待ISR中的任一个replica“活”过来,并且选它作为leader
    • 选择第一个“活”过来的replica(不一定是ISR中的)作为leader

    [size=16.7999992370605px]  这就需要在可用性和一致性当中作出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源(前文有说明,所有读写都由leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
      
      上文说明了一个parition的replication过程,然尔Kafka集群需要管理成百上千个partition,Kafka通过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也需要平衡leader的分布,尽可能的让所有partition的leader均匀分布在不同broker上。另一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的所有partition,并为之选举leader。实际上,Kafka选举一个broker作为controller,这个controller通过watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。
        
      
      这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。
       
    Consumer group
    [size=16.7999992370605px]  (本节所有描述都是基于consumer hight level API而非low level API)。
      每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)
        
      
      很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。下图展示了Kafka在Linkedin的一种简化部署。
        
      为了更清晰展示Kafka consumer group的特性,笔者作了一项测试。创建一个topic (名为topic1),创建一个属于group1的consumer实例,并创建三个属于group2的consumer实例,然后通过producer向topic1发送key分别为1,2,3r的消息。结果发现属于group1的consumer收到了所有的这三条消息,同时group2中的3个consumer分别收到了key为1,2,3的消息。如下图所示。
       
    Consumer Rebalance
    [size=16.7999992370605px]  (本节所讲述内容均基于Kafka consumer high level API)
      Kafka保证同一consumer group中只有一个consumer会消息某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。
      如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。
      如下例所示,如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。
        
      增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。
        
      再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
        
      再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。
        
      此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
        
      接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
        
      再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
       
    [size=16.7999992370605px]  consumer rebalance算法如下:   
    • Sort PT (all partitions in topic T)
    • Sort CG(all consumers in consumer group G)
    • Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
    • Remove current entries owned by Ci from the partition owner registry
    • Assign partitions from iN to (i+1)N-1 to consumer Ci
    • Add newly assigned partitions to the partition owner registry

    [size=16.7999992370605px]  目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:
    • Register itself in the consumer id registry under its group.
    • Register a watch on changes under the consumer id registry.
    • Register a watch on changes under the broker id registry.
    • If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
    • Force itself to rebalance within in its consumer group.
        
        在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。

    [size=16.7999992370605px]  目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。但该方式有不利的方面:
    • Herd effect
        任何broker或者consumer的增减都会触发所有的consumer的rebalance
    • Split Brain
        每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的consumer都认为rebalance已经完成了,但实际上可能并非如此。

    [size=16.7999992370605px]  根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。流程如下:
             
    消息Deliver guarantee
    [size=16.7999992370605px]  通过上文介绍,想必读者已经明天了producer和consumer是如何工作的,以及Kafka是如何做replication的,接下来要讨论的是Kafka如何确保消息在producer和consumer之间传输。有这么几种可能的delivery guarantee:
    • At most once 消息可能会丢,但绝不会重复传输
    • At least one 消息绝不会丢,但可能会重复传输
    • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
        
        Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了At least once,但可通过设置producer异步发送实现At most once)。
        接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
    • 读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
    • 读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性—如是否幂等性,当成Kafka本身的feature)
    • 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
        总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。而Exactly once要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。

    Benchmark
    [size=16.7999992370605px]  纸上得来终觉浅,绝知些事要躬行。笔者希望能亲自测一下Kafka的性能,而非从网上找一些测试数据。所以笔者曾在0.8发布前两个月做过详细的Kafka0.8性能测试,不过很可惜测试报告不慎丢失。所幸在网上找到了Kafka的创始人之一的Jay Kreps的bechmark。以下描述皆基于该benchmark。(该benchmark基于Kafka0.8.1)
    测试环境
    [size=16.7999992370605px]  该benchmark用到了六台机器,机器配置如下
    • Intel Xeon 2.5 GHz processor with six cores
    • Six 7200 RPM SATA drives
    • 32GB of RAM
    • 1Gb Ethernet
        
        这6台机器其中3台用来搭建Kafka broker集群,另外3台用来安装Zookeeper及生成测试数据。6个drive都直接以非RAID方式挂载。实际上kafka对机器的需求与Hadoop的类似。

    producer吞吐率
    [size=16.7999992370605px]  该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。
    1个producer线程,无replication
    [size=16.7999992370605px]  在这一测试中,创建了一个包含6个partition且没有replication的topic。然后通过一个线程尽可能快的生成50 million条比较短(payload100字节长)的消息。测试结果是821,557 records/second78.3MB/second)。
      之所以使用短消息,是因为对于消息系统来说这种使用场景更难。因为如果使用MB/second来表征吞吐率,那发送长消息无疑能使得测试结果更好。
      整个测试中,都是用每秒钟delivery的消息的数量乘以payload的长度来计算MB/second的,没有把消息的元信息算在内,所以实际的网络使用量会比这个大。对于本测试来说,每次还需传输额外的22个字节,包括一个可选的key,消息长度描述,CRC等。另外,还包含一些请求相关的overhead,比如topic,partition,acknowledgement等。这就导致我们比较难判断是否已经达到网卡极限,但是把这些overhead都算在吞吐率里面应该更合理一些。因此,我们已经基本达到了网卡的极限。
      初步观察此结果会认为它比人们所预期的要高很多,尤其当考虑到Kafka要把数据持久化到磁盘当中。实际上,如果使用随机访问数据系统,比如RDBMS,或者key-velue store,可预期的最高访问频率大概是5000到50000个请求每秒,这和一个好的RPC层所能接受的远程请求量差不多。而该测试中远超于此的原因有两个。
    • Kafka确保写磁盘的过程是线性磁盘I/O,测试中使用的6块廉价磁盘线性I/O的最大吞吐量是822MB/second,这已经远大于1Gb网卡所能带来的吞吐量了。许多消息系统把数据持久化到磁盘当成是一个开销很大的事情,这是因为他们对磁盘的操作都不是线性I/O。
    • 在每一个阶段,Kafka都尽量使用批量处理。如果想了解批处理在I/O操作中的重要性,可以参考David Patterson的”Latency Lags Bandwidth

    1个producer线程,3个异步replication
    [size=16.7999992370605px]  该项测试与上一测试基本一样,唯一的区别是每个partition有3个replica(所以网络传输的和写入磁盘的总的数据量增加了3倍)。每一个broker即要写作为leader的partition,也要读(从leader读数据)写(将数据写到磁盘)作为follower的partition。测试结果为786,980 records/second75.1MB/second)。
      该项测试中replication是异步的,也就是说broker收到数据并写入本地磁盘后就acknowledge producer,而不必等所有replica都完成replication。也就是说,如果leader crash了,可能会丢掉一些最新的还未备份的数据。但这也会让message acknowledgement延迟更少,实时性更好。
      这项测试说明,replication可以很快。整个集群的写能力可能会由于3倍的replication而只有原来的三分之一,但是对于每一个producer来说吞吐率依然足够好。   
    1个producer线程,3个同步replication
    [size=16.7999992370605px]  该项测试与上一测试的唯一区别是replication是同步的,每条消息只有在被in sync集合里的所有replica都复制过去后才会被置为committed(此时broker会向producer发送acknowledgement)。在这种模式下,Kafka可以保证即使leader crash了,也不会有数据丢失。测试结果为421,823 records/second40.2MB/second)。
      Kafka同步复制与异步复制并没有本质的不同。leader会始终track follower replica从而监控它们是否还alive,只有所有in sync集合里的replica都acknowledge的消息才可能被consumer所消费。而对follower的等待影响了吞吐率。可以通过增大batch size来改善这种情况,但为了避免特定的优化而影响测试结果的可比性,本次测试并没有做这种调整。   
    3个producer,3个异步replication
    [size=16.7999992370605px]  该测试相当于把上文中的1个producer,复制到了3台不同的机器上(在1台机器上跑多个实例对吞吐率的增加不会有太大帮忙,因为网卡已经基本饱和了),这3个producer同时发送数据。整个集群的吞吐率为2,024,032 records/second193,0MB/second)。
    Producer Throughput Vs. Stored Data
    [size=16.7999992370605px]  消息系统的一个潜在的危险是当数据能都存于内存时性能很好,但当数据量太大无法完全存于内存中时(然后很多消息系统都会删除已经被消费的数据,但当消费速度比生产速度慢时,仍会造成数据的堆积),数据会被转移到磁盘,从而使得吞吐率下降,这又反过来造成系统无法及时接收数据。这样就非常糟糕,而实际上很多情景下使用queue的目的就是解决数据消费速度和生产速度不一致的问题。
      但Kafka不存在这一问题,因为Kafka始终以O(1)的时间复杂度将数据持久化到磁盘,所以其吞吐率不受磁盘上所存储的数据量的影响。为了验证这一特性,做了一个长时间的大数据量的测试,下图是吞吐率与数据量大小的关系图。
        
      上图中有一些variance的存在,并可以明显看到,吞吐率并不受磁盘上所存数据量大小的影响。实际上从上图可以看到,当磁盘数据量达到1TB时,吞吐率和磁盘数据只有几百MB时没有明显区别。
      这个variance是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。上图的测试结果是在生产环境中对Kafka集群做了些tuning后得到的,这些tuning方法可参考这里。   
    consumer吞吐率
    [size=16.7999992370605px]  需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。   
    1个consumer
    [size=16.7999992370605px]  该测试从有6个partition,3个replication的topic消费50 million的消息。测试结果为940,521 records/second89.7MB/second)。
      可以看到,Kafkar的consumer是非常高效的。它直接从broker的文件系统里读取文件块。Kafka使用sendfile API来直接通过操作系统直接传输,而不用把数据拷贝到用户空间。该项测试实际上从log的起始处开始读数据,所以它做了真实的I/O。在生产环境下,consumer可以直接读取producer刚刚写下的数据(它可能还在缓存中)。实际上,如果在生产环境下跑I/O stat,你可以看到基本上没有物理“读”。也就是说生产环境下consumer的吞吐率会比该项测试中的要高。
    3个consumer
    [size=16.7999992370605px]  将上面的consumer复制到3台不同的机器上,并且并行运行它们(从同一个topic上消费数据)。测试结果为2,615,968 records/second249.5MB/second)。
      正如所预期的那样,consumer的吞吐率几乎线性增涨。   
    Producer and Consumer
    [size=16.7999992370605px]  上面的测试只是把producer和consumer分开测试,而该项测试同时运行producer和consumer,这更接近使用场景。实际上目前的replication系统中follower就相当于consumer在工作。
      该项测试,在具有6个partition和3个replica的topic上同时使用1个producer和1个consumer,并且使用异步复制。测试结果为795,064 records/second75.8MB/second)。
      可以看到,该项测试结果与单独测试1个producer时的结果几乎一致。所以说consumer非常轻量级。   
    消息长度对吞吐率的影响
    [size=16.7999992370605px]  上面的所有测试都基于短消息(payload 100字节),而正如上文所说,短消息对Kafka来说是更难处理的使用方式,可以预期,随着消息长度的增大,records/second会减小,但MB/second会有所提高。下图是records/second与消息长度的关系图。
        
      正如我们所预期的那样,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。但是如果看每秒钟发送的消息的总大小,它会随着消息长度的增加而增加,如下图所示。
        
      从上图可以看出,当消息长度为10字节时,因为要频繁入队,花了太多时间获取锁,CPU成了瓶颈,并不能充分利用带宽。但从100字节开始,我们可以看到带宽的使用逐渐趋于饱和(虽然MB/second还是会随着消息长度的增加而增加,但增加的幅度也越来越小)。   
    端到端的Latency
    [size=16.7999992370605px]  上文中讨论了吞吐率,那消息传输的latency如何呢?也就是说消息从producer到consumer需要多少时间呢?该项测试创建1个producer和1个consumer并反复计时。结果是,2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)
      (这里并没有说明topic有多少个partition,也没有说明有多少个replica,replication是同步还是异步。实际上这会极大影响producer发送的消息被commit的latency,而只有committed的消息才能被consumer所消费,所以它会最终影响端到端的latency)   
    重现该benchmark
    [size=16.7999992370605px]  如果读者想要在自己的机器上重现本次benchmark测试,可以参考本次测试的配置和所使用的命令
      实际上Kafka Distribution提供了producer性能测试工具,可通过bin/kafka-producer-perf-test.sh脚本来启动。所使用的命令如下   
    1. Producer
    2. Setup
    3. bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3Single thread, no replication

    4. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196Single-thread, async 3x replication

    5. bin/kafktopics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test6 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196Single-thread, sync 3x replication

    6. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=-1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000Three Producers, 3x async replication
    7. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196Throughput Versus Stored Data

    8. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196Effect of message sizefor i in 10 100 1000 10000 100000;doecho ""echo $ibin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test $((1000*1024*1024/$i)) $i -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000done;

    9. Consumer
    10. Consumer throughput

    11. bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 13 Consumers

    12. On three servers, run:
    13. bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1End-to-end Latency

    14. bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181 test 5000Producer and consumer

    15. bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1
    复制代码

    [size=16.7999992370605px]  broker配置如下
    1. ############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.broker.id=0############################# Socket Server Settings ############################## The port the socket server listens onport=9092# Hostname the broker will bind to and advertise to producers and consumers.# If not set, the server will bind to all interfaces and advertise the value returned from# from java.net.InetAddress.getCanonicalHostName().#host.name=localhost# The number of threads handling network requestsnum.network.threads=4# The number of threads doing disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=1048576# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=1048576# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## The directory under which to store log fileslog.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs# The number of logical partitions per topic per server. More partitions allow greater parallelism# for consumption, but also mean more files.num.partitions=8############################# Log Flush Policy ############################## The following configurations control the flush of data to disk. This is the most# important performance knob in kafka.# There are a few important trade-offs here:#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).#    3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.# Per-topic overrides for log.flush.interval.ms#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletionlog.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining# segments don't drop below log.retention.bytes.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=536870912# The interval at which log segments are checked to see if they can be deleted according # to the retention policieslog.cleanup.interval.mins=1############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=1000000# metrics reporter propertieskafka.metrics.polling.interval.secs=5kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
    2. kafka.csv.metrics.dir=/tmp/kafka_metrics# Disable csv reporting by default.kafka.csv.metrics.reporter.enabled=falsereplica.lag.max.messages=10000000
    复制代码
    展开全文
  • 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础。现在它已为多家不同类型的公司作为多种类型的数据管道(data ...

    我们为什么要搭建该系统

    Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础。现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用。

    活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部分。活动数据包括页面访问量(page view)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计 分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。

    近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持

    活动流和运营数据的若干用例

    • "动态汇总(News feed)"功能。将你朋友的各种活动信息广播给你

    • 相关性以及排序。通过使用计数评级(count rating)、投票(votes)或者点击率( click-through)判定一组给定的条目中那一项是最相关的.

    • 安全:网站需要屏蔽行为不端的网络爬虫(crawler),对API的使用进行速率限制,探测出扩散垃圾信息的企图,并支撑其它的行为探测和预防体系,以切断网站的某些不正常活动。

    • 运营监控:大多数网站都需要某种形式的实时且随机应变的方式,对网站运行效率进行监控并在有问题出现的情况下能触发警告。

    • 报表和批处理: 将数据装载到数据仓库或者Hadoop系统中进行离线分析,然后针对业务行为做出相应的报表,这种做法很普遍。

    活动流数据的特点

    这种由不可变(immutable)的活动数据组成的高吞吐量数据流代表了对计算能力的一种真正的挑战,因其数据量很容易就可能会比网站中位于第二位的数据源的数据量大10到100倍。

    传统的日志文件统计分析对报表和批处理这种离线处理的情况来说,是一种很不错且很有伸缩性的方法;但是这种方法对于实时处理来说其时延太大,而且还具有较 高的运营复杂度。另一方面,现有的消息队列系统(messaging and queuing system)却很适合于在实时或近实时(near-real-time)的情况下使用,但它们对很长的未被处理的消息队列的处理很不给力,往往并不将数 据持久化作为首要的事情考虑。这样就会造成一种情况,就是当把大量数据传送给Hadoop这样的离线系统后, 这些离线系统每个小时或每天仅能处理掉部分源数据。Kafka的目的就是要成为一个队列平台,仅仅使用它就能够既支持离线又支持在线使用这两种情况。

    Kafka支持非常通用的消息语义(messaging semantics)。尽管我们这篇文章主要是想把它用于活动处理,但并没有任何限制性条件使得它仅仅适用于此目的。

    部署

    下面的示意图所示是在LinkedIn中部署后各系统形成的拓扑结构。

    要注意的是,一个单个的Kafka集群系统用于处理来自各种不同来源的所有活动数据。它同时为在线和离线的数据使用者提供了一个单个的数据管道,在线活动 和异步处理之间形成了一个缓冲区层。我们还使用kafka,把所有数据复制(replicate)到另外一个不同的数据中心去做离线处理。

    我们并不想让一个单个的Kafka集群系统跨越多个数据中心,而是想让Kafka支持多数据中心的数据流拓扑结构。这是通过在集群之间进行镜像或“同步” 实现的。这个功能非常简单,镜像集群只是作为源集群的数据使用者的角色运行。这意味着,一个单个的集群就能够将来自多个数据中心的数据集中到一个位置。下 面所示是可用于支持批量装载(batch loads)的多数据中心拓扑结构的一个例子:

    请注意,在图中上面部分的两个集群之间不存在通信连接,两者可能大小不同,具有不同数量的节点。下面部分中的这个单个的集群可以镜像任意数量的源集群。要了解镜像功能使用方面的更多细节,请访问这里.


    主要的设计元素

    Kafka之所以和其它绝大多数信息系统不同,是因为下面这几个为数不多的比较重要的设计决策:

    1. Kafka在设计之时为就将持久化消息作为通常的使用情况进行了考虑。

    2. 主要的设计约束是吞吐量而不是功能。

    3. 有关哪些数据已经被使用了的状态信息保存为数据使用者(consumer)的一部分,而不是保存在服务器之上。

    4. Kafka是一种显式的分布式系统。它假设,数据生产者(producer)、代理(brokers)和数据使用者(consumer)分散于多台机器之上。

    以上这些设计决策将在下文中进行逐条详述。


    基础知识

    首先来看一些基本的术语和概念。

    消息指的是通信的基本单位。由消息生产者(producer)发布关于某话题(topic)的消息,这句话的意思是,消息以一种物理方式被发送给了作为代理(broker)的服务器(可能是另外一台机器)。若干的消息使用者(consumer)订阅(subscribe)某个话题,然后生产者所发布的每条消息都会被发送给所有的使用者。

    Kafka是一个显式的分布式系统 —— 生产者、使用者和代理都可以运行在作为一个逻辑单位的、进行相互协作的集群中不同的机器上。对于代理和生产者,这么做非常自然,但使用者却需要一些特殊的支持。每个使用者进程都属于一个使用者小组(consumer group) 。准确地讲,每条消息都只会发送给每个使用者小组中的一个进程。因此,使用者小组使得许多进程或多台机器在逻辑上作为一个单个的使用者出现。使用者小组这个概念非常强大,可以用来支持JMS中队列(queue)或者话题(topic)这两种语义。为了支持队列 语义,我们可以将所有的使用者组成一个单个的使用者小组,在这种情况下,每条消息都会发送给一个单个的使用者。为了支持话题语 义,可以将每个使用者分到它自己的使用者小组中,随后所有的使用者将接收到每一条消息。在我们的使用当中,一种更常见的情况是,我们按照逻辑划分出多个使 用者小组,每个小组都是有作为一个逻辑整体的多台使用者计算机组成的集群。在大数据的情况下,Kafka有个额外的优点,对于一个话题而言,无论有多少使 用者订阅了它,一条条消息都只会存储一次。


    消息持久化(Message Persistence)及其缓存

    不要害怕文件系统!

    在对消息进行存储和缓存时,Kafka严重地依赖于文件系统。 大家普遍认为“磁盘很慢”,因而人们都对持久化结(persistent structure)构能够提供说得过去的性能抱有怀疑态度。实际上,同人们的期望值相比,磁盘可以说是既很慢又很快,这取决于磁盘的使用方式。设计的很 好的磁盘结构往往可以和网络一样快。

    磁盘性能方面最关键的一个事实是,在过去的十几年中,硬盘的吞吐量正在变得和磁盘寻道时间严重不一致了。结果,在一个由6个7200rpm的SATA硬盘 组成的RAID-5磁盘阵列上,线性写入(linear write)的速度大约是300MB/秒,但随即写入却只有50k/秒,其中的差别接近10000倍。线性读取和写入是所有使用模式中最具可预计性的一种 方式,因而操作系统采用预读(read-ahead)和后写(write-behind)技术对磁盘读写进行探测并优化后效果也不错。预读就是提前将一个 比较大的磁盘块中内容读入内存,后写是将一些较小的逻辑写入操作合并起来组成比较大的物理写入操作。关于这个问题更深入的讨论请参考这篇文章ACM Queue article;实际上他们发现,在某些情况下,顺序磁盘访问能够比随即内存访问还要快!


    为了抵消这种性能上的波动,现代操作系变得越来越积极地将主内存用作磁盘缓存。所有现代的操作系统都会乐于将所有空 闲内存转做磁盘缓存,即时在需要回收这些内存的情况下会付出一些性能方面的代价。所有的磁盘读写操作都需要经过这个统一的缓存。想要舍弃这个特性都不太容 易,除非使用直接I/O。因此,对于一个进程而言,即使它在进程内的缓存中保存了一份数据,这份数据也可能在OS的页面缓存(pagecache)中有重 复的一份,结构就成了一份数据保存了两次。

    更进一步讲,我们是在JVM的基础之上开发的系统,只要是了解过一些Java中内存使用方法的人都知道这两点:

    1. Java对象的内存开销(overhead)非常大,往往是对象中存储的数据所占内存的两倍(或更糟)。

    2. Java中的内存垃圾回收会随着堆内数据不断增长而变得越来越不明确,回收所花费的代价也会越来越大。



    由 于这些因素,使用文件系统并依赖于页面缓存要优于自己在内存中维护一个缓存或者什么别的结构 —— 通过对所有空闲内存自动拥有访问权,我们至少将可用的缓存大小翻了一倍,然后通过保存压缩后的字节结构而非单个对象,缓存可用大小接着可能又翻了一倍。这 么做下来,在GC性能不受损失的情况下,我们可在一台拥有32G内存的机器上获得高达28到30G的缓存。而且,这种缓存即使在服务重启之后会仍然保持有 效,而不象进程内缓存,进程重启后还需要在内存中进行缓存重建(10G的缓存重建时间可能需要10分钟),否则就需要以一个全空的缓存开始运行(这么做它 的初始性能会非常糟糕)。这还大大简化了代码,因为对缓存和文件系统之间的一致性进行维护的所有逻辑现在都是在OS中实现的,这事OS做起来要比我们在进 程中做那种一次性的缓存更加高效,准确性也更高。如果你使用磁盘的方式更倾向于线性读取操作,那么随着每次磁盘读取操作,预读就能非常高效使用随后准能用得着的数据填充缓存。


    这就让人联想到一个非常简单的设计方案:不是要在内存中保存尽可能多的数据并在需要时将这些数据刷新(flush)到文件系统,而是我们要做完全相反的事 情。所有数据都要立即写入文件系统中持久化的日志中但不进行刷新数据的任何调用。实际中这么做意味着,数据被传输到OS内核的页面缓存中了,OS随后会将 这些数据刷新到磁盘的。此外我们添加了一条基于配置的刷新策略,允许用户对把数据刷新到物理磁盘的频率进行控制(每当接收到N条消息或者每过M秒),从而 可以为系统硬件崩溃时“处于危险之中”的数据在量上加个上限。

    这种以页面缓存为中心的设计风格在一篇讲解Varnish的设计思想的文章中有详细的描述(文风略带有助于身心健康的傲气)。


    常量时长足矣

    消息系统元数据的持久化数据结构往往采用BTree。 BTree是目前最通用的数据结构,在消息系统中它可以用来广泛支持多种不同的事务性或非事务性语义。 它的确也带来了一个非常高的处理开销,Btree运算的时间复杂度为O(log N)。一般O(log N)被认为基本上等于常量时长,但对于磁盘操作来讲,情况就不同了。磁盘寻道时间一次要花10ms的时间,而且每个磁盘同时只能进行一个寻道操作,因而其 并行程度很有限。因此,即使少量的磁盘寻道操作也会造成非常大的时间开销。因为存储系统混合了高速缓存操作和真正的物理磁盘操作,所以树型结构(tree structure)可观察到的性能往往是超线性的(superlinear)。更进一步讲,BTrees需要一种非常复杂的页面级或行级锁定机制才能避 免在每次操作时锁定一整颗树。实现这种机制就要为行级锁定付出非常高昂的代价,否则就必须对所有的读取操作进行串行化(serialize)。因为对磁盘 寻道操作的高度依赖,就不太可能高效地从驱动器密度(drive density)的提高中获得改善,因而就不得不使用容量较小(< 100GB)转速较高的SAS驱动去,以维持一种比较合理的数据与寻道容量之比。


    直 觉上讲,持久化队列可以按照通常的日志解决方案的样子构建,只是简单的文件读取和简单地向文件中添加内容。虽然这种结果必然无法支持BTree实现中的丰 富语义,但有个优势之处在于其所有的操作的复杂度都是O(1),读取操作并不需要阻止写入操作,而且反之亦然。这样做显然有性能优势,因为性能完全同数据 大小之间脱离了关系 —— 一个服务器现在就能利用大量的廉价、低转速、容量超过1TB的SATA驱动器。虽然这些驱动器寻道操作的性能很低,但这些驱动器在大量数据读写的情况下性 能还凑和,而只需1/3的价格就能获得3倍的容量。 能够存取到几乎无限大的磁盘空间而无须付出性能代价意味着,我们可以提供一些消息系统中并不常见的功能。例如,在Kafka中,消息在使用完后并没有立即 删除,而是会将这些消息保存相当长的一段时间(比方说一周)。


    效率最大化

    我们的假设是,系统里消息的量非常之大,实际消息量是网站页面浏览总数的数倍之多(因为每个页面浏览就是我们要处理的其中一个活动)。而且我们假设发布的每条消息都会被至少读取一次(往往是多次),因而我们要为消息使用而不是消息的产生进行系统优化,

    导致低效率的原因常见的有两个:过多的网络请求和大量的字节拷贝操作。

    为了提高效率,API是围绕这“消息集”(message set)抽象机制进行设计的,消息集将消息进行自然分组。这么做能让网络请求把消息合成一个小组,分摊网络往返(roundtrip)所带来的开销,而不是每次仅仅发送一个单个消息。


    MessageSet实现(implementation)本身是对字节数组或文件进行一次包装后形成的一薄层API。因而,里面并不存在消息处理所需的 单独的序列化(serialization)或逆序列化(deserialization)的步骤。消息中的字段(field)是按需进行逆序列化的(或 者说,在不需要时就不进行逆序列化)。

    由代理维护的消息日志本身不过是那些已写入磁盘的消息集的目录。按此进行抽象处理后,就可以让代理和消息使用者共用一个单个字节的格式(从某种程度上说,消息生产者也可以用它,消息生产者的消息要求其校验和(checksum)并在验证后才会添加到日志中)

    使用共通的格式后就能对最重要的操作进行优化了:持久化后日志块(chuck)的网络传输。为了将数据从页面缓存直接传送给socket,现代的Unix 操作系统提供了一个高度优化的代码路径(code path)。在Linux中这是通过sendfile这个系统调用实现的。通过Java中的API,FileChannel.transferTo,由它来简洁的调用上述的系统调用。


    为了理解sendfile所带来的效果,重要的是要理解将数据从文件传输到socket的数据路径:

    1. 操作系统将数据从磁盘中读取到内核空间里的页面缓存

    2. 应用程序将数据从内核空间读入到用户空间的缓冲区

    3. 应用程序将读到的数据写回内核空间并放入socke的缓冲区

    4. 操作系统将数据从socket的缓冲区拷贝到NIC(网络借口卡,即网卡)的缓冲区,自此数据才能通过网络发送出去

    这样效率显然很低,因为里面涉及4次拷贝,2次系统调用。使用sendfile就可以避免这些重复的拷贝操作,让OS直接将数据从页面缓存发送到网络中,其中只需最后一步中的将数据拷贝到NIC的缓冲区。

    我们预期的一种常见的用例是一个话题拥有多个消息使用者。采用前文所述的零拷贝优化方案,数据只需拷贝到页面缓存中一次,然后每次发送给使用者时都对它进 行重复使用即可,而无须先保存到内存中,然后在阅读该消息时每次都需要将其拷贝到内核空间中。如此一来,消息使用的速度就能接近网络连接的极限。

    要得到Java中对send'file和零拷贝的支持方面的更多背景知识,请参考IBM developerworks上的这篇文章


    端到端的批量压缩

    多数情况下系统的瓶颈是网络而不是CPU。 这一点对于需要将消息在个数据中心间进行传输的数据管道来说,尤其如此。当然,无需来自Kafka的支持,用户总是可以自行将消息压缩后进行传输,但这么 做的压缩率会非常低,因为不同的消息里都有很多重复性的内容(比如JSON里的字段名、web日志中的用户代理或者常用的字符串)。高效压缩需要将多条消 息一起进行压缩而不是分别压缩每条消息。理想情况下,以端到端的方式这么做是行得通的 —— 也即,数据在消息生产者发送之前先压缩一下,然后在服务器上一直保存压缩状态,只有到最终的消息使用者那里才需要将其解压缩。

    通过运行递归消息集,Kafka对这种压缩方式提供了支持。 一批消息可以打包到一起进行压缩,然后以这种形式发送给服务器。这批消息都会被发送给同一个消息使用者,并会在到达使用者那里之前一直保持为被压缩的形式。

    Kafka支持GZIP和Snappy压缩协议。关于压缩的更多更详细的信息,请参见这里


    客户状态

    追踪(客户)消费了什么是一个消息系统必须提供的一个关键功能之一。它并不直观,但是记录这个状态是该系统的关键性能之一。状态追踪要求(不断)更新一个 有持久性的实体的和一些潜在会发生的随机访问。因此它更可能受到存储系统的查询时间的制约而不是带宽(正如上面所描述的)。

    大部分消息系统保留着关于代理者使用(消费)的消息的元数据。也就是说,当消息被交到客户手上时,代理者自己记录了整个过程。这是一个相当直观的选择,而 且确实对于一个单机服务器来说,它(数据)能去(放在)哪里是不清晰的。又由于许多消息系统存储使用的数据结构规模小,所以这也是个实用的选择--因为代 理者知道什么被消费了使得它可以立刻删除它(数据),保持数据大小不过大。


    也许不显然的是,让代理和使用者这两者对消息的使用情况做到一致表述绝不是一件轻而易举的事情。如果代理每次都是在将消息发送到网络中后就将该消息记录为已使用的话,一旦使用者没能真正处理到该消息(比方说,因为它宕机或这请求超时了抑或别的什么原因),就会出现消息丢失的情况。为了解决此问题,许多消息系新加了一个确认功能,当消息发出后仅把它标示为已发送而不是已使用,然后代理需要等到来自使用者的特定的确认信息后才将消息记录为已使用。 这种策略的确解决了丢失消息的问题,但由此产生了新问题。首先,如果使用者已经处理了该消息但却未能发送出确认信息,那么就会让这一条消息被处理两次。第 二个问题是关于性能的,这种策略中的代理必须为每条单个的消息维护多个状态(首先为了防止重复发送就要将消息锁定,然后,然后还要将消息标示为已使用后才 能删除该消息)。另外还有一些棘手的问题需要处理,比如,对于那些以发出却未得到确认的消息该如何处理?


    消息传递语义(Message delivery semantics)

    系统可以提供的几种可能的消息传递保障如下所示:

    • 最多一次—这种用于处理前段文字所述的第一种情况。消息在发出后立即标示为已使用,因此消息不会被发出去两次,但这在许多故障中都会导致消息丢失。

    • 至少一次—这种用于处理前文所述的第二种情况,系统保证每条消息至少会发送一次,但在有故障的情况下可能会导致重复发送。

    • 仅仅一次—这种是人们实际想要的,每条消息只会而且仅会发送一次。

    这个问题已得到广泛的研究,属于“事务提交”问题的一个变种。提供仅仅一次语义的算法已经有了,两阶段或者三阶段提交法以及Paxos算法的一些变种就是 其中的一些例子,但它们都有与生俱来的的缺陷。这些算法往往需要多个网络往返(round trip),可能也无法很好的保证其活性(liveness)(它们可能会导致无限期停机)。FLP结果给出了这些算法的一些基本的局限。

    Kafka 对元数据做了两件很不寻常的事情。一件是,代理将数据流划分为一组互相独立的分区。这些分区的语义由生产者定义,由生产者来指定每条消息属于哪个分区。一 个分区内的消息以到达代理的时间为准进行排序,将来按此顺序将消息发送给使用者。这么一来,就用不着为每一天消息保存一条元数据(比如说,将消息标示为已 使用)了,我们只需为使用者、话题和分区的每种组合记录一个“最高水位标记”(high water mark)即可。因此,标示使用者状态所需的元数据总量实际上特别小。在Kafka中,我们将该最高水位标记称为“偏移量”(offset),这么叫的原 因将在实现细节部分讲解。


    使用者的状态

    在Kafka中,由使用者负责维护反映哪些消息已被使用的状态信息(偏移量)。典型情况下,Kafka使用者的library会把状态数据保存到 Zookeeper之中。然而,让使用者将状态信息保存到保存它们的消息处理结果的那个数据存储(datastore)中也许会更佳。例如,使用者也许就 是要把一些统计值存储到集中式事物OLTP数据库中,在这种情况下,使用者可以在进行那个数据库数据更改的同一个事务中将消息使用状态信息存储起来。这样 就消除了分布式的部分,从而解决了分布式中的一致性问题!这在非事务性系统中也有类似的技巧可用。搜索系统可用将使用者状态信息同它的索引段(index segment)存储到一起。尽管这么做可能无法保证数据的持久性(durability),但却可用让索引同使用者状态信息保存同步:如果由于宕机造成 有一些没有刷新到磁盘的索引段信息丢了,我们总是可用从上次建立检查点(checkpoint)的偏移量处继续对索引进行处理。与此类似,Hadoop的 加载作业(load job)从Kafka中并行加载,也有相同的技巧可用。每个Mapper在map任务结束前,将它使用的最后一个消息的偏移量存入HDFS。

    这个决策还带来一个额外的好处。使用者可用故意回退(rewind)到 以前的偏移量处,再次使用一遍以前使用过的数据。虽然这么做违背了队列的一般协约(contract),但对很多使用者来讲却是个很基本的功能。举个例 子,如果使用者的代码里有个Bug,而且是在它处理完一些消息之后才被发现的,那么当把Bug改正后,使用者还有机会重新处理一遍那些消息。


    Push和Pull

    相关问题还有一个,就是到底是应该让使用者从代理那里吧数据Pull(拉)回来还是应该让代理把数据Push(推)给使用者。和大部分消息系统一 样,Kafka在这方面遵循了一种更加传统的设计思路:由生产者将数据Push给代理,然后由使用者将数据代理那里Pull回来。近来有些系统,比如 scribe和flume,更着重于日志统计功能,遵循了一种非常不同的基于Push的设计思路,其中每个节点都可以作为代理,数据一直都是向下游 Push的。上述两种方法都各有优缺点。然而,因为基于 Push的系统中代理控制着数据的传输速率,因此它难以应付大量不同种类的使用者。我们的设计目标是,让使用者能以它最大的速率使用数据。不幸的是,在 Push系统中当数据的使用速率低于产生的速率时,使用者往往会处于超载状态(这实际上就是一种拒绝服务攻击)。基于Pull的系统在使用者的处理速度稍 稍落后的情况下会表现更佳,而且还可以让使用者在有能力的时候往往前赶赶。让使用者采用某种退避协议(backoff protocol)向代理表明自己处于超载状态,可以解决部分问题,但是,将传输速率调整到正好可以完全利用(但从不能过度利用)使用者的处理能力可比初 看上去难多了。以前我们尝试过多次,想按这种方式构建系统,得到的经验教训使得我们选择了更加常规的Pull模型。


    分发

    Kafka通常情况下是运行在集群中的服务器上。没有中央的“主”节点。代理彼此之间是对等的,不需要任何手动配置即可可随时添加和删除。同样,生产者和 消费者可以在任何时候开启。 每个代理都可以在Zookeeper(分布式协调系统)中注册的一些元数据(例如,可用的主题)。生产者和消费者可以使用Zookeeper发现主题和相 互协调。关于生产者和消费者的细节将在下面描述。


    生产者

    生产者自动负载均衡

    对于生产者,Kafka支持客户端负载均衡,也可以使用一个专用的负载均衡器对TCP连接进行负载均衡调整。专用的第四层负载均衡器在Kafka代理之上 对TCP连接进行负载均衡。在这种配置的情况,一个给定的生产者所发送的消息都会发送给一个单个的代理。使用第四层负载均衡器的好处是,每个生产者仅需一 个单个的TCP连接而无须同Zookeeper建立任何连接。不好的地方在于所有均衡工作都是在TCP连接的层次完成的,因而均衡效果可能并不佳(如果有 些生产者产生的消息远多于其它生产者,按每个代理对TCP连接进行平均分配可能会导致每个代理接收到的消息总数并不平均)。


    采用客户端基于zookeeper的负载均衡可以解决部分问题。如果这么做就能让生产者动态地发现新的代理,并按请求数量进行负载均衡。类似的,它还能让 生产者按照某些键值(key)对数据进行分区(partition)而不是随机乱分,因而可以保存同使用者的关联关系(例如,按照用户id对数据使用进行 分区)。这种分法叫做“语义分区”(semantic partitioning),下文再讨论其细节。

    下面讲解基于zookeeper的负载均衡的工作原理。在发生下列事件时要对zookeeper的监视器(watcher)进行注册:

    • 加入了新的代理

    • 有一个代理下线了

    • 注册了新的话题

    • 代理注册了已有话题。

    生产者在其内部为每一个代理维护了一个弹性的连接(同代理 建立的连接)池。通过使用zookeeper监视器的回调函数(callback),该连接池在建立/保持同所有在线代理的连接时都要进行更新。当生产者 要求进入某特定话题时,由分区者(partitioner)选择一个代理分区(参加语义分区小结)。从连接池中找出可用的生产者连接,并通过它将数据发送 到刚才所选的代理分区。


    异步发送

    对于可伸缩的消息系统而言,异步非阻塞式操作是不可或缺的。在Kafka中,生产者有个选项(producer.type=async)可用指定使用异步 分发出产请求(produce request)。这样就允许用一个内存队列(in-memory queue)把生产请求放入缓冲区,然后再以某个时间间隔或者事先配置好的批量大小将数据批量发送出去。因为一般来说数据会从一组以不同的数据速度生产数 据的异构的机器中发布出,所以对于代理而言,这种异步缓冲的方式有助于产生均匀一致的流量,因而会有更佳的网络利用率和更高的吞吐量。


    语义分区

    下面看看一个想要为每个成员统计一个个人空间访客总数的程序该怎么做。应该把一个成员的所有个人空间访问事件发送给某特定分区,因此就可以把对一个成员的 所有更新都放在同一个使用者线程中的同一个事件流中。生产者具有从语义上将消息映射到有效的Kafka节点和分区之上的能力。这样就可以用一个语义分区函 数将消息流按照消息中的某个键值进行分区,并将不同分区发送给各自相应的代理。通过实现kafak.producer.Partitioner接口,可以 对分区函数进行定制。在缺省情况下使用的是随即分区函数。上例中,那个键值应该是member_id,分区函数可以是 hash(member_id)%num_partitions。


    对Hadoop以及其它批量数据装载的支持

    具有伸缩性的持久化方案使得Kafka可支持批量数据装载,能够周期性将快照数据载入进行批量处理的离线系统。我们利用这个功能将数据载入我们的数据仓库(data warehouse)和Hadoop集群。

    批量处理始于数据载入阶段,然后进入非循环图(acyclic graph)处理过程以及输出阶段(支持情况在这里)。支持这种处理模型的一个重要特性是,要有重新装载从某个时间点开始的数据的能力(以防处理中有任何错误发生)。

    对于Hadoop,我们通过在单个的map任务之上分割装载任务对数据的装载进行了并行化处理,分割时,所有节点/话题/分区的每种组合都要分出一个来。Hadoop提供了任务管理,失败的任务可以重头再来,不存在数据被重复的危险。


    实施细则

    下面给出了一些在上一节所描述的低层相关的实现系统的某些部分的细节的简要说明。

    API 设计

    生产者 APIs

    生产者 API 是给两个底层生产者的再封装 -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.

    [java] view plaincopy

    1. class Producer {  

    2.       

    3.   /* Sends the data, partitioned by key to the topic using either the */  

    4.   /* synchronous or the asynchronous producer */  

    5.   public void send(kafka.javaapi.producer.ProducerData producerData);  

    6.   

    7.   /* Sends a list of data, partitioned by key to the topic using either */  

    8.   /* the synchronous or the asynchronous producer */  

    9.   public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);  

    10.   

    11.   /* Closes the producer and cleans up */     

    12.   public void close();  

    13.   

    14. }   


    该API的目的是将生产者的所有功能通过一个单个的API公开给其使用者(client)。新建的生产者可以:

    • 对多个生产者请求进行排队/缓冲并异步发送批量数据 —— kafka.producer.Producer提供了在将多个生产请求序列化并发送给适当的Kafka代理分区之前,对这些生产请求进行批量处理的能力 (producer.type=async)。批量的大小可以通过一些配置参数进行控制。当事件进入队列时会先放入队列进行缓冲,直到时间到了 queue.time或者批量大小到达batch.size为止,后台线程 (kafka.producer.async.ProducerSendThread)会将这批数据从队列中取出,交给 kafka.producer.EventHandler进行序列化并发送给适当的kafka代理分区。通过event.handler这个配置参数,可 以在系统中插入一个自定义的事件处理器。在该生产者队列管道中的各个不同阶段,为了插入自定义的日志/跟踪代码或者自定义的监视逻辑,如能注入回调函数会 非常有用。通过实现kafka.producer.asyn.CallbackHandler接口并将配置参数callback.handler设置为实 现类就能够实现注入。

    • 使用用户指定的Encoder处理数据的序列化(serialization)

      1 interface Encoder<T> {
      2   public Message toMessage(T data);
      3 }

      Encoder的缺省值是一个什么活都不干的kafka.serializer.DefaultEncoder。

    • 提 供基于zookeeper的代理自动发现功能 —— 通过使用zk.connect配置参数指定zookeeper的连接url,就能够使用基于zookeeper的代理发现和负载均衡功能。在有些应用场 合,可能不太适合于依赖zookeeper。在这种情况下,生产者可以从broker.list这个配置参数中获得一个代理的静态列表,每个生产请求会被 随即的分配给各代理分区。如果相应的代理宕机,那么生产请求就会失败。

    • 通过使用一个可选性的、由用户指定的Partitioner,提供由软件实现的负载均衡功能 —— 数据发送路径选择决策受kafka.producer.Partitioner的影响。

      1 interface Partitioner<T> {
      2    int partition(T key, int numPartitions);
      3 }

      分区API根据相关的键值以及系统中具有的代理分区的数量返回一个分区id。将该id用作索引,在broker_id和partition组成的经过排序 的列表中为相应的生产者请求找出一个代理分区。缺省的分区策略是hash(key)%numPartitions。如果key为null,那就进行随机选 择。使用partitioner.class这个配置参数也可以插入自定义的分区策略。


    使用者API

    我们有两个层次的使用者API。底层比较简单的API维护了一个同单个代理建立的连接,完全同发送给服务器的网络请求相吻合。该API完全是无状态的,每个请求都带有一个偏移量作为参数,从而允许用户以自己选择的任意方式维护该元数据。

    高层API对使用者隐藏了代理的具体细节,让使用者可运行于集群中的机器之上而无需关心底层的拓扑结构。它还维护着数据使用的状态。高层API还提供了订阅同一个过滤表达式(例如,白名单或黑名单的正则表达式)相匹配的多个话题的能力。


    底层API

    class SimpleConsumer {
    	
      /* Send fetch request to a broker and get back a set of messages. */ 
      public ByteBufferMessageSet fetch(FetchRequest request);
    
      /* Send a list of fetch requests to a broker and get back a response set. */ 
      public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    
      /**
       * Get a list of valid offsets (up to maxSize) before the given time.
       * The result is a list of offsets, in descending order.
       * @param time: time in millisecs,
       *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
       *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
       */
      public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    }

    底层API不但用于实现高层API,而且还直接用于我们的离线使用者(比如Hadoop这个使用者),这些使用者还对状态的维护有比较特定的需求。

    高层API

    /* create a connection to the cluster */ 
    ConsumerConnector connector = Consumer.create(consumerConfig);
    
    interface ConsumerConnector {
    	
      /**
       * This method is used to get a list of KafkaStreams, which are iterators over
       * MessageAndMetadata objects from which you can obtain messages and their
       * associated metadata (currently only topic).
       *  Input: a map of <topic, #streams>
       *  Output: a map of <topic, list of message streams>
       */
      public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 
    
      /**
       * You can also obtain a list of KafkaStreams, that iterate over messages
       * from topics that match a TopicFilter. (A TopicFilter encapsulates a
       * whitelist or a blacklist which is a standard Java regex.)
       */
      public List<KafkaStream> createMessageStreamsByFilter(
          TopicFilter topicFilter, int numStreams);
    
      /* Commit the offsets of all messages consumed so far. */
      public commitOffsets()
      
      /* Shut down the connector */
      public shutdown()
    }

    该API的中心是一个由KafkaStream这个类实现的迭代器(iterator)。每个KafkaStream都代表着一个从一个或多个分区到一个 或多个服务器的消息流。每个流都是使用单个线程进行处理的,所以,该API的使用者在该API的创建调用中可以提供所需的任意个数的流。这样,一个流可能 会代表多个服务器分区的合并(同处理线程的数目相同),但每个分区只会把数据发送给一个流中。

    createMessageStreams方法为使用者注册到相应的话题之上,这将导致需要对使用者/代理的分配情况进行重新平衡。为了将重新平衡操作减 少到最小。该API鼓励在一次调用中就创建多个话题流。createMessageStreamsByFilter方法为发现同其过滤条件想匹配的话题 (额外地)注册了多个监视器(watchers)。应该注意,createMessageStreamsByFilter方法所返回的每个流都可能会对多 个话题进行迭代(比如,在满足过滤条件的话题有多个的情况下)。


    网络层

    网络层就是一个特别直截了当的NIO服务器,在此就不进行过于细致的讨论了。sendfile是通过给MessageSet接口添加了一个writeTo 方法实现的。这样就可以让基于文件的消息更加高效地利用transferTo实现,而不是使用线程内缓冲区读写方式。线程模型用的是一个单个的接收器 (acceptor)线程和每个可以处理固定数量网络连接的N个处理器线程。这种设计方案在别处已经经过了非常彻底的检验,发现其实现起来简单、运行起来很快。其中使用的协议一直都非常简单,将来还可以用其它语言实现其客户端。


    消息

    消息由一个固定大小的消息头和一个变长不透明字节数字的有效载荷构成(opaque byte array payload)。消息头包含格式的版本信息和一个用于探测出坏数据和不完整数据的CRC32校验。让有效载荷保持不透明是个非常正确的决策:在用于序列 化的代码库方面现在正在取得非常大的进展,任何特定的选择都不可能适用于所有的使用情况。都不用说,在Kafka的某特定应用中很有可能在它的使用中需要 采用某种特殊的序列化类型。MessageSet接口就是一个使用特殊的方法对NIOChannel进行大宗数据读写(bulk reading and writing to an NIOChannel)的消息迭代器。


    消息的格式

    /** 
    	 * A message. The format of an N byte message is the following: 
    	 * 
    	 * If magic byte is 0 
    	 * 
    	 * 1. 1 byte "magic" identifier to allow format changes 
    	 * 
    	 * 2. 4 byte CRC32 of the payload 
    	 * 
    	 * 3. N - 5 byte payload 
    	 * 
    	 * If magic byte is 1 
    	 * 
    	 * 1. 1 byte "magic" identifier to allow format changes 
    	 * 
    	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
    	 * 
    	 * 3. 4 byte CRC32 of the payload 
    	 * 
    	 * 4. N - 6 byte payload 
    	 * 
    	 */

    日志

    具有两个分区的、名称为"my_topic"的话题的日志由两个目录组成(即:my_topic_0和my_topic_1),目录中存储的是内容为该话 题的消息的数据文件。日志的文件格式是一系列的“日志项”;每条日志项包含一个表示消息长度的4字节整数N,其后接着保存的是N字节的消息。每条消息用一 个64位的整数偏移量进行唯一性标示,该偏移量表示了该消息在那个分区中的那个话题下发送的所有消息组成的消息流中所处的字节位置。每条消息在磁盘上的格 式如下文所示。每个日志文件的以它所包含的第一条消息的偏移量来命名。因此,第一个创建出来的文件的名字将为00000000000.kafka,随后每 个后加的文件的名字将是前一个文件的文件名大约再加S个字节所得的整数,其中,S是配置文件中指定的最大日志文件的大小。


    消息的确切的二进制格式都有版本,它保持为一个标准的接口,让消息集可以根据需要在生产者、代理、和使用者直接进行自由传输而无须重新拷贝或转换。其格式如下所示:

    On-disk format of a message
    
    message length : 4 bytes (value: 1+4+n) 
    "magic" value  : 1 byte
    crc            : 4 bytes
    payload        : n bytes

    将消息的偏移量作为消息的可不常见。我们原先的想法是使用由生产者产生的GUID作为消息id,然后在每个代理上作一个从GUID到偏移量的映射。但是, 既然使用者必须为每个服务器维护一个ID,那么GUID所具有的全局唯一性就失去了价值。更有甚者,维护将从一个随机数到偏移量的映射关系带来的复杂性, 使得我们必须使用一种重量级的索引结构,而且这种结构还必须与磁盘保持同步,这样我们还就必须使用一种完全持久化的、需随机访问的数据结构。如此一来,为 了简化查询结构,我们就决定使用一个简单的依分区的原子计数器(atomic counter),这个计数器可以同分区id以及节点id结合起来唯一的指定一条消息;这种方法使得查询结构简化不少,尽管每次在处理使用者请求时仍有可 能会涉及多次磁盘寻道操作。然而,一旦我们决定使用计数器,跳向直接使用偏移量作为id就非常自然了,毕竟两者都是分区内具有唯一性的、单调增加的整数。 既然偏移量是在使用者API中并不会体现出来,所以这个决策最终还是属于一个实现细节,进而我们就选择了这种更加高效的方式。



    写操作

    日志可以顺序添加,添加的内容总是保存到最后一个文件。当大小超过配置中指定的大小(比如说1G)后,该文件就会换成另外一个新文件。有关日志的配置参数 有两个,一个是M,用于指出写入多少条消息之后就要强制OS将文件刷新到磁盘;另一个是S,用来指定过多少秒就要强制进行一次刷新。这样就可以保证一旦发 生系统崩溃,最多会有M条消息丢失,或者最长会有S秒的数据丢失,


    读操作

    可以通过给出消息的64位逻辑偏移量和S字节的数据块最大的字节数对日志文件进行读取。读取操作返回的是这S个字节中包含的消息的迭代器。S应该要比最长 的单条消息的字节数大,但在出现特别长的消息情况下,可以重复进行多次读取,每次的缓冲区大小都加倍,直到能成功读取出这样长的一条消息。也可以指定一个 最大的消息和缓冲区大小并让服务器拒绝接收比这个大小大一些的消息,这样也能给客户端一个能够读取一条完整消息所需缓冲区的大小的上限。很有可能会出现读 取缓冲区以一个不完整的消息结尾的情况,这个情况用大小界定(size delimiting)很容易就能探知。


    从某偏移量开始进行日志读取的实际过程需要先找出存储所需数据的日志段文件,从全局偏移量计算出文件内偏移量,然后再从该文件偏移量处开始读取。搜索过程通过对每个文件保存在内存中的范围值进行一种变化后的二分查找完成。

    日志提供了获取最新写入的消息的功能,从而允许从“当下”开始消息订阅。这个功能在使用者在SLA规定的天数内没能正常使用数据的情况下也很有用。当使用 者企图从一个并不存在的偏移量开始使用数据时就会出现这种情况,此时使用者会得到一个OutOfRangeException异常,它可以根据具体的使用 情况对自己进行重启或者仅仅失败而退出。


    以下是发送给数据使用者(consumer)的结果的格式。

    MessageSetSend (fetch result)
    
    total length     : 4 bytes
    error code       : 2 bytes
    message 1        : x bytes
    ...
    message n        : x bytes
    MultiMessageSetSend (multiFetch result)
    
    total length       : 4 bytes
    error code         : 2 bytes
    messageSetSend 1
    ...
    messageSetSend n

    删除

    一次只能删除一个日志段的数据。 日志管理器允许通过可加载的删除策略设定删除的文件。 当前策略删除修改事件超过N 天以上的文件,也可以选择保留最后 N GB 的数据。 为了避免删除时的读取锁定冲突,我们可以使用副本写入模式,以便在进行删除的同时对日志段的一个不变的静态快照进行二进制搜索。


    数据正确性保证

    日志功能里有一个配置参数M,可对在强制进行磁盘刷新之前可写入的消息的最大条目数进行控制。在系统启动时会运行一个日志恢复过程,对最新的日志段内所有消息进行迭代,以对每条消息项的有效性进行验证。一条消息项是合法的,仅当其大小加偏移量小于文件的大小并且该消息中有效载荷的CRC32值同该消息中存储的CRC值相等。在探测出有数据损坏的情况下,就要将文件按照最后一个有效的偏移量进行截断。

    要注意,这里有两种必需处理的数据损坏情况:由于系统崩溃造成的未被正常写入的数据块(block)因而需要截断的情况以及由于文件中被加入了毫无意义的 数据块而造成的数据损坏情况。造成数据损坏的原因是,一般来说OS并不能保证文件索引节点(inode)和实际数据块这两者的写入顺序,因此,除了可能会 丢失未刷新的已写入数据之外,在索引节点已经用新的文件大小更新了但在将数据块写入磁盘块之前发生了系统崩溃的情况下,文件就可能会获得一些毫无意义的数 据。CRC值就是用于这种极端情况,避免由此造成整个日志文件的损坏(尽管未得到保存的消息当然是真的找不回来了)。


    分发

    Zookeeper目录

    接下来讨论zookeeper用于在使用者和代理直接进行协调的结构和算法。

    记法

    当一个路径中的元素是用[xyz]这种形式表示的时,其意思是, xyz的值并不固定而且实际上xyz的每种可能的值都有一个zookpeer z节点(znode)。例如,/topics/[topic]表示了一个名为/topics的目录,其中包含的子目录同话题对应,一个话题一个目录并且目 录名即为话题的名称。也可以给出数字范围,例如[0...5],表示的是子目录0、1、2、3、4。箭头->用于给出z节点的内容。例如 /hello -> world表示的是一个名称为/hello的z节点,包含的值为"world"。


    代理节点的注册

    /brokers/ids/[0...N] --> host:port (ephemeral node)

    上面是所有出现的代理节点的列表,列表中每一项都提供了一个具有唯一性的逻辑代理id,用于让使用者能够识别代理的身份(这个必须在配置中给出)。在启动 时,代理节点就要用/brokers/ids下列出的逻辑代理id创建一个z节点,并在自己注册到系统中。使用逻辑代理id的目的是,可以让我们在不影响 数据使用者的情况下就能把一个代理搬到另一台不同的物理机器上。试图用已在使用中的代理id(比如说,两个服务器配置成了同一个代理id)进行注册会导致 发生错误。

    因为代理是以非长久性z节点的方式注册的,所以这个注册过程是动态的,当代理关闭或宕机后注册信息就会消失(至此要数据使用者,该代理不再有效)。

    代理话题的注册

    /brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

    每个代理会都要注册在某话题之下,注册后它会维护并保存该话题的分区总数。


    使用者和使用者小组

    为了对数据的使用进行负载均衡并记录使用者使用的每个代理上的每个分区上的偏移量,所有话题的使用者都要在Zookeeper中进行注册。

    多个使用者可以组成一个小组共同使用一个单个的话题。同一小组内的每个使用者共享同一个给定的group_id。比如说,如果某个使用者负责用三台机器进行某某处理过程,你就可以为这组使用者分配一个叫做“某某”的id。这个小组id是在使用者的配置文件中指定的,并且这就是你告诉使用者它到底属于哪个组的方法。

    小组内的使用者要尽量公正地划分出分区,每个分区仅为小组内的一个使用者所使用。


    使用者ID的注册

    除了小组内的所有使用者都要共享一个group_id之外,每个使用者为了要同其它使用者区别开来,还要有一个非永久性的、具有唯一性的consumer_id(采用hostname:uuid的形式)。 consumer_id要在以下的目录中进行注册。

    /consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

    小组内的每个使用者都要在它所属的小组中进行注册并采用consumer_id创建一个z节点。z节点的值包含了一个<topic, #streams>的map。 consumer_id只是用来识别小组内活跃的每个使用者。使用者建立的z节点是个临时性的节点,因此如果这个使用者进程终止了,注册信息也将随之消失。


    数据使用者偏移追踪

    数据使用者跟踪他们在每个分区中耗用的最大偏移量。这个值被存储在一个Zookeeper(分布式协调系统)目录中。

    /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

    分区拥有者注册表

    每个代理分区都被分配给了指定使用者小组中的单个数据使用者。数据使用者必须在耗用给定分区前确立对其的所有权。要确立其所有权,数据使用者需要将其 id 写入到特定代理分区中的一个临时节点(ephemeral node)中。

    /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)


    代理节点的注册

    代理节点之间基本上都是相互独立的,因此它们只需要发布它们拥有的信息。当有新的代理加入进来时,它会将自己注册到代理节点注册目录中,写下它的主机名和 端口。代理还要将已有话题的列表和它们的逻辑分区注册到代理话题注册表中。在代理上生成新话题时,需要动态的对话题进行注册。

    使用者注册算法

    当使用者启动时,它要做以下这些事情:

    1. 将自己注册到它属小组下的使用者id注册表。

    2. 注册一个监视使用者id列的表变化情况(有新的使用者加入或者任何现有使用者的离开)的变化监视器。(每个变化都会触发一次对发生变化的使用者所属的小组内的所有使用者进行负载均衡。)

    3. 主次一个监视代理id注册表的变化情况(有新的代理加入或者任何现有的代理的离开)的变化监视器。(每个变化都会触发一次对所有小组内的所有使用者负载均衡。)

    4. 如果使用者使用某话题过滤器创建了一个消息流,它还要注册一个监视代理话题变化情况(添加了新话题)的变化监视器。(每个变化都会触发一次对所有可用话题的评估,以找出话题过滤器过滤出哪些话题。新过滤出来的话题将触发一次对该使用者所在的小组内所有的使用者负载均衡。)

    5. 迫使自己在小组内进行重新负载均衡。


    使用者重新负载均衡的算法

    使用者重新复杂均衡的算法可用让小组内的所有使用者对哪个使用者使用哪些分区达成一致意见。使用者重新负载均衡的动作每次添加或移除代理以及同一小组内的 使用者时被触发。对于一个给定的话题和一个给定的使用者小组,代理分区是在小组内的所有使用者中进行平均划分的。一个分区总是由一个单个的使用者使用。这 种设计方案简化了实施过程。假设我们运行多个使用者以并发的方式同时使用同一个分区,那么在该分区上就会形成争用(contention)的情况,这样一来就需要某种形式的锁定机制。如果使用者的个数比分区多,就会出现有写使用者根本得不到数据的情况。在重新进行负载均衡的过程中,我们按照尽量减少每个使用者需要连接的代理的个数的方式,尝尝试着将分区分配给使用者。

    每个使用者在重新进行负载均衡时需要做下列的事情:

       1. 针对Ci所订阅的每个话题T
       2.   将PT设为生产话题T的所有分区
       3.   将CG设为小组内同Ci 一样使用话题T的所有使用者
       4.   对PT进行排序(让同一个代理上的各分区挨在一起)
       5.   对CG进行排序 
       6.   将i设为Ci在CG中的索引值并让N = size(PT)/size(CG)
       7.   将从i*N到(i+1)*N - 1的分区分配给使用者Ci 
       8.   将Ci当前所拥有的分区从分区拥有者注册表中删除
       9.   将新分配的分区加入到分区拥有者注册表中
            (我们可能需要多次尝试才能让原先的分区拥有者释放其拥有权)

    在触发了一个使用者要重新进行负载均衡时,同一小组内的其它使用者也会几乎在同时被触发重新进行负载均衡。


    转载于:https://my.oschina.net/ppz168/blog/281889

    展开全文
  • 发布订阅消息系统--kafka的解析。

    千次阅读 2018-09-26 14:15:39
    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐...
  • 如何发布Web项目到互联网

    万次阅读 2017-02-20 18:56:23
    比如我们有个项目想要发布到互联网上,我们首先需要购买域名以及主机,主机的话,推荐云主机(本人推荐西部数码或者阿里云),性能好; 我们先在云主机上搭建环境,比如Mysql,Jdk,Tomcat; 然后我们把自己...
  • 目录 一、服务注册中心:注册中心核心功能+实现策略 ... ... ... 三、服务发现与调用 ...服务治理在面临系统存在大量服务时可以解决基本的三大定位问题:提升服务架构的可扩展性;...对服务的有效划分和路由...
  • Redis发布订阅和应用场景

    万次阅读 2018-12-12 12:11:10
    发布订阅-应用场景 Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的...
  • 灰度发布:灰度很简单,发布很复杂

    万次阅读 多人点赞 2018-06-02 20:53:38
    什么是灰度发布,其要点有哪些?最近跟几个聊的来的同行来了一次说聚就聚的晚餐,聊了一下最近的工作情况如何以及未来规划等等,酒足饭饱后我们聊了一个话题“灰度发布”。因为笔者所负责的产品还没有达到他们产品...
  • Qt 程序打包发布总结

    万次阅读 多人点赞 2016-01-05 15:24:10
    1. 概述  当我们用QT写好了一个软件,要把你的...QT开发的程序发布的时候经常采用两种方式: l 静态编译,可生成单一的可执行文件。 l 动态编译,需同时附上需要的dll文件。 2. 发布准备 不管采用哪种方式,首
  • zuul灰度发布功能实现

    万次阅读 2018-01-22 16:59:50
    灰度发布、蓝绿发布、金丝雀发布各是...我们要发布版本了,在不确定正确性的情况下,我们选择先部分节点升级,然后让一些特定的流量进入到这些新节点,完成测试后再全量发布。我们知道,在eureka中注册各个服务后,如果
  • 很多朋友都认为微信小程序申请、部署、发布很难,需要很长时间。 实际上,微信和腾讯云同是腾讯产品,已经提供了10分钟(根据准备资源情况,已完成小程序申请认证)完成小程序开发、部署、发布的方式。当然,实现的...
  • ActiveMQ之发布- 订阅消息模式实现

    万次阅读 2020-05-02 10:50:24
    发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费...
  • 最近,在学习QT5的过程中,想尝试着把自己写的工程程序给打包发布出来,在任何一台windows系统都能运行,这样就不会限于电脑需不需要安装QT安装包了。 首先,先介绍自己使用的环境。我使用的QT版本是。我的电脑...
  • 文章目录全量发布灰度发布 app版本发布,就是app有新的版本发布,需要给用户安装升级使用。 按照app发布的手段来说,大致可以分为两大类:直接全量发布、先灰度发布再全量发布。 全量发布 顾名思义,全量发布就是一...
  • 如何打包和发布Python程序

    千次阅读 2019-05-04 23:54:14
    文章目录如何打包和发布Python程序打包编写setup.py文件编译发布TestPYPI发布PYPI发布附录新书推荐 如何打包和发布Python程序 在使用Python的过程中,我们经常需要做的一件事情就是通过pip来安装第三方的包。那么你...
  • 什么是灰度发布?

    万次阅读 多人点赞 2019-06-05 18:00:25
    # 什么是灰度发布,以及灰度发布A/B测试 在一般情况下,升级服务器端应用,需要将应用源码或程序包上传到服务器,然后停止掉老版本服务,再启动新版本。但是这种简单的发布方式存在两个问题,一方面,在新版本升级...
  • 什么是灰度发布,灰度测试。

    万次阅读 2018-01-10 09:22:09
    什么是灰度发布? 灰度发布,又名金丝雀发布,或者灰度测试,是指在黑与白之间能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B...
  • 很多朋友都认为微信小程序申请、部署、发布很难,需要很长时间。 实际上,微信和腾讯云同是腾讯产品,已经提供了10分钟(根据准备资源情况,已完成小程序申请认证)完成小程序开发、部署、发布的方式。当然,实现的...
  • VS2017发布项目

    千次阅读 2019-04-30 09:51:26
    VS2017发布WEB项目 网站发布 1、右键选择项目 2、点击发布,出现如下页面 3、点击新建配置文件,进入页面之后选择IIS、FTP等 4、选择创建配置文件。进入下方页面,发布方法选择文件系统。选择文件系统之后选择你...
1 2 3 4 5 ... 20
收藏数 2,048,568
精华内容 819,427
关键字:

发布