精华内容
下载资源
问答
  • 实时计算框架

    2017-06-08 17:40:33
    实时计算框架思路



     实时计算框架思路

    展开全文
  • SparkStreaming是建立在Spark上的实时计算框架,通过它提供的丰富的API、基于内存的高速执行引擎,用户可以结合流式、批处理和交互试查询应用。本文将详细介绍Spark Streaming实时计算框架的原理与特点、适用场景。 ...
  • Spark Streaming实时计算框架 ✎ 学习目标 1.了解什么是实时计算 2.掌握DStream的转换、窗口、输出操作 3.理解Spark Streaming工作原理 4.掌握Spark Streaming和Kafka整合 概要 近年来,在Web应用、网络监控、传感...

    Spark Streaming实时计算框架

    ✎ 学习目标
    1.了解什么是实时计算
    2.掌握DStream的转换、窗口、输出操作
    3.理解Spark Streaming工作原理
    4.掌握Spark Streaming和Kafka整合

    概要

    近年来,在Web应用、网络监控、传感监测、电信金融、生产制造等领域,增强了对数据实时处理的需求,而Spark中的Spark Streaming实时计算框架就是为实现对数据实时处理的需求而设计。在电子商务中,淘宝、京东网站从用户点击的行为和浏览的历史记录中发现用户的购买意图和兴趣,然后通过Spark Streaming实时计算框架的分析处理,为之推荐相关商品,从而有效地提高商品的销售量,同时也增加了用户的满意度,可谓是“一举二得”。

    实时计算概述

    什么是实时计算
    在传统的数据处理流程(离线计算)中,复杂的业务处理流程会造成结果数据密集,结果数据密集则存在数据反馈不及时,若是在实时搜索的应用场景中,需要实时数据做决策,而传统的数据处理方式则并不能很好地解决问题,这就引出了一种新的数据计算——实时计算,它可以针对海量数据进行实时计算,无论是在数据采集还是数据处理中,都可以达到秒级别的处理要求。

    常用的实时计算框架

    1. Apache Spark Streaming
      Apache公司开源的实时计算框架。Apache Spark Streaming主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。
    2. Apache Storm
      Apache公司开源的实时计算框架,它具有简单、高效、可靠地实时处理海量数据,处理数据的速度达到毫秒级别,并将处理后的结果数据保存到持久化介质中(如数据库、HDFS)。
    3. Apache Flink
      Apache公司开源的实时计算框架。Apache Spark Streaming主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。
    4. Yahoo!S4
      Yahoo公司开源的实时计算平台。Yahoo!S4是通用的、分布式的、可扩展的,并且还具有容错和可插拔能力,供开发者轻松地处理源源不断产生的数据。

    Spark Streaming的概述

    Spark Streaming简介
    SparkStreaming是构建在Spark上的实时计算框架,且是对Spark Core API的一个扩展,它能够实现对流数据进行实时处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming具有易用性、容错性及易整合性的显著特点。

    Spark Streaming工作原理
    Spark Streaming支持从多种数据源获取数据,包括Kafka、Flume、Twitter、ZeroMQ、Kinesis及TCP Sockets数据源。当Spark Streaming从数据源获取数据之后,则可以使用诸如map、reduce、join和window等高级函数进行复杂的计算处理,最后将处理结果存储到分布式文件系统、数据库中,最终利用实时Web仪表板进行展示。
    在这里插入图片描述

    Spark的DStream流

    DStream简介
    Spark Streaming提供了一个高级抽象的流,即DStream(离散流)。DStream表示连续的数据流,可以通过Kafka、Flume和Kinesis等数据源创建,也可以通过现有DStream的高级操作来创建。DStream的内部结构是由一系列连续的RDD组成,每个RDD都是一小段时间分隔开来的数据集。对DStream的任何操作,最终都会转变成对底层RDDs的操作。
    在这里插入图片描述
    DStream编程模型
    批处理引擎Spark Core把输入的数据按照一定的时间片(如1s)分成一段一段的数据,每一段数据都会转换成RDD输入到Spark Core中,然后将DStream操作转换为RDD算子的相关操作,即转换操作、窗口操作以及输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。在这里插入图片描述

    DStream转换操作
    Spark Streaming中对DStream的转换操作会转变成对RDD的转换操转换流程如下。
    在这里插入图片描述
    其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。

    DStream API提供的与转换操作相关的方法

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    DStream窗口操作
    在Spark Streaming中,为DStream提供窗口操作,即在DStream流上,将一个可配置的长度设置为窗口,以一个可配置的速率向前移动窗口。根据窗口操作,对窗口内的数据进行计算,每次落在窗口内的RDD数据会被聚合起来计算,生成的RDD会作为Window DStream的一个RDD。在这里插入图片描述
    DStream API提供的与窗口操作相关的方法
    在这里插入图片描述
    DStream API提供的与窗口操作相关的方法在这里插入图片描述

    DStream输出操作
    DStream API提供的与输出操作相关的方法
    在这里插入图片描述
    在这里插入图片描述
    DStream实例——实现网站热词排序
    网站热词排序能够分析出用户对网站内容的喜好程度,以此来增加用户感兴趣的内容,从而提升用户访问网站的流量。利用SparkStreaming计数就可以编程实现热词排序的需求。

    1. 创建数据库和表
    mysql>create database spark;
    mysql>use spark;
    mysql>create table searchKeyWord(insert_time date, keyword varchar(30),
              >search_count integer);
    
    1. 导入依赖
    <dependency>
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
           <version>5.1.38</version>
    </dependency>
    
    1. 创建Scala类,实现热词排序
      在spark_chapter07项目的/src/main/scala/cn.itcast.dstream文件夹下,创建一个名为“HotWordBySort”的Scala类,用于编写Spark Streaming应用程序,实现热词统计排序,
    2. 运行Scala类,并在hadoop01 9999端口输入数据
    [root@hadoop01 servers]# nc -lk 9999
    hadoop,111
    spark,222
    hadoop,222
    hadoop,222
    hive,222
    hive,333
    
    1. 查看数据表searchKeyWord中的数据
      mysql> select * from searchKeyWord;
      ±------------- ±---------±----------------+
      | insert_time | keyword | search_count |
      ±------------- ±---------±----------------+
      | 2018-12-04 | hadoop | 3 |
      | 2018-12-04 | hive | 2 |
      | 2018-12-04 | spark | 1 |
      ±------------- ±--------- ±---------------+

    Spark Streaming整合Kafka实战

    KafkaUtils.createDstream方式
    Kafka作为一个实时的分布式消息队列,实时地生产和消费消息。在大数据计算框架中,可利用Spark Streaming实时读取Kafka中的数据,再进行相关计算。在Spark1.3版本后,KafkaUtils里面提供了两个创建DStream的方式,一种是KafkaUtils.createDstream方式,另一种为KafkaUtils.createDirectStream方式。
    KafkaUtils.createDstream是通过Zookeeper连接Kafka,receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据。
    在这里插入图片描述

    1. 导入依赖
    # 添加Spark Streaming整合Kafka的依赖
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
    	<version>2.3.2</version>
    </dependency>
    
    1. 创建Scala类,实现词频统计
      在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下,创建一个名为“SparkStreaming_Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。
    2. 创建Topic,指定消息的类别
    [root@hadoop01~]#kafka-topics.sh --create --topic kafka_spark –partitions 3 
    --replication-factor 1 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
    WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    Created topic "kafka_spark".
    
    1. 启动Kafka的消息生产者,并观察IDEA控制台输出
    [root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop01:9092
    --topic kafka_spark
    >hadoop spark hbase kafka spark
    >kafka itcast itcast spark kafka spark kafka
    

    在这里插入图片描述
    KafkaUtils.createDirectStream方式
    当接收数据时,它会定期地从Kafka中Topic对应Partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,然后Spark通过调用Kafka简单的消费者API(即低级API)来读取一定范围的数据。在这里插入图片描述

    1. 导入依赖
    # 添加Spark Streaming整合Kafka的依赖
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
    	<version>2.3.2</version>
    </dependency>
    
    1. 创建Scala类,实现词频统计
      在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下,创建一个名为“SparkStreaming_Kafka_createDirectStream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。
    2. 创建Topic,指定消息的类别
    [root@hadoop01~]# kafka-topics.sh --create --topic kafka_direct0 -–partitions 3 
    --replication-factor 1--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
    WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    Created topic "kafka_direct0".
    
    1. 启动Kafka的消息生产者,并观察IDEA控制台输出
    [root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop01:9092
    --topic kafka_direct0
    >hadoop spark hbase kafka spark
    >kafka itcast itcast spark kafka spark kafka
    

    在这里插入图片描述

    展开全文
  • 什么是实时计算、常用的实时计算框架;Spark Streaming 介绍、 工作原理 DStream简介、编程模型、转换操作; DStream API 转换操作; Spark Streaming工作机制、程序的基本步骤、创建 StreamingContext对象


    1、MySQL安装教程

    2、Spark Streaming 实现网站热词排序

    3、Spark Streaming 整合 Kafka,实现交流


    一、实时计算概述

    近年来,在Web应用、网络监控、传感监测、电信金融、生产制造等领域,增强了对数据实时处理的需求,而 Spark 中的 Spark Streaming 实时计算框架就是为实现对数据实时处理的需求而设计。

    在电子商务中,淘宝、京东网站从用户点击的行为和浏览的历史记录中发现用户的购买意图和兴趣,然后通过 Spark Streaming 实时计算框架的分析处理,为之推荐相关商品,从而有效地提高商品的销售量,同时也增加了用户的满意度,可谓是“一举二得”。

    本章主要对 Spark Streaming 实时计算框架相关知识进行介绍。

    1.1 什么是实时计算?

    在传统的数据处理流程(离线计算)中,复杂的业务处理流程会造成结果数据密集,结果数据密集则存在数据反馈不及时,若是在实时搜索的应用场景中,需要实时数据做决策,而传统的数据处理方式则并不能很好地解决问题,这就引出了一种新的数据计算——实时计算,它可以针对海量数据进行实时计算,无论是在数据采集还是数据处理中,都可以达到秒级别的处理要求

    简单来说,实时计算就是在数据采集与数据处理中,都可以达到级别的处理要求。


    1.2 常用的实时计算框架

    1. Apache Spark Streaming
      Apache公司开源的实时计算框架。Apache Spark Streaming主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。
    2. Apache Storm
      Apache公司开源的实时计算框架,它具有简单、高效、可靠地实时处理海量数据,处理数据的速度达到毫秒级别,并将处理后的结果数据保存到持久化介质中(如数据库、HDFS)。
    3. Apache Flink
      Apache公司开源的实时计算框架。Apache Spark Streaming 主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。
    4. Yahoo! s4
      Yahoo公司开源的实时计算平台。Yahoo ! S4是通用的、分布式的、可扩展的,并且还具有容错和可插拔能力,供开发者轻松地处理源源不断产生的数据。

    二、Spark Streaming

    2.1 Spark Streaming 介绍

    Spark Streaming 是构建在Spark上的实时计算框架,且是对Spark Core API的一个扩展,它能够实现对流数据进行实时处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming具有易用性、容错性易整合性的显著特点。

    • 易用性:
    • 容错性
    • 易整合性

    2.2 Spark Streaming 工作原理

    Spark Streaming支持从多种数据源获取数据,包括 Kafka 、 Flume 、Twitter 、ZeroMQ 、Kinesis、TCP Sockets 数据源。当Spark Streaming 从数据源获取数据之后,则可以使用诸如map 、reduce 、join和window等高级函数进行复杂的计算处理,最后将处理结果存储到分布式文件系统、数据库中,最终利用实时计算实现操作。
    在这里插入图片描述

    Spark Streaming 会对输入的数据源进行处理,然后将结果输出,Spark Streaming 在接受实时传入的数据流时,会将数据按批次(batch)进行划分,然后,再将这部分数据交由Spark引擎进行处理,处理完成后将结果输出到外部文件。
    在这里插入图片描述

    2.2 Spark Streaming工作机制

    • 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上;
    • 每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等);
    • Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。

    在这里插入图片描述

    2.3 Spark Streaming程序的基本步骤

    1. 通过创建输入DStream来定义输入源;
    2. 通过对DStream应用转换操作和输出操作来定义流计算;
    3. 用streamingContext.start()来开始接收数据和处理流程;
    4. 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束);
    5. 可以通过streamingContext.stop()来手动结束流计算进程。
    • 运行一个Spark Streaming程序,首先要生成一个 StreamingContext 对象,它是Spark Streaming程序的主入口
    • 可以从一个SparkConf对象创建一个StreamingContext对象
    • 登录Linux系统后,启动spark-shell。进入spark-shell以后,就已经获得了一个默认的SparkConext,也就是sc。

    2.4 创建 StreamingContext对象

    在交互式界面:

    import org.apache.spark.streaming._ # 导包
    val ssc = new StreamingContext(sc, Seconds(1)) # 创建对象,间隔时间1

    在这里插入图片描述
    Spark Streaming程序代码:

    import org.apache.spark._
    import org.apache.spark.streaming._
    val conf = new SparkConf().setAppName("主程序名").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(1))
    

    三、Spark Streaming 操作

    3.1 DStream简介

    Spark Streaming 提供了一个高级抽象的流,即DStream (离散流) 。DStream 表示连续的数据流,可以通过 Kafka 、Flume、kinesis 等数据源创建,也可以通过现有DStream的高级操作来创建。DStream 的内部结构是由一系列连续的RDD组成,每个RDD都是一小段时间分隔开来的数据集。对DStream的任何操作,最终都会转变成对底层RDDs的操作
    在这里插入图片描述

    3.2 DStream 编程模型

    批处理引擎 Spark Core 把输入的数据按照一定的时间片(如1s) 分成一段一段的数据,每一段数据都会转换成RDD输入到Spark Core中,然后将 DStream 操作转换为RDD算子的相关操作,即转换操作、窗口操作、输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。
    在这里插入图片描述

    3.3 DStream 转换操作

    Spark Streaming 中对 DStream 的转换操作都要转换为对RDD的转换操作。
    在这里插入图片描述
    其中, lines 表示转换操作前的DStream , words 表示转换操作后生成的 DStream 。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。

    3.4 DStream API 转换操作

    在这里插入图片描述在这里插入图片描述

    3.5 IDEA安装依赖

        <properties>
            <scala.version>2.11.8</scala.version>
            <hadoop.version>2.7.3</hadoop.version>
            <spark.version>2.4.0</spark.version>
            <hbase.version>1.2.4</hbase.version>
        </properties>
      <dependencies>
            <!--Scala-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <!--kafka-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.4.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>2.4.0</version>
            </dependency>
    
            <!--Spark & Kafka-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.4.0</version>
            </dependency>
    
            <!--Spark & flume-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-flume_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
    
            <!--Spark-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.4.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.4.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <!--Hadoop-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <!--Hbase-->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${hbase.version}</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.46</version>
            </dependency>
        </dependencies>
    

    在之前安装过的只需要安装:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>2.4.0</version>
            </dependency>
    

    3.6 启动服务端监听 Socket 服务

    命令:nc -lk 9999


    3.7 实现 transform() 方法,分割多个单词

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object TransformTest {
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf对象
        val sparkConf: SparkConf = new SparkConf()
          .setAppName("TransformTest").setMaster("local[2]")
        // 2.创建SparkContext对象,它是所有任务计算的源头
        val sc: SparkContext = new SparkContext(sparkConf)
        // 3.设置日志级别
        sc.setLogLevel("WARN")
        // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
        val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
        // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
        // 以上是固定搭配结构
        val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
        // 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
        val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
        // 7.打印输出结果
        words.print()
        // 8.开启流式计算
        ssc.start()
        // 9.让程序一直运行,除非人为干预停止
        ssc.awaitTermination()
      }
    }
    

    在这里插入图片描述

    3.8 UpdateStateByKeyTest 更新值

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    object UpdateStateByKeyTest {
      //newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
      //runningCount 表示历史的所有相同key的value总和
      def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount =runningCount.getOrElse(0)+newValues.sum
        Some(newCount)
      }
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf对象 设置appName和master地址  local[2] 表示本地采用2个线程运行任务
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
        // 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
        val sc: SparkContext = new SparkContext(sparkConf)
        // 3.设置日志级别
        sc.setLogLevel("WARN")
        // 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
        val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
        // 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
        ssc.checkpoint("./")
        // 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
        val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
        // 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
        val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
        // 8.调用updateStateByKey操作,统计单词在全局中出现的次数
        var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
        // 9.打印输出结果
        result.print()
        // 10.开启流式计算
        ssc.start()
        // 11.让程序一直运行,除非人为干预停止
        ssc.awaitTermination()
      }
    }
    

    在这里插入图片描述

    3.9 Dstream 窗口操作

    • 事先设定一个滑动窗口的长度(也就是窗口的持续时间);
    • 设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动;
    • 每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream;
    • 可以启动对这个小段DStream的计算。

    在这里插入图片描述

    方法名称 相关说明
    window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream;
    countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
    reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
    reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。
    reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。
    countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
    object WindowTest {
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf对象
        val sparkConf: SparkConf = new SparkConf()
          .setAppName("WindowTest ").setMaster("local[2]")
        // 2.创建SparkContext对象,它是所有任务计算的源头
        val sc: SparkContext = new SparkContext(sparkConf)
        // 3.设置日志级别
        sc.setLogLevel("WARN")
        // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
        val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
        // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
        val dstream: ReceiverInputDStream[String] = ssc
          .socketTextStream("192.168.142.128",9999)
        // 6.按空格进行切分每一行
        val words: DStream[String] = dstream.flatMap(_.split(" "))
        // 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
        val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
        // 8.打印输出结果
        windowWords.print()
        // 9.开启流式计算
        ssc.start()
        // 10.让程序一直运行,除非人为干预停止
        ssc.awaitTermination()
      }
    }
    

    在这里插入图片描述

    3.10 DStream 输出操作

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    
    object SaveAsTextFilesTest {
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        //1.创建SparkConf对象 设置appName和master地址  local[2] 表示本地采用2个线程运行任务
        val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
        //2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
        val sc: SparkContext = new SparkContext(sparkConf)
        //3.设置日志级别
        sc.setLogLevel("WARN")
        //4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
        val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
        //5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
        val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
        //6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
        dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
        //7.开启流式计算
        ssc.start()
        //8.让程序一直运行,除非人为干预停止
        ssc.awaitTermination()
      }
    }
    

    在这里插入图片描述
    在这里插入图片描述

    展开全文
  • day02 实时计算框架storm基础 day03 实时计算框架storm运行原理 day04 实时计算案例之日志告警系统 day05 实时计算案例之流量日志分析/交易风险控制系统 day06 推荐系统案例 day07 推荐系统数据清洗与存储(Hbase、...

    1、实时计算阶段安排

    • day01 企业消息队列kafka

      • 接收实时产生的数据,用来计算。
    • day02 实时计算框架storm基础

    • day03 实时计算框架storm运行原理

    • day04 实时计算案例之日志告警系统

    • day05 实时计算案例之流量日志分析/交易风险控制系统

    • day06 推荐系统案例

    • day07 推荐系统数据清洗与存储(Hbase、Redis)

    • day08 搜索系统之elasticSearch

    这里写图片描述

    2、大数据课程整体结构

    这里写图片描述

    3、大数据实时存储

    这里写图片描述

    这里写图片描述

    4、Kafka配置文件server.properties

    修改三个地方:

    • broker.id

      • 每个kafka实例都具备一个唯一的id
    • log.dirs

      • Kafka用来存放消息的路径,需要在每台机器上创建
    • zookeeper.connect

      • Kafka信息存放在zookeeper中,需要制定zookeeper地址。

      • 每个Kafka实例启动的时候,都会将自己的信息注册到zookeeper中。

    • 配置文件

    broker.id=0
    
    num.network.threads=3
    
    num.io.threads=8
    
    socket.send.buffer.bytes=102400
    
    socket.receive.buffer.bytes=102400
    
    socket.request.max.bytes=104857600
    
    log.dirs=/export/data/kafka
    
    num.partitions=1
    
    num.recovery.threads.per.data.dir=1
    
    offsets.topic.replication.factor=1
    
    transaction.state.log.replication.factor=1
    
    transaction.state.log.min.isr=1
    
    log.retention.hours=168
    
    log.segment.bytes=1073741824
    
    log.retention.check.interval.ms=300000
    
    zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
    
    zookeeper.connection.timeout.ms=6000
    
    group.initial.rebalance.delay.ms=0
    

    5、Kafka启动需知(一键启动)

    • 启动zookeeper集群

      • 一键启动脚本的环境变量配置
      
      #set onekey env
      
      export OK_HOME=/export/servers/oneKey
      export PATH=${OK_HOME}/zk:$PATH
      export PATH=${OK_HOME}/storm:$PATH
      export PATH=${OK_HOME}/kafka:$PATH
      • 关于黑洞

      • 一键启动的目录信息

      -rw-r--r--. 1 root root  21 Nov 11 03:46 slave
      -rwxr-xr-x. 1 root root 160 Nov 11 03:46 startzk.sh
      -rwxr-xr-x. 1 root root 172 Nov 11 03:47 stopzk.sh

      /export/servers/oneKey/zk

      • startzk.sh文件
      cat /export/servers/oneKey/zk/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;nohup zkServer.sh start >/dev/nul* 2>&1 &"
      }&
      wait
      done 
      • stopzk.sh 停止脚本
      cat /export/servers/oneKey/zk/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;jps |grep QuorumPeerMain |cut -c 1-4 |xargs kill -s 9"
      }&
      wait
      done 
      • 跨服务器运行命令

      ssh hostname “command”

    • 启动Kafka集群

      • 环境变量
      
      #set onekey env
      
      export OK_HOME=/export/servers/oneKey
      export PATH=${OK_HOME}/zk:$PATH
      export PATH=${OK_HOME}/storm:$PATH
      export PATH=${OK_HOME}/kafka:$PATH
      • kafka环境变量配置
      
      #set kafka env
      
      export KAFKA_HOME=/export/servers/kafka
      export PATH=${KAFKA_HOME}/bin:$PATH
      • 启动脚本
      cat /export/servers/oneKey/kafka/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;nohup kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/nul* 2>&1 &"
      }&
      wait
      done
      • 停止脚本
      cat /export/servers/oneKey/kafka/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;jps |grep Kafka |cut -c 1-4 |xargs kill -s 9 "
      }&
      wait
      done 

    6、Kafka配置文件详解

    6.1、 producer端配置文件说明

    #指定kafka节点列表,用于获取metadata,不必全部指定
    metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
    
    # 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
    #partitioner.class=kafka.producer.DefaultPartitioner
    
    # 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
    compression.codec=none
    
    # 指定序列化处理类
    serializer.class=kafka.serializer.DefaultEncoder
    
    # 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
    #compressed.topics=
    
    # 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
    # 0: producer不会等待broker发送ack 
    # 1: 当leader接收到消息之后发送ack 
    # -1: 当所有的follower都同步消息成功后发送ack. 
    request.required.acks=0 
    
    # 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功) 
    request.timeout.ms=10000
    
    # 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
    也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
    producer.type=sync
    
    # 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
    # 此值和batch.num.messages协同工作.
    queue.buffering.max.ms = 5000
    
    # 在async模式下,producer端允许buffer的最大消息量
    # 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
    # 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
    queue.buffering.max.messages=20000
    
    # 如果是异步,指定每次批量发送数据量,默认为200
    batch.num.messages=500
    
    # 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后 
    # 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) 
    # 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 
    # -1: 无阻塞超时限制,消息不会被抛弃 
    # 0:立即清空队列,消息被抛弃 
    queue.enqueue.timeout.ms=-1
    
    
    # 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数 
    # 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失) 
    # 有可能导致broker接收到重复的消息,默认值为3.
    message.send.max.retries=3
    
    # producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况 
    # 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新 
    # (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000 
    topic.metadata.refresh.interval.ms=60000

    6.2、broker端配置文件说明

    #broker的全局唯一编号,不能重复
    broker.id=0
    
    #用来监听链接的端口,producer或consumer将在此端口建立连接
    port=9092
    
    #处理网络请求的线程数量
    num.network.threads=3
    
    #用来处理磁盘IO的现成数量
    num.io.threads=8
    
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    
    #接受套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    
    #kafka运行日志存放的路径
    log.dirs=/export/data/kafka/
    
    #topic在当前broker上的分片个数
    num.partitions=2
    
    #用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    
    #segment文件保留的最长时间,超时将被删除
    log.retention.hours=1
    
    #滚动生成新的segment文件的最大时间
    log.roll.hours=1
    
    #日志文件中每个segment的大小,默认为1G
    log.segment.bytes=1073741824
    
    #周期性检查文件大小的时间
    log.retention.check.interval.ms=300000
    
    #日志清理是否打开
    log.cleaner.enable=true
    
    #broker需要使用zookeeper保存meta数据
    zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
    
    #zookeeper链接超时时间
    zookeeper.connection.timeout.ms=6000
    
    #partion buffer中,消息的条数达到阈值,将触发flush到磁盘
    log.flush.interval.messages=10000
    
    #消息buffer的时间,达到阈值,将触发flush到磁盘
    log.flush.interval.ms=3000
    
    #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
    delete.topic.enable=true
    
    #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessfu* 错误!
    host.name=kafka01
    
    advertised.host.name=192.168.140.128

    6.3、consumer端配置文件说明

    # zookeeper连接服务器地址
    zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
    
    # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
    zookeeper.session.timeout.ms=5000
    
    #当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
    zookeeper.connection.timeout.ms=10000
    
    # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
    zookeeper.sync.time.ms=2000
    
    #指定消费 
    group.id=itcast
    
    # 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息 
    # 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
    auto.commit.enable=true
    
    # 自动更新时间。默认60 * 1000
    auto.commit.interval.ms=1000
    
    # 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
    conusmer.id=xxx 
    
    # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
    client.id=xxxx
    
    # 最大取多少块缓存到消费者(默认10)
    queued.max.message.chunks=50
    
    # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新  的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数. 
    rebalance.max.retries=5
    
    # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
    fetch.min.bytes=6553600
    
    # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
    fetch.wait.max.ms=5000
    socket.receive.buffer.bytes=655360
    
    # 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
    auto.offset.reset=smallest
    
    # 指定序列化处理类
    derializer.class=kafka.serializer.DefaultDecoder

    7、Kafka整体概念梳理

    • Producer :消息生产者,就是向kafka broker发消息的客户端。
      • 第一个问题:数据分发机制
      • 第二个问题:消息是否会丢失
    • Consumer :消息消费者,向kafka broker取消息的客户端
    • Topic :名称。
      • 一类消息的分类名称,怎么区分一类消息?
    • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
      • 为什么要有Consumer Group? 提供并发和容错。
    • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
      • 用来存放数据
      • Kafka为什么快?
      • pagecache:实时消费数据,针对实时消费,数据其实就在broker内存中。
      • sendfile:消费之前的数据,可以直接通过系统层面将数据发送出去。
    • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

      • 为什么要进行partition?
      • 副本机制
      • segment ,将保存在一个partition文件目录下的数据,切分为多个文件段。 主要方便快速删除和快速查找。
    • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

    • Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。
      • 副本中谁来当leader。
    • Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。
    • ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。

    8、Kafka细节补充

    8.1、使用Kafka Producer生产数据并观察segment段的变化

    这里写图片描述

    public static void main(String[] args) {
            //1、准备配置文件
            Properties props = new Properties();
            props.put("bootstrap.servers", "node01:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //2、创建KafkaProducer
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
                //3、发送数据
                /**
                 * 在ProducerRecord构造参数中key的情况下,会根据key进行hash取模,得到partition的编号。
                 * 
                 */
            while(true){
                kafkaProducer.send(new ProducerRecord<String, String>("yum02", "我是很多内容……"));
            }

    8.2、使用Kafka Producer生产数据的分发策略

    The default partitioning strategy:
     * <li>If a partition is specified in the record, use it
     * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
     * <li>If no partition or key is present choose a partition in a round-robin fashion
    分发策略:
    1)如果指定了partition,直接使用
    2)如果没有指定partition,但是制定了key,可以使用key做hash取模
    3)如果没有指定partition,又没有指定key,使用轮训的方式
    //在ProducerRecord构造参数中有key的情况下,会根据key进行hash取模,得到partition的编号
    kafkaProducer.send(new ProducerRecord<String, String>("yun02","num","Consumer Group "));
    // 如果沒有key,也沒有partition就會輪訓
    kafkaProducer.send(new ProducerRecord<String, String>("yum02", "afka Web "));
    //如果指定了partition,就會使用partition
    kafkaProducer.send(new ProducerRecord<String, String>("yun02",1,"num","value"));

    8.3、Kafka为什么那么快?

    两个技术:

    • pagecache:数据生产放入pagecache、数据读取从pagecache从读取,针对实时消费的情况
    • sendfile:直接在系统层面将数据发送出去,减少应用层面的数据拷贝,提高效率针对消费历史数据的情况

    8.4、zookeeper可视化工具使用

    9、Storm一個項目究竟設置幾個worker?

    一个项目每个环节(组件)设置多少个并行度?

    //设置应用程序的worker数
    config.setNumWorkers(1);
    //设置组件的并行度
    topologyBuilder.setSpout("SentenceSpout",new SentenceSpout(),1);
    topologyBuilder.setBolt("splitBolt",new SplitBolt(),1).shuffleGrouping("SentenceSpout");
    topologyBuilder.setBolt("WordCountBolt",new WordCountBolt(),1).shuffleGrouping("splitBolt");

    这里写图片描述

    这里写图片描述

    10、实时看板案例实战

    • 项目范围

      • 不同岗位的人
      • 看订单数、订单人数、订单金额
    • 数据从何而来?

      • 如何获取订单相关?

      • 数据库?索引库?消息队列?选择哪一个?

        • 数据库 select * from 表 groupby 分类1,分类2
        • 压力大 ,业务部门不可能让你执行sql
        • 分库分表,一条sql根本写不出来
        • 索引库?
        • 搜索引擎,当数据库的从库使用,可以做。
        • 消息队列——>AMQ
        • 订单系统每创建一个订单,都会将消息传入到amq。
        • 数据部门通过flume或者其他的技术手段,将amq的数据同步到kafka
        • 实时看板从kafka中获取数据,并进行计算。
      • 程序架构

      • 真实场景:订单系统———->AMQ—————Flume————Kafka———Storm————-Redis———–JavaWeb

      • 案例场景:订单系统(producer)———–Kafka———Storm————-Redis———–JavaWeb

      • 数据长什么样?

      • pinyougou: 订单编号、订单金额、商品名称、商品分类、商品。

      • 真实订单:买家信息(手机号、省市县、姓名,账户、收货地址)、商品信息(sku详情挂上)、支付方式、卖家信息(手机号、省市县、姓名,账户、收货地址、公司信息)等等,差不多200多个字段。
      • 伪造订单信息:

      • 准备工作

      • 准备数据格式

      • 在kafka中创建topic (itcast_order,7个partition2个副本)

        • 名称:itcast_order

        • 设置分片数据:

        • 根据数据量的大小,根据条数据

        • 比如:500万订单,每条订单信息占用0.2M 每天100万M====1T
        • 假设有7台机器(Kafka),创建7partition

        • 设置副本

        • 2个就可以了,3个就可以了。

        • 执行创建命令:

        bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 2 --partitions 7 --topic itcast_order

      • 使用kafkaproducer编写生产者发送数据,数据使用json串的方式发送

      package cn.itcast.realtime.kanban.order;
      
      import com.google.gson.Gson;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      import java.util.Properties;
      
      /**
       * 模拟生成订单信息
       * 1)准备一个订单的javaben
       * 2) 发送订单信息到kafka
       */
      public class OrderProducer {
      
          public static void main(String[] args) {
              //1、准备配置文件
              Properties props = new Properties();
              props.put("bootstrap.servers", "node01:9092");
              props.put("acks", "all");
              props.put("retries", 0);
              props.put("batch.size", 16384);
              props.put("linger.ms", 1);
              props.put("buffer.memory", 33554432);
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              //2、创建KafkaProducer
              KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
              while (true) {
                  //3、发送数据
                  PaymentInfo paymentInfo = new PaymentInfo();
                  kafkaProducer.send(new ProducerRecord<String, String>("itcast_order", paymentInfo.random()));
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      }
      package cn.itcast.realtime.kanban.order;
      
      import com.alibaba.fastjson.JSON;
      import com.google.gson.Gson;
      
      import java.io.Serializable;
      import java.text.ParseException;
      import java.text.SimpleDateFormat;
      import java.util.Date;
      import java.util.Random;
      import java.util.UUID;
      
      public class PaymentInfo implements Serializable {
          private static final long serialVersionUID = -7958315778386204397L;
          private String orderId;//订单编号
          private Date createOrderTime;//订单创建时间
          private String paymentId;//支付编号
          private Date paymentTime;//支付时间
          private String productId;//商品编号
          private String productName;//商品名称
          private long productPrice;//商品价格
          private long promotionPrice;//促销价格
          private String shopId;//商铺编号
          private String shopName;//商铺名称
          private String shopMobile;//商品电话
          private long payPrice;//订单支付价格
          private int num;//订单数量
      
          /**
           * <Province>19</Province>
           * <City>1657</City>
           * <County>4076</County>
           */
          private String province; //省
          private String city; //市
          private String county;//县
      
          //102,144,114
          private String catagorys;
      
          public String getProvince() {
              return province;
          }
      
          public void setProvince(String province) {
              this.province = province;
          }
      
          public String getCity() {
              return city;
          }
      
          public void setCity(String city) {
              this.city = city;
          }
      
          public String getCounty() {
              return county;
          }
      
          public void setCounty(String county) {
              this.county = county;
          }
      
          public String getCatagorys() {
              return catagorys;
          }
      
          public void setCatagorys(String catagorys) {
              this.catagorys = catagorys;
          }
      
          public PaymentInfo() {
          }
      
          public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
              this.orderId = orderId;
              this.createOrderTime = createOrderTime;
              this.paymentId = paymentId;
              this.paymentTime = paymentTime;
              this.productId = productId;
              this.productName = productName;
              this.productPrice = productPrice;
              this.promotionPrice = promotionPrice;
              this.shopId = shopId;
              this.shopName = shopName;
              this.shopMobile = shopMobile;
              this.payPrice = payPrice;
              this.num = num;
          }
      
          public String getOrderId() {
              return orderId;
          }
      
          public void setOrderId(String orderId) {
              this.orderId = orderId;
          }
      
          public Date getCreateOrderTime() {
              return createOrderTime;
          }
      
          public void setCreateOrderTime(Date createOrderTime) {
              this.createOrderTime = createOrderTime;
          }
      
          public String getPaymentId() {
              return paymentId;
          }
      
          public void setPaymentId(String paymentId) {
              this.paymentId = paymentId;
          }
      
          public Date getPaymentTime() {
              return paymentTime;
          }
      
          public void setPaymentTime(Date paymentTime) {
              this.paymentTime = paymentTime;
          }
      
          public String getProductId() {
              return productId;
          }
      
          public void setProductId(String productId) {
              this.productId = productId;
          }
      
          public String getProductName() {
              return productName;
          }
      
          public void setProductName(String productName) {
              this.productName = productName;
          }
      
          public long getProductPrice() {
              return productPrice;
          }
      
          public void setProductPrice(long productPrice) {
              this.productPrice = productPrice;
          }
      
          public long getPromotionPrice() {
              return promotionPrice;
          }
      
          public void setPromotionPrice(long promotionPrice) {
              this.promotionPrice = promotionPrice;
          }
      
          public String getShopId() {
              return shopId;
          }
      
          public void setShopId(String shopId) {
              this.shopId = shopId;
          }
      
          public String getShopName() {
              return shopName;
          }
      
          public void setShopName(String shopName) {
              this.shopName = shopName;
          }
      
          public String getShopMobile() {
              return shopMobile;
          }
      
          public void setShopMobile(String shopMobile) {
              this.shopMobile = shopMobile;
          }
      
          public long getPayPrice() {
              return payPrice;
          }
      
          public void setPayPrice(long payPrice) {
              this.payPrice = payPrice;
          }
      
          public int getNum() {
              return num;
          }
      
          public void setNum(int num) {
              this.num = num;
          }
      
          @Override
          public String toString() {
              return "PaymentInfo{" +
                      "orderId='" + orderId + '\'' +
                      ", createOrderTime=" + createOrderTime +
                      ", paymentId='" + paymentId + '\'' +
                      ", paymentTime=" + paymentTime +
                      ", productId='" + productId + '\'' +
                      ", productName='" + productName + '\'' +
                      ", productPrice=" + productPrice +
                      ", promotionPrice=" + promotionPrice +
                      ", shopId='" + shopId + '\'' +
                      ", shopName='" + shopName + '\'' +
                      ", shopMobile='" + shopMobile + '\'' +
                      ", payPrice=" + payPrice +
                      ", num=" + num +
                      '}';
          }
      
          public String random() {
              this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
              this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
              this.productPrice = new Random().nextInt(1000);
              this.promotionPrice = new Random().nextInt(500);
              this.payPrice = new Random().nextInt(480);
              this.shopId = new Random().nextInt(200000) + "";
      
              this.catagorys = new Random().nextInt(10000) + "," + new Random().nextInt(10000) + "," + new Random().nextInt(10000);
              this.province = new Random().nextInt(23) + "";
              this.city = new Random().nextInt(265) + "";
              this.county = new Random().nextInt(1489) + "";
      
              String date = "2015-11-11 12:22:12";
              SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
              try {
                  this.createOrderTime = simpleDateFormat.parse(date);
              } catch (ParseException e) {
                  e.printStackTrace();
              }
              String jsonString = JSON.toJSONString(this);
              return jsonString;
          }
      }

      • 使用storm消费kafka中的数据(storm-kafka)

      这里写图片描述

      “`java
      public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
      //创建一个TopologyBuilder用来组装任务信息
      TopologyBuilder topologyBuilder = new TopologyBuilder();
      // 设置kafkaspout
      KafkaSpoutConfig.Builder

    附1:Kafka常见问题

    Kafka是什么?

    • 分布式消息队列,类似于JMS。典型的生产者消费者模式。
    • 生产的数据存放到kafka的集群,集群由很多个broker组成
    • kafka集群的元数据保存在zookeeper上。

    针对一个topic为什么要进行分区?

    • 一般情况下,针对海量数据,都会将数据分为很多个部分(分片),单独存放在不同的机器上。
    • 如果一个topic的数据量非常大,我们要提前规划好分区数
    • producer根据分区数进行数据分发(分发策略partitioner)

    针对一个分区,为什么要添加副本?

    • 副本机制的存放时为了保证数据的安全(容错)
    • 添加副本添加多少个好?
      • 添加副本副作用的,数据要同步到不同机器上,有大量的网络传输和磁盘占用。
      • 根据业务对数据容错性,可以设置副本数为N。N=2

    一个分区在broker是以目录的形式存放的,为什么分区下会设置segment段?

    • 消息队列系统,一般都是实时,只能短时间保存数据。比如保存一个小时以内的数据。
    • broker需要对分片的数据进行定时的删除,按照一定的数据量来保存数据,方便根据数据最后修改的时间进行删除。

    producer数据生产不丢失的问题?

    • ACK (Acknowledgement)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
    • 数据生产环节不丢失(ack机制)
      • 如果是同步模式下
      • 将发送状态设置为-1,是最为妥当的。但是,由于-1是让所有的副本都确定收到数据,这个过程会有较长的等待。面对海量的数据,如果每条消息都确认的话,效率会大大降低。
      • 一般做法的做法: 设置让leader接收到数据就确认,就也是1,提高效率,这个方案可能会有丢失的风险。
      • 如果是在异步模式下(也有ack)
      • 生产的数据并不会立即发送给broker,会在produer段有个容器(队列)来临时缓存数据。
      • 针对这个容器,有个阻塞设置。如果设置为0,就是立即丢弃数据。如果这是为-1,就永久阻塞。
        • 如果在producer永久阻塞时,人为关闭producer代码所在进程,会立即清空队列中的数据,导致数据丢失。
    展开全文
  • trt 基于Storm的轻量级实时计算框架
  • 实时计算框架:Spark集群搭建与入门案例。50字50字50字50字50字50字
  • 《分布式实时计算框架 原理及实践案例》
  • 一、大数据实时计算框架 1、什么是实时计算?流式计算? (一)什么是Storm?Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式...
  • 基于Storm的实时计算框架的研究与应用_李川.pdf
  • Light_drtc是一个轻量级分布式实时计算框架,它可以帮助你快速实现自定义的实时计算平台。它主要参考当前流行的实时计算框架Storm的任务分发和Spark Streaming的Mini-Batch处理思想设计,设计目的是为了降低当前...
  •  Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API、基于内存的高速执行引擎,用户可以结合流式、批处理和交互试查询应用。本文将详细介绍Spark Streaming实时计算框架的原理与特点、适用场景...
  • Storm实时计算框架实现WordCount 一、demo说明 spout:自随机生产数据 bolt:SplitBolt(切割行)、FilterBolt(过滤关键字)、CounterBolt(词频统计) 二、引入maven依赖 <dependencies> <dependency...
  • 本文主要介绍当前业界广泛使用的两个分布式实时计算框架flink与storm的性能对比,希望对你有所帮助。 来源:https://tech.meituan.com/2017/11/17/flink-benchmark.html 1. 背景 ApacheFlink 和 Apache Storm 是...
  • 框架地址 ... 框架说明 Light_drtc是一个轻量级分布式实时计算框架,它可以帮助你快速实现自定义的实时计算平台。它主要参考当前流行的实时计算框架Storm的任务分发和Spark Streaming的Min-B...
  • Storm+Kafka实时计算框架搭建

    千次阅读 2015-09-01 22:51:36
    Storm+Kafka实时计算框架搭建标签(空格分隔): storm kafaka 大数据 云计算 本篇文章是本人根据安装、配置storm实时计算框架时的执行过的命令整理出来的,中间没有配插图,不太适合初学者参考配置环境,请勿吐槽...
  • 一、大数据实时计算框架 1、什么是实时计算?流式计算? 举例:自来水厂处理自来水(特点:持续性、流式计算) ) 2、对比:离线计算和流式计算 (*)离线计算:MapReduce和Spark Core, 数据的批量处理...
  • 一、实时计算,流式计算? 实时计算 == 流式计算 自来水厂就是一个典型的实时计算系统: 自来水厂可以简单的理解为由一个水泵(采集水源),多个蓄水池(处理水源:沉淀,过滤,消毒等步骤),管理员构成。 ...
  •  Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API、基于内存的高速执行引擎,用户可以结合流式、批处理和交互试查询应用。本文将详细介绍Spark Streaming实时计算框架的原理与特点、适用场景...
  • 参考文章:实时计算框架 Flink 在教育行业的应用实践 如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如 Flink 等来保障。例如,在 TB 级别数据量的数据库中...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,533
精华内容 1,413
关键字:

实时计算框架