2018-09-07 21:21:02 Jantelope 阅读数 7537
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2469 人正在学习 去看看 肖滨

简单了解一下Kafka:是一种高吞吐量的分布式发布订阅消息系统。依赖Zookeeper,因此搭建Kafka的时候需要事先搭建好Zookeeper。体系结构如下:

当SparkStreaming与Kafka做集成的时候Kafka成了Streaming的高级数据源,由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程

pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ZDemo5</groupId>
    <artifactId>ZDemo5</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

</project>

启动kafka:bin/kafka-server-start.sh config/server.properties &       ---后台方式启动

创建topic:bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic mydemo2

发布消息:bin/kafka-console-producer.sh --broker-list bigdata111:9092 --topic mydemo2

SparkStreaming读取Kafka数据源由两种模式: 

模式一:Receiver 的方式,Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。

这种方法吞吐量不高

代码实现:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel

object KafkaRecciver {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("hdfs://bigdata111:9000/checkpoint")
    //创建kafka对象   生产者 和消费者 
    //模式1 采取的是 receiver 方式  reciver 每次只能读取一条记录
    val topic = Map("mydemo2" -> 1)
    //直接读取的方式  由于kafka 是分布式消息系统需要依赖Zookeeper
    val data = KafkaUtils.createStream(ssc, "192.168.128.111:2181", "mygroup", topic, StorageLevel.MEMORY_AND_DISK)
    //数据累计计算
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
    //进行数据统计当前值加上之前的值
    var total = curVal.sum
    //最初的值应该是0
    var previous = preVal.getOrElse(0)
    //Some 代表最终的返回值
    Some(total+previous)
  }
   val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print()
   //启动ssc
   ssc.start()
   ssc.awaitTermination()
    
  }
}

模式二:与基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。这种模式可以有效提高系统的吞吐量

代码实现:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder

object KafkaDirector {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //构建conf ssc 对象
    val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(3))
    //设置数据检查点进行累计统计单词
    ssc.checkpoint("hdfs://192.168.128.111:9000/checkpoint")
    //kafka 需要Zookeeper  需要消费者组
    val topics = Set("mydemo2")
    //                                     broker的原信息                                  ip地址以及端口号
    val kafkaPrams = Map[String,String]("metadata.broker.list" -> "192.168.128.111:9092")
    //                                          数据的输入了类型    数据的解码类型
    val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaPrams, topics)
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的但会值
      Some(total+previous)
    }
    //统计结果
    val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print() 
    //启动程序
    ssc.start()
    ssc.awaitTermination()
    
  }
}

结果展示:

-------------------------------------------
Time: 1536325032000 ms
-------------------------------------------
(love,1)
(Beijing,1)
(I,1)

-------------------------------------------
Time: 1536325035000 ms
-------------------------------------------
(love,2)
(Beijing,1)
(I,2)
(Shanghai,1)

-------------------------------------------
Time: 1536325038000 ms
-------------------------------------------
(love,2)
(Beijing,1)
(I,2)
(Shanghai,1)

-------------------------------------------
Time: 1536325041000 ms
-------------------------------------------
(love,2)
(Beijing,1)
(I,2)
(Shanghai,1)

 

2019-07-05 21:19:57 sdyuy 阅读数 34
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2469 人正在学习 去看看 肖滨

有关Kafka群集体系结构,请看下面的结构图。 它显示了Kafka的集群图。
Kafka群集体系结构

下表描述了上图中显示的每个组件。

  • Broker – Kafka集群通常由多个代理组成,以保持负载平衡。 Kafka经纪人是无状态的,所以他们使用ZooKeeper维护他们的集群状态。 一个Kafka代理实例可以处理每秒数十万次的读写操作,每个Broker都可以处理TB消息,而不会影响性能。 Kafka经纪人的领导人选举可以由ZooKeeper完成。
  • ZooKeeper – ZooKeeper用于管理和协调Kafka经纪人。 ZooKeeper服务主要用于通知生产者和消费者关于Kafka系统中任何新经纪人的存在或Kafka系统中经纪人的失败。 根据Zookeeper收到的有关经纪人存在或失败的通知,生产者和消费者就会做出决定,并开始与其他经纪人协调他们的任务。
  • Producers – 生产者将数据推送给经纪人。 新代理启动后,所有生产者都会搜索它并自动向该新代理发送消息。 Kafka生产者不会等待经纪人的确认,并且可以像经纪人能够处理的那样快地发送消息。
  • Consumers – 由于Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移量来维护消耗了多少消息。 如果消费者确认特定的消息偏移量,则意味着消费者已经消费了所有先前的消息。 消费者向代理发出一个异步拉取请求,以准备消耗字节缓冲区。 消费者可以简单地通过提供偏移值来倒回或跳到分区中的任何点。 消费者补偿值由ZooKeeper通知。
  • 推荐学习目录:Kafka群集体系结构
2018-12-07 09:34:13 weixin_41907511 阅读数 3070
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2469 人正在学习 去看看 肖滨

spark streaming流式处理kafka中的数据,首先是把数据接收过来,然后转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据;2.直接从kafka读取数据。

基于Receiver的方式(旧方法)

流程:
此方法使用Receiver接收数据。Receiver是使用Kafka高阶API接口实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。
在这里插入图片描述
问题:
在默认配置下,此方法可能会在失败时丢失数据。为确保零数据丢失,必须在Spark Streaming中另外启用预写日志(Write Ahead Logs)。这将同步保存所有收到的Kafka数据到分布式文件系统(例如HDFS)上,以便在发生故障时可以恢复所有数据。

注意点:
在Receiver的方式中,Kafka中的topic partition与Spark Streaming中生成的RDD partition无关。所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

直接读取方式( Direct Stream方法)

流程:
这种方法不使用接收器(Receiver)来接收数据,而是定期向Kafka查询每个主题的每个分区中的最新偏移量(offsets),并相应地定义要在每个批次(batch)中处理的偏移量范围。当Spark Streaming启动处理数据的作业时,利用Kafka的低阶API读取Kafka定义的偏移范围的数据。
在这里插入图片描述
优点:
这种方法相较于Receiver方式的优势在于:
简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值(偏移量),这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

缺点:
Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本

/**
  * Kafka 0.10的Spark Streaming集成(spark获取kafka数据的最新方式)
  */
object KafkaDirectStream {

  def main(args: Array[String]): Unit = {
    //创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")

    val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //创建一个StreamingContext,其里面包含了一个SparkContext
    val streamingContext = new StreamingContext(sc, Seconds(5))

    //配置kafka的参数
    /**
      * Kafka服务监听端口
      * 指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
      * 指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
      * 消费者ID,随意指定
      * 指定从latest(最新)还是smallest(最早)处开始读取数据
      * 如果true,consumer定期地往zookeeper写入每个分区的offset
      */
    val kafkaParams = Map[String, Object](

      "bootstrap.servers" -> "192.168.2.210:9092",    //kafka机器IP:端口
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g1",
      "auto.offset.reset" -> "latest",
      "partition.assignment.strategy" -> "org.apache.kafka.clients.consumer.RangeAssignor",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //要监听的Topic,可以同时监听多个
    val topics = Array("test")

    //在Kafka中记录读取偏移量
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      //位置策略(可用的Executor上均匀分配分区)
      LocationStrategies.PreferConsistent,
      //消费策略(订阅固定的主题集合)
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )


    //迭代DStream中的RDD(KafkaRDD),将每一个时间点对应的RDD取出来
    stream.foreachRDD { rdd =>
      //获取该RDD对应的偏移量
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //取出对应的数据
      rdd.foreach{ line =>
        println(line.key() + " " + line.value())
      }

      //异步更新偏移量到kafka中
      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
2019-12-05 11:19:07 lbh199466 阅读数 69
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2469 人正在学习 去看看 肖滨

kafka通过向zookeeper注册节点来进行集群的管理,那么,kafka在zookeeper中的存放了哪些数据呢,存储结构又是什么样子的呢

1、首先,进入kafka的config/server.properties
有一个配置项:
zookeeper.connect=localhost:2185/kafka
这个配置项的结构为 zookeeper地址:端口号/路径
即kafka在zk上的注册信息都以/kafka为根目录

2、利用zk可视化工具zookeeper-visualizer

在这里插入图片描述
3、下面介绍每个目录的作用

cluster

kafka集群唯一标识

controller_epoch

初始为0,每次重新选举后+1,用于防止broker脑裂,无视epoch更低的请求

controller

集群中第一个启动的broker通过在zk创建一个临时节点/controller让自己成为控制器。如果控制器被关闭或者与zk断开连接,这个节点会消失

brokers

在这里插入图片描述

kafka-manager

kafka-manager/configs
kafka-manager/mutex
kafka-manager/deleteClusters
kafka-manager/clusters

admin

admin/delete_topics 删除的主题

isr_change_notification

consumers

消费者信息,新的kafka将消费者的消费信息(offset)存储在kafka的__consumer_offsets主题下

config

config/changes
config/clients
config/topics 所有的主题

log_dir_event_notification

latest_producer_id_block

用于幂等producer
集群中所有broker启动时都会启动一个叫TransactionCoordinator的组件,该组件能够执行预分配PID块和分配PID的工作,而所有broker都使用/latest_producer_id_block节点来保存PID块
可以参考:关于Kafka幂等producer的讨论

2019-02-13 15:21:44 Beat_Boxer 阅读数 594
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2469 人正在学习 去看看 肖滨

Kafka 是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated )、基于ZooKeeper 协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比, Kafka能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。

 

Kafka定位是一个分布式流处理平台,他具有以下三个特性:

①:能够允许发布和订阅数据。更像是一个消息队列或者消息系统

②:存储流数据时提供相应的容错机制

③:当流数据到达时能够及时处理

 

 

kafka中的基本概念

topic(主题):一个主题其实就是消息的一个分类,也就是将一组特定的消息归纳为一个主题,生产者需要将消息发送到特定的主题,消费者通过订阅主题主题的某些分区进行消费。

message(消息):消息是kafka通信的基本单位,消息由一个固定的消息头一个可变长的消息体构成。在老版本中,没一条消息称为message,在Java实现的客户端中,没一条消息被称为record。

分区(partition):Kafka将一组消息归纳为一个主题,而每个主题被分为一个或多个分区(partition),每个分区由一系列有序、不可变的消息组成,是一个有序队列。每个分区在物理上对应为一个文件夹,分区的命名:“主题名—分区编号”,分区编号从0开始。每个主题对应的分区数可以在kafka启动时所加载的配置文件中进行配置,也可以在在创建主题时指定其分区数,当然客户端还可以在主题创建后修改主题的分区数。作用:分区数越多,一般吞吐量会越高,分区也使得Kafka进行顺序消费以及负载均衡。kafka只能保证一个分区之内消息的有序性,因为每条消息被追加到相应的分区,并且是顺序写磁盘,因此效率比较高;

副本(Replica):每个分区由一个或多个副本,分布在集群的不同代理上,以便提高可用性,从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志对象(Log),分区的副本与日志对象是一一对应的;

 

Leader副本和Follower副本:由于kafka副本的存在,就需要保证一个分区的多个副本之间数据的一致性,Kafka会选择该分区的一个副本作为leader副本,而分区的其他副本作为Follower副本,只有leader副本处理客户端读、写请求,Follower副本从leader副本同步消息,假如没有Leader副本就需要(n*n)条通路来同步数据,有了leader后,只需要(n-1)条通路。

 

偏移量:任何发布到分区中的消息都会被直接追加到日志文件中(分区目录下以“.log”)为文件名后缀的数据文件的尾部,每条消息在日志文件中的位置都会对应一个按序递增的偏移量,偏移量是一个分区下严格有序的逻辑值。

 

日志段:一个日志对应多个日志段(LogSegment),一个日志段对应于磁盘上一个具体的日志文件(以“.log”结尾的数据文件)和两个索引文件(“.index”消息偏移量索引文件和“.timeindex”消息时间戳索引文件)。

 

代理(broker):kafka集群一般是由一个或多个Kafka实例构成,kafka实例即为一个代理,称为broker,每一个kafka实例都有唯一标识的id,这个id是一个非负整数,这个Id被称为brokerId

 

生产者(Producer):生产者负责将消息发送给kafka,也就是向Kafka代理(broker)发送消息的客户端

 

消费者(Consumer)和消费组(ConsumerGroup):消费者以拉取pull方式拉取数据,他是消费的客户端。在kafka中,每一个消费者属于一个特定的消费组,用groupId代表消费组名称,通过group.id进行配置,如果不指定消费组,则默认为消费组test-consumer-group.。同时每一个消费者也有一个全局唯一的Id,通过client.id进行配置,如果为配置,则会默认生成(${groupId}-${hostName}-${timestamp}-${UUID});

 

同步副本列表(ISR):kafka在ZooKeeper中动态维护了一个ISR(In-sync-Replica),此中保存了同步的副本列表,即为所有与Leader副本保持消息同步的所有Follower副本对应的代理节点id,如果某一个follower副本宕机,则该Follower副本节点id将从ISR列表中移除

 

kafka集群结构:

 

kafka对消息的处理:kafka不会删除立即被消费掉的消息,由于磁盘大小的有限,也不会一直存储消息;所以就有了Kafka对消息的处理,其可以通过配置文件进行配置,一是基于消息已存储的时间长度,二是基于分区的大小进行处理。

 

没有更多推荐了,返回首页