2019-01-16 21:02:58 weixin_39702831 阅读数 152
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2466 人正在学习 去看看 肖滨

Kafka简介

首先,我们还是先从官网入手吧:kafka.apache.org
那么Kafka是什么呢?官网上面是说:Apache Kafka® is a distributed streaming platform 这个是改版后的介绍了,以前就是一个消息中间件。
那么Kafka有什么用呢?

  1. 发布与订阅(就是读写数据)
  2. 处理数据(当数据读取到Kafka里面后现在可以在Kafka里面直接对一些数据进行处理了)
  3. 存储,因为Flume也是可以将数据从一端传送到另外一端去的,但是Flume的数据一般是走memory-channel的,我们很少走磁盘type的。当读写数据的数据出现严重差异的时候就需要有一个地方来缓存一下数据了,而Kafka就很好的胜任这一个任务了。
    其次,Kafka的缓存数据不是在内存里面的,而是落盘到Linux的文件里面,这里就引出了一个概念就是partition,每一个partition对应一个Linux的文件夹,里面以多个文件来存储我们的流式数据
    接着,Kafka是使用一个叫做topic的概念来区分不同业务的数据的。假如现在我们有3台机器部署了Kafka,我们分别从订单系统和货物系统上接收数据,这些数据都是经过我们Kafka所部署的3台机器,那么我们怎么区分不同的数据呢?我们在创建一个不同的topic来代表不同的数据流生产线。
    订单系统发送的数据路线:omn–>Flume–>Kafka–>SparkStreaming.1
    wms–>Flume–>Kafka–>SparkStreaming.2
    这样不同topic的数据之间就是相互独立的,大家不会拿错数据了。在生产当中我们可以创建多个topic来构建不同的业务线来获取数据,而不同的业务线也可以公用这些Kafka机器了。

Kafka核心概念

名词 解释
producer 生产者,就是数据的发布者,也就是将数据写入到Kafka里面的源
consumer 消费者,就是数据的订阅者,也就是将Kafka里面的数据读取出去的目标
topic 不同数据的区分,不同的topic的数据将相互独立,所以我们创建一条数据线时都要定义唯一的topic
partition 分区,当数据缓存在Kafka里面时我们可以为一个topic构建多个partition,如果partition越多,数据写入和读取的并行度越高,速度越快。但多个partition的时候,数据就不在是有序的了,相同partition里面的数据是有序的,不同的时候就无序了
broker 缓存代理,Kafka集群中的一台或多台服务器统称broker,所以当producer要传入数据时要指定好broker-list
consumer group 消费者组,里面有多个消费者,相同的partition不能由相同消费者组里面的同一个消费者消费

ConsumerGroup:

  1. 一个组内的消费者共享一个GroupID
  2. 组内的所有消费者协调一起消费topic里面的所有partition里面到数据
  3. 每个分区只能被一个组内的一个消费者消费
  4. 当一个组内的一个消费者挂了,还能有其他消费者顶上,体现容错性
    请记清楚这副图:
    在这里插入图片描述

Partition

假设我们现在只用一个topic名为test,而这个topic下面了有3个partition,这个每次生产者生产的数据都会按照一个规则或者随机地落入到一个partition里面(这里的落入其实是追加,就是数据会有序地从后面加入到一个partition里面,所以说每个partition里面的数据都是有序地,按时间来排序,但是多个partition里面的数据就是无序的了)
那么在实际中如何体现一个partition的存在呢?一般一个partition对应一个文件夹,如果你为你这个topic创建了3个partition,那么就会产生3个文件夹,分别为:test-0,test-1,test-2。然后我们的数据就存储在这个3个文件夹里面的文件了。
每个partition里面存储数据的模式都是:00000000000xxxx.index和 00000000000xxxx.log。
这里,我们要引入另外一个概念offset(偏移量),而这个offset又分为:相对偏移量和绝对偏移量
假设我们一个partition里面存储了6000个数据,那么假设这个partition里面有3个文件

00000000000000.index
00000000000000.log
00000000002000.index
00000000002000.log
00000000004000.index
00000000004000.log

每一个数据都有自己唯一的偏移量,这里有6000个,那么第一个数据就是0001,第二个就是0002…最后一个为6000。所以每一个文件的名称就是前一个文件的里面最后一条数据的偏移量。所以第二个文件的名称就是00000000002000.index和
00000000002000.log,而2000这个数据是存储在00000000000000.log里面的。
由于数据量较多,我们要查找一个数据的时候不可能遍历一遍它所在的文件,这个时候我们就需要一个存储索引的文件来协助系统快速查找了。
那么这个00000000000xxxx.index文件是怎么存储索引的呢,因为每一条数据在一个partition里面有唯一的偏移量,但是不同的数据有落在了不同的文件里面,那么每一个文件里面也有一个相对偏移量和字节偏移量来确定这条数据在这个文件里面的位置。
现在我们要寻找绝对偏移量为2800的数据,按照2分法来查找,我们可以确定这条数据是在00000000002000.log这个文件里面的,然后我们用2800-2000=800,我们知道这条数据在00000000002000.log里面是第800条数据(800是相对偏移量),然后我们在00000000002000.index这个文件里面查找是否有800这个偏移量序号(因为00000000002000.index是将00000000002000.log里面的数据的相对偏移量以稀松方式存储的,所以其并不是将所有相对偏移量都存储下来的),如果没有的话就找其最近的小于它的相对偏移量出来,得到这条数据在00000000002000.log里面的字节偏移量,快速定位到那里,从那个位置开始遍历后面的数据来查找2800偏移量的这条数据。
所以00000000000xxxx.index里面时候以稀松法存储一些数据在00000000000xxxx.log里面的相对偏移量和字节偏移量的。

Kafka基础概念

消费语义

  1. at most once 最多一次消费,数据要么及时消费掉,那么就丢掉了
  2. at least once 至少消费一次,可以重复消费
  3. exactly once 全部数据都消费,没有丢掉且不重复消费 需要外部存储offset,如:Zk,Hbase,Redis等

生产有序

单分区有序,全局无序
数据在每一个partition里面都是按生产时间来排序的,消费也将是按照这个顺序来消费的,就是先到先出。但是当多个partitions的时候,不同partition里面的数据将没有顺序性。所以一些公司需要数据有序的话,就只能单partition了。
全局有序
如果想全局有序,就要按照数据的特性来选取不同的partition来存储了。一般的做法是将数据构建成key-value对,其中key为其特征,value就是这个数据的真实值。然后对key取模来选取对应的partition存储数据了,这个需要改Kafka的源码了,这里不多讲了。

调参与监控

Kafka的参数是在http://kafka.apache.org/documentation/#configuration网址下的,小伙伴们可以自行去查看。而监控的话,我们生产上一般是从CDH上面监控的,只要生产者和消费者的趋势一致就说明消费没有延时,具体的我们到CDH部分来细说。

2016-09-27 11:43:51 z1093776496 阅读数 843
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2466 人正在学习 去看看 肖滨
传输超过大数据时所需的配置项:

  • Topic配置
          创建Topic后,在console端使用kafka-topics命令进行配置:

kafka-topics --alter --zookeeper localhost:2181/kafka --topic mytopic --config max.message.bytes=23000013
  • Producer端配置 
props.set(“max.request.size”,“130004070”)
  • Consumer端配置
props.set(“fetch.message.max.bytes”,“23000013”) 


2016-08-21 18:08:42 xiaoyu_BD 阅读数 11154
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2466 人正在学习 去看看 肖滨

我们用Flume采集日志信息后,数据进入Kafka。在Kafka之后的选择我们有很多,最典型的就是Flume-ng------Kafka------Storm。当然也可以是MongoElasticsearchHbase等等。不过不管去哪,Kafka的数据默认只存7天(可以配置),7天之后就会自动删除了,所以说数据持久化的问题就来了,数据持久化最好的选择就是进入Hadoop。所以本文介绍如何将Kafka中的数据持久化到hdfs

我这里只是提供一个思路,而且这个思路跟官网并不一致,由于我们的Kafka数据会同时进Mongo、进Elasticsearch,所以进hdfs并不需要实时同步,只是做个永久保存。所有的一切都源于需求,我们都是根据需求去搞事情。先介绍一下写作背景,本人是在北京某公司给某银行做一个大数据有关的项目,基本上银行内部数据量比较大的信息都存在我们平台,目前是300多台服务器,预计今年会扩容到500台左右。目前正处于项目三期开发阶段,引入了Kafka技术。

我做的是定期把每台Broker上的log文件同步到hdfs上,我使用Linux自带的cron服务实现定时执行一个脚本,通过脚本把log文件同步到hdfs。我们定义的log文件满1G就会重新生成一个文件,所以我们只要把满1G大小的log传到hdfs即可。传上去的时候统一放到制定topic下的指定分区,便于日后恢复数据。这里还有一个小问题就是hdfs是不支持文件的续写和替换的,所以对于已存在的文件是不能传到hdfs的,当然这里已存在的文件也没有必要传上去了,所以我们要写一个判断。简单说就是我们只把每天新生成的数据同步到hdfs

下面是我写的脚本,是依赖cron服务定时执行的

#!/bin/sh

a=1

b=1

filelistcount=`cat filelist.txt |wc -l`

filenamecount=`find  /tmp/kafka-logs/ -name "*.log" | grep /0 | awk -F '/' '{print $NF}'|wc -l`

echo $filenamecount

while [ $a -le $filenamecount ]

do

    

    filenamelog=`find /tmp/kafka-logs/ -name "*.log" | awk -F '/' '{print $NF}'|sed -n $a"P" `   

    filenameindex=`echo $filenamelog|awk -F '.' '{print $1}'`.index

    topicpatition=`find /tmp/kafka-logs/ -name "*.log"| awk -F '/' '{print $4}'|sed -n $a"P" `

    filesize=`du -ah /tmp/kafka-logs/$topicpatition/$filenamelog|awk -F ' ' '{print $1}'`  

    echo $filesize $topicpatition/$filenamelog and $filenameindex

    echo $filesize

    if test "$filesize" = "684K"

    then

        while [ $b -le $filelistcount ]

        do

           if test "$topicpatition/$filenamelog" =  "`cat filelist.txt |sed -n $b"p"`"

           then    

               echo "$topicpatition/$filename:File exists"

               break

           else

                let b++

           fi

           

        done

        if test "$b" -gt "$filelistcount"

        then

             hdfs dfs -put /tmp/kafka-logs/$topicpatition/$filenamelog  /test/

             hdfs dfs -put /tmp/kafka-logs/$topicpatition/$filenameindex  /test/

             echo "send hdfs"

             echo $topicpatition/$filenamelog >> filelist.txt

        fi

        b=1

    fi

    let a++

    

done

 

这个脚本我还没有写完,不过大体框架已经很清晰了。后期可以根据需求再进行完善。

2018-08-13 14:20:01 lijie930106 阅读数 434
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2466 人正在学习 去看看 肖滨

大数据时代来临,如果你还不知道Kafka那你就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP10银行,8家TOP10保险公司,9家TOP10电信公司等等。

LinkedIn,Microsoft和Netflix每天都用Kafka处理万亿级的信息。Kafka主要应用于实时信息流的大数据收集或者实时分析(或者两者兼有)。Kafka既可以为内存微服务提供持久性服务,也可以用于向复杂事件流系统和IoT/IFTTT式自动化系统反馈事件。

为什么是Kafka?

Kafka常用于实时流数据结构的实时分析。由于Kafka是一种快速、可扩展、可持久和高容错的发布-订阅消息系统(publish-subscribe messaging system),所以Kafka对于一些Use Case(有大数据量和高响应需求)的支持远好于JMS、RabbitMQ和AMQP。相比于那些工具,Kafka支持更高的吞吐量,更高的稳定性和副本(replication)特性。这使得它比传统的MOM更加适合跟踪服务调用(可以跟踪每次调用)或跟踪IoT传感器数据。

Kafka可以与Flume/Flafka、Spark Streaming、Storm、HBase、Flink以及Spark配合使用,用于实时获取、分析和处理流数据。Kafka可以为Hadoop大数据湖(Hadoop BigData lake)提供数据流。Kafka Broker支持在Hadoop或Spark中低延时地处理和分析海量信息流。此外,Kafka子项目KafkaStreaming可用于实时分析。

什么是Kafka Use Case?

简而言之,Kafka用于流处理、网站活动跟踪、度量收集和监视、日志聚合、实时分析、CEP、将数据注入Spark和Hadoop、CQRS、重放消息、错误恢复以及分布式提交内存计算(微服务)的日志。

谁在使用Kafka?

许多需要快速处理大量数据的大公司都在使用Kafka。Kafka最初是由LinkedIn开发,用它来跟踪活动数据和运营指标。Twitter把它作为Storm的一部分来作为流处理的基础。Square把Kafka当作总线,将所有系统事件(日志,自定义事件,指标等)传输到各个Square数据中心,或者输出到Splunk,或者应用于Graphite(仪表板),或者实现Esper-like/ CEP警报系统。Spotify,Uber,Tumbler,Goldman Sachs,PayPal,Box,Cisco,CloudFlare和Netflix等公司也都在使用它。

为什么Kafka这么流行

首先最主要的原因是Kafka具有极佳的性能表现。它非常稳定,能提供稳定的持久化,具有灵活的订阅-发布消息队列,可与N个消费者群组进行良好扩展,具有强大的复制功能,为生产者提供可调整的一致性保证,并在碎片级别提供保留排序(即Kafka主题分区)。其次,Kafka可以很好地兼容需要数据流处理的系统,并将这些系统融合、转换并加载到其他存储。另外,Kafka操作(配置和使用)都非常简单,而且Kafka的工作原理也很好理解。当然了,如果Kafka处理数据很慢,有再多其他优点都是没有意义的,所以,“多快好省”就是Kafka的最大优势。

为什么Kafka这么快

Kafka基于zero copy原则,深度依靠操作系统内核实现快速移动数据。Kafka能将数据记录分批处理。这些批次数据可以通过端到端的方式从生产者到文件系统(Kafka主题日志)再到消费者。批处理能实现更高效的数据压缩并减少I / O延迟。Kafka将不可变的提交日志写入连续磁盘,从而避免了随机磁盘访问和磁盘寻道速度慢的问题。Kafka支持增加分区进行横向扩展。它将主题日志分成几百个(可能有数千个)分区分布到数千个服务器。这种方式可以让Kafka承载海量负载。

Kafka Streaming

Kafka最常用于将数据实时传输到其他系统。Kafka作为一个中间层来解耦不同的实时数据管道。Kafka核心并不适合入数据聚合(data aggregation)或CEP等的直接计算。Kafka Streaming作为Kafka生态系统的一部分,提供了进行实时分析的能力。Kafka可以为Storm,Flink,Spark Streaming以及你的服务和CEP系统提供快速通道系统(实时操作数据系统)。Kafka也用于流数据批量数据分析。它将数据传输到大数据平台或RDBMS,Cassandra,Spark甚至S3中用于未来的数据分析。这些数据存储通常支持数据分析,报告,数据科学分析,合规性审计和备份。

说了那么多,让我们来讨论一个终极命题:

到底什么是Kafka?

Kafka是一个分布式流平台,用于发布和订阅记录流。Kafka可以用于容错存储。Kafka将主题日志分区复制到多个服务器。Kafka的设计目的是为了让你的应用能在记录生成后立即就能处理。Kafka的处理速度很快,通过批处理和压缩记录有效地使用IO。Kafka会对数据流进行解耦。Kafka用于将数据流到数据湖、应用和实时流分析系统中。

Kafka支持多语言

客户端和服务器之间的Kafka通信使用基于TCP的线路协议,该协议是版本化和文档化的。Kafka承诺保持对老客户端的向后兼容性,并支持多种语言,包括C#,Java,C,Python,Ruby等多种语言。Kafka生态系统还提供REST代理,可通过HTTP和JSON轻松集成。Kafka还通过Kafka的融合模式注册(ConfluentSchema Registry)支持Avro模式。Avro和模式注册允许客户以多种编程语言制作和读取复杂的记录,并允许记录的变化。

Kafka的用途

Kafka支持构建实时流数据管道。Kafka支持内存微服务(比如actors,Akka,Baratine.io,QBit,reactors,reactive,,Vert.x,RxJava,Spring Reactor)。Kafka支持构建实时流应用程序,进行实时数据分析,转换,响应,聚合、加入实时数据流以及执行CEP。

Kafka可以用来协助收集度量标准或KPI,从多个来源收集统计信息并实现eventsourcing(将应用状态的所有更改捕获为事件序列)。可以将它与内存微服务和actor系统一起使用,以实现内中服务(分布式系统的外部提交日志)。

Kafka可以用来在节点之间复制数据,为节点重新同步以及恢复状态。虽然Kafka主要用于实时数据分析和流处理,但也可以将其用于日志聚合,消息传递,跟踪点击流,审计跟踪等等。

Kafka可扩展的消息存储

Kafka是一个很好的记录或信息存储系统。Kafka就像一个提交日志存储和复制的高速文件系统。这些特点使Kafka适用于各种应用场合。写入Kafka主题的记录会持久保存到磁盘并复制到其他服务器以实现容错。由于现在磁盘速度快而且相当大,所以这种方式非常有用。Kafka生产者可以等待确认,所以消息是持久的,因为生产者在复制完成之前不会完成写入操作。Kafka磁盘结构可以很好地扩展。磁盘在大批量流式传输时具有非常高的吞吐量。

此外,Kafka客户端和消费者可以控制读取位置(偏移量),这允许在出现重要错误(即修复错误和重放)时重播日志等用例。而且,由于偏移量是按照每个消费者群体进行跟踪的,所以消费者可以非常灵活地重播日志。

Kafka的记录保留
Kafka集群保留所有公布的记录。如果没有设置限制,它将保留所有记录直到磁盘空间不足。可以设置基于时间的限制(可配置的保留期限),也可以基于空间的限制(可根据存储空间进行配置)或精简(保留最新版本的记录)。除非被时间,空间或精简等策略删除,主题日志中的记录一直处于可用状态。由于Kafka总是在主题日志的末尾写入,所以它的消费速度不会受到大小的影响。

2019-05-22 22:00:00 aa541505 阅读数 1127
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2466 人正在学习 去看看 肖滨

如果你恰好在学大数据,想要通过本篇文章就学好大数据,我建议你可以把页面关闭掉,大数据是入门学容易,达到高薪是绝对需要系统学习的,当然如果你想着通过大数据提高你的收入,可以详细阅读我推荐的文章

推荐阅读文章

大数据工程师在阿里面试流程是什么?

学习大数据需要具备怎么样基础?

年薪30K的大数据开发工程师的工作经验总结?

Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

1.前言
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。

 1.1  Kafka的特性:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写

1.2   Kafka的使用场景:
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
- 事件源

1.3  Kakfa的设计思想
- Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。
这里曾经发生过一个bug,TalkingData使用Kafka0.8.1的时候,kafka controller在Zookeeper上注册成功后,它和Zookeeper通信的timeout时间是6s,也就是如果kafka controller如果有6s中没有和Zookeeper做心跳,那么Zookeeper就认为这个kafka controller已经死了,就会在Zookeeper上把这个临时节点删掉,那么其他Kafka就会认为controller已经没了,就会再次抢着注册临时节点,注册成功的那个kafka broker成为controller,然后,之前的那个kafka controller就需要各种shut down去关闭各种节点和事件的监听。但是当kafka的读写流量都非常巨大的时候,TalkingData的一个bug是,由于网络等原因,kafka controller和Zookeeper有6s中没有通信,于是重新选举出了一个新的kafka controller,但是原来的controller在shut down的时候总是不成功,这个时候producer进来的message由于Kafka集群中存在两个kafka controller而无法落地。导致数据淤积。
这里曾经还有一个bug,TalkingData使用Kafka0.8.1的时候,当ack=0的时候,表示producer发送出去message,只要对应的kafka broker topic partition leader接收到的这条message,producer就返回成功,不管partition leader 是否真的成功把message真正存到kafka。当ack=1的时候,表示producer发送出去message,同步的把message存到对应topic的partition的leader上,然后producer就返回成功,partition leader异步的把message同步到其他partition replica上。当ack=all或-1,表示producer发送出去message,同步的把message存到对应topic的partition的leader和对应的replica上之后,才返回成功。但是如果某个kafka controller 切换的时候,会导致partition leader的切换(老的 kafka controller上面的partition leader会选举到其他的kafka broker上),但是这样就会导致丢数据。
-  Consumergroup:各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。所以如果想同时对一个topic做消费的话,启动多个consumer group就可以了,但是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。它不能像AMQ那样可以多个BET作为consumer去互斥的(for update悲观锁)并发处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许同一个consumer group下的一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。如果想多个不同的业务都需要这个topic的数据,起多个consumer group就好了,大家都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
    当启动一个consumer group去消费一个topic的时候,无论topic里面有多个少个partition,无论我们consumer group里面配置了多少个consumer thread,这个consumer group下面的所有consumer thread一定会消费全部的partition;即便这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费所有的partition。因此,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
    同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不能够一个consumer group的多个consumer同时消费一个partition。
    一个consumer group下,无论有多少个consumer,这个consumer group一定回去把这个topic下所有的partition都消费了。当consumer group里面的consumer数量小于这个topic下的partition数量的时候,如下图groupA,groupB,就会出现一个conusmer thread消费多个partition的情况,总之是这个topic下的partition都会被消费。如果consumer group里面的consumer数量等于这个topic下的partition数量的时候,如下图groupC,此时效率是最高的,每个partition都有一个consumer thread去消费。当consumer group里面的consumer数量大于这个topic下的partition数量的时候,如下图GroupD,就会有一个consumer thread空闲。因此,我们在设定consumer group的时候,只需要指明里面有几个consumer数量即可,无需指定对应的消费partition序号,consumer会自动进行rebalance。
    多个Consumer Group下的consumer可以消费同一条message,但是这种消费也是以o(1)的方式顺序的读取message去消费,,所以一定会重复消费这批message的,不能向AMQ那样多个BET作为consumer消费(对message加锁,消费的时候不能重复消费message)
- Consumer Rebalance的触发条件:(1)Consumer增加或删除会触发 Consumer Group的Rebalance(2)Broker的增加或者减少都会触发 Consumer Rebalance
- Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。所以必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由自己维护。一般来说都是使用high level api的。Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也可以配置成读完消息处理再commit,这种情况下consumer端的响应就会比较慢的,需要等处理完才行。
一般情况下,一定是一个consumer group处理一个topic的message。Best Practice是这个consumer group里面consumer的数量等于topic里面partition的数量,这样效率是最高的,一个consumer thread处理一个partition。如果这个consumer group里面consumer的数量小于topic里面partition的数量,就会有consumer thread同时处理多个partition(这个是kafka自动的机制,我们不用指定),但是总之这个topic里面的所有partition都会被处理到的。。如果这个consumer group里面consumer的数量大于topic里面partition的数量,多出的consumer thread就会闲着啥也不干,剩下的是一个consumer thread处理一个partition,这就造成了资源的浪费,因为一个partition不可能被两个consumer thread去处理。所以我们线上的分布式多个service服务,每个service里面的kafka consumer数量都小于对应的topic的partition数量,但是所有服务的consumer数量只和等于partition的数量,这是因为分布式service服务的所有consumer都来自一个consumer group,如果来自不同的consumer group就会处理重复的message了(同一个consumer group下的consumer不能处理同一个partition,不同的consumer group可以处理同一个topic,那么都是顺序处理message,一定会处理重复的。一般这种情况都是两个不同的业务逻辑,才会启动两个consumer group来处理一个topic)。

在深入学习Kafka之前,需要先了解topics, brokers, producersconsumers等几个主要术语。 下面说明了主要术语的详细描述和组件。

在上图中,主题(topic)被配置为三个分区。 分区1(Partition 1)具有两个偏移因子01。分区2(Partition 2)具有四个偏移因子0,1,23,分区3(Partition 3)具有一个偏移因子0replica 的id与托管它的服务器的id相同。

假设,如果该主题的复制因子设置为3,则Kafka将为每个分区创建3个相同的副本,并将它们放入群集中以使其可用于其所有操作。 为了平衡集群中的负载,每个代理存储一个或多个这些分区。 多个生产者和消费者可以同时发布和检索消息。

  • Topics - 属于特定类别的消息流被称为主题(Topics),数据存储在主题中。主题分为多个分区。 对于每个主题,Kafka都保留一个分区的最小范围。 每个这样的分区都以不可变的有序顺序包含消息。 分区被实现为一组相同大小的段文件。
  • Partition - 主题可能有很多分区,所以它可以处理任意数量的数据。
  • Partition offset - 每个分区消息都有一个称为偏移量的唯一序列标识。
  • Replicas of partition - 副本只是分区的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
  • Brokers

    • 经纪人(Brokers)是简单的系统,负责维护公布的数据。 每个代理可能每个主题有零个或多个分区。 假设,如果一个主题和N个代理中有N个分区,则每个代理将有一个分区。
    • 假设某个主题中有N个分区并且N个代理(n + m)多于N个,则第一个N代理将拥有一个分区,下一个M代理将不会拥有该特定主题的任何分区。
    • 假设某个主题中有N个分区且N个代理(n-m)少于N个代理,则每个代理将拥有一个或多个分区共享。 由于经纪人之间的负载分配不均衡,不推荐这种情况。
  • Kafka Cluster - Kafka拥有多个经纪人称为Kafka集群。 Kafka集群可以在无需停机的情况下进行扩展。 这些集群用于管理消息数据的持久性和复制。

  • Producers - 生产者(Producer)是一个或多个Kafka主题的发布者。 生产者向Kafka经纪人发送数据。 每当生产者向经纪人发布消息时,经纪人只需将消息附加到最后一个段文件。 实际上,该消息将被附加到分区。 生产者也可以将消息发送到他们选择的分区。
  • Consumers - 消费者从经纪人那里读取数据。 消费者通过从经纪人处获取数据来订阅一个或多个主题并消费发布的消息。
  • Leader - Leader是负责所有分区读写的节点。 每个分区都有一台服务器充当领导者。
  • Follower - 遵循领导者(Leader)指示的节点称为追随者(Follower)。 如果领导失败,其中一个追随者将自动成为新领导。 追随者扮演正常的消费者角色,拉动消息并更新自己的数据存储。

Kafka工具包装在org.apache.kafka.tools.*下。 工具分为系统工具和复制工具。

系统工具

系统工具可以使用run class脚本从命令行运行。 语法如下 -

bin/kafka-run-class.sh package.class -- options

Shell

下面提到了一些系统工具 -

  • Kafka迁移工具 - 此工具用于将代理从一个版本迁移到另一个版本。
  • Mirror Maker - 此工具用于将一个Kafka集群镜像到另一个。
  • 消费者偏移量检查器 - 此工具显示指定的一组主题和使用者组的消费者组,主题,分区,偏移量,日志大小,所有者。

复制工具

Kafka复制是一个高层次的设计工具。 添加复制工具的目的是提供更强的耐用性和更高的可用性。 下面提到了一些复制工具 -

  • 创建主题工具 - 这会创建一个包含默认分区数量,复制因子的主题,并使用Kafka的默认方案执行副本分配。

  • 列表主题工具 - 此工具列出给定主题列表的信息。 如果在命令行中没有提供主题,该工具将查询Zookeeper以获取所有主题并列出它们的信息。 该工具显示的字段是主题名称,分区,领导,副本,isr。

  • 添加分区工具 - 创建主题时,必须指定主题的分区数量。 稍后,当话题量增加时,话题可能需要更多的分区。 此工具有助于为特定主题添加更多分区,还可以手动添加分区的副本分配。

  • 推荐阅读文章

    大数据工程师在阿里面试流程是什么?

    学习大数据需要具备怎么样基础?

    年薪30K的大数据开发工程师的工作经验总结?


 

java消费kafka的数据

阅读数 642

Kafka

阅读数 63

没有更多推荐了,返回首页