精华内容
下载资源
问答
  • 问:Spark Streaming什么软件栈中的流计算?A:Spark,B:Flume,C:Storm,D:Hive正确答案:解析:问:Spark Streaming什么软件栈中的流计算?A:Spark,B:Flume,C:Storm,D:Hive相关问题:民用建筑按使用功能可以分为哪两...

    问:Spark Streaming是什么软件栈中的流计算?

    A:Spark,B:Flume,C:Storm,D:Hive

    正确答案:

    解析:

    问:Spark Streaming是什么软件栈中的流计算?A:Spark,B:Flume,C:Storm,D:Hive

    相关问题:

    民用建筑按使用功能可以分为哪两类( )?

    A:公共建筑B:园林建筑C:居住建筑D:特种建筑

    仓储合同是指保管人储存存货人交付的仓储物,存货人支付仓储费的合同。

    A:错B:对

    双侧性视网膜母细胞瘤是遗传性疾病,一般属于常染色体显性遗传。( )

    A:错B:对

    土壤pH在( )时,各种营养元素的有效度都较高。

    A:6.5B:7.5C:7.0D:6.0

    >1岁儿童人工呼吸的要点正确的是( )

    A:每次吹气为1秒B:无需胸廓回缩连续吹气两次C:罩住口和鼻D:胸廓回缩后再吹气E:罩住口

    系统方法是指将事物当做一个整体系统来研究,分析系统各组成部分之间、组成部分与系统之间的有机联系及系统与外界环境之间的关系、综合地、准确地、全面地考察事物、处理问题的一种方法。( )

    A:错,B:对

    关于人骨骼肌描述,哪项正确?

    A:肌浆网是储存钙的部位B:肌浆网是肌纤维内的滑面内质网C:形成横纹的结构基础是肌原纤维D:横小管和两侧终池形成三联体E:横小管是肌膜在Z线水平向内凹陷形成

    分化是指将初代培养获得的培养物,接种到新培养基上的过程。

    A:对B:错

    I hope you can spare some time to visit our company, inspect our products and discuss your particular ______ with us.

    A:requirementsB:regenerationC:insureD:objectives

    展开全文
  • SparkStreaming简单示例

    2020-12-21 16:42:54
    实现SparkStream类:两个版本...import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamDemo { def main.

    实现SparkStream类:两个版本二选一即可

    实现SparkStream类(Scala版)

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SparkStreamDemo {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStream")
        
        // 采集周期,指定的3秒为每次采集的时间间隔
        val streamingContext = new StreamingContext(conf,Seconds(3))
        
        // 指定采集的端口
        val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.**.**",7777)
        //指定采集本地目录
        //val socketLineStream: DStream[String] = streamingContext.textFileStream("file:///D:/ideaProject/SparkStream/in")
    
        // 将采集的信息进行处理,统计数据(wordcount)
        val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\\s+"))
        val mapStream: DStream[(String, Int)] = wordStream.map(x=>(x,1))
        val wordcountStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
    
        // 打印
        wordcountStream.print()
        // 启动采集器
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    
    

    实现SparkStream类(Java版)

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class SparkStreamJava {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreamjava");
            
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(3));
            JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.**.**", 7777);
            
            JavaDStream<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] split = s.split("\\s+");
                    return Arrays.asList(split).iterator();
                }
            });
            JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
            JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            reduceByKey.print();
            jsc.start();
            try {
                jsc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    
        }
    }
    
    

    启动SparkStreaming

    在这里插入图片描述
    启用端口

    • 启用命令:nc -lk 7777
    • 输入数据
    hello world
    hello java
    hello spark
    

    在这里插入图片描述
    查看Streaming输出内容

    在这里插入图片描述

    展开全文
  • 案例一:WordCount(scala版本) ...import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream object Sp

    导入 spark 和 spark-streaming 依赖包

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    

    案例一:SparkStreaming接受socket数据,实现单词计数WordCount

    spark版本

    从本机的7777端口源源不断地收到以换行符分隔的文本数据流,并计算单词个数

    package cn.kgc.kb09.Spark
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    
    object SparkStreamDemo1 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkStreamDemo1").setMaster("local[*]")
    
        // 采集周期,指定的3秒为每次采集的时间间隔
        val streamingContext = new StreamingContext(conf,Seconds(3))
    
        // 指定采集的方法
        val socketLineStream: ReceiverInputDStream[String] =
          streamingContext.socketTextStream("192.168.247.201",7777)
    
        // 将采集来的信息进行处理,统计数据(wordcount)
        val wordStream = socketLineStream.flatMap(line => line.split("\\s+"))
        val mapStream = wordStream.map(x => (x,1))
        val wordcountStream = mapStream.reduceByKey(_+_)
    
        // 打印
        wordcountStream.print()
    
        // 启动采集器
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    

    这时候在Linux中输入的内容会在控制台打印wordcount单词统计:
    根据指定的采集周期,每次采集的时间间隔3秒。spark streaming的本质是微批处理。

    Spark Streaming拥有两类数据源

    • (1)基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等。

    • (2)高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。

    基本数据源输入源码:
    SparkStream 对于外部的数据输入源,一共有下面几种:

    • (1)用户自定义的数据源:receiverStream

    • (2)根据TCP协议的数据源: socketTextStream、socketStream

    • (3)网络数据源:rawSocketStream

    • (4)hadoop文件系统输入源:fileStream、textFileStream、binaryRecordsStream

    • (5)其他输入源(队列形式的RDD):queueStream

    Java版本

    package cn.kgc.kb09.Spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    import java.util.Arrays;
    import java.util.Iterator;
    
    /**
    * @Qianchun
    * @Date 2020/12/18
    * @Description
    */
    public class SparkStreamJavaDemo1 {
        public static void main(String[] args) throws InterruptedException {
            // 第一步:配置SparkConf
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamJavaDemo1");
    
            // 第二步:创建SparkStreamingContext
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(3));
    
            /**
             * 第三步:创建Spark Streaming 输入数据来源 input Stream
             * 1、数据输入来源可以基于 File,HDFS,Flume,Kafka,Socket等
             * 2.在这里我们制定数据来源于网络 Socket端口,Spark Streaming链接上改端口并在运行的时候一直监听该端口的数据(当然该端口服务首先必须存在),
             *  并且在后续会根据业务需要不断的有数据产生(当然对于Spark Streaming 引用程序的运行而言,有无数据其处理流程都是一样的)
             * 3.如果经常在每隔 5 秒钟没有数据的话不断的启动空的 Job 其实是会造成调度资源的浪费,因为彬没有数据需要发生计算;
             * 真实的企业级生产环境的代码在具体提交 Job 前会判断是否有数据,如果没有的话,不再提交 Job;
             */
            JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.247.201", 7777);
    
            /**第四步:接下来就是对于 Rdd编程一样基于 DStream进行编程
             * 原因是DStream是RDD产生的模板(或者说类), 在 Saprk Stream发生计算前,其实质是把每个 Batch的DStream的操作翻译成为 Rdd 的操作!!!
             */
            JavaDStream<String> flatmap = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] split = s.split("\\s+");
                    return Arrays.asList(split).iterator();
                }
            });
    
            JavaPairDStream<String, Integer> mapToPair = flatmap.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            /**
             * 此处print并不会直接触发 job 的执行,因为现在的一切都是在 Spark Streaming 框架的控制之下的,对于 Spark Streaming 而言具体是否触发真正的 job 运行
             * 是基设置的  Duration 时间间隔触发
             * 一定要注意的是 Spark Streaming应用程序要想执行具体的Job,对DStream就必须有 output Stream操作
             * output Stream有很多类型的函数触发,类print,saveAsTextFile,saveAsHadoopFile等,最为重要的一个方法是 foreachRDD,因为Spark Streaming处理的结果一般都会放在 Redis,DB,
             * DashBoard等上面,foreachRDD主要就是用来完成这些功能的,而且可以随意的自定义具体数据到底放在那里
             */
            reduceByKey.print();
    
            /**
             * Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息接收应用程序本身或者 Executor 中的消息;
             */
            jsc.start();
            jsc.awaitTermination();
        }
    }
    

    运行代码,在虚拟机上输入nc -lk 7777 代表向7777号端口输入数据,来进行测试,会计算出每三秒中每个词出现的次数

    案例二:自定义采集器Receiver(自定义数据源)

    还是以wordcount为例,自定义Receiver实现一下

    声明一个receiver类,通常需要继承原有的基类,在这里需要继承自Receiver,该基类有两个方法需要重写分别是:

    • 1、 onstart() 接收器开始运行时触发方法,在该方法内需要启动一个线程,用来接收数据。

    • 2、onstop() 接收器结束运行时触发的方法,在该方法内需要确保停止接收数据。
      当然在接收数据流过程中也可能会发生终止接收数据的情况,这时候onstart内可以通过isStoped()来判断 ,是否应该停止接收数据

    数据存储:
    一旦接收完数据,则必须要进行数据的存储,并交由SparkStreaming 来处理,Spark以store(data)方法来支持此流程。由于数据格式的不同,当然store方法必须要支持各种类型的数据存储。store方法是以一次存储一条记录或者一次性收集全部的序列化对象。

    代码实现:
    采集端口内输入内容,接收到“end”停止

    package cn.kgc.kb09.Spark
    import java.io.{BufferedReader, InputStreamReader}
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    // 自定义采集器
    // 1)继承Receiver
    // 2)重写方法 onStart(), onStop()
    class MyReceiver(host:String, port:Int) extends Receiver [String](StorageLevel.MEMORY_ONLY) {
    
      // 接收socket数据
      var socket: java.net.Socket = null
    
      def receive: Unit = {
        socket = new java.net.Socket(host, port)
        // 通过BufferedReader ,将输入流转换为字符串
        val reader = new BufferedReader(
          new InputStreamReader(socket.getInputStream, "UTF-8")
        )
        var line: String = null
    
        // 将采集的数存储到采集器的内部进行转换
        while ((line=reader.readLine()) != null) {
          if (line.equals("end")) {
            return
          } else {
            this.store(line)
          }
        }
      }
    
      override def onStart(): Unit = {
        new Thread(new Runnable {
          override def run(): Unit = {
            receive
          }
        }).start()
      }
    
      override def onStop(): Unit = {
        if (socket != null) {
          socket.close()
          socket = null
        }
      }
    }
    
    
    object MyReceiverStream {
      def main(args: Array[String]): Unit = {
        // spark的配置对象
        val conf = new SparkConf().setMaster("local[*]").setAppName("myReceiverStream")
    
        // 实时分析的环境对象
        // 采集周期:以指定的时间为周期采集实时数据
        val streamingContext = new StreamingContext(conf, Seconds(5))
    
        // 在这里转换成自定义的采集器
        val receiverStream =
          streamingContext.receiverStream(new MyReceiver("192.168.247.201", 7777))
    
        // 将采集的数据进行分割
        val lineStream = receiverStream.flatMap(line => line.split("\\s+"))
    
        // 将数据进行结构的转变进行统计分析
        val mapStream = lineStream.map((_, 1))
    
        // 将转换结构后的数据进行聚合处理
        val sumStream = mapStream.reduceByKey(_ + _)
    
        // 将结果打印
        sumStream.print()
    
        // 启动采集器
        streamingContext.start()
        // Driver等待采集器的执行
        streamingContext.awaitTermination()
      }
    }
    

    运行代码,启动nc

    nc-lk 7777
    

    案例三:Spark Streaming处理文件系统数据(local/hdfs)

    • textFileStream路径如果是hdfs的路径 你直接hdfs dfs -put到你的监测路径就可以

    • 如果是本地目录如E:\\qianchun\\Kafka\\kafkaStream\\in\\test,你不能直接在目录里创建文件或移动文件到这个目录,必须用流的形式写入到这个目录形成文件才能被监测到。可在其它地方创建一个文件然后另存到此本地目录下可以完成此项测试。

    package cn.kgc.kb09.Spark
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SparkStreamFileDataSourceDemo2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("FileDataSource")
    
        // 创建StreamingContext对象
        val streamingContext = new StreamingContext(conf,Seconds(5))
    
        // 文件为HDFS文件 
    //    val fileDStream: DStream[String] = streamingContext.textFileStream("hdfs://hadoopwei:9000/kb09file")
        // 文件为本地Windows文件
        val inputDir = "E:\\qianchun\\Kafka\\kafkaStream\\in\\test"
    
        // 对StreamingContext对象调用 .textFileStream()方法生成一个文件流类型的InputStream
        val fileDStream = streamingContext.textFileStream(inputDir)
    
        // 编写流计算过程
        val wordStream = fileDStream.flatMap(line => line.split("\\s+"))
        val mapStream = wordStream.map((_,1))
        val sumStream = mapStream.reduceByKey(_+_)
    
        // 打印结果
        sumStream.print()
    
        // 启动流计算
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    

    案例四:Spark Streaming读取Kafka数据

    (1) 版本选型

    • ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。

    • DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

    (2)Kafka 0-10 Direct模式

    需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

    导入spark-streaming-kafka依赖包

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    

    出现报错,因为有一个依赖版本过高:
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.6
    需要添加依赖:

    <!-- 版本降维 -->
    <dependency> 
        <groupId>com.fasterxml.jackson.core</groupId> 
        <artifactId>jackson-databind</artifactId> 
        <version>2.6.6</version> 
    </dependency>
    

    代码部分

    package cn.kgc.kb09.Spark
    
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SparkStreamKafkaSource {
      def main(args: Array[String]): Unit = {
        // 创建SparkConf
        val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo")
        // 创建StreamingContext
        val streamingContext = new StreamingContext(conf,Seconds(5))
    
        // 设置checkpoint目录
        streamingContext.checkpoint("checkpoint")
    
        // 配置Kafka相关参数
        val kafkaParams: Map[String, String] = Map(
          // kafka集群有几台机器就写几台
          (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.247.201:9092"),
          // 因为是消费topic,所以需要K-V反序列化
          (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
          (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
          // 定义消费者组别
          (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
        )
    
        // 通过 KafkaUtils.createDirectStream接受kafka数据,这里采用是kafka低级api偏移量不受zk管理
        val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
          KafkaUtils.createDirectStream(
            streamingContext,  //不再是直接从streamingContext点出来的基本源,而是作为参数生成InputDStream
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)  //sparkKafkaDemo是生产者的topic
          )
    
        // 对获取的数据进行处理
        val wordStream = kafkaStream.flatMap(v => v.value().toString.split("\\s+"))
    
        val mapStream = wordStream.map((_,1))
    
    
        // 无状态
    //    val sumStream = mapStream.reduceByKey(_+_)
    //    sumStream.print()
    
    
        // 有状态  hello,2   再输入hello,则返回(2,1)
        // 前提条件:需要设置checkpoint
        val stateSumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
          case (seq, buffer) => {
            println(seq, seq.sum, buffer.getOrElse(0))
            val sum = buffer.getOrElse(0) + seq.sum
            Option(sum)
          }
        }
    
        // 打印结果
        stateSumStream.print()
    
        // 启动流计算
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    

    通过 KafkaUtils.createDirectStream接受kafka数据,这里采用是kafka低级api偏移量不受zk管理

    • LocationStrategies:本地策略。为提升性能,可指定Kafka Topic Partition的消费者所在的Executor。

    • LocationStrategies.PreferConsistent:一致性策略。一般情况下用这个策略就OK。将分区尽可能分配给所有可用Executor。

    • LocationStrategies.PreferBrokers:特殊情况,如果Executor和Kafka Broker在同一主机,则可使用此策略。

    • LocationStrategies.PreferFixed:特殊情况,当Kafka Topic Partition负荷倾斜,可用此策略,手动指定Executor来消费特定的Partition.

    • ConsumerStrategies:消费策略。

    • ConsumerStrategies.Subscribe/SubscribePattern:可订阅一类Topic,且当新Topic加入时,会自动订阅。一般情况下,用这个就OK。

    • ConsumerStrategies.Assign:可指定要消费的Topic-Partition,以及从指定Offset开始消费。

    特点:

    • 1、不需要使用单独的Receiver线程从Kafka获取数据

    • 2、使用Kafka简单消费者API,不需要ZooKeeper参与,直接从Kafka Broker获取数据。

    • 3、执行过程:Spark Streaming Batch Job触发时,Driver端确定要读取的Topic-Partition的OffsetRange,然后由Executor并行从Kafka各Partition读取数据并计算。

    • 4、为保证整个应用EOS, Offset管理一般需要借助外部存储实现。如Mysql、HBase等。

    • 5、由于不需要WAL,且Spark Streaming会创建和Kafka Topic Partition一样多的RDD Partition,且一一对应,这样,就可以并行读取,大大提高了性能。

    • 6、Spark Streaming应用启动后,自己通过内部currentOffsets变量跟踪Offset,避免了基于Receiver的方式中Spark Streaming和Zookeeper中的Offset不一致问题。

    参考文献:
    https://www.cnblogs.com/redhat0019/p/10817597.html
    https://blog.csdn.net/timicai/article/details/111485113
    https://blog.csdn.net/wangpei1949/article/details/89419691
    https://www.cnblogs.com/upupfeng/p/12325201.html

    展开全文
  • SparkStreaming

    2021-11-15 22:40:28
    一、SparkStreaming概述 1. 数据处理类型分类 - 静态数据 ...- sparkstreaming什么 - 一句话总结:微批处理的流式(数据)实时计算框架 - 原理:是把输入数据以某一时间间隔批量的处理,当批处理...

    一、SparkStreaming概述

    1. 数据处理类型分类

    - 静态数据
      - 数据源是不变的、有限的、显式离散的
      - 多适用于批量计算、离线计算
    - 流数据
      - 数据是变动的、无限的、连续的
      - 多适用于实时计算,能在秒级、秒内处理完成
        - 实时数据分类
          - 小时级
          - 分钟级
          - 秒级
    - sparkstreaming是什么
      - 一句话总结:微批处理的流式(数据)实时计算框架
      - 原理:是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,即可用于处理实时数据流。
      - 优点
        - 可以和spark core、sparksql等无缝集成
        - 支持从多种数据源获取数据,包括Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP sockets,然后可以使用诸如map、reduce、join等高级函数进行复杂算法的处理,最后可以将处理结果存储到HDFS文件系统,数据库等。
      - 重要概念说明
        - StreamingContext
          - 类比于SparkContext,SparkSqlContext
          - 流计算框架中的中枢类,负责各种环境信息、分发调度等任务。
        - 数据源
          - 简称:Source,意为DataSource的缩写
          - 指流数据的来源是哪里,如文件,Socket输入、Kafka等。
        - 离散流
          - 英文称Discretized Stream,简称DStream,即为sparkstreaming微批处理当中的数据抽象单位。
          - 是继spark core的RDD、spark sql的DataFrame和DataSet后又一基础的数据类型,是spark streaming特有的数据类型。
        - 输入离散流
          - 英文简称:Input DStream
          - 将Spark Streaming连接到一个外部Source数据源来读取数据的统称
        - 批处理
          - 英文称Batch Data
          - 连续数据离散化的步骤:将流式实时连续的数据整体转化成以时间片为单位进行分批,即将流式数据转化成时间片为单位数据进行批数据处理,随着时间推移,这些处理结果即形成结果数据流,即流处理引擎形成。
        - 时间片或批处理时间间隔
          - 英文称 batch interval
          - 人为对流数据进行定量的标准,以时间片作为拆分流数据的依据。
          - 一个时间片的数据对应一个RDD实例。
        - 窗口长度
          - 英文称window length
          - 一个窗口覆盖的流数据的时间长度,必须是批处理时间间隔的倍数。
          - 窗口分类
            - 滑动窗口
            - 滚动窗口
          - 滑动窗口时间间隔
            - 滑动窗口:简称Sliding window
            - 前一个窗口到后一个窗口所经过的时间长度间隔。必须是批处理时间间隔的倍数

    2. 处理流程图示说明

    - 框架处理总流程图![](F:\大数据笔记\图片\sparkstreaming框架处理总流程图.png)

    - 框架内部工作流程图
      - Spark Streaming接收实时输入数据流并将数据分成批处理,然后由SparkCore引擎处理,以批量生成最终结果流。![](F:\大数据笔记\图片\sparkstreaming框架内部工作流程图.png)

    二、scala快速构建sparkstreaming应用

    1. 在maven中添加spark-streaming依赖

       ```xml
              <!-- spark-streaming依赖 -->
             <dependency>
               <groupId>org.apache.spark</groupId>
               <artifactId>spark-streaming_${scala.compile.version}</artifactId>
               <version>2.3.2</version>
               <scope>provided</scope>
            </dependency>
       ```

    2. 以StreamingContext为起点,面向DStream编程

       ```scala
       package com.tl.job002.streaming
       import org.apache.spark._
       import org.apache.spark.streaming._
       //SparkStreaming测试类
       object SparkStreamingTest {
         def main(args: Array[String]): Unit = {
           //    要初始化Spark Streaming程序,必须创建一个StreamingContext对象,
           //它是所有Spark Streaming功能的主要入口点。
           //一切都从SparkConf开始
          val conf = new SparkConf().setMaster("local[2]")
       .setAppName("NetworkWordCount")
           //指定时间间隔的ssc初始化
           val ssc = new StreamingContext(conf, Seconds(1))
          
           //ssc指定来自TCP源作为输入数据源,即链接一个指定主机的已打开的TCP端口,从该端口中读取文本数据,每行以”\n”作为每行的结尾。
           val lines = ssc.socketTextStream("localhost", 9999)
          
           //将DStream进行打平处理,实际是对其内部的离散的rdd进行打平处理
           val words = lines.flatMap(_.split("\\s+"))
          
           // 将单列的word转化为双列的kv结构,用于后边的wc操作
           val pairs = words.map(word => (word, 1))
          
           //对kv的输出进行wc计算
           val wordCounts = pairs.reduceByKey(_ + _)
           //打印wc结果到控制台
           wordCounts.print()
          
           //正式开始计算
           ssc.start()
           //等待计算结束,一般流式计算没有异常或人为干预是一直保持运行状态的
           ssc.awaitTermination()
         }
       }
       
       ```

    3. SparkStreaming实现WordCount的流程抽象说明(经典流程)

    - SparkStreaming代码的开发流程
      - 初始化StreamingContext
      - 通过创建输入DStreams来定义输入源。
      - 通过将转换和输出操作应用于DStream来定义流式计算。
      - 开始接收数据并使用它进行处理streamingContext.start()。
      - 等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()。
      - 可以使用手动停止处理streamingContext.stop()。
    - SparkStreaming代码开发注意
      - 一旦启动(start)了上下文,就不能设置或添加新的流式计算。
      - 上下文停止后,无法重新启动。
      - 在JVM中只能同时激活一个StreamingContext。
      - StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将stop()的可选参数设置stopSparkContext为false。
      - 只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext),就可以重复使用SparkContext来创建多个StreamingContexts。

    三、SparkStreaming常见问题说明

    1. 输入DStream和Receivers

    - 输入DStream:即为从数据流源接受的输入数据流的DStream
      - 内置两类流媒体源
        - 基本流数据源:包括本地或是hdfs文件系统、Socket套接字链接、Akka actor等。
        - 高级流数据源:包括Kafka、Flume、Kinesis、ZeroMQ等数据源,可以通过引入第三方工具库来使用该数据源。
      - Receivers:每个输入DStream(除文件流之外)均与Receiver相对关联,该对象负责从流源接收数据并将其存储在Spark的内存中进行处理。

    2. 关于SparkStreaming本地运行时线程数量的设置

    - 在本地运行Spark Streaming程序时,不能使用“local”或“local [1]”作为主URL。
    - 该设置即只有一个线程将用于本地运行任务。如果正使用基于接收器的输入DStream,则必须使用单个线程来运行接收器,而无法留下用于处理接收数据的线程。故本地运行时,始终使用“local [ n ]”作为主URL,其中n >要运行的接收器数量,即至少从2起

    3. 关于SparkStreaming在集群运行时CPU逻辑核心数设置

    - 一个逻辑CPU的资源相当于可以开启一个线程的能力。
    - 在集群上运行时,分配给SparkStreaming应用程序的核心数必须大于接收器数。否则系统将只能接收数据,但无法处理数据。

    展开全文
  • 每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。此外,还需要有可用的 CPU 核心来处理数据。 解决: 必须至少有和接收器数目相同的核心数,还要加上用来完成...
  • Spark Streaming 简介

    2021-08-30 20:05:48
    file1.txt I love Hadoop I love Spark import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object Test { def main(args:...
  • Spark Streaming入门概述应用场景集成Spark生态系统的使用Spark Streaming发展史 词频统计 概述 Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从像卡夫卡,室壁...
  • SparkStreaming是基于spark core 的实时架构,虽然SparkStreaming可以进行实时计算,但它并不是一个纯实时计算框架。StreamingContext的批次间隔决定了每隔多久计算一次。 SparkStreaming提供了窗口的计算 ,窗口...
  • Spark Streaming 是微批(Micro-Batch)的模型。 下面我们就分几个方面介绍两个框架的主要区别: 1. 架构模型:Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要...
  • Spark Streaming 实时写入Hive

    千次阅读 2021-02-01 17:06:08
    所以使用Spark Streaming替代Flume实现入库Hive功能。 二、流程图 Created with Raphaël 2.2.0kafkaSpark StreamingETLhive 三、代码实现 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project ...
  • Spark Streaming 在指标分析实践Spark Streaming 介绍 众所周知 Spark 是批处理框架,而 Spark Streaming 借鉴批处理的理念实现的准实时算框架,通过将数据按时间分批处理,实际应用中根据延迟要求合理设置分批间隔...
  • 本文对Spark Streaming和Structured Streaming在流模型、API使用、时延性能以及和Kafka对接等方面进行了对比
  • 这里写目录标题一、Spark Streaming 简介二、简单的例子Spark Streaming相关核心类 一、Spark Streaming 简介 Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从...
  • Java-Spark系列7-Spark streaming介绍

    万次阅读 2021-09-27 15:49:25
    文章目录一.Spark streaming介绍1.1 Spark streaming简介1.2 Spark 与storm区别1.3 一个简单的例子二.Spark Streaming的组件介绍2.1 Streaming Context2.2 Dstream(离散流)2.1 Receiver2.2 数据源2.3 可靠性2.4 ...
  • Flink 与 Spark Streaming 区别 这个问题是一个非常宏观的问题,因为两个框架的不同点非常之多。但是在面试时有非常重要的一点一定要回答出来: Flink 是标准的实时处理引擎,基于事件驱动。 而 Spark Streaming ...
  • SparkCore 1.一句话介绍Hadoop 和 Sparkspark是基于内存的分布式计算框架。 ​ hadoop是一个分布式计算开源框架,包含分布式文件系统HDFS、 MapReduce分布式计算的软件架构和Yarn资源管理调度系统。 2.Spark和MR...
  • SparkStreaming实时ETL

    2021-03-18 16:15:11
    SparkStreaming实时在线ETL 描述:使用Scala和Java编程 1.和kafka整合,通过kafka生产数据,然后把数据传入sparkstreaming进行在线 ETL; 2.把ETL后的数据在传入kafka,通过kafka消费数据 3.通过Redis进行...
  • SparkStreaming的窗口

    2021-01-05 15:19:12
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming....
  • ​ 这篇博客来源于一个惨痛的线上事故经历,我们编写好SparkStreaming程序清洗行为数据,然后每十分钟往Hive写一次,大家都以为任务正常的运行,不会出什么问题,然而问题正在后台默默的产生了,到了第二天,所有...
  • $SPARK_HOME/bin/spark-submit --num-executors 1 --executor-memory 1g -...--conf spark.streaming.backpressure.enabled=true --conf spark.streaming.backpressure.initialRate=1000 --conf spark.streaming.stopG.
  • sparkstreaming和flink的区别 参考 https://blog.csdn.net/b6ecl1k7BS8O/article/details/81350587 – 组件: sparkstreaming: Master:主要负责整体集群资源的管理和应用程序调度; Worker:负责单个节点的资源管理...
  • Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通 过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操 作。现在就来看看,如何将DStream中的...
  • Spark Streaming 在很多实时数据处理的场景中,都需要用到流式处理(Stream Process)框架,Spark也包含了两个完整的流式处理框架Spark Streaming和Structured StreamingSpark 2.0出现),先阐述流式处理框架...
  • Spark Streaming核心核心概念1. StreamingContextTransformationsOutput Operations实战案例 核心概念 1. StreamingContext Transformations Output Operations 实战案例
  • Spark Streaming

    2021-03-07 03:37:17
    大纲:Spark Streaming简介Spark Streaming的原理和架构Spark Streaming之基础抽象DStreamDStream相关操作Spark Streaming与flume整合Spark Streaming与kafka整合第一章 Spark Streaming概述1.1 Spark Streaming简介...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 53,394
精华内容 21,357
关键字:

sparkstreaming是什么