kafka通信 spark_spark连接kafka配置spark-env.sh - CSDN
  • 作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据... 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版

    作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中。 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor、Spark布道者陈超我们了解到 ,在Spark 1.2版本中,Spark Streaming开始支持fully HA模式(选择使用),通过添加一层WAL(Write Ahead Log),每次收到数据后都会存在HDFS上,从而避免了以前版本中的数据丢失情况,但是不可避免的造成了一定的开销,需要开发者自行衡量。

    以下为译文

    作为一个实时大数据处理工具, Spark Sreaming 近日一直被广泛关注,与 Apache Storm 的对比也经常出现。但是依我说,缺少与Kafka整合,任何实时大数据处理工具都是不完整的,因此我将一个示例Spark Streaming应用程序添加到 kafka-storm-starter ,并且示范如何从Kafka读取,以及如何写入到Kafka。在这个过程中,我还使用Avro作为数据格式,以及Twitter Bijection进行数据序列化。

    在本篇文章,我将详细地讲解这个Spark Streaming示例;同时,我还会穿插当下Spark Streaming与Kafka整合的一些焦点话题。免责声明:这是我首次试验Spark Streaming,仅作为参考。

    当下,这个Spark Streaming示例被上传到GitHub,下载访问: kafka-storm-starter。项目的名称或许会让你产生某些误解,不过,不要在意这些细节:)

    什么是Spark Streaming

    Spark Streaming 是Apache Spark的一个子项目。Spark是个类似于Apache Hadoop的开源批处理平台,而Spark Streaming则是个实时处理工具,运行在Spark引擎之上。

    Spark Streaming vs. Apache Storm

    Spark Streaming与Apache Storm有一些相似之处,后者是当下最流行的大数据处理平台。前不久,雅虎的Bobby Evans 和Tom Graves曾发表过一个“ Spark and Storm at Yahoo! ”的演讲,在这个演讲中,他们对比了两个大平台,并提供了一些选择参考。类似的,Hortonworks的P. Taylor Goetz也分享过名为 Apache Storm and Spark Streaming Compared 的讲义。

    这里,我也提供了一个非常简短的对比:对比Spark Streaming,Storm的产业采用更高,生产环境应用也更稳定。但是从另一方面来说,对比Storm,Spark拥有更清晰、等级更高的API,因此Spark使用起来也更加愉快,最起码是在使用Scala编写Spark应用程序的情况(毫无疑问,我更喜欢Spark中的API)。但是,请别这么直接的相信我的话,多看看上面的演讲和讲义。

    不管是Spark还是Storm,它们都是Apache的顶级项目,当下许多大数据平台提供商也已经开始整合这两个框架(或者其中一个)到其商业产品中,比如Hortonworks就同时整合了Spark和Storm,而Cloudera也整合了Spark。

    附录:Spark中的Machines、cores、executors、tasks和receivers 

    本文的后续部分将讲述许多Spark和Kafka中的parallelism问题,因此,你需要掌握一些Spark中的术语以弄懂这些环节。

    • 一个Spark集群必然包含了1个以上的工者作节点,又称为从主机(为了简化架构,这里我们先抛弃开集群管理者不谈)。
    • 一个工作者节点可以运行一个以上的executor
    • Executor是一个用于应用程序或者工作者节点的进程,它们负责处理tasks,并将数据保存到内存或者磁盘中。每个应用程序都有属于自己的executors,一个executor则包含了一定数量的cores(也被称为slots)来运行分配给它的任务。
    • Task是一个工作单元,它将被传送给executor。也就是说,task将是你应用程序的计算内容(或者是一部分)。SparkContext将把这些tasks发送到executors进行执行。每个task都会占用父executor中的一个core(slot)。
    • Receiver( API , 文档 )将作为一个长期运行的task跑在一个executor上。每个receiver都会负责一个所谓的input DStream(比如从Kafka中读取的一个输入流),同时每个receiver( input DStream)占用一个core/slot。
    • input DStream:input DStream是DStream的一个类型,它负责将Spark Streaming连接到外部的数据源,用于读取数据。对于每个外部数据源(比如Kafka)你都需要配置一个input DStream。一个Spark Streaming会通过一个input DStream与一个外部数据源进行连接,任何后续的DStream都会建立标准的DStreams。

    在Spark的执行模型,每个应用程序都会获得自己的executors,它们会支撑应用程序的整个流程,并以多线程的方式运行1个以上的tasks,这种隔离途径非常类似Storm的执行模型。一旦引入类似YARN或者Mesos这样的集群管理器,整个架构将会变得异常复杂,因此这里将不会引入。你可以通过Spark文档中的 Cluster Overview 了解更多细节。

    整合Kafka到Spark Streaming

    概述

    简而言之,Spark是支持Kafka的,但是这里存在许多不完善的地方。

    Spark代码库中的 KafkaWordCount 对于我们来说是个非常好的起点,但是这里仍然存在一些开放式问题。

    特别是我想了解如何去做:

    • 从kafaka中并行读入。在Kafka,一个话题(topic)可以有N个分区。理想的情况下,我们希望在多个分区上并行读取。这也是 Kafka spout in Storm 的工作。
    • 从一个Spark Streaming应用程序向Kafka写入,同样,我们需要并行执行。

    在完成这些操作时,我同样碰到了Spark Streaming和/或Kafka中一些已知的问题,这些问题大部分都已经在Spark mailing list中列出。在下面,我将详细总结Kafka集成到Spark的现状以及一些常见问题。

    Kafka中的话题、分区(partitions)和parallelism

    详情可以查看我之前的博文: Apache Kafka 0.8 Training Deck and Tutorial 和Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node 。

    Kafka将数据存储在话题中,每个话题都包含了一些可配置数量的分区。话题的分区数量对于性能来说非常重要,而这个值一般是消费者parallelism的最大数量:如果一个话题拥有N个分区,那么你的应用程序最大程度上只能进行N个线程的并行,最起码在使用Kafka内置Scala/Java消费者API时是这样的。

    与其说应用程序,不如说Kafka术语中的消费者群(consumer group)。消费者群,通过你选择的字符串识别,它是逻辑消费者应用程序集群范围的识别符。同一个消费者群中的所有消费者将分担从一个指定Kafka话题中的读取任务,同时,同一个消费组中所有消费者从话题中读取的线程数最大值即是N(等同于分区的数量),多余的线程将会闲置。

    多个不同的Kafka消费者群可以并行的运行:毫无疑问,对同一个Kafka话题,你可以运行多个独立的逻辑消费者应用程序。这里,每个逻辑应用程序都会运行自己的消费者线程,使用一个唯一的消费者群id。而每个应用程序通常可以使用不同的read parallelisms(见下文)。当在下文我描述不同的方式配置read parallelisms时,我指的是如何完成这些逻辑消费者应用程序中的一个设置。

    这里有一些简单的例子

    • 你的应用程序使用“terran”消费者群id对一个名为“zerg.hydra”的kafka话题进行读取,这个话题拥有10个分区。如果你的消费者应用程序只配置一个线程对这个话题进行读取,那么这个线程将从10个分区中进行读取。
    • 同上,但是这次你会配置5个线程,那么每个线程都会从2个分区中进行读取。
    • 同上,这次你会配置10个线程,那么每个线程都会负责1个分区的读取。
    • 同上,但是这次你会配置多达14个线程。那么这14个线程中的10个将平分10个分区的读取工作,剩下的4个将会被闲置。

    这里我们不妨看一下现实应用中的复杂性——Kafka中的再平衡事件。在Kafka中,再平衡是个生命周期事件(lifecycle event),在消费者加入或者离开消费者群时都会触发再平衡事件。这里我们不会进行详述,更多再平衡详情可参见我的 Kafka training deck 一文。

    你的应用程序使用消费者群id“terran”,并且从1个线程开始,这个线程将从10个分区中进行读取。在运行时,你逐渐将线程从1个提升到14个。也就是说,在同一个消费者群中,parallelism突然发生了变化。毫无疑问,这将造成Kafka中的再平衡。一旦在平衡结束,你的14个线程中将有10个线程平分10个分区的读取工作,剩余的4个将会被闲置。因此如你想象的一样,初始线程以后只会读取一个分区中的内容,将不会再读取其他分区中的数据。

    现在,我们终于对话题、分区有了一定的理解,而分区的数量将作为从Kafka读取时parallelism的上限。但是对于一个应用程序来说,这种机制会产生一个什么样的影响,比如一个Spark Streaming job或者 Storm topology从Kafka中读取数据作为输入。

    1. Read parallelism: 通常情况下,你期望使用N个线程并行读取Kafka话题中的N个分区。同时,鉴于数据的体积,你期望这些线程跨不同的NIC,也就是跨不同的主机。在Storm中,这可以通过TopologyBuilder#setSpout()设置Kafka spout的parallelism为N来实现。在Spark中,你则需要做更多的事情,在下文我将详述如何实现这一点。

    2. Downstream processing parallelism: 一旦使用Kafka,你希望对数据进行并行处理。鉴于你的用例,这种等级的parallelism必然与read parallelism有所区别。如果你的用例是计算密集型的,举个例子,对比读取线程,你期望拥有更多的处理线程;这可以通过从多个读取线程shuffling或者网路“fanning out”数据到处理线程实现。因此,你通过增长网络通信、序列化开销等将访问交付给更多的cores。在Storm中,你通过shuffle grouping 将Kafka spout shuffling到下游的bolt中。在Spark中,你需要通过DStreams上的 repartition 转换来实现。

    通常情况下,大家都渴望去耦从Kafka的parallelisms读取,并立即处理读取来的数据。在下一节,我将详述使用 Spark Streaming从Kafka中的读取和写入。

    从Kafka中读取

    Spark Streaming中的Read parallelism

    类似Kafka,Read parallelism中也有分区的概念。了解Kafka的per-topic话题与RDDs in Spark 中的分区没有关联非常重要。

    Spark Streaming中的 KafkaInputDStream (又称为Kafka连接器)使用了Kafka的高等级消费者API ,这意味着在Spark中为Kafka设置 read parallelism将拥有两个控制按钮。

    1. Input DStreams的数量。 因为Spark在每个Input DStreams都会运行一个receiver(=task),这就意味着使用多个input DStreams将跨多个节点并行进行读取操作,因此,这里寄希望于多主机和NICs。

    2. Input DStreams上的消费者线程数量。 这里,相同的receiver(=task)将运行多个读取线程。这也就是说,读取操作在每个core/machine/NIC上将并行的进行。

    在实际情况中,第一个选择显然更是大家期望的。

    为什么会这样?首先以及最重要的,从Kafka中读取通常情况下会受到网络/NIC限制,也就是说,在同一个主机上你运行多个线程不会增加读的吞吐量。另一方面来讲,虽然不经常,但是有时候从Kafka中读取也会遭遇CPU瓶颈。其次,如果你选择第二个选项,多个读取线程在将数据推送到blocks时会出现锁竞争(在block生产者实例上,BlockGenerator的“+=”方法真正使用的是“synchronized”方式)。

    input DStreams建立的RDDs分区数量:KafkaInputDStream将储存从Kafka中读取的每个信息到Blocks。从我的理解上,一个新的Block由 spark.streaming.blockInterval在毫秒级别建立,而每个block都会转换成RDD的一个分区,最终由DStream建立。如果我的这种假设成立,那么由KafkaInputDStream建立的RDDs分区数量由batchInterval / spark.streaming.blockInterval决定,而batchInterval则是数据流拆分成batches的时间间隔,它可以通过StreamingContext的一个构造函数参数设置。举个例子,如果你的批时间价格是2秒(默认情况下),而block的时间间隔是200毫秒(默认情况),那么你的RDD将包含10个分区。如果有错误的话,可以提醒我。

    选项1:控制input DStreams的数量

    下面这个例子可以从 Spark Streaming Programming Guide 中获得:

    val ssc: StreamingContext = ??? // ignore for now
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)
    
    val numInputDStreams = 5
    val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }

    在这个例子中,我们建立了5个input DStreams,因此从Kafka中读取的工作将分担到5个核心上,寄希望于5个主机/NICs(之所以说是寄希望于,因为我也不确定Spark Streaming task布局策略是否会将receivers投放到多个主机上)。所有Input Streams都是“terran”消费者群的一部分,而Kafka将保证topic的所有数据可以同时对这5个input DSreams可用。换句话说,这种“collaborating”input DStreams设置可以工作是基于消费者群的行为是由Kafka API提供,通过KafkaInputDStream完成。

    在这个例子中,我没有提到每个input DSream会建立多少个线程。在这里,线程的数量可以通过KafkaUtils.createStream方法的参数设置(同时,input topic的数量也可以通过这个方法的参数指定)。在下一节中,我们将通过实际操作展示。

    但是在开始之前,在这个步骤我先解释几个Spark Streaming中常见的几个问题,其中有些因为当下Spark中存在的一些限制引起,另一方面则是由于当下Kafka input DSreams的一些设置造成:

    当你使用我上文介绍的多输入流途径,而这些消费者都是属于同一个消费者群,它们会给消费者指定负责的分区。这样一来则可能导致syncpartitionrebalance的失败,系统中真正工作的消费者可能只会有几个。为了解决这个问题,你可以把再均衡尝试设置的非常高,从而获得它的帮助。然后,你将会碰到另一个坑——如果你的receiver宕机(OOM,亦或是硬件故障),你将停止从Kafka接收消息。

    Spark用户讨论 markmail.org/message/…

    这里,我们需要对“停止从Kafka中接收”问题 做一些解释 。当下,当你通过ssc.start()开启你的streams应用程序后,处理会开始并一直进行,即使是输入数据源(比如Kafka)变得不可用。也就是说,流不能检测出是否与上游数据源失去链接,因此也不会对丢失做出任何反应,举个例子来说也就是重连或者结束执行。类似的,如果你丢失这个数据源的一个receiver,那么 你的流应用程序可能就会生成一些空的RDDs 。

    这是一个非常糟糕的情况。最简单也是最粗糙的方法就是,在与上游数据源断开连接或者一个receiver失败时,重启你的流应用程序。但是,这种解决方案可能并不会产生实际效果,即使你的应用程序需要将Kafka配置选项auto.offset.reset设置到最小——因为Spark Streaming中一些已知的bug,可能导致你的流应用程序发生一些你意想不到的问题,在下文Spark Streaming中常见问题一节我们将详细的进行介绍。

    选择2:控制每个input DStream上小发着线程的数量

    在这个例子中,我们将建立一个单一的input DStream,它将运行3个消费者线程——在同一个receiver/task,因此是在同一个core/machine/NIC上对Kafka topic “zerg.hydra”进行读取。

    val ssc: StreamingContext = ??? // ignore for now
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
    
    val consumerThreadsPerInputDstream = 3
    val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)
    val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

    KafkaUtils.createStream方法被重载,因此这里有一些不同方法的特征。在这里,我们会选择Scala派生以获得最佳的控制。

    结合选项1和选项2

    下面是一个更完整的示例,结合了上述两种技术:

    val ssc: StreamingContext = ???
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
    
    val numDStreams = 5
    val topics = Map("zerg.hydra" -> 1)
    val kafkaDStreams = (1 to numDStreams).map { _ =>
        KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
      }

    我们建立了5个input DStreams,它们每个都会运行一个消费者线程。如果“zerg.hydra”topic拥有5个分区(或者更少),那么这将是进行并行读取的最佳途径,如果你在意系统最大吞吐量的话。

    Spark Streaming中的并行Downstream处理

    在之前的章节中,我们覆盖了从Kafka的并行化读取,那么我们就可以在Spark中进行并行化处理。那么这里,你必须弄清楚Spark本身是如何进行并行化处理的。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关, 通过在每个RDD分区上运行task进行 。在有些文档中,分区仍然被称为“slices”。

    在任何Spark应用程序中,一旦某个Spark Streaming应用程序接收到输入数据,其他处理都与非streaming应用程序相同。也就是说,与普通的Spark数据流应用程序一样,在Spark Streaming应用程序中,你将使用相同的工具和模式。更多详情可见Level of Parallelism in Data Processing 文档。

    因此,我们同样将获得两个控制手段:

    1. input DStreams的数量 ,也就是说,我们在之前章节中read parallelism的数量作为结果。这是我们的立足点,这样一来,我们在下一个步骤中既可以保持原样,也可以进行修改。

    2. DStream转化的重分配 。这里将获得一个全新的DStream,其parallelism等级可能增加、减少,或者保持原样。在DStream中每个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是通过RDD.repartition实现。接下来将对RDD中的所有数据做随机的reshuffles,然后建立或多或少的分区,并进行平衡。同时,数据会在所有网络中进行shuffles。换句话说,DStream.repartition非常类似Storm中的shuffle grouping。

    因此,repartition是从processing parallelism解耦read parallelism的主要途径。在这里,我们可以设置processing tasks的数量,也就是说设置处理过程中所有core的数量。间接上,我们同样设置了投入machines/NICs的数量。

    一个DStream转换相关是 union 。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。通常情况下,你更愿意用StreamingContext的派生。一个union将返回一个由Union RDD支撑的UnionDStream。Union RDD由RDDs统一后的所有分区组成,也就是说,如果10个分区都联合了3个RDDs,那么你的联合RDD实例将包含30个分区。换句话说,union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。你是否使用union依赖于你的用例是否需要从所有Kafka分区进行“in one place”信息获取决定,因此这里大部分都是基于语义需求决定。举个例子,当你需要执行一个不用元素上的(全局)计数。

    注意: RDDs是无序的。因此,当你union RDDs时,那么结果RDD同样不会拥有一个很好的序列。如果你需要在RDD中进行sort。

    你的用例将决定需要使用的方法,以及你需要使用哪个。如果你的用例是CPU密集型的,你希望对zerg.hydra topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。

    val ssc: StreamingContext = ???
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
    val readParallelism = 5
    val topics = Map("zerg.hydra" -> 1)
    val kafkaDStreams = (1 to readParallelism).map { _ =>
      KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
      }
    //> collection of five *input* DStreams = handled by five receivers/tasks
    val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
    //> single DStream
    val processingParallelism = 20
    val processingDStream = unionDStream(processingParallelism)
    //> single DStream but now with 20 partitions
    

    在下一节中,我将把所有部分结合到一起,并且联合实际数据处理进行讲解。

    写入到Kafka

    写入到Kafka需要从foreachRDD输出操作进行:

    通用的输出操作者都包含了一个功能(函数),让每个RDD都由Stream生成。这个函数需要将每个RDD中的数据推送到一个外部系统,比如将RDD保存到文件,或者通过网络将它写入到一个数据库。需要注意的是,这里的功能函数将在驱动中执行,同时其中通常会伴随RDD行为,它将会促使流RDDs的计算。

    注意: 重提“功能函数是在驱动中执行”,也就是Kafka生产者将从驱动中进行,也就是说“功能函数是在驱动中进行评估”。当你使用foreachRDD从驱动中读取Design Patterns时,实际过程将变得更加清晰。

    在这里,建议大家去阅读Spark文档中的 Design Patterns for using foreachRDD一节,它将详细讲解使用foreachRDD读外部系统中的一些常用推荐模式,以及经常出现的一些陷阱。

    在我们这个例子里,我们将按照推荐来重用Kafka生产者实例,通过生产者池跨多个RDDs/batches。 我通过 Apache Commons Pool 实现了这样一个工具,已经上传到GitHub 。这个生产者池本身通过 broadcast variable 提供给tasks。

    最终结果看起来如下:

    val producerPool = {
      // See the full code on GitHub for details on how the pool is created
      val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
      ssc.sparkContext.broadcast(pool)
    }
    
    stream.map { ... }.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        // Get a producer from the shared pool
        val p = producerPool.value.borrowObject()
        partitionOfRecords.foreach { case tweet: Tweet =>
          // Convert pojo back into Avro binary format
          val bytes = converter.value.apply(tweet)
          // Send the bytes to Kafka
          p.send(bytes)
        }
        // Returning the producer to the pool also shuts it down
        producerPool.value.returnObject(p)
      })
    })

    需要注意的是, Spark Streaming每分钟都会建立多个RDDs,每个都会包含多个分区,因此你无需为Kafka生产者实例建立新的Kafka生产者,更不用说每个Kafka消息。上面的步骤将最小化Kafka生产者实例的建立数量,同时也会最小化TCP连接的数量(通常由Kafka集群确定)。你可以使用这个池设置来精确地控制对流应用程序可用的Kafka生产者实例数量。如果存在疑惑,尽量用更少的。

    完整示例

    下面的代码是示例Spark Streaming应用程序的要旨(所有代码参见 这里 )。这里,我做一些解释:

    • 并行地从Kafka topic中读取Avro-encoded数据。我们使用了一个最佳的read parallelism,每个Kafka分区都配置了一个单线程 input DStream。
    • 并行化Avro-encoded数据到pojos中,然后将他们并行写到binary,序列化可以通过Twitter Bijection 执行。
    • 通过Kafka生产者池将结果写回一个不同的Kafka topic。
    // Set up the input DStream to read from Kafka (in parallel)
    val kafkaStream = {
      val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
      val kafkaParams = Map(
        "zookeeper.connect" -> "zookeeper1:2181",
        "group.id" -> "spark-streaming-test",
        "zookeeper.connection.timeout.ms" -> "1000")
      val inputTopic = "input-topic"
      val numPartitionsOfInputTopic = 5
      val streams = (1 to numPartitionsOfInputTopic) map { _ =>
        KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
      }
      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.
      unifiedStream.repartition(sparkProcessingParallelism)
    }
    
    // We use accumulators to track global "counters" across the tasks of our streaming app
    val numInputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages consumed")
    val numOutputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages produced")
    // We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.
    val producerPool = {
      val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
      ssc.sparkContext.broadcast(pool)
    }
    // We also use a broadcast variable for our Avro Injection (Twitter Bijection)
    val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])
    
    // Define the actual data flow of the streaming job
    kafkaStream.map { case bytes =>
      numInputMessages += 1
      // Convert Avro binary data to pojo
      converter.value.invert(bytes) match {
        case Success(tweet) => tweet
        case Failure(e) => // ignore if the conversion failed
      }
    }.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        val p = producerPool.value.borrowObject()
        partitionOfRecords.foreach { case tweet: Tweet =>
          // Convert pojo back into Avro binary format
          val bytes = converter.value.apply(tweet)
          // Send the bytes to Kafka
          p.send(bytes)
          numOutputMessages += 1
        }
        producerPool.value.returnObject(p)
      })
    })
    
    // Run the streaming job
    ssc.start()
    ssc.awaitTermination()

    更多的细节和解释可以在这里看所有源代码。

    就我自己而言,我非常喜欢 Spark Streaming代码的简洁和表述。在Bobby Evans和 Tom Graves讲话中没有提到的是,Storm中这个功能的等价代码是非常繁琐和低等级的: kafka-storm-starter 中的 KafkaStormSpec 会运行一个Stormtopology来执行相同的计算。同时,规范文件本身只有非常少的代码,当然是除下说明语言,它们能更好的帮助理解;同时,需要注意的是,在Storm的Java API中,你不能使用上文Spark Streaming 示例中所使用的匿名函数,比如map和foreach步骤。取而代之的是,你必须编写完整的类来获得相同的功能,你可以查看 AvroDecoderBolt 。这感觉是将Spark的API转换到Java,在这里使用匿名函数是非常痛苦的。

    最后,我同样也非常喜欢 Spark的说明文档 ,它非常适合初学者查看,甚至还包含了一些 进阶使用 。关于Kafka整合到Spark,上文已经基本介绍完成,但是我们仍然需要浏览mailing list和深挖源代码。这里,我不得不说,维护帮助文档的同学做的实在是太棒了。

    知晓Spark Streaming中的一些已知问题

    你可能已经发现在Spark中仍然有一些尚未解决的问题,下面我描述一些我的发现:

    一方面,在对Kafka进行读写上仍然存在一些含糊不清的问题,你可以在类似Multiple Kafka Receivers and Union 和 How to scale more consumer to Kafka stream mailing list的讨论中发现。

    另一方面,Spark Streaming中一些问题是因为Spark本身的固有问题导致,特别是故障发生时的数据丢失问题。换句话说,这些问题让你不想在生产环境中使用Spark。

    • 在Spark 1.1版本的驱动中,Spark并不会考虑那些已经接收却因为种种原因没有进行处理的元数据( 点击这里查看更多细节 )。因此,在某些情况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,当下已经释放。
    • 1.1版本中的Kafka连接器是基于Kafka的高等级消费者API。这样就会造成一个问题,Spark Streaming不可以依赖其自身的KafkaInputDStream将数据从Kafka中重新发送,从而无法解决下游数据丢失问题(比如Spark服务器发生故障)。
    • 有些人甚至认为这个版本中的Kafka连接器不应该投入生产环境使用,因为它是基于Kafka的高等级消费者API。取而代之,Spark应该使用简单的消费者API(就像Storm中的Kafka spout),它将允许你控制便宜和分区分配确定性。
    • 但是当下Spark社区已经在致力这些方面的改善,比如Dibyendu Bhattacharya的Kafka连接器。后者是Apache Storm Kafka spout的一个端口,它基于Kafka所谓的简单消费者API,它包含了故障发生情景下一个更好的重放机制。
    • 即使拥有如此多志愿者的努力,Spark团队更愿意非特殊情况下的Kafka故障恢复策略,他们的目标是“在所有转换中提供强保证,通用的策略”,这一点非常难以理解。从另一个角度来说,这是浪费Kafka本身的故障恢复策略。这里确实难以抉择。
    • 这种情况同样也出现在写入情况中,很可能会造成数据丢失。
    • Spark的Kafka消费者参数auto.offset.reset的使用同样与Kafka的策略不同。在Kafka中,将auto.offset.reset设置为最小是消费者将自动的将offset设置为最小offset,这通常会发生在两个情况:第一,在ZooKeeper中不存在已有offsets;第二,已存在offset,但是不在范围内。而在Spark中,它会始终删除所有的offsets,并从头开始。这样就代表着,当你使用auto.offset.reset = “smallest”重启你的应用程序时,你的应用程序将完全重新处理你的Kafka应用程序。更多详情可以在下面的两个讨论中发现: 1 和 2 。
    • Spark-1341:用于控制Spark Streaming中的数据传输速度。这个能力可以用于很多情况,当你已经受Kafka引起问题所烦恼时(比如auto.offset.reset所造成的),然后可能让你的应用程序重新处理一些旧数据。但是鉴于这里并没有内置的传输速率控制,这个功能可能会导致你的应用程序过载或者内存不足。

    在这些故障处理策略和Kafka聚焦的问题之外之外,扩展性和稳定性上的关注同样不可忽视。再一次,仔细观看 Bobby和Tom的视频 以获得更多细节。在Spark使用经验上,他们都永远比我更丰富。

    当然,我也有我的 评论 ,在 G1 garbage(在Java 1.7.0u4+中) 上可能也会存在问题。但是,我从来都没碰到这个问题。

    Spark使用技巧和敲门

    在我实现这个示例的代码时,我做了一些重要的笔记。虽然这不是一个全面的指南,但是在你开始Kafka整合时可能发挥一定的作用。它包含了 Spark Streaming programming guide 中的一些信息,也有一些是来自Spark用户的mailing list。

    通用

    • 当你建立你的Spark环境时,对Spark使用的cores数量配置需要特别投入精力。你必须为Spark配置receiver足够使用的cores(见下文),当然实际数据处理所需要的cores的数量也要进行配置。在Spark中,每个receiver都负责一个input DStream。同时,每个receiver(以及每个input DStream) occies一个core,这样做是服务于每个文件流中的读取(详见文档)。举个例子,你的作业需要从两个input streams中读取数据,但是只访问两个cores,这样一来,所有数据都只会被读取而不会被处理。
    • 注意,在一个流应用程序中,你可以建立多个input DStreams来并行接收多个数据流。在上文从Kafka并行读取一节中,我曾演示过这个示例作业。
    • 你可以使用 broadcast variables在不同主机上共享标准、只读参数,相关细节见下文的优化指导。在示例作业中,我使用了broadcast variables共享了两个参数:第一,Kafka生产者池(作业通过它将输出写入Kafka);第二,encoding/decoding Avro数据的注入(从Twitter Bijection中)。 Passing functions to Spark 。
    • 你可以使用累加器参数来跟踪流作业上的所有全局“计数器”,这里可以对照Hadoop作业计数器。在示例作业中,我使用累加器分别计数所有消费的Kafka消息,以及所有对Kafka的写入。如果你对累加器进行命名,它们同样可以在Spark UI上展示。
    • 不要忘记import Spark和Spark Streaming环境:
    // Required to gain access to RDD transformations via implicits.
    import org.apache.spark.SparkContext._
    
    // Required when working on `PairDStreams` to gain access to e.g. `DStream.reduceByKey`
    // (versus `DStream.transform(rddBatch => rddBatch.reduceByKey()`) via implicits.
    //
    // See also http://spark.apache.org/docs/1.1.0/programming-guide.html#working-with-key-value-pairs
    import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

    如果你是 Twitter Algebird的爱好者,你将会喜欢使用Count-Min Sketch和Spark中的一些特性,代表性的,你会使用reduce或者reduceByWindow这样的操作(比如,DStreams上的转换 )。Spark项目包含了 Count-Min Sketch 和 HyperLogLog 的示例介绍。

    如果你需要确定Algebird数据结构的内存介绍,比如Count-Min Sketch、HyperLogLog或者Bloom Filters,你可以使用SparkContext日志进行查看,更多细节参见 Determining Memory Consumption 。

    Kafka整合

    我前文所述的一些增补:

    • 你可能需要修改Spark Streaming中的一些Kafka消费者配置。举个例子,如果你需要从Kafka中读取大型消息,你必须添加fetch.message.max.bytes消费设置。你可以使用KafkaUtils.createStream(…)将这样定制的Kafka参数给Spark Streaming传送。

    测试

    • 首先,确定 已经 在一个finally bloc或者测试框架的teardown method中使用stop()关闭了StreamingContext 和/或 SparkContext,因为在同一个程序(或者JVM?)中Spark不支持并行运行两种环境。
    • 根据我的经验,在使用sbt时,你希望在测试中将你的建立配置到分支JVM中。最起码在kafka-storm-starter中,测试必须并行运行多个线程,比如ZooKeeper、Kafka和Spark的内存实例。开始时,你可以参考 build.sbt 。
    • 同样,如果你使用的是Mac OS X,你可能期望关闭JVM上的IPv6用以阻止DNS相关超时。这个问题与Spark无关,你可以查看 .sbtopts 来获得关闭IPv6的方法。

    性能调优

    • 确定你理解作业中的运行时影响,如果你需要与外部系统通信,比如Kafka。在使用foreachRDD时,你应该阅读中 Spark Streaming programming guide 中的Design Patterns一节。举个例子,我的用例中使用Kafka产生者池来优化 Spark Streaming到Kafka的写入。在这里,优化意味着在多个task中共享同一个生产者,这个操作可以显著地减少由Kafka集群建立的新TCP连接数。
    • 使用Kryo做序列化,取代默认的Java serialization,详情可以访问 Tuning Spark 。我的例子就使用了Kryo和注册器,举个例子,使用Kryo生成的Avro-generated Java类(见 KafkaSparkStreamingRegistrator )。除此之外,在Storm中类似的问题也可以使用Kryo来解决。
    • 通过将spark.streaming.unpersist设置为true将Spark Streaming 作业设置到明确持续的RDDs。这可以显著地减少Spark在RDD上的内存使用,同时也可以改善GC行为。(点击访问 来源 )
    • 通过MEMORY_ONLY_SER开始你的储存级别P&S测试(在这里,RDD被存储到序列化对象,每个分区一个字节)。取代反序列化,这样做更有空间效率,特别是使用Kryo这样的高速序列化工具时,但是会增加读取上的CPU密集操作。这个优化对 Spark Streaming作业也非常有效。对于本地测试来说,你可能并不想使用*_2派生(2=复制因子)。

    总结

    完整的Spark Streaming示例代码可以在 kafka-storm-starter 查看。这个应用包含了Kafka、Zookeeper、Spark,以及上文我讲述的示例。

    总体来说,我对我的初次Spark Streaming体验非常满意。当然,在Spark/Spark Streaming也存在一些需要特别指明的问题,但是我肯定Spark社区终将解决这些问题。在这个过程中,得到了Spark社区积极和热情的帮助,同时我也非常期待Spark 1.2版本的新特性。

    在大型生产环境中,基于Spark还需要一些TLC才能达到Storm能力,这种情况我可能将它投入生产环境中么?大部分情况下应该不会,更准确的说应该是现在不会。那么在当下,我又会使用Spark Streaming做什么样的处理?这里有两个想法,我认为肯定存在更多:

    • 它可以非常快的原型数据流。如果你因为数据流太大而遭遇扩展性问题,你可以运行 Spark Streaming,在一些样本数据或者一部分数据中。
    • 搭配使用Storm和Spark Streaming。举个例子,你可以使用Storm将原始、大规模输入数据处理到易管理等级,然后使用Spark Streaming来做下一步的分析,因为后者可以开箱即用大量有趣的算法、计算指令和用例。

    感谢Spark社区对大数据领域所作出的贡献!

     

    翻译/童阳

    文章出处:推酷-CSDN

    展开全文
  • Kafka 简介 kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信kafka...

    Kafka

    kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。
    在这里插入图片描述

    • producer : 消息生产者

    • consumer : 消息消费之

    • broker : kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker

    • topic : 消息队列/分类相当于队列,里面有生产者和消费者模型

    • zookeeper : 元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息

    • 1、一个topic分成多个partition

    • 2、每个partition内部消息强有序, 其中的每个消息都有一个序号交offset

    • 3、一个partition 只对应一个broker, 一个broker 可以管理多个partition

    • 4、 消息直接写入文件,并不保存在内存中

    • 5、按照时间策略, 默认一周删除, 而不是消息消费完就删除

    • 6、producer自己决定网那个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略

    在这里插入图片描述

    kafka 的消息消费模型

    • consumer 自己维护消费到哪个offset
    • 每个consumer都有对应的group
    • group 内是queue消费模型
      – 各个consumer消费不同的partition
      – 一个消息在group内只消费一次
    • 各个group各自独立消费,互不影响
      在这里插入图片描述

    kafka 特点

    • 生存者消费模型:FIFO; partition内部是FIFO的, partition之间不是FIFO
    • 高性能:单节点支持上千个客户端,百MB/s 吞吐
    • 持久性:直接持久在普通的磁盘上,性能比较好; 直接append 方式追加到磁盘,数据不会丢
    • 分布式:数据副本冗余,流量负载均衡、可扩展; 数据副本,也就是同一份数据可以到不同的broker上面去,也就是当一份数据, 磁盘坏掉,数据不亏丢失
    • 很灵活: 消息长时间持久化+Cilent维护消费状态; 1、持久花时间长,可以是一周、一天,2、可以自定义消息偏移量

    kafka 安装

    1. https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.1/kafka_2.11-2.0.1.tgz
      下载
    2. 解压压缩包,修改config 文件夹下 server.properties
       	 // 节点编号:(不同节点按0,1,2,3整数来配置)
        	broker.id = 0
        	// 数据存放目录
        	log.dirs = /log
        	// zookeeper 集群配置
        	zookeeper.connect=node1:2181,node2:2181,node3:2181
    
    1. 启动 bin/kafka-server-start.sh config/server.properties

      可以单独配置一个启动文件
      vim start-kafka.sh

      nohup bin/kafka-server-start.sh   config/server.properties > kafka.log 2>&1 &
      

    授权 chmod 755 start-kafka.sh

    kafka基础命令
    创建topic./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic t0315 --partitions 3 --replication-factor 3

    查看topic: ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list

    生产者:./kafka-console-producer.sh --topic t0315 --broker-list node1:9092,node2:9092,node3:9092

    消费者:./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic t0315

    获取描述: ./kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic t0315

    kafka中有一个被称为优先副本(preferred replicas)的概念。如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。当0节点的broker挂掉时,会启动1这个节点broker当做leader。当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制。
    在配置文件conf/ server.properties中配置开启(默认就是开启):auto.leader.rebalance.enable true

    Code 部分

    sparkStreaming 的direact 方式

    <properties>
       <spark.version>2.2.0</spark.version>
     </properties>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>${spark.version}</version>
       <!-- <exclusions>
          <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
          </exclusion>
          <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
          </exclusion>
        </exclusions>-->
      </dependency>
    
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
      </dependency>
    

    producer 部分:

    import kafka.serializer.StringEncoder;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    /**
     *@Author PL
     *@Date 2018/12/27 10:59
     *@Description TODO
     **/
    public class KafkaProducer {
        public static void main(String[] args) throws InterruptedException {
    
            Properties pro = new Properties();
            pro.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
            pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            pro.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            //Producer<String,String> producer = new Producer<String, String>(new ProducerConfig(pro));
            //org.apache.kafka.clients.producer.KafkaProducer producer1 = new Kafka
            org.apache.kafka.clients.producer.KafkaProducer<String,String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(pro);
            System.out.println("11");
            String topic = "t0315";
            String msg = "hello word";
            for (int i =0 ;i <100;i++) {
                producer.send(new ProducerRecord<String, String>(topic, "hello", msg));
                System.out.println(msg);
            }
            producer.close();
        }
    }
    

    customer

    import kafka.serializer.StringDecoder;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    
    import java.util.*;
    
    /**
     *@Author PL
     *@Date 2018/12/26 13:28
     *@Description TODO
     **/
    public class SparkStreamingForkafka {
        public static void main(String[] args) throws InterruptedException {
            SparkConf sc = new SparkConf().setMaster("local[2]").setAppName("test");
            JavaStreamingContext jsc = new JavaStreamingContext(sc, Durations.seconds(5));
            Map<String,String> kafkaParam = new HashMap<>();
            kafkaParam.put("metadata.broker.list","node1:9092,node2:9092,node3:9092");
            //kafkaParam.put("t0315",1);
            HashSet<String> topic = new HashSet<>();
            topic.add("t0315");
    
            //JavaPairInputDStream<String, String> line = KafkaUtils.createStream(jsc,"node1:9092,node2:9092,node3:9092","wordcountGrop",kafkaParam);
            JavaPairInputDStream<String, String> line = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParam, topic);
            JavaDStream<String> flatLine = line.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                @Override
                public Iterator<String> call(Tuple2<String, String> tuple2) throws Exception {
                    return Arrays.asList(tuple2._2.split(" ")).iterator();
                }
            });
    
            JavaPairDStream<String, Integer> pair = flatLine.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairDStream<String, Integer> count = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
    
            count.print();
    
            jsc.start();
            jsc.awaitTermination();
            jsc.close();;
        }
    }
    

    上述方式为一个SparkStreaming 的消费者, direct方式就是把kafka当成一个存储数据的库,spark 自己维护offset。假设,driver 端宕机了, 之后再重启,会从offset 那一部分开始取?
    所以我们需要将kafka 的offset 保存在文件中, 宕机之后在启动时去恢复文件中的offset 读取数据。

    import kafka.serializer.StringDecoder;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function0;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    
    import java.util.*;
    
    /**
     *@Author PL
     *@Date 2018/12/26 13:28
     *@Description TODO
     **/
    public class KafkaCheckPoint {
        public static void main(String[] args) throws InterruptedException {
            final String checkPoint = "./checkPoint";
    
            Function0<JavaStreamingContext> scFunction = new Function0<JavaStreamingContext>() {
                @Override
                public JavaStreamingContext call() throws Exception {
                    return createJavaStreamingContext();
                }
            };
            // 如果存在checkport 就恢复数据,不存在就直接运行
            JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkPoint, scFunction);
            jsc.start();
            jsc.awaitTermination();
            jsc.close();;
        }
    
    
        public static  JavaStreamingContext createJavaStreamingContext(){
            System.out.println("初始化");  // 第一次会执行,宕机之后重启执行数据恢复时不执行
            final SparkConf sc = new SparkConf().setMaster("local").setAppName("test");
            JavaStreamingContext jsc = new JavaStreamingContext(sc, Durations.seconds(5));
            /**
            * checkpoint 保存
            * 	1、 配置信息
            *	2、Dstream 执行逻辑
            *	3、Job 的执行进度
            *	4、offset
            */
            jsc.checkpoint("./checkPoint");
    
            Map<String,String> kafkaParam = new HashMap<>();
            kafkaParam.put("metadata.broker.list","node1:9092,node2:9092,node3:9092");
            HashSet<String> topic = new HashSet<>();
            topic.add("t0315");
    
            JavaPairInputDStream<String, String> line = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParam, topic);
            JavaDStream<String> flatLine = line.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                @Override
                public Iterator<String> call(Tuple2<String, String> tuple2) throws Exception {
                    return Arrays.asList(tuple2._2.split(" ")).iterator();
                }
            });
    
            JavaPairDStream<String, Integer> pair = flatLine.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairDStream<String, Integer> count = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            count.print();
            return jsc;
        }
    
    }
    

    这次我们启动的时候会发现先从checkpoint中恢复数据, 从上次宕机的数据开始读取并执行。但是,当我们更改功能时,发现新修改的部分没有执行, 还是执行的上次保存的代码。。。。。。。

    这时候可以把offset 保存至zookeeper中

    主方法

    import com.pl.data.offset.getoffset.GetTopicOffsetFromKafkaBroker;
    import com.pl.data.offset.getoffset.GetTopicOffsetFromZookeeper;
    import kafka.common.TopicAndPartition;
    import org.apache.log4j.Logger;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import java.util.Map;
    
    public class UseZookeeperManageOffset {
    	/**
    	 * 使用log4j打印日志,“UseZookeeper.class” 设置日志的产生类
    	 */
    	static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);
    	
    	
    	public static void main(String[] args) throws InterruptedException {
    		
    		/**
    		 * 从kafka集群中得到topic每个分区中生产消息的最大偏移量位置
    		 */
    		Map<TopicAndPartition, Long> topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "t0315");
    		
    		/**
    		 * 从zookeeper中获取当前topic每个分区 consumer 消费的offset位置
    		 */
    		Map<TopicAndPartition, Long> consumerOffsets = 
    				GetTopicOffsetFromZookeeper.getConsumerOffsets("node1:2181,node2:2181,node3:2181","pl","t0315");
    		
    		/**
    		 * 合并以上得到的两个offset ,
    		 * 	思路是:
    		 * 		如果zookeeper中读取到consumer的消费者偏移量,那么就zookeeper中当前的offset为准。
    		 * 		否则,如果在zookeeper中读取不到当前消费者组消费当前topic的offset,就是当前消费者组第一次消费当前的topic,
    		 * 			offset设置为topic中消息的最大位置。
    		 */
    
    		if(null!=consumerOffsets && consumerOffsets.size()>0){
                topicOffsets.putAll(consumerOffsets);
            }
    		/**
    		 * 如果将下面的代码解开,是将topicOffset 中当前topic对应的每个partition中消费的消息设置为0,就是从头开始。
    		 */
    		/*for(Map.Entry<TopicAndPartition, Long> item:topicOffsets.entrySet()){
              item.setValue(0l);
    		}*/
    		
    		/**
    		 * 构建SparkStreaming程序,从当前的offset消费消息
    		 */
    		JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"pl");
    		jsc.start();
    		jsc.awaitTermination();
    		jsc.close();
    		
    	}
    }
    

    获取kafka中当前的offset 偏移量(kafka API)

    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.cluster.Broker;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.OffsetRequest;
    import kafka.javaapi.OffsetResponse;
    import kafka.javaapi.PartitionMetadata;
    import kafka.javaapi.TopicMetadata;
    import kafka.javaapi.TopicMetadataRequest;
    import kafka.javaapi.TopicMetadataResponse;
    import kafka.javaapi.consumer.SimpleConsumer;
    
    /**
     * 测试之前需要启动kafka
     * @author root
     *
     */
    public class GetTopicOffsetFromKafkaBroker {
    	public static void main(String[] args) {
    		
    		Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("node1:9092,node2:9092,node3:9092", "t0315");
    		Set<Entry<TopicAndPartition, Long>> entrySet = topicOffsets.entrySet();
    		for(Entry<TopicAndPartition, Long> entry : entrySet) {
    			TopicAndPartition topicAndPartition = entry.getKey();
    			Long offset = entry.getValue();
    			String topic = topicAndPartition.topic();
    			int partition = topicAndPartition.partition();
    			System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
    		}
    	
    	}
    	
    	/**
    	 * 从kafka集群中得到当前topic,生产者在每个分区中生产消息的偏移量位置
    	 * @param KafkaBrokerServer
    	 * @param topic
    	 * @return
    	 */
    	public static Map<TopicAndPartition,Long> getTopicOffsets(String KafkaBrokerServer, String topic){
    		Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
    		// 遍历kafka集群,并拆分
    		for(String broker:KafkaBrokerServer.split(",")){
    			SimpleConsumer simpleConsumer = new SimpleConsumer(broker.split(":")[0],Integer.valueOf(broker.split(":")[1]), 64*10000,1024,"consumer"); 
    			TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));
    			TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
    			List<TopicMetadata> topicMetadataList = topicMetadataResponse.topicsMetadata();
    			// 遍历每个topic下的元数据
    			for (TopicMetadata metadata : topicMetadataList) {
    				// 遍历元数据下的分区
    				for (PartitionMetadata part : metadata.partitionsMetadata()) {
    					Broker leader = part.leader();
    					if (leader != null) { 
    						TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId()); 
    						
    						PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 10000); 
    						OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); 
    						OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); 
    						
    						if (!offsetResponse.hasError()) { 
    							long[] offsets = offsetResponse.offsets(topic, part.partitionId()); 
    							retVals.put(topicAndPartition, offsets[0]);
    						}
    					}
    				}
    			}
    			simpleConsumer.close();
    		}
    		return retVals;
    	}
    }
    
    

    获取zookeeper中上次的消费的offset

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryUntilElapsed;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    import kafka.common.TopicAndPartition;
    
    public class GetTopicOffsetFromZookeeper {
       
       public static Map<TopicAndPartition,Long> getConsumerOffsets(String zkServers,String groupID, String topic) { 
       	Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
       	// 连接 zookeeper
       	ObjectMapper objectMapper = new ObjectMapper();
       	CuratorFramework  curatorFramework = CuratorFrameworkFactory.builder()
       			.connectString(zkServers).connectionTimeoutMs(1000)
       			.sessionTimeoutMs(10000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
       	curatorFramework.start();
       	
       	try{
       		String nodePath = "/consumers/"+groupID+"/offsets/" + topic;
       		if(curatorFramework.checkExists().forPath(nodePath)!=null){
       			List<String> partitions=curatorFramework.getChildren().forPath(nodePath);
       			for(String partiton:partitions){
       				int partitionL=Integer.valueOf(partiton);
       				Long offset=objectMapper.readValue(curatorFramework.getData().forPath(nodePath+"/"+partiton),Long.class);
       				TopicAndPartition topicAndPartition=new TopicAndPartition(topic,partitionL);
       				retVals.put(topicAndPartition, offset);
       			}
       		}
       	}catch(Exception e){
       		e.printStackTrace();
       	}
       	curatorFramework.close();
       	
       	return retVals;
       } 
       
       
       public static void main(String[] args) {
       	Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("node1:2181,node2:2181,node3:2181","pl","t0315");
       	Set<Entry<TopicAndPartition, Long>> entrySet = consumerOffsets.entrySet();
       	for(Entry<TopicAndPartition, Long> entry : entrySet) {
       		TopicAndPartition topicAndPartition = entry.getKey();
       		String topic = topicAndPartition.topic();
       		int partition = topicAndPartition.partition();
       		Long offset = entry.getValue();
       		System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
       	}
       }
    }
    

    读取kafka中指定offset开始的消息

    import com.fasterxml.jackson.databind.ObjectMapper;
    import kafka.common.TopicAndPartition;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryUntilElapsed;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.HasOffsetRanges;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import org.apache.spark.streaming.kafka.OffsetRange;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicReference;
    
    public class SparkStreamingDirect {
    	public static JavaStreamingContext getStreamingContext(Map<TopicAndPartition, Long> topicOffsets,final String groupID){
    		SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingOnKafkaDirect");
    		conf.set("spark.streaming.kafka.maxRatePerPartition", "10");
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
    //        jsc.checkpoint("/checkpoint");
            Map<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("metadata.broker.list","node1:9092,node2:9092,node3:9092");
    //        kafkaParams.put("group.id","MyFirstConsumerGroup");
            for(Map.Entry<TopicAndPartition,Long> entry:topicOffsets.entrySet()){
        		System.out.println(entry.getKey().topic()+"\t"+entry.getKey().partition()+"\t"+entry.getValue());
            }
    
            JavaInputDStream<String> message = KafkaUtils.createDirectStream(
    			jsc,
    	        String.class,
    	        String.class, 
    	        StringDecoder.class,
    	        StringDecoder.class, 
    	        String.class,
    	        kafkaParams,
    	        topicOffsets, 
    	        new Function<MessageAndMetadata<String,String>,String>() {
    				private static final long serialVersionUID = 1L;
    				public String call(MessageAndMetadata<String, String> v1)throws Exception {
    	                return v1.message();
    	            }
    	        }
    		);
            final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
            JavaDStream<String> lines = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
    			private static final long serialVersionUID = 1L;
    			@Override
                public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
                  OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                  offsetRanges.set(offsets);
                  return rdd;
                }
              }
            );
            message.foreachRDD(new VoidFunction<JavaRDD<String>>(){
                /**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
                public void call(JavaRDD<String> t) throws Exception {
                    ObjectMapper objectMapper = new ObjectMapper();
                    CuratorFramework  curatorFramework = CuratorFrameworkFactory.builder()
                            .connectString("node1:2181,node2:2181,node3:2181").connectionTimeoutMs(1000)
                            .sessionTimeoutMs(10000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
                    curatorFramework.start();
                    for (OffsetRange offsetRange : offsetRanges.get()) {
                    	long fromOffset = offsetRange.fromOffset();
                    	long untilOffset = offsetRange.untilOffset();
                    	final byte[] offsetBytes = objectMapper.writeValueAsBytes(offsetRange.untilOffset());
                        String nodePath = "/consumers/"+groupID+"/offsets/" + offsetRange.topic()+ "/" + offsetRange.partition();
                        System.out.println("nodePath = "+nodePath);
                        System.out.println("fromOffset = "+fromOffset+",untilOffset="+untilOffset);
                        if(curatorFramework.checkExists().forPath(nodePath)!=null){
                            curatorFramework.setData().forPath(nodePath,offsetBytes);
                        }else{
                            curatorFramework.create().creatingParentsIfNeeded().forPath(nodePath, offsetBytes);
                        }
                    }
                    curatorFramework.close();
                }
    
            });
            lines.print();
            return jsc;
        }
    }
    
    展开全文
  • 文章目录单机版环境搭建及相关DEMOFlumeFlume基本介绍与架构Flume安装部署案例实操Kafka环境搭建Kafka控制台的一些命令操作Java API控制KafkaFlume+Kafka配合SparkSpark 简介Spark环境搭建在Spark Shell 中运行代码...

    大数据开发文档

    本文档主要讲述了flume+kafka+spark的单机分布式搭建,由浅入深,介绍了常见大数据流处理流程

    单机版环境搭建及相关DEMO

    Flume

    Flume基本介绍与架构

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

    Flume出生日记

    有很多的服务和系统

    • network devices
    • operating system
    • web servers
    • Applications

    这些系统都会产生很多的日志,那么把这些日志拿出来,用来分析时非常有用的。

    如何解决数据从其他的server上移动到Hadoop上?

    shell cp hadoop集群上的机器上, hadoop fs -put …/ 直接拷贝日志,但是没办法监控,而cp的时效性也不好,容错负载均衡也没办法做

    ======>

    Flume诞生了

    Flume架构

    Flume组成架构如图1-1,所示:

    在这里插入图片描述
    ​ 图1-1 Flume组成架构

    Agent

    Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,是Flume数据传输的基本单元。

    Agent主要有3个部分组成,Source、Channel、Sink。

    Source

    Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

    Channel

    Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

    Flume自带两种Channel:Memory ChannelFile Channel

    Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

    Sink

    Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

    Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。

    Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

    Event

    传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

    Flume拓扑结构

    Flume的拓扑结构如图1-3、1-4、1-5和1-6所示:

    在这里插入图片描述

    ​ 图1-3 Flume Agent连接

    在这里插入图片描述

    ​ 图1-4 单source,多channel、sink

    在这里插入图片描述

    ​ 图1-5 Flume负载均衡

    在这里插入图片描述

    ​ 图1-6 Flume Agent聚合

    Flume安装部署

    Flume的安装相对简单,但是前提是要先下好Java环境JDK,1.8以上即可,JDK安装可以查看Kafka安装流程,这里以Linux下的安装为例

    Flume安装地址

    安装部署

    1. 解压apache-flume-1.7.0-bin.tar.gz到/usr/local/目录下(安装包详见安装包文件夹flume文件夹下的tar.gz压缩包)
    #把下载的包移动到目录
    $ sudo mv apache-flume-1.7.0-bin.tar.gz /usr/local
    #解压
    $ sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz  /usr/local/
    
    1. 修改apache-flume-1.7.0-bin的名称为flume
    $ sudo mv apache-flume-1.7.0-bin flume
    
    1. 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
    $ mv flume-env.sh.template flume-env.sh
    
    $ vi flume-env.sh
    
    export JAVA_HOME=/opt/module/jdk1.8.0_144(这里路径替换为本机JDK安装目录)
    
    

    案例实操

    • 监控端口数据

      • 案例需求:首先,Flume监控本机44444端口,然后通过telnet工具向本机44444端口发送消息,最后Flume将监听的数据实时显示在控制台。

      • 需求分析

    在这里插入图片描述

    • 实现步骤:

      • 安装telnet工具

        在/usr/local目录下创建flume-telnet文件夹。

        $ mkdir flume-telnet
        

        再将rpm软件包(xinetd-2.3.14-40.el6.x86_64.rpm、telnet-0.17-48.el6.x86_64.rpm和telnet-server-0.17-48.el6.x86_64.rpm)拷入/usr/local/flume-telnet文件夹下面。执行RPM软件包安装命令:

        $ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm
        
        $ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm
        
        $ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm
        
        
    • 判断44444端口是否被占用

      判断44444端口是否占用,如果被占用则kill掉或者更换端口

      $ sudo netstat -tunlp | grep 44444
      功能描述:netstat命令是一个监控TCP/IP网络的非常有用的工具,它可以显示路由表、实际的网络连接以及每一个网络接口设备的状态信息。
      
      基本语法:netstat [选项]
      
      选项参数:
      
      -t或--tcp:显示TCP传输协议的连线状况; 
      
      -u或--udp:显示UDP传输协议的连线状况;
      
             -n或--numeric:直接使用ip地址,而不通过域名服务器; 
      
             -l或--listening:显示监控中的服务器的Socket; 
      
             -p或--programs:显示正在使用Socket的程序识别码和程序名称;
      
      
    • 创建Flume Agent配置文件flume-telnet-logger.conf

      在flume目录下创建job文件夹并进入job文件夹

      $ mkdir job
      $ cd job/	
      
    • 在job文件夹下创建Flume Agent配置文件flume-telnet-logger.conf

      $ touch flume-telnet-logger.conf
      # 如果觉得vim上手难度太大,可以使用gedit来进行编辑
      $ vim flume-telnet-logger.conf
      # 在conf文件中加入以下内容
      
      # Name the components on this agent
      
      a1.sources = r1
      
      a1.sinks = k1
      
      a1.channels = c1
      
       
      
      # Describe/configure the source
      
      a1.sources.r1.type = netcat
      
      a1.sources.r1.bind = localhost
      
      a1.sources.r1.port = 44444
      
       
      
      # Describe the sink
      
      a1.sinks.k1.type = logger
      
       
      
      # Use a channel which buffers events in memory
      
      a1.channels.c1.type = memory
      
      a1.channels.c1.capacity = 1000
      
      a1.channels.c1.transactionCapacity = 100
      
       
      
      # Bind the source and sink to the channel
      
      a1.sources.r1.channels = c1
      
      a1.sinks.k1.channel = c1
      

    注:配置文件来源于官方手册

    在这里插入图片描述

    • 先开启flume监听端口

      $ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
      
      参数说明:
      
             --conf conf/  :表示配置文件存储在conf/目录
      
             --name a1       :表示给agent起名为a1
      
             --conf-file job/flume-telnet.conf :flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
      
             -Dflume.root.logger==INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。
      
    • 使用telnet工具向本机的44444端口发送内容

      $ telnet localhost 44444
      
    • 将A服务器上的日志实时采集到B服务器

      一般跨节点都是使用avro sink

      技术选型有两种方案:

      • exec source + memory channel + avro sink

        // Flume的关键就是写配置文件,仍然是在conf文件夹下创建配置文件
        // avro-memory-sink.conf
        
        # Name the components on this agent
        exec-memory-avro.sources = exec-source
        exec-memory-avro.sinks = arvo-sink
        exec-memory-avro.channels = memory-channel
        
        # Describe/configure the source
        exec-memory-avro.sources.exec-source.type = exec
        exec-memory-avro.sources.exec-source.command = tail -F $FLUME_HOME/logs/flume.log
        exec-memory-avro.sources.exec-source.shell = /bin/sh -c
        
        # Describe the sink
        exec-memory-avro.sinks.arvo-sink.type = avro
        exec-memory-avro.sinks.arvo-sink.hostname = localhost
        exec-memory-avro.sinks.arvo-sink.port = 44444
        
        # Use a channel which buffers events in memory
        exec-memory-avro.channels.memory-channel.type = memory
        exec-memory-avro.channels.memory-channel.capacity = 1000
        exec-memory-avro.channels.memory-channel.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        exec-memory-avro.sources.exec-source.channels = memory-channel
        exec-memory-avro.sinks.arvo-sink.channel = memory-channel
        
      • avro source + memory channel + logger sink

        // avro-logger-sink.conf
        # Name the components on this agent
        avro-memory-logger.sources = avro-source
        avro-memory-logger.sinks = logger-sink
        avro-memory-logger.channels = memory-channel
        
        # Describe/configure the source
        avro-memory-logger.sources.avro-source.type = avro
        avro-memory-logger.sources.avro-source.bind = localhost
        avro-memory-logger.sources.avro-source.port = 44444
        
        # Describe the sink
        avro-memory-logger.sinks.logger-sink.type = logger
        
        # Use a channel which buffers events in memory
        avro-memory-logger.channels.memory-channel.type = memory
        avro-memory-logger.channels.memory-channel.capacity = 1000
        avro-memory-logger.channels.memory-channel.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        avro-memory-logger.sources.avro-source.channels = memory-channel
        avro-memory-logger.sinks.logger-sink.channel = memory-channel
        

      接下来启动两个配置

      先启动avro-memory-logger
      
      flume-ng agent \
      
      --name avro-memory-logger \
      
      --conf $FLUME_HOME/conf \
      
      --conf-file $FLUME_HOME/conf/avro-memory-logger.conf \
      
      -Dflume.root.logger=INFO,console
      
      再启动另外一个
      
      flume-ng agent --name exec-memory-avro 
      
      --conf $FLUME_HOME/conf \
      
      --conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
      
      -Dflume.root.logger=INFO,console
      
      

    在这里插入图片描述

    在这里插入图片描述

    一个可能因为手误出现的bug

    log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor).
    log4j:WARN Please initialize the log4j system properly.

    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

    出现这个错误是因为路径没有写对

    往监听的日志中输入一段字符串,可以看到我们的logger sink 已经成功接收到信息

    在这里插入图片描述

    上面Flume的基本流程图如下

    在这里插入图片描述


    Kafka

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,[3]这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理

    具体的架构可以查看官网的intro部分

    因为在实际编程中使用kafka_2.11-0.11.00以上版本和使用以下版本的Java API 不一致,所以推荐直接参照官网的文档进行编程。

    环境搭建

    单机单节点

    搭建说明

    需要有一定的Linux操作经验,对于没有权限之类的问题要懂得通过命令解决

    Kafka的安装相比Flume来说更加复杂,因为Kafka依赖于Zookeeper

    环境说明:

    • os:Ubuntu 18.04
    • zookeeper:zookeeper 3.4.9
    • kafka:kafka_2.11-0.11.0.0
    • jdk:jdk 8(kafka启动需要使用到jdk)

    详细说明:

    一、jdk安装

    jdk分为以下几种:jre、openjdk、 oracle jdk,这里我们要安装的是oracle jdk(推荐安装)

    add-apt-repository ppa:webupd8team/java
    apt-get update
    apt-get install oracle-java8-installer
    apt-get install oracle-java8-set-default
    

    测试安装版本:

    img

    二、安装配置zookeeper单机模式

    下载zookeeper 3.4.5,开始安装(软件包详见软件包下的kafka中的压缩包):

    cd /usr/local
    wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
    

    img

    等待安装成功:

    img

    解压:

    tar -zxvf zookeeper-3.4.5.tar.gz
    

    解压后同目录下便存在相同文件夹:

    img

    切换到conf目录下:

    cd zookeeper-3.4.5/conf/
    

    img

    复制zoo_sample.cfg到zoo.cfg:

    cp zoo_sample.cfg zoo.cfg
    

    然后编辑zoo.cfg如下(其它不用管,默认即可):

    initLimit=10
    syncLimit=5
    dataDir=/home/young/zookeeper/data
    clientPort=2181
    

    img

    别忘了新建dataDir目录:

    mkdir /home/young/zookeeper/data
    

    为zookeeper创建环境变量,打开/etc/profile文件,并在最末尾添加如下内容:

    vi /etc/profile
    

    添加内容如下:

    export ZOOKEEPER_HOME=/home/young/zookeeper
    export PATH=.:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
    

    img

    配置完成之后,切换到zookeeper/bin目录下,启动服务:

    img

    关闭服务:

    img

    这里暂时先关闭zookeeper服务,防止下面使用kafka启动时报端口占用错误。

    三、安装配置kafka单机模式

    下载kafka(安装包详见软件包kafka下的压缩包):

    cd /usr/local
    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
    

    解压:

    tar -zxvf kafka_2.11-0.11.0.0.tgz
    

    img

    进入kafka/config目录下:

    img

    以上文件是需要修改的文件,下面一个个修改配置:

    配置server.properties:

    以下为修改的,其他为默认即可:

    #broker.id需改成正整数,单机为1就好
    broker.id=1
    #指定端口号
    port=9092
    #localhost这一项还有其他要修改,详细见下面说明
    host.name=localhost
    #指定kafka的日志目录
    log.dirs=/usr/local/kafka_2.11-0.11.0.0/kafka-logs
    #连接zookeeper配置项,这里指定的是单机,所以只需要配置localhost,若是实际生产环境,需要在这里添加其他ip地址和端口号
    zookeeper.connect=localhost:2181
    

    img

    配置zookeeper.properties:

    #数据目录
    dataDir=/usr/local/kafka_2.11-0.11.0.0/zookeeper/data
    #客户端端口
    clientPort=2181
    host.name=localhost
    

    img

    配置producer.properties:

    zookeeper.connect=localhost:2181
    

    img

    配置consumer.properties:

    zookeeper.connect=localhost:2181
    

    img

    最后还需要拷贝几个jar文件到kafka的libs目录,分别是zookeeper-xxxx.jar、log4j-xxxx.jar、slf4j-simple-xxxx.jar,最后如下:

    img

    四、kafka的使用

    启动zookeeper服务:

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    img

    img

    新开一个窗口启动kafka服务:

    bin/kafka-server-start.sh config/server.properties
    

    img

    img

    至此单机服务搭建已经全部完成

    单机多节点

    对于单机单节点只需要使用一个配置文件来启动即可,那么对于单机多节点,只需要建立多个配置文件,并且启动即可。比如我们需要有三个节点。

    在这里插入图片描述

    然后我们的每个server properies里面的端口以及ID要不一致

    server-1.properties

    在这里插入图片描述

    server-2.properties

    在这里插入图片描述

    server-3.properties

    在这里插入图片描述

    当然其对应的log对应目录也要修改,这个就不多说了

    然后在控制台启动

    > bin/kafka-server-start.sh config/server-1.properties &
    > bin/kafka-server-start.sh config/server-2.properties &
    > bin/kafka-server-start.sh config/server-3.properties &
    

    通过jps -m 能看到三个kafka即可(可能以普通用户看不到相应的进程,只是因为没给到权限,可以给权限或者直接sudo su切换到超级用户)

    Kafka控制台的一些命令操作

    控制台中我们可以通过命令建立topic,并且开启一个消费者一个生产者来模拟通信,这些在官网的quickstart中都有详尽的描述

    [外链图片转存失败(img-cCfCODtn-1569486879029)(../%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%9C%80%E7%BB%88%E7%89%88%E6%96%87%E6%A1%A3/kafka%E5%AD%A6%E4%B9%A0/producer.png)]

    在这里插入图片描述

    通过我们的一个叫topic的标签,我们建立了一个生产者和一个消费者,可以明显看到消费者接收到了生产者的消息。其他比较常用的命令,比如describe等可以自行探索。

    Java API控制Kafka

    接下来会说一个简单的在Java中使用Kafka小例子

    这里都是基于2.11_0.11.0.0.0版本以及之后的编程来说明,更低版本相应的API有些许变化,低版本中很多函数已经被替代和废除。

    基本配置

    • 首先在Idea中建立一个新的Maven项目,这里我们选择一个achetype:scala-archetype-simple

    在这里插入图片描述

    • 接下来我们把Maven文件配置好,并且auto import dependencies,这里如果没有选择auto import,我们可以在Pom.xml右键找到maven选项里面有一个reload

      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.test.spark</groupId>
        <artifactId>spark streaming</artifactId>
        <version>1.0</version>
        <inceptionYear>2008</inceptionYear>
        <properties>
          <scala.version>2.7.0</scala.version>
          <kafka.version>0.11.0.0</kafka.version>
        </properties>
      
      
        <dependencies>
          <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
          </dependency>
          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
          </dependency>
        </dependencies>
      
        <build>
          <sourceDirectory>src/main/scala</sourceDirectory>
          <testSourceDirectory>src/test/scala</testSourceDirectory>
          <plugins>
            <plugin>
              <groupId>org.scala-tools</groupId>
              <artifactId>maven-scala-plugin</artifactId>
              <executions>
                <execution>
                  <goals>
                    <goal>compile</goal>
                    <goal>testCompile</goal>
                  </goals>
                </execution>
              </executions>
              <configuration>
                <scalaVersion>${scala.version}</scalaVersion>
                <args>
                  <arg>-target:jvm-1.5</arg>
                </args>
              </configuration>
            </plugin>
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-eclipse-plugin</artifactId>
              <configuration>
                <downloadSources>true</downloadSources>
                <buildcommands>
                  <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                </buildcommands>
                <additionalProjectnatures>
                  <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                </additionalProjectnatures>
                <classpathContainers>
                  <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                  <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                </classpathContainers>
              </configuration>
            </plugin>
          </plugins>
        </build>
        <reporting>
          <plugins>
            <plugin>
              <groupId>org.scala-tools</groupId>
              <artifactId>maven-scala-plugin</artifactId>
              <configuration>
                <scalaVersion>${scala.version}</scalaVersion>
              </configuration>
            </plugin>
          </plugins>
        </reporting>
      </project>
      
      
      • 因为我们使用Java编程,所以我们在main下面建立一个java文件夹,并且把整个文件夹设为source,如下图

        在这里插入图片描述

    • 然后我们在这个例子会涉及到几个Class,包括启动的Class,消费者,生产者,配置

    在这里插入图片描述

    代码分析

    //KafkaProperties.java
    
    package com.test.spark.kafka;
    
    /**
     * Kafka常用配置文件
     */
    public class KafkaProperties {
    
        public static final String ZK= "211.83.96.204:2181";
        public static final String TOPIC= "test";
        public static final String BROKER_LIST = "211.83.96.204:9092";
        public static final String GROUP_ID = "test_group1";
    
    }
    

    首先看一下配置文件,为了配置能更加全局化好修改,我们直接建立一个配置文件,把可能需要的一些全局参数放进来,方便后续开发。其中有zookeeper的IPTopic名称服务器列表以及group_id

    // KafkaProducerClient.java
    
    package com.test.spark.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    
    /**
     * Kafka 生产者
     */
    public class KafkaProducerClient extends Thread{
    
        private  String topic;
        private  Producer<String, String> producer;
        public KafkaProducerClient(String topic) {
            this.topic = topic;
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers","localhost:9092");
    //        properties.put("serializer.class","kafka.serializer.StringEncoder");
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("request.required.acks","1");
            producer = new KafkaProducer<String, String>(properties);
        }
    
        @Override
        public void run() {
    
            int messageNo = 1;
    
            while(true) {
                String message = "message_" + messageNo;
                producer.send(new ProducerRecord<String, String>(topic, message));
                System.out.println("Sent: " + message);
    
                messageNo ++;
    
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    消费者中我们使用多线程的方式,循环发送消息

    // KafkaConsumerClient.java
    
    package com.test.spark.kafka;
    
    import kafka.consumer.ConsumerConnector$class;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * Kafka消费者
     */
    public class KafkaConsumerClient {
        private String topic;
    
        public KafkaConsumerClient(String topic) {
            this.topic = topic;
    
    
        }
    
        public void start() {
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", KafkaProperties.GROUP_ID);//不同ID 可以同时订阅消息
            props.put("enable.auto.commit", "false");//自动commit
            props.put("auto.commit.interval.ms", "1000");//定时commit的周期
            props.put("session.timeout.ms", "30000");//consumer活性超时时间
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(this.topic));//订阅TOPIC
            try {
                while(true) {//轮询
                    ConsumerRecords<String, String> records =consumer.poll(Long.MAX_VALUE);//超时等待时间
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            System.out.println("receive" + ": " + record.value());
                        }
                        consumer.commitSync();//同步
                    }
                }
            } finally
    
            {
                consumer.close();
            }
        }
    }
    
    

    在消费中我们会轮询消息

    在这里插入图片描述

    Flume+Kafka配合

    把logger sink ===> kafka sink

    sink kafka: producer

    所以启动一个kafka的consumer,直接对接到kafka sink消费掉即可

    //avro-memory-kafka.conf
    
    # Name the components on this agent
    avro-memory-kafka.sources = avro-source
    avro-memory-kafka.sinks = kafka-sink
    avro-memory-kafka.channels = memory-channel
    
    # Describe/configure the source
    avro-memory-kafka.sources.avro-source.type = avro
    avro-memory-kafka.sources.avro-source.bind = localhost
    avro-memory-kafka.sources.avro-source.port = 44444
    
    # Describe the sink
    avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    avro-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
    avro-memory-kafka.sinks.kafka-sink.topic = test
    avro-memory-kafka.sinks.kafka-sink.batchSize = 5
    avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
    
    # Use a channel which buffers events in memory
    avro-memory-kafka.channels.memory-channel.type = memory
    avro-memory-kafka.channels.memory-channel.capacity = 1000
    avro-memory-kafka.channels.memory-channel.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    avro-memory-kafka.sources.avro-source.channels = memory-channel
    avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
    

    [外链图片转存失败(img-o7uizDQ2-1569486879031)(…/%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%9C%80%E7%BB%88%E7%89%88%E6%96%87%E6%A1%A3/kafka%E5%AD%A6%E4%B9%A0/flume/connect_flume_kafka.png)]

    注意这个batchSize,在数据量没有到达设定的阈值时,他会有一个timeout,这之后才会有数据发过来


    Spark

    Spark 简介

    1. 什么是Spark?Spark作为Apache顶级的开源项目,是一个快速、通用的大规模数据处理引擎,和Hadoop的MapReduce计算框架类似,但是相对于MapReduce,Spark凭借其可伸缩、基于内存计算等特点,以及可以直接读写Hadoop上任何格式数据的优势,进行批处理时更加高效,并有更低的延迟。相对于“one stack to rule them all”的目标,实际上,Spark已经成为轻量级大数据快速处理的统一平台,各种不同的应用,如实时流处理、机器学习、交互式查询等,都可以通过Spark建立在不同的存储和运行系统上。
    2. Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
    3. Spark于2009年诞生于加州大学伯克利分校AMPLab。目前,已经成为Apache软件基金会旗下的顶级开源项目。相对于MapReduce上的批量计算、迭代型计算以及基于Hive的SQL查询,Spark可以带来上百倍的性能提升。目前Spark的生态系统日趋完善,Spark SQL的发布、Hive on Spark项目的启动以及大量大数据公司对Spark全栈的支持,让Spark的数据分析范式更加丰富。

    Spark环境搭建

    Hadoop安装(Spark依赖于Hadoop安装)

    参考链接

    Hadoop可以通过HadoopDownloadOne 或者HadoopDownloadTwo 下载,一般选择下载最新的稳定版本,即下载 “stable” 下的hadoop-2.x.y.tar.gz 这个格式的文件(详见安装文件夹中的hadoop-2.7.7)

    $ sudo tar -zxf  hadoop-2.7.7.tar.gz  -C /usr/local    # 解压到/usr/local中
    $ cd /usr/local/
    $ sudo mv ./hadoop-2.6.0/ ./hadoop            # 将文件夹名改为hadoop
    $ sudo chown -R hadoop ./hadoop       # 修改文件权限
    

    Hadoop 解压后即可使用。输入如下命令来检查 Hadoop 是否可用,成功则会显示 Hadoop 版本信息:

    $ cd /usr/local/hadoop
    $ ./bin/hadoop version
    

    Hadoop单机配置及运行测试

    Hadoop 默认模式为非分布式模式(本地模式),无需进行其他配置即可运行。非分布式即单 Java 进程,方便进行调试。

    现在我们可以执行例子来感受下 Hadoop 的运行。Hadoop 附带了丰富的例子(运行 ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar 可以看到所有例子),包括 wordcount、terasort、join、grep 等。

    $ cd /usr/local/hadoop
    $ mkdir ./input
    $ cp ./etc/hadoop/*.xml ./input   # 将配置文件作为输入文件
    $ ./bin/Hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep ./input ./output 'dfs[a-z.]+'
    $ cat ./output/*          # 查看运行结果
    

    注意,Hadoop 默认不会覆盖结果文件,因此再次运行上面实例会提示出错,需要先将 ./output 删除。

    如果中间提示 Error: JAVA_HOME is not set and could not be found. 的错误,则说明之前设置 JAVA_HOME 环境变量那边就没设置好,请按教程先设置好 JAVA_HOME 变量,否则后面的过程都是进行不下去的。如果已经按照前面教程在.bashrc文件中设置了JAVA_HOME,还是出现 Error: JAVA_HOME is not set and could not be found. 的错误,那么,请到hadoop的安装目录修改配置文件“/usr/local/hadoop/etc/hadoop/hadoop-env.sh”,在里面找到“export JAVA_HOME=${JAVA_HOME}”这行,然后,把它修改成JAVA安装路径的具体地址,比如,“export JAVA_HOME=/usr/lib/jvm/default-java”,然后,再次启动Hadoop。

    Spark安装

    此处采用Spark和Hadoop一起安装使用,这样,就可以让Spark使用HDFS存取数据。需要说明的是,当安装好Spark以后,里面就自带了scala环境,不需要额外安装scala。在安装spark之前,需要先安装Java和Hadoop。

    需要的具体运行环境如下:

    Ø Ubuntu16.04以上

    Ø Hadoop 2.7.1以上

    Ø Java JDK 1.8以上

    Ø Spark 2.1.0 以上

    Ø Python 3.4以上

    (此次系统环境使用的Ubuntu16.04,自带Python,不需额外安装)

    Spark官网下载

    由于已经安装了Hadoop,所以在Choose a package type后面需要选择Pre-build with user-provided Hadoop,然后点击Download Spark后面的spark-2.1.0-bin-without-hadoop.tgz下载即可。需要说明的是,Pre-build with user-provided Hadoop:属于“Hadoop free”版,这样下载到的Spark,可应用到任意Hadoop版本。

    Spark部署模式主要有四种:Local模式(单机模式)、Standalone模式(使用Spark自带的简单集群管理器)、YARN模式(使用YARN作为集群管理器)和Mesos模式(使用Mesos作为集群管理器)。

    这里介绍Local模式(单机模式)的 Spark安装。我们选择Spark 2.4.3版本,并且假设当前使用用户名hadoop登录了Linux操作系统。

    $ sudo tar -zxf ~/下载/spark-2.4.3-bin-without-hadoop.tgz -C/usr/local/
    $ sudo mv ./spark-2.4.3-bin-without-hadoop/ ./spark
    $ sudo chown -R hadoop:hadoop ./spark       # 此处的 hadoop 为你的用户名
    
    

    安装后,还需要修改Spark的配置文件spark-env.sh

    $ cd /usr/local/spark
    $ sudo cp conf/spark-env.sh.template conf/spark-env.sh
    $ sudo vim conf/spark-env.sh
    #添加下面的环境变量信息
    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop:classpath)
    

    有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据。如果没有配置上面信息,Spark就只能读写本地数据,无法读写HDFS数据。

    配置完成后就可以直接使用,不需要像Hadoop运行启动命令。通过运行Spark自带的示例,验证Spark是否安装成功。

    $ cd /usr/local/spark
    $ bin/run-example SparkPi
    

    过滤后的运行结果如下图示,可以得到π 的 5 位小数近似值:

    在这里插入图片描述

    Spark不依赖Hadoop安装

    Spark同样也可以不依赖hadoop进行安装,但是仍然需要JDK环境,同样是在Spark官网上,选择spark-2.4.3-bin-hadoop2.7.tgz。我们直接将其解压出来,下面我们开始配置环境变量。我们进入编辑/etc/profile,在最后加上如下代码。

    #Spark
    export SPARK_HOME=/opt/spark-2.4.3
    export PATH=$PATH:$SPARK_HOME/bin
    

    然后进入/spark-2.3.1/bin目录下即可直接运行spark-shell。

    下面配置本地集群环境,首先我们进入刚刚解压的Spark目录,进入/spark-2.2.1/conf/,拷贝一份spark-env.sh。

    $ cp spark-env.sh.template spark-env.sh
    

    然后我们编辑这个文件,添加如下环境设置(按自身环境修改)

    #export SCALA_HOME=/opt/scala-2.13.0
    export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-3.b13.el7_5.x86_64  #这里是你jdk的安装路径
    export SPARK_HOME=/opt/spark-2.4.3
    export SPARK_MASTER_IP=XXX.XX.XX.XXX  #将这里的xxx改为自己的Linux的ip地址
    #export SPARK_EXECUTOR_MEMORY=512M
    #export SPARK_WORKER_MEMORY=1G
    #export master=spark://XXX.XX.XX.XXX:7070
    

    再回到conf目录下,拷贝一份slaves。

    $ cp slaves.template slaves
    

    在slaves最后加上localhost,保存即可。最后想要启动spark,进入安装目录下的sbin文件夹下,运行start-all.sh输入登录密码,master和worker进程就能按照配置文件启动。

    在Spark Shell 中运行代码

    这里介绍Spark Shell的基本使用。Spark shell提供了简单的方式来学习API,并且提供了交互的方式来分析数据。它属于REPL(Read-Eval-Print Loop,交互式解释器),提供了交互式执行环境,表达式计算完成就会输出结果,而不必等到整个程序运行完毕,因此可即时查看中间结果,并对程序进行修改,这样可以在很大程度上提升开发效率。

    Spark Shell支持Scala和Python,本文使用 Scala 来进行介绍。前面已经安装了Hadoop和Spark,如果Spark不使用HDFS和YARN,那么就不用启动Hadoop也可以正常使用Spark。如果在使用Spark的过程中需要用到 HDFS,就要首先启动 Hadoop。

    这里假设不需要用到HDFS,因此,就没有启动Hadoop。现在直接开始使用Spark。Spark-shell命令及其常用的参数如下:

    $ ./bin/spark-shell —master
    

    Spark的运行模式取决于传递给SparkContext的Master URL的值。Master URL可以是以下任一种形式:

    • local 使用一个Worker线程本地化运行SPARK(完全不并行)

    • local[*] 使用逻辑CPU个数数量的线程来本地化运行Spark

    • local[K] 使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU核数设定)

    • spark://HOST:PORT 连接到指定的Spark standalone master。默认端口是7077.

    • yarn-client 以客户端模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。

    • yarn-cluster 以集群模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。

    • mesos://HOST:PORT 连接到指定的Mesos集群。默认接口是5050。

    需要强调的是,本文采用“本地模式”(local)运行Spark,关于如何在集群模式下运行Spark,之后的文章会着重介绍。

    在Spark中采用本地模式启动Spark Shell的命令主要包含以下参数:

    –master:这个参数表示当前的Spark Shell要连接到哪个master,如果是local[*],就是使用本地模式启动spark-shell,其中,中括号内的星号表示需要使用几个CPU核心(core);

    –jars: 这个参数用于把相关的JAR包添加到CLASSPATH中;如果有多个jar包,可以使用逗号分隔符连接它们;

    比如,要采用本地模式,在4个CPU核心上运行spark-shell:

    $ cd /usr/local/spark
    $ /bin/spark-shell —master local[4]
    
    

    或者,可以在CLASSPATH中添加code.jar,命令如下:

    $ cd /usr/local/spark
    $ ./bin/spark-shell -master local[4] --jars code.jar
    

    可以执行spark-shell –help命令,获取完整的选项列表,具体如下:

    $ cd /usr/local/spark
    $ ./bin/spark-shell —help
    
    

    [外链图片转存失败(img-36hzRWo9-1569486879033)(spark-shell.png)]

    上面是命令使用方法介绍,下面正式使用命令进入spark-shell环境,可以通过下面命令启动spark-shell环境:

    scala> 8*2+5
    res0: Int = 21
    
    

    最后,可以使用命令“:quit”退出Spark Shell,如下所示:

    scala>:quit
    

    或者,也可以直接使用“Ctrl+D”组合键,退出Spark Shell

    Scala编写wordCount

    任务需求

    学会了上文基本的安装和执行后,现在练习一个任务:编写一个Spark应用程序,对某个文件中的单词进行词频统计。

    准备工作:进入Linux系统,打开“终端”,进入Shell命令提示符状态,然后,执行如下命令新建目录:

    $ cd /usr/local/spark
    $ mkdir mycode
    $ cd mycode
    $ mkdir wordcount
    $ cd wordcount
    
    

    然后,在/usr/local/spark/mycode/wordcount目录下新建一个包含了一些语句的文本文件word.txt,命令如下:

    $  vim word.txt
    

    首先可以在文本文件中随意输入一些单词,用空格隔开,编写Spark程序对该文件进行单词词频统计。然后,按键盘Esc键退出vim编辑状态,输入“:wq”保存文件并退出vim编辑器。

    在Spark-Shell中执行词频统计

    • 启动Spark-Shell

      首先,登录Linux系统(要注意记住登录采用的用户名,本教程统一采用hadoop用户名进行登录),打开“终端”(可以在Linux系统中使用Ctrl+Alt+T组合键开启终端),进入shell命令提示符状态,然后执行以下命令进入spark-shell:

    $ cd /usr/local/spark
    $ ./bin_spark-shell
    $ …这里省略启动过程显示的一大堆信息
    $ scala>
    

    ​ 启动进入spark-shell需要一点时间,在进入spark-shell后,我们可能还需要到Linux文件系统中对相关目录下的文件进行编辑和操作(比如要查看spark程序执行过程生成的文件),这个无法在park-shell中完成,因此,这里再打开第二个终端,用来在Linux系统的Shell命令提示符下操作。

    • 加载本地文件

      在开始具体词频统计代码之前,需要考虑如何加载文件,文件可能位于本地文件系统中,也有可能存放在分布式文件系统HDFS中,下面先介绍介绍如何加载本地文件,以及如何加载HDFS中的文件。首先,请在第二个终端窗口下操作,用下面命令到达/usr/local/spark/mycode/wordcount目录,查看一下上面已经建好的word.txt的内容:

      $ cd /usr/local/spark/mycode/wordcount
      $ cat word.txt
      
      

      Cat命令会把word.txt文件的内容全部显示到屏幕上。

      现在切换回spark-shell,然后输入下面命令:

      scala> val textFile = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)
      

      上面代码中,val后面的是变量textFile,而sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。这两个textFile不是一个东西,不要混淆。实际上,val后面的是变量textFile,你完全可以换个变量名称,比如,val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。这里使用相同名称,就是有意强调二者的区别。

      注意要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面我们执行一条“行动”类型的语句,就可以看到结果:

      scala>textFile.first()
      

      first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。屏幕上会显示很多反馈信息,这里不再给出,你可以从这些结果信息中,找到word.txt文件中的第一行的内容。

      正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:

       val textFile = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word123.txt”)
      

      上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作first(),如下:

       scala> textFile.first()
      

      执行上面语句后,会返回错误信息“拒绝连接”,因为这个word123.txt文件根本就不存在。现在我们可以练习一下如何把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中:

      val textFile = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)
      textFile.saveAsTextFile(“file:///usr/local/spark/mycode/wordcount/writeback”)
      
      

      上面的saveAsTextFile()括号里面的参数是保存文件的路径,不是文件名。saveAsTextFile()是一个“行动”(Action)类型的操作,所以马上会执行真正的计算过程,从word.txt中加载数据到变量textFile中,然后,又把textFile中的数据写回到本地文件目录“_usr_local_spark_mycode_wordcount_writeback/”下面,现在让我们切换到Linux Shell命令提示符窗口中,执行下面命令:

      $ cd /usr/local/spark/mycode/wordcount/writeback/
      $ ls
      

      执行结果会显示,有两个文件part-00000和_SUCCESS,我们可以使用cat命令查看一下part-00000文件,会发现结果是和上面word.txt中的内容一样的。

      词频统计

      有了前面的铺垫性介绍,下面我们开始第一个Spark应用程序:WordCount。请切换到spark-shell窗口,输入如下命令:

      scala> val textFile = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)
      scala> val wordCount = textFile.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
      scala> wordCount.collect()
      

      上面只给出了代码,省略了执行过程中返回的结果信息,因为返回信息很多。下面简单解释一下上面的语句。

      • textFile包含了多行文本内容,textFile.flatMap(line => line.split(” “))会遍历textFile中的每行文本内容,当遍历到其中一行文本内容时,会把文本内容赋值给变量line,并执行Lamda表达式line => line.split(” “)。line => line.split(” “)是一个Lamda表达式,左边表示输入参数,右边表示函数里面执行的处理逻辑,这里执行line.split(” “),也就是针对line中的一行文本内容,采用空格作为分隔符进行单词切分,从一行文本切分得到很多个单词构成的单词集合。这样,对于textFile中的每行文本,都会使用Lamda表达式得到一个单词集合,最终,多行文本,就得到多个单词集合。textFile.flatMap()操作就把这多个单词集合“拍扁”得到一个大的单词集合。

      • 然后,针对这个大的单词集合,执行map()操作,也就是map(word => (word, 1)),这个map操作会遍历这个集合中的每个单词,当遍历到其中一个单词时,就把当前这个单词赋值给变量word,并执行Lamda表达式word => (word, 1),这个Lamda表达式的含义是,word作为函数的输入参数,然后,执行函数处理逻辑,这里会执行(word, 1),也就是针对输入的word,构建得到一个tuple,形式为(word,1),key是word,value是1(表示该单词出现1次)。

      • 程序执行到这里,已经得到一个RDD,这个RDD的每个元素是(key,value)形式的tuple。最后,针对这个RDD,执行reduceByKey((a, b) => a + b)操作,这个操作会把所有RDD元素按照key进行分组,然后使用给定的函数(这里就是Lamda表达式:(a, b) => a + b),对具有相同的key的多个value进行reduce操作,返回reduce后的(key,value),比如(“hadoop”,1)和(“hadoop”,1),具有相同的key,进行reduce以后就得到(“hadoop”,2),这样就计算得到了这个单词的词频。

    编写独立应用程序执行词频统计

    在上面spark-shell编写wordcount后,下面我们编写一个Scala应用程序来实现词频统计。首先登录Linux系统,进入Shell命令提示符状态,然后执行下面命令:

    $ cd /usr/local/spark/mycode/wordcount/
    $ mkdir -p src/main/scala  这里加入-p选项,可以一起创建src目录及其子目录
    
    

    然后在“/usr/local/spark/mycode/wordcount/src/main/scala”目录下新建一个test.scala文件,里面包含如下代码:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext./
    import org.apache.spark.SparkConf
    
    object WordCount {
    def main(args: Array[String]) {
    val inputFile =  “file:///usr/local/spark/mycode/wordcount/word.txt”
    val conf = new SparkConf().setAppName(“WordCount”).setMaster(“local[2]”)
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(inputFile)
    val wordCount = textFile.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
    wordCount.foreach(println)
    }
    }
    
    

    注意,SparkConf().setAppName(“WordCount”).setMaster(“local[2]”)这句语句,也可以删除.setMaster(“local[2]”),只保留 val conf = new SparkConf().setAppName(“WordCount”)

    如果test.scala没有调用SparkAPI,则只要使用scalac命令编译后执行即可。此处test.scala程序依赖 Spark API,因此需要通过 sbt 进行编译打包。首先执行如下命令:

    $ cd /usr/local/spark/mycode/wordcount/
    $ vim simple.sbt
    
    

    通过上面代码,新建一个simple.sbt文件,请在该文件中输入下面代码:

    name := “Simple Project”
    version := “1.0”
    scalaVersion := “2.11.8”
    libraryDependencies += “org.apache.spark” %% “spark-core” % “2.1.0”
    
    

    下面我们使用sbt打包Scala程序。为保证sbt能正常运行,先执行如下命令检查整个应用程序的文件结构,应该是类似下面的文件结构:

    $ ./src
    $ ./src/main
    $ ./src/main/scala
    $ ./src/main/scala/test.scala
    $ ./simple.sbt
    $ ./word.txt
    

    接着,我们就可以通过如下代码将整个应用程序打包成 JAR(首次运行同样需要下载依赖包 ):

    $ cd /usr/local/spark/mycode/wordcount/  请一定把这目录设置为当前目录
    $ /usr/local/sbt/sbt package
    
    

    上面执行过程需要消耗几分钟时间,屏幕上会返回一下信息:

    hadoop@dblab-VirtualBox:_usr_local_spark_mycode_wordcount$ /usr_local_sbt_sbt package
    OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
    [info] Set current project to Simple Project (in build file:/usr_local_spark_mycode_wordcount/)
    [info] Updating {file:/usr_local_spark_mycode_wordcount/}wordcount…
    [info] Resolving jline#jline;2.12.1 …
    [info] Done updating.
    [info] Packaging _usr_local_spark_mycode_wordcount_target_scala-2.11_simple-project_2.11-1.0.jar …
    [info] Done packaging.
    [success] Total time: 34 s, completed 2017-2-20 10:13:13
    

    若屏幕上返回上述信息表明打包成功,生成的 jar 包的位置为/usr/local/spark/mycode/wordcount/target/scala-2.11_simple-project_2.11-1.0.jar

    最后通过spark-submit 运行程序。我们就可以将生成的jar包通过spark-submit提交到Spark中运行了,命令如下:

    $ /usr/local/spark/bin/spark-submit —class “WordCount”  /usr/local/spark/mycode/wordcount/target/scala-2.11_simple-project_2.11-1.0.jar
    

    最终得到的词频统计结果类似如下:

    (Spark,1)
    (is,1)
    (than,1)
    (fast,1)
    (love,2)
    (i,1)
    (I,1)
    (hadoop,2)
    

    Flume_Kafka_SparkStreaming实现词频统计

    准备工作

    在做这个project之前,需要预先准备好的环境如下:

    安装kafka(参考第一节)、安装flume(参考第二节)、安装Spark(参考第三节) 。

    做完上面三个工作之后,我们开始进入正式的词频统计Demo。

    Spark准备工作

    要通过Kafka连接Spark来进行Spark Streaming操作,Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。也就是说Spark需要jar包让Kafka和Spark streaming相连。按照我们前面安装好的Spark版本,这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,输入以下命令启动spark-shell:

    $ cd /usr/local/spark
    $ ./bin/spark-shell
    

    启动成功后,在spark-shell中执行下面import语句:

    import org.apache.spark.streaming.kafka._

    程序报错,因为找不到相关jar包。根据Spark官网的说明,对于Spark版本,如果要使用Kafka,则需要下载spark-streaming-kafka相关jar包。Jar包下载地址(注意版本对应关系)。

    在这里插入图片描述

    接下来需要把这个文件复制到Spark目录的jars目录下,输入以下命令:

    $ cd /usr/local/spark/jars
    $ mkdir kafka
    $ cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/kafka
    
    

    下面把Kafka安装目录的libs目录下的所有jar文件复制到/usr/local/spark/jars/kafka目录下输入以下命令:至此,所有环境准备工作已全部完成,下面开始编写代码。

    Project 过程

    • 编写Flume配置文件flume_to_kafka.conf

      输入命令:

      $ cd /usr/local/kafka/libs
      $ ls
      $ cp ./* /usr/local/spark/jars/kafka
      
      

      内容如下:

      a1.sources=r1
      a1.channels=c1
      a1.sinks=k1
      #Describe/configure the source 
      a1.sources.r1.type=netcat
      a1.sources.r1.bind=localhost
      a1.sources.r1.port=33333
      #Describe the sink
      a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink  
      a1.sinks.k1.kafka.topic=test  
      a1.sinks.k1.kafka.bootstrap.servers=localhost:9092  
      a1.sinks.k1.kafka.producer.acks=1  
      a1.sinks.k1.flumeBatchSize=20  
      #Use a channel which buffers events in memory  
      a1.channels.c1.type=memory
      a1.channels.c1.capacity=1000000
      a1.channels.c1.transactionCapacity=1000000
      #Bind the source and sink to the channel
      a1.sources.r1.channels=c1
      a1.sinks.k1.channel=c1
      
    • 编写Spark Streaming程序(进行词频统计的程序)

      首先创建scala代码的目录结构,输入命令:

      $ cd /usr/local/spark/mycode
      $ mkdir flume_to_kafka
      $ cd flume_to_kafka
      $ mkdir -p src/main/scala
      $ cd src/main/scala
      $ vim KafkaWordCounter.scala
      
      

      KafkaWordCounter.scala是用于单词词频统计,它会把从kafka发送过来的单词进行词频统计,代码内容如下:

      reduceByKeyAndWindow函数作用解释如下:

      package org.apache.spark.examples.streaming
      import org.apache.spark._
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming._
      import org.apache.spark.streaming.kafka._
      import org.apache.spark.streaming.StreamingContext._
      import org.apache.spark.streaming.kafka.KafkaUtils
      
      object KafkaWordCounter{
      def main(args:Array[String]){
      StreamingExamples.setStreamingLogLevels()
      val sc=new SparkConf().setAppName("KafkaWordCounter").setMaster("local[2]")
      val ssc=new StreamingContext(sc,Seconds(10))
      ssc.checkpoint("file:///usr/local/spark/mycode/flume_to_kafka/checkpoint") //设置检查点
      val zkQuorum="localhost:2181" //Zookeeper服务器地址
      val group="1"  //topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group" 
      val topics="test" //topics的名称          
      val numThreads=1 //每个topic的分区数
      val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
      val lineMap=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
      val lines=lineMap.map(_._2)
      val words=lines.flatMap(_.split(" "))
      val pair=words.map(x => (x,1))
      val wordCounts=pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) 
      wordCounts.print
      ssc.start
      ssc.awaitTermination
      }
      }
      
      

      reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);

      此代码中就是一个窗口转换操作reduceByKeyAndWindow,其中,Minutes(2)是滑动窗口长度,Seconds(10)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)。reduceByKeyAndWindow中就使用了加法和减法这两个reduce函数,加法和减法这两种reduce函数都是“可逆的reduce函数”,也就是说,当滑动窗口到达一个新的位置时,原来之前被窗口框住的部分数据离开了窗口,又有新的数据被窗口框住,但是,这时计算窗口内单词的词频时,不需要对当前窗口内的所有单词全部重新执行统计,而是只要把窗口内新增进来的元素,增量加入到统计结果中,把离开窗口的元素从统计结果中减去,这样,就大大提高了统计的效率。尤其对于窗口长度较大时,这种“逆函数”带来的效率的提高是很明显的。

    • 创建StreamingExamples.scala

      继续在当前目录(/usr/local/spark/mycode/flume_to_kafka/src/main/scala)下创建StreamingExamples.scala代码文件,用于设置log4j,输入命令:

      vim StreamingExamples.scala

      package org.apache.spark.examples.streaming
      import org.apache.spark.internal.Logging
      import org.apache.log4j.{Level, Logger}
      //Utility functions for Spark Streaming examples. 
      object StreamingExamples extends Logging {
      //Set reasonable logging levels for streaming if the user has not configured log4j. 
        def setStreamingLogLevels() {
          val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
          if (!log4jInitialized) {
            // We first log something to initialize Spark's default logging, then we override the
            // logging level.
            logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")
            Logger.getRootLogger.setLevel(Level.WARN)
          }
        }
      }
      
      
    • 创建StreamingExamples.scala

      继续在当前目录(/usr/local/spark/mycode/flume_to_kafka/src/main/scala)下创建StreamingExamples.scala代码文件,用于设置log4j,输入命令:

      vim StreamingExamples.scala

      package org.apache.spark.examples.streaming
      import org.apache.spark.internal.Logging
      import org.apache.log4j.{Level, Logger}
      //Utility functions for Spark Streaming examples. 
      object StreamingExamples extends Logging {
      //Set reasonable logging levels for streaming if the user has not configured log4j. 
        def setStreamingLogLevels() {
          val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
          if (!log4jInitialized) {
            // We first log something to initialize Spark's default logging, then we override the
            // logging level.
            logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")
            Logger.getRootLogger.setLevel(Level.WARN)
          }
        }
      }
      
      
    • 打包文件simple.sbt

      输入命令:

      $ cd /usr/local/spark/mycode/flume_to_kafka
      $ vim simple.sbt
      

      内容如下:

      name := "Simple Project"
      version := "1.0"
      scalaVersion := "2.11.8"
      libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
      libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
      libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
      

      要注意版本号一定要设置正确,在/usr/local/spark/mycode/flume_to_kafka目录下输入命令:

      $ cd /usr/local/spark/mycode/flume_to_kafka
      $ find .
      

      打包之前,这条命令用来查看代码结构,目录结构如下所示:

    在这里插入图片描述

    • 打包编译

    一定要在/usr/local/spark/mycode/flume_to_kafka目录下运行打包命令。

    输入命令:

    $ cd /usr/local/spark/mycode/flume_to_kafka
    $ /usr/local/sbt/sbt package
    
    

    第一次打包的过程可能会很慢,请耐心等待几分钟。打包成功后,会看到SUCCESS的提示。

    • 启动zookeeper和kafka
    #启动zookeeper:
    $ cd /usr/local/kafka
    $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
    
    # 新开一个终端,启动Kafka:
    $ cd /usr/local/kafka
    $ bin/kafka-server-start.sh config/server.properties
    
    
    • 运行程序KafkaWordCounter

    打开一个新的终端,我们已经创建过topic,名为test(这是之前在flume_to_kafka.conf中设置的topic名字),端口号2181。在终端运行KafkaWordCounter程序,进行词频统计,由于现在没有启动输入,所以只有提示信息,没有结果。

    输入命令:

    $ cd /usr/local/spark
    $/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCounter" /usr/local/spark/mycode/flume_to_kafka/target/scala-2.11/simple-project_2.11-1.0.jar
    

    其中”/usr/local/spark/jars/“和”/usr/local/spark/jars/kafka/”用来指明引用的jar包,“org.apache.spark.examples.streaming.KafkaWordCounter”代表包名和类名,这是编写KafkaWordCounter.scala里面的包名和类名,最后一个参数用来说明打包文件的位置。

    执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔10秒钟刷新一次信息,用来输出词频统计的结果,此时还只有提示信息,如下所示:

    [外链图片转存失败(img-ZnMkMxIN-1569486879035)(result_one.png)]

    在启动Flume之前,Zookeeper和Kafka要先启动成功,不然启动Flume会报连不上Kafka的错误。

    • 启动flume agent

    打开第四个终端,在这个新的终端中启动Flume Agent

    输入命令:

    $ cd /usr/local/flume
    $ bin/flume-ng agent --conf ./conf --conf-file ./conf/flume_to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
    
    

    启动agent以后,该agent就会一直监听localhost的33333端口,这样,我们下面就可以通过“telnet localhost 33333”命令向Flume Source发送消息。这个终端也不要关闭,让它一直处于监听状态。

    • 发送消息

    打开第五个终端,发送消息。输入命令:

    $ telnet localhost 33333
    

    这个端口33333是在flume conf文件中设置的source

    在这个窗口里面随便敲入若干个字符和若干个回车,这些消息都会被Flume监听到,Flume把消息采集到以后汇集到Sink,然后由Sink发送给Kafka的topic(test)。因为spark Streaming程序不断地在监控topic,在输入终端和前面运行词频统计程序那个终端窗口内看到统计结果。


    分布式环境搭建及相关DEMO

    Flume

    Flume在分布式环境下跟单机下一致,只需要在一台机器上搭建即可。

    Kafka

    搭建高吞吐量Kafka分布式发布订阅消息集群

    • Zookeeper集群: 121.48.163.195:2181 , 113.54.154.68:2181,113.54.159.232:2181

    • kafka 集群: 121.48.163.195 , 113.54.154.68,113.54.159.232

    搭建 kafka 集群

    kafka 集群: 121.48.163.195 , 113.54.154.68,113.54.159.232

    1. 下载kafka和zookeeper

      步骤和前面单机版一致

    2. 修改配置

      $ vim /usr/local/kafka_2.12-0.11.0.0/config/server.properties 
      
      设置broker.id
      第一台为broker.id = 0
      第二台为broker.id = 1
      第三台为broker.id = 2
      注意这个broker.id每台服务器不能重复
      
      然后设置zookeeper的集群地址
      zookeeper.connect=121.48.163.195:2181 , 113.54.154.68:2181,113.54.159.232:2181
      
    3. 修改zookeeper配置文件

      $ vim /usr/local/zookeeper-3.4.5/conf/zoo.cfg
      #添加server.1 server.2 server.3
      
      server.1=121.48.163.195:2888:3888
      server.2=113.54.154.68:2888:3888
      server.3=113.54.159.232:2888:3888
      
      #添加id
      $ sudo echo "1" > /usr/local/zookeeper-3.4.5/data/myid(每台机器的id可以和brokerid保持一致)
      
    4. 启动服务

      # 每台机器运行命令,但是在实际大型集群中可以使用脚本的方式一键启动
      $ bin/kafka-server-start.sh config/server.properties &
      
    5. 创建主题

      $ /usr/local/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --create --zookeeper 121.48.163.195:2181 , 113.54.154.68:2181,113.54.159.232:2181 --replication-factor 2 --partitions 1 --topic ymq 
      
      --replication-factor 2 #复制两份
      
      --partitions 1 #创建1个分区
      
      --topic #主题为ymq
      
      # 运行list topic命令,可以看到该主题:
      
      $ /usr/local/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --list --zookeeper 121.48.163.195:2181 , 113.54.154.68:2181,113.54.159.232:2181
      
      
    6. 其它操作

      其它操作基本语法差不多一致,不再赘述,详情可以参考官网

    7. Kafka Manager

      Yahoo开源Kafka集群管理器Kafka Manager

    Spark

    • 选取三台服务器

      • 121.48.163.195 主节点
      • 113.54.154.68 从节点
      • 113.54.159.232 从节点

      设置三台服务器root用户,之后操作都用root用户进行,便于管理

    • 修改hosts文件

      $ sudo vim /etc/hosts
      # 在上面加上服务器ip
      121.48.163.195 Master
      113.54.154.68  Slave1
      113.54.159.232 Slave2
      

      修改完之后source /etc/hosts

    • SSH无密码验证配置

      • 安装和启动ssh协议

        我们需要两个服务:ssh和rsync。可以通过下面命令查看是否已经安装:

         rpm -qa|grep openssh
        
             rpm -qa|grep rsync
        
          如果没有安装ssh和rsync,可以通过下面命令进行安装:
        
             sudo apt  install ssh (安装ssh协议)
        
             sudo apt  install rsync (rsync是一个远程数据同步工具,可通过LAN/WAN快速同步多台主机间的文件)
        
             service sshd restart (启动服务)
        
      • 配置Master无密码登录所有Slave

        配置Master节点,以下是在Master节点的配置操作。

        • 在Master节点上生成密码对,在Master节点上执行以下命令:

        ssh-keygen -t rsa -P ‘’

        生成的密钥对:id_rsa和id_rsa.pub,默认存储在"/root/.ssh"目录下。

        • 接着在Master节点上做如下配置,把id_rsa.pub追加到授权的key里面去。

        cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

        • 修改ssh配置文件"/etc/ssh/sshd_config"的下列内容,将以下内容的注释去掉,在三台机器上均进行修改:

           RSAAuthentication yes # 启用 RSA 认证
          
               PubkeyAuthentication yes # 启用公钥私钥配对认证方式
          
               AuthorizedKeysFile .ssh/authorized_keys # 公钥文件路径(和上面生成的文件同)
          
          
        • 重启ssh服务,才能使刚才设置有效。

          service sshd restart

        • 验证无密码登录本机是否成功

          ssh localhost

        • 接下来的就是把公钥复制到所有的Slave机器上。使用下面的命令进行复制公钥:

          $ scp /root/.ssh/id_rsa.pub root@Slave1:/root/
          
          $ scp /root/.ssh/id_rsa.pub root@Slave2:/root/
              
          

        接着配置Slave节点,以下是在Slave1节点的配置操作。

        1>在"/root/“下创建”.ssh"文件夹,如果已经存在就不需要创建了。

        mkdir /root/.ssh

        2)将Master的公钥追加到Slave1的授权文件"authorized_keys"中去。

        cat /root/id_rsa.pub >> /root/.ssh/authorized_keys

        3)修改"/etc/ssh/sshd_config",具体步骤参考前面Master设置的第3步和第4步。

        4)用Master使用ssh无密码登录Slave1

        ssh 114.55.246.77

        5)把"/root/"目录下的"id_rsa.pub"文件删除掉。

        rm –r /root/id_rsa.pub

        重复上面的5个步骤把Slave2服务器进行相同的配置。

      • 配置Slave无密码登录Master

        以下是在Slave1节点的配置操作。

        1)创建"Slave1"自己的公钥和私钥,并把自己的公钥追加到"authorized_keys"文件中,执行下面命令:

        ssh-keygen -t rsa -P ‘’

        cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

        2)将Slave1节点的公钥"id_rsa.pub"复制到Master节点的"/root/"目录下。

        scp /root/.ssh/id_rsa.pub root@Master:/root/

        以下是在Master节点的配置操作。

        1)将Slave1的公钥追加到Master的授权文件"authorized_keys"中去。

        cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

        2)删除Slave1复制过来的"id_rsa.pub"文件。

        rm –r /root/id_rsa.pub

        配置完成后测试从Slave1到Master无密码登录。

        ssh 114.55.246.88

        按照上面的步骤把Slave2和Master之间建立起无密码登录。这样,Master能无密码验证登录每个Slave,每个Slave也能无密码验证登录到Master。

    • 安装基础环境(JAVA和SCALA环境)

      这里不再赘述

    • Hadoop2.7.3完全分布式搭建

      以下是在Master节点操作:

      • 下载二进制包hadoop-2.7.7.tar.gz

      • 解压并移动到相应目录,我习惯将软件放到/opt目录下,命令如下:

        $ tar -zxvf hadoop-2.7.3.tar.gz
        
        $ mv hadoop-2.7.7 /opt
        
        
      • 修改对应的配置文件,修改/etc/profile,增加如下内容:

         export HADOOP_HOME=/opt/hadoop-2.7.3/
         export PATH=$PATH:$HADOOP_HOME/bin
         export PATH=$PATH:$HADOOP_HOME/sbin
         export HADOOP_MAPRED_HOME=$HADOOP_HOME
         export HADOOP_COMMON_HOME=$HADOOP_HOME
         export HADOOP_HDFS_HOME=$HADOOP_HOME
         export YARN_HOME=$HADOOP_HOME
         export HADOOP_ROOT_LOGGER=INFO,console
         export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
         export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
        
      • 修改完成后执行$ source /etc/profile

      • 修改$HADOOP_HOME/etc/hadoop/hadoop-env.sh,修改JAVA_HOME 如下:

        export JAVA_HOME=/usr/local/jdk1.8.0_121

      • 修改$HADOOP_HOME/etc/hadoop/slaves,将原来的localhost删除,改成如下内容:

        • Slave1

        • Slave2

      • 修改$HADOOP_HOME/etc/hadoop/core-site.xml

        <configuration>
              <property>
                  <name>fs.defaultFS</name>
                  <value>hdfs://Master:9000</value>
              </property>
              <property>
                 <name>io.file.buffer.size</name>
        		 <value>131072</value>
             </property>
             <property>
                  <name>hadoop.tmp.dir</name>
                  <value>/opt/hadoop-2.7.7/tmp</value>
             </property>
        </configuration>
        
        
      • 修改$HADOOP_HOME/etc/hadoop/hdfs-site.xml

        <configuration>
            <property>
              <name>dfs.namenode.secondary.http-address</name>
              <value>Master:50090</value>
            </property>
            <property>
              <name>dfs.replication</name>
              <value>2</value>
            </property>
            <property>
              <name>dfs.namenode.name.dir</name>
              <value>file:/opt/hadoop-2.7.7/hdfs/name</value>
            </property>
            <property>
              <name>dfs.datanode.data.dir</name>
              <value>file:/opt/hadoop-2.7.7/hdfs/data</value>
            </property>
        </configuration>
        
        
      • cp mapred-site.xml.template mapred-site.xml,并修改$HADOOP_HOME/etc/hadoop/mapred-site.xml

        <configuration>
         <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
          </property>
          <property>
                  <name>mapreduce.jobhistory.address</name>
                  <value>Master:10020</value>
          </property>
          <property>
                  <name>mapreduce.jobhistory.address</name>
                  <value>Master:19888</value>
          </property>
        </configuration>
        
        
      • 修改$HADOOP_HOME/etc/hadoop/yarn-site.xml

        <configuration>
             <property>
                 <name>yarn.nodemanager.aux-services</name>
                 <value>mapreduce_shuffle</value>
             </property>
             <property>
                 <name>yarn.resourcemanager.address</name>
                 <value>Master:8032</value>
             </property>
             <property>
                 <name>yarn.resourcemanager.scheduler.address</name>
                 <value>Master:8030</value>
             </property>
             <property>
                 <name>yarn.resourcemanager.resource-tracker.address</name>
                 <value>Master:8031</value>
             </property>
             <property>
                 <name>yarn.resourcemanager.admin.address</name>
                 <value>Master:8033</value>
             </property>
             <property>
                 <name>yarn.resourcemanager.webapp.address</name>
                 <value>Master:8088</value>
             </property>
        </configuration>
        
      • 复制Master节点的hadoop文件夹到Slave1和Slave2上

      $ scp -r /opt/hadoop-2.7.7 root@Slave1:/opt
      $ scp -r /opt/hadoop-2.7.7 root@Slave2:/opt
      
      • 在Slave1和Slave2上分别修改/etc/profile,过程同Master一样

      • 在Master节点启动集群,启动之前格式化一下namenode:

        • Hadoop namenode -format

        • 启动:/opt/hadoop-2.7.7/sbin/start-all.sh

        • 至此hadoop的完全分布式搭建完毕

      • 查看集群是否启动成功:

        $ jps -m 
      
        Master显示:
      
           SecondaryNameNode
      
           ResourceManager
      
           NameNode
      
        Slave显示:
      
           NodeManager
      
           DataNode
      
      
    • Spark完全分布式环境搭建

      以下操作都在Master节点进行。

      • 下载二进制包spark-2.4.3-bin-hadoop2.7.tgz

      • 解压并移动到相应目录,命令如下:

        $ tar -zxvf spark-2.4.3-bin-hadoop2.7.tgz
        
        $ mv hadoop-2.7.3 /opt
        
        
      • 修改相应的配置文件,修改/etc/profie,增加如下内容:

        export SPARK_HOME=/opt/spark-2.4.3-bin-hadoop2.7/
        export PATH=$PATH:$SPARK_HOME/bin
        
      • 复制spark-env.sh.template成spark-env.sh

        $ cp spark-env.sh.template spark-env.sh
        
      • 修改$SPARK_HOME/conf/spark-env.sh,添加如下内容:

        export   JAVA_HOME=/usr/local/jdk1.8.0_121   
        export   SCALA_HOME=/usr/share/scala   
        export   HADOOP_HOME=/opt/hadoop-2.7.3   
        export   HADOOP_CONF_DIR=/opt/hadoop-2.7.3/etc/hadoop   
        export   SPARK_MASTER_IP=114.55.246.88   
        export   SPARK_MASTER_HOST=114.55.246.88   
        export   SPARK_LOCAL_IP=114.55.246.88   
        export   SPARK_WORKER_MEMORY=1g   
        export   SPARK_WORKER_CORES=2   
        export   SPARK_HOME=/opt/spark-2.4.3-bin-hadoop2.7   
        export   SPARK_DIST_CLASSPATH=$(/opt/hadoop-2.7.3/bin/hadoop classpath)   
        
      • 复制slaves.template成slaves

        $ cp slaves.template slaves
        
      • 修改$SPARK_HOME/conf/slaves,添加如下内容:

        Master
        Slave1
        Slave2
        
      • 将配置好的spark文件复制到Slave1和Slave2节点

        $ scp /opt/spark-2.4.3-bin-hadoop2.7 root@Slave1:/opt
        $ scp /opt/spark-2.4.3-bin-hadoop2.7 root@Slave2:/opt
        
      • 修改Slave1和Slave2配置

        在Slave1和Slave2上分别修改/etc/profile,增加Spark的配置,过程同Master一样。

        在Slave1和Slave2修改$SPARK_HOME/conf/spark-env.sh,将export SPARK_LOCAL_IP=114.55.246.88改成Slave1和Slave2对应节点的IP。

      • 在Master节点启动集群

        /opt/spark-2.4.3-bin-hadoop2.7/sbin/start-all.sh

      • 查看集群是否启动成功

        $ jps -m
        Master在Hadoop的基础上新增了:
        
             Master
        
          Slave在Hadoop的基础上新增了:
        
             Worker
        
        

    在我的博客查看更多
    作者:槐洛文

    展开全文
  • flume+kafka+spark stream+hbase做日志收集 前言 flume+kafka+spark stream 是目前比较常用的一套大数据消息日志收集管理框架,至于最后是入到Hive或者者Hbase需看不同业务场景,下面以HBase为场景简述下整个...

    前言

    flume+kafka+spark stream 是目前比较常用的一套大数据消息日志收集管理框架,至于最后是入到Hive或者者Hbase需看不同业务场景,下面以HBase为场景简述下整个配置与搭建流程以及这些框架如此搭配的优点。

    1. flume 配置

    1.1 flume 简介

    从官网文档 https://flume.apache.org 可以知道Flume的定位是很清晰的,它提供了一个分布式的,高可用的桥梁链接,可以收集、聚合和移动大量的日志数据,从许多不同的数据源到集中式数据存储,大概的结构如下图,流程大致为 从源端(source)接收数据,经过管道(channel)的缓存等等,发送到目标(sink)端。:

    其中source的定义flume提供了很多方式,常用的有以下几种:

    • Http source ,这种方式可以通过监听接口方式来收集log;
    • Exec source ,这种方式可以通过执行一些shell命令来收集log,例如通过 tail -f 文件 来监听文件追加的日志;
    • Spooling source,这种方式可以监听某个目录的log,当有新的log产生时即会被发送出去;
    • 还有很多其他的方式,例如可以以kafka作为source,这样flume就充当了kafka的消费者,当然还有很多如 Avro sourceThrift sourceTCP类的等等,具体参考官网文档更加相应场景配置即可。

    channel同样flume提供了很多方式,memory channel这种方式已经不太建议了,原因也很明显,不够安全,当出现任何机器问题时数据就会丢失,file channelkafka channel是比较推荐的做法,特别是当需要比较高的并发时,kafka channel是一个不错的选择。

    sink同样flume提供了很多方式,常用的有以下几种:

    • HDFS/Hive/Hbase/ElasticSearch sink,直接写入hdfs/Hive/Hbase/ElasticSearch,这种方式适合那些比较无需做ETL的场景。
    • kafka sink,直接充当kafka的生产者,可以看到kafka可以在整个flume生命周期里可以自由穿插。
    • Http sink,直接通过post方法将数据发送到目标api。
    • 其他的一些详细见官网文档即可。

    1.2 flume 配置

    下面以Spooling Directory Source -> file channel -> kafka sink为例:

    一份样例配置参数:

    # Name the components on this agent
    agent.sources = dir-src
    agent.sinks = kafka-sink
    agent.channels = file-channel
    
    # Describe/configure the source
    agent.sources.dir-src.type = spooldir
    agent.sources.dir-src.spoolDir = #监听目录
    agent.sources.dir-src.fileHeader = true
    agent.sources.dir-src.deserializer.maxLineLength=1000000
    
    # Describe the sink
    agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.kafka-sink.kafka.topic = test
    agent.sinks.kafka-sink.kafka.bootstrap.servers = #kafka boostrapServer
    
    # Use a channel which buffers events in file
    agent.channels.file-channel.type = file
    agent.channels.file-channel.checkpointDir = # checkpoint目录
    agent.channels.file-channel.dataDirs = # 缓存的数据目录
    
    # Bind the source and sink to the channel
    agent.sources.dir-src.channels = file-channel
    agent.sinks.kafka-sink.channel = file-channel

    配置详解:

    • 首先每个flume配置表可以存在多个agent,多个source,多个channel,多个sink,所以可以根据相应业务场景进行组合。
    • 对于每个agent,必须配置相应的source/channel/sink,通过agent.source = ???,agent.channel = ???,agent.sink = ???来指定。
    • 对于具体的source/channel/sink,通过 agent.{source/channel/sink}.???.属性 = ... 来具体配置 source/channel/sink 的属性。
    • 配置完source/channel/sink相应的属性后,需把相应的组件串联一起,如: agent.sources.dir-src.channels = file-channel其中dir-src这个source指定了其channel为我们定义好的file-channel.

    一些Tips:

    • flume在收集log的时候经常会出现Line length exceeds max (2048), truncating line!,这个一般情况对于一些log的存储没影响,但是遇到需要解析log的情况就有问题了,有时一个json或者其他格式的log被截断了,解析也会出问题,所以在source的属性配置里可以通过参数deserializer.maxLineLength调高默认的2048。
    • flume在监听相应的目录时,如果有重名的文件,或者直接在监听目录下修改相应正在读取的文件时,都会报错,而且flume-ng目前没有这种容错机制,报错只能重启了,还有一个比较大的问题,flume-ng没有提供相应的kill脚本,只能通过shell直接ps -aux | grep flume找到相应的PID,然后手动kill。
    • flume在监听相应目录时,如果目录下的文件是通过HTTP或者scp传输过来的,小文件的话没问题,但是当文件大小超过网络传输速率,就会造成flume读取文件时报错直接显示文件大小正在变化,这点也是比较麻烦的,所以建议是现有个临时目录先存放文件,等文件传输完成后再通过shell的mv命令直接发送到监听目录。
    • 有时候我们的log文件是以压缩的方式传输过来,但是如果我们想解析后才发送出去的话,可以将当前的Spooling Directory Source的改为Exec Source,可以指定改source的command参数里写shell解析命令。

    flume的启动:

    flume-ng agnet --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"

    2. kafka 配置

    2.1 kafka简介

    kafka的官网 https://kafka.apache.org 同样对kafka的定位做了一个清晰阐述,分布式的消息流平台,与传统的MQ架构类似,kafka解耦了生产者,中间层与消费者三个组件,乍一听似乎与其他的MQ框架没有太大的区别,于是对比了很久,各个框架间并没有表现出显著性的区别以致某一方是不可替代的,但是其中仍有一些值得细细推敲的地方,具体可见下表(以rabbit MQ 为例):

    属性 rabbit MQ Kafka
    多语言支持 支持,语言无关 支持,语言无关
    消息延迟 微妙级 毫秒级
    负载均衡 miror queue 多broker,多replication
    协议问题 遵从AMQP协议, broker由Exchange,Binding,queue组成,客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费, rabbitMQ以broker为中心,有消息的确认机制。 遵从一般的MQ结构,producer,broker,consumer,consumer从broker上批量pull数据,通过提交offset来做相应消息拉取管控。
    集群扩展 支持 原生支持
    事务支持 原生支持 支持

    除上述所列各点外,还有几点需单独拿出讨论的:

    • kafka利用zookeeper做均衡管理,最新的kafka版本在消费者消费完信息后会将offset保存在kafka本身服务上,而不是zookeeper上,这在很大程度保证了消息队列被消费不会出现缺失与重复,但是要保证0重复0丢失,对于consumer提交offset的设计仍有比较大的考验。
    • kafka在创建topic时一般都是分区存储,如此带来的问题是每个分区间的消息顺序是很难保证全局性,只能在单个分区下保证,因此kafka在日志这个领域会更加的吻合和焕发光芒。

    2.2 kafka配置

    同样,下面是一份单broker的kafka配置方案:

    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    
    delete.topic.enable = true

    有几点配置需要注意:

    • broker.id 必须是全局唯一的,多个broker尽量部署在不同的集群上,通过指定相同的zookeeper.connect 进行统一管理。
    • listeners 是监听相应的IP:Port,如果kafka已经部署在集群上,会通过java.net.InetAddress.getCanonicalHostName()自动获取到相应的地址。
    • num.partitions 是为每个topic保留的默认分区,如果创建topic时不指定即采用默认1。
    • 其他的一些配置参数看注释既可以,delete.topic.enable = true可以让topic的删除什么更加方便。

    kafka的启动:

    kafka-server-start server.properties

    创建无备份,分区为1的topic:

    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic test

    删除topic:

    kafka-topics  --delete --zookeeper localhost:2181 --topic test
    zookeeper-client 
    rmr /brokers/topics/test

    3. spark stream 消费者

    3.1 spark stream 简介

    Spart Stream 是 Spark 框架下一个流处理的子项目,其基础数据DStream封装的是spark的RDD,通过轮询不断地从源端拉取数据,spark stream支持多种源端数据的拉取,同时基于spark的核心计算模块,使得其在实时性和大数据方面有着很强的优势,其流程结构大概如下图所示:

    3.2 spark stream 写 Kafka 消费者

    spark stream 写 kafka 消费者,官方提供了相应的示例,这里再稍微简述下:

    首先sbt引入spark stream/Kafka相关依赖

    libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0" % Provided
    libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0" % Provided
    libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0" % Provided
    libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % Provided
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"  % Provided
    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0" 

    其次定义好kafka参数:

    val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "192.168.1.23:9093,gz19:9092,gz21:9092,gz24:9092,gz18:9092,gz89:9092,bigdata.zuzuche.cn:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "kafka_consumer_tantu",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )

    订阅相应的topic:

    val stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                Subscribe[String, String](topics, kafkaParams)
            )

    接下来就可以对stream做进一步的处理,跟spark rdd的处理类似。

    同样在写spark stream的时候有一些细节需要注意:

    • spark stream的轮询时间最小可以达到500ms, 但是如此带来的集群资源消耗也会更大,轮询的时间间隔应根据具体的场景设定。
    • spark stream本质上仍为spark的任务,只是添加了轮询机制使其一直挂在后台,当spark-submit提交spark stream的时候若设定的excutor大于kafka topic创建时设定的分区,多出来的部分会处于空闲,所以两者的配置要互相参考。

    4. HBase 存储

    4.1 HBase 简介

    HBase是NoSql中的一个代表,是一个面向列的数据库,支持亿级别的行*百万级别的列,若要定位到某个字段的值,通常需要限定如下:表名 -> rowid -> column family:column name -> timestamp,其中rowid为全局唯一的行键,行键的设计会影响到列的同个列下的排序,column family为列簇,其含义接近于HIve中的分区,通过column family的限定,其下相应的column会被集中存放,不同column family的column会分开存放,这样当需要索引少量的列时,无需遍历全部字段,当然,column family也不是越多越好,而且官方文档似乎也不支持过多的列簇,关于HBase的表结构,参考如下图:

    4.2 spark stream 写入 HBase (以HBase 1.2.0 为例)

    引入HBase相关依赖:

    libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0"
    libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0"

    将数据存储为HBase对应的格式:

    // 随机产生某个uuid为行键
    val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
    // 将列簇,列明,列值添加进对应结构
    put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_name"), Bytes.toBytes("column_value"))

    插入HBase:

    // 表名
    val tablename = "table_name"
    // 创建初始配置
    val hbaseconf = HBaseConfiguration.create()
    // 创建链接
    val conn = ConnectionFactory.createConnection(hbaseconf)
    // 指定表
    val table: HTable = new HTable(hbaseconf, Bytes.toBytes(tablename))
    // 提交事务,插入数据
    table.put(put)

    5. Hive 外部表关联 HBase, impala 映射查询

    Hive做HBase的外部关联,需提前定义好列字段,而通常HBase的列都是无限扩展的,所以通过Hive外部映射HBase,只能处理一些日常的查询需求。

    5.1 Hive 外部银映射 HBase:

    CREATE EXTERNAL TABLE hive_external_HBase(
    key string,
    time string,
    `_track_id` string,
    )    
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'    
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,search:time,search:_track_id")    
    TBLPROPERTIES("hbase.table.name" = "HBase_table_name");

    语法与创建Hive基本一致,需要注意的是hive字段不支持特殊字符如$_*&等开头,需加转义符。

    最后,Hive有时候查询的速度并不能达到我们的想象,再做以不impala映射,用impala的查询引擎,会明显快很多:

    INVALIDATE METADATA;

    总结

    flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:

    • 为什么不用kafka直接接收源数据,而用flume作为Kafka的源?

    从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。

    • 为什么用spark stream作为kafka的消费者而不是其他?

    就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。

    • 为什么是入到hbase而不是其他Nosql?

    无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。

    展开全文
  • SparkStreaming接收Kafka数据的两种方式一、SparkStreaming + Kafka Receiver模式二、SparkStreaming + Kafka Direct模式三、Direct模式与Receiver模式比较四、SparkStreaming+Kafka维护消费者offset 一、Spark...
  • Storm、KafkaSpark

    2019-03-24 09:40:11
    。 MAPREDUCE 实战编程案例:通过一个实战案例来熟悉复杂MAPREDUCE程序的开发。该程序是从nginx服务器产生的访问服务器中计算出每个访客的访问次数及每次访问的时长。原始数据样例如下: ...
  • kafka+Sparkstreaming环境搭建与配置说明以及相关的测试代码的编写
  • 摘要:目次 1.Flume先容. 2 1.1 Flume数据源和输出体式格局.... 3 1.5 启动flume4 2.Kafka先容. 4 ] 目录 1.Flume介绍. 2 1.1 Flume数据源以及输出方式. 2 1.2 Flume的核心概念. 2 1.3 Flume
  • 转载自https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/ 引言 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一...
  • 本文实现kafkaSpark Streaming之间的通信,其中Kafka端producer实现使用Java,Spark Streaming端Consumer使用Python实现。 首先安装kafkaspark streaming环境,kafka测试连通测试参考上文,本文的实验环境都为...
  • sparkkafka的介绍 一 spark是什么 hadoop MapReduce:从集群中读取数据,分片读取 进行一次处理,将结果写到集群,从集群中读取更新后的数据,进行下一次的处理,将结果写到集群 Spark :从集群中读取数据,把数据...
  • 在分布式环境下,Spark集群采用的是主从架构。如下图所示,在一个Spark集群中,有一个节点负责中央协调,调度各个分布式工作节点,这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器...
  • sparkkafka的消息消费--...当我们使用spark做准实时计算的时候,很大场景都是和kafka通信,总结下spark使用kafka的注意事项,下面上代码 package com.aura.bigdata.spark.scala.streaming.p1 impor...
  • 原 [KafkaSpark集成系列四] Spark运行结构https://blog.csdn.net/u013256816/article/details/82082146版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 ...
  • 效果:SparkStreaming中的Receivers,恰好Kafka有发布/订阅 ,然而:此种方式企业不常用,说明有BUG,不符合企业需求。因为:接收到的数据存储在Executor的内存,会出现数据漏处理或者多处理状...
1 2 3 4 5 ... 20
收藏数 6,432
精华内容 2,572
关键字:

kafka通信 spark