精华内容
下载资源
问答
  • 常用spark命令行
    2021-07-02 15:52:40

    常见命令

    函数名功能实例
    parallelize
    makeRDD
    range
    rdd创建sc.parallelize(Array(1,2,3,4))
    sc.parallelize(1 to 100)
    sc.makeRDD(List(1,2,3,4,5))
    sc.makeRDD(1 to 100)
    sc.range(1,100,3)
    getNumPartitions检查rdd分区数rdd.getNumPartitions
    rdd.partitions.length
    textFile文件系统加载数据sc.textFile(“file:///root/data/bigdata.dat”)
    map(func)对数据集中地每个元素都使用func->RDD
    filter(dunc)对每个元素使用func->func为true的蒜素构成的RDD
    flatMap(func)与map类似,每个输入元素被映射为0、多个输出元素rdd.flatMap(_.split("\s+"))
    mapPartitions(func)func作用在分区的所有元素调用分区数rdd.mapPartitions{iter=>Iterator(s"${iter.toList}")}
    mapPartitionsWithIndex根据分区索引生成rdd.mapPartitionsWithIndex{(idx,iter)=>Iterator(s" i d x : idx: idx:{iter.toArray.mkString("-")}")}
    groupBy(func)按照传入函数的返回值进行分组
    glom()将一个分区形成一个数组,形成新的RDD类型RDD[Array[T]]
    sample(withReplacement,fraction,seed)采样算子,seed->随机抽烟出数量为fraction的数据,withReplacement表示抽出的数据从是否放回
    distinct([numTasks])对RDD去重
    coalesce(numPartitions)缩减分区数,无shuffle
    repartitions(numPartitions)增加、减少分区有shuffle
    sortBy(func,[ascending],[numTasks])使用func处理后的结果排序
    sortWith制定规则进行升降序排序reduce1RDD.map{case ((provice, adid), count) => (provice, (adid, count))}.groupByKey().mapValues(buf => buf.toList.sortWith(_._2 > _.2).take(N).map(._1).mkString("😊).foreach(println)
    sliding(size,step)滑动窗口size表示每个array的长度step表示每次位移的长度
    glom将一个分区的所有元素合并到一个Array中sc.parallelize(1 to 100).glom.collect
    scala.util.Random产生随机数val random = scala.util.Random
    val arr = (1 to 20).map(x=>random.nextInt(10))
    intersection(otherRDD)交集取两个RDD交集
    union并集取两个RDD并集
    subtract差集取两个RDD之间的差集
    take(num):Array[T]算子获取RDD中的前num个元素
    collectAsMap:scala.collection.Map转换成key:value 后面相同的keyvalue覆盖之前的
    stats统计rdd的相关数据count,mean,stdev,max,min
    count统计个数
    mean统计平均数
    stdev统计方差
    reduce(func)聚合操作(要求元素同类型)rdd.reduce(+)第一个_表示reduce的返回值第二个_表示rdd的元素
    fold(func)聚合操作(要求元素同类型)rdd.fold(0,+),0表示第一次调用的结果,加法为0,乘法为1
    scala> rdd.fold(1)((x,y)=>{
    aggregate(func)先聚合每个分区的元素,在整合所有分区的结果
    first()返回第一个element对于这个RDD
    top(n)按照默认降序或者指定排序规则返回前num个元素
    takeSample(withReplacement,num,[seed])返回采样的数据
    saveAsTextFile(path)
    saveAsSequenceFile(path)
    saveAsObjectFile(path)
    mapValues操作map的第二个元素,map的简化操作a.mapValues(x=>1 to x)
    a.map(x=>(x._1,1 to x._2))
    flatMapValues将value的值压平a.flatMapValues(x=>1 to x) Array((1,2), (3,4), (5,6))->Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5…
    keys
    values
    groupByKey将二元组相同的key对应的value合并到一个CompactBuffer中rdd.groupByKey().map(x=>(x._1,x._2.sum.toDouble/x._2.size)).collect
    rdd.groupByKey.mapValues(v=>v.sum.toDouble/v.size).collect
    reduceByKey操作元祖第二个元素结构为(x,y)进行合并rdd.mapValues((_,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1.toDouble/x._2)).collect()
    foldByKey与上面一样rdd.mapValues((_,1)).foldByKey((0,0))((x,y)=>{(x._1+y._1,x._2+y._2)}).mapValues(x=>x._1.toDouble/x._2).collect
    cattesian笛卡尔积
    combinations将一个集合分解成n个集合,集合的容量为n,如果n大于原集合count那么返回None
    collectionAccumulator集合累加器val accum = sc.longAccumulator(“my accu”)
    sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))
    accum.value
    res2:long=10
    更多相关内容
  • Spark命令参数

    2022-02-07 09:57:20
    提交运行Spark Application时,有些基本参数需要传递值,如下所示: 动态加载Spark Applicaiton运行时的参数,通过–conf进行指定,如下使用方式: 三、Driver Program 参数配置 每个Spark Application运行时都...

    一、应用提交语法

    在这里插入图片描述

    二、基本命令参数

    • 提交运行Spark Application时,有些基本参数需要传递值,如下所示:

    在这里插入图片描述

    • 动态加载Spark Applicaiton运行时的参数,通过–conf进行指定,如下使用方式:

    在这里插入图片描述

    三、Driver Program 参数配置

    每个Spark Application运行时都有一个Driver Program,属于一个JVM Process进程,可以设置内存Memory和CPU Core核数。

    在这里插入图片描述

    四、Executor 参数配置

    每个Spark Application运行时,需要启动Executor运行任务Task,需要指定Executor个数及每个Executor资源信息(内存Memory和CPU Core核数)。
    在这里插入图片描述

    五、官方样例

    # Run application locally on 8 cores
    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master local[8] \
      /path/to/examples.jar \
      100
    
    # Run on a Spark standalone cluster in client deploy mode
    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://207.184.161.138:7077 \
      --executor-memory 20G \
      --total-executor-cores 100 \
      /path/to/examples.jar \
      1000
    
    # Run a Python application on a Spark standalone cluster
    ./bin/spark-submit \
      --master spark://207.184.161.138:7077 \
      examples/src/main/python/pi.py \
      1000
    

    展开全文
  • 一、spark所在目录cd usr/local/spark 二、启动spark/usr/local/spark/sbin/start-all.sh 启动Hadoop以及Spark:bash ./starths.sh 浏览器查看:172.16.31.17:8080 停止Hadoop以及Sparkbash ./stophs.sh三、基础使用...

    一、spark所在目录

    cd usr/local/spark

    a415383455814d4a0f2067e7c262f00c.png

    二、启动spark

    /usr/local/spark/sbin/start-all.sh

    9a87664359be64dd70159e3fb172208f.png

    启动Hadoop以及Spark:

    bash ./starths.sh

    da0ae90a27ce30fa8070708667838360.png

    浏览器查看:

    172.16.31.17:8080

    152a676957fdff5dd9e98c0ae2340f92.png

    停止Hadoop以及Spark

    bash ./stophs.sh

    三、基础使用

    1.运行Spark示例(SparkPi)

    在 ./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。

    c6e684c64d6a50cc1c1e094d16910d23.png

    我们可以先运行一个示例程序 SparkPi(即计算 π 的近似值),执行如下命令:

    ./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

    35c121880ce0e8852f0899b416ecbe80.png

    执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中)

    Python 版本的 SparkPi 则需要通过 spark-submit 运行:

    ./bin/spark-submit examples/src/main/python/pi.py

    f71a4407eec5d782291ed3e045a206fc.png

    2.通过Spark-shell进行交互分析

    Spark Shell 支持 Scala 和 Python

    Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。

    Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。 使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。

    2.1 启动Spark Shell

    ./bin/spark-shell

    a0b2010c212fdb8004f605bcfa5da3c2.png

    2.2 基础操作

    Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。

    2.2.1 RDD创建方式

    参考链接:

    (1)使用本地文件创建RDD:

    val textFile = sc.textFile("file:///usr/local/spark/README.md")

    bc4eb2ada6c9314e9a15de63403d0026.png

    代码中通过 “file://” 前缀指定读取本地文件。

    (2)使用hdfs文件创建RDD:

    val textfile = sc.textFile("/winnie/htest/test01.txt")

    7678ef0197e92afb7ddfa7d617245f3f.png

    Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/local/spark/README.md”的错误。

    74a1d6237ab668d34a5eaca0180b0d00.png

    (3)使用集合创建RDD:

    从集合中创建RDD主要有下面两个方法:makeRDD 和 parallelize

    makeRDD示例:

    sc.makeRDD(1 to 50)

    8baf3aa7ec21d57bccc46b2d41eef013.png

    sc.makeRDD(Array("1", "2", "3"))

    0a1395dd55833813773f3ece7726921f.png

    sc.makeRDD(List(1,3,5))

    b42a6c4d58988638b985f7f1cfdb7c3e.png

    parallelize示例:

    同上

    2.2.2 RDD两种操作

    (1)RDDs支持两种类型的操作:

    actions: 执行操作,在数据集上运行计算后返回值

    transformations: 转化操作,从现有数据集创建一个新的数据集

    转化操作并不会立即执行,而是到了执行操作才会被执行

    转化操作:

    map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD

    flatMap() 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD

    filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

    distinct() 没有参数,将RDD里的元素进行去重操作

    union() 参数是RDD,生成包含两个RDD所有元素的新RDD

    intersection() 参数是RDD,求出两个RDD的共同元素

    subtract() 参数是RDD,将原RDD里和参数RDD里相同的元素去掉

    cartesian() 参数是RDD,求两个RDD的笛卡儿积

    行动操作:

    collect() 返回RDD所有元素

    count() RDD里元素个数,对于文本文件,为总行数

    first()RRD中的第一个item,对于文本文件,为首行内容

    countByValue() 各元素在RDD中出现次数

    reduce() 并行整合所有RDD数据,例如求和操作

    fold(0)(func) 和reduce功能一样,不过fold带有初始值

    aggregate(0)(seqOp,combop) 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样

    foreach(func) 对RDD每个元素都是使用特定函数

    行动操作每次的调用是不会存储前面的计算结果的,若想要存储前面的操作结果,要把需要缓存中间结果的RDD调用cache(),cache()方法是把中间结果缓存到内存中,也可以指定缓存到磁盘中(也可以只用persisit())

    (2)实例

    textFile.count()

    textFile.first()

    e96d07021f3d3a78f605c88d06e3e1db.png

    val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行

    linesWithSpark.count() // 统计行数

    7a572227c43ae065fb55026ff6a1f407.png

    action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:

    textFile.filter(line => line.contains("Spark")).count()

    找到包含单词最多的那一行内容共有几个单词:

    textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

    e824ef1908d91c98cdd61ca1617f07ad.png

    代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。

    Hadoop MapReduce 是常见的数据流模式,在 Spark 中同样可以实现(下面这个例子也就是 WordCount):

    val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 实现单词统计

    wordCounts.collect() // 输出单词统计结果

    053c4d400c30b5f28bccedb15a0f6cae.png

    2.2.3 Spark shell退出

    :quit

    3.Spark SQL 和 DataFrames

    Spark SQL 是 Spark 内嵌的模块,用于结构化数据。

    在 Spark 程序中可以使用 SQL 查询语句或 DataFrame API。

    DataFrames 和 SQL 提供了通用的方式来连接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,并且可以在多种数据源之间执行 join 操作。

    3.1 Spark SQL 基本操作

    Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。

    在 Spark shell 启动时,输出日志的最后有这么几条信息:

    2f9ab086b797ad4c97e340c176c1347c.png

    这些信息表明 SparkContent 和 SQLContext 都已经初始化好了,可通过对应的 sc、sqlContext 变量直接进行访问。

    使用 SQLContext 可以从现有的 RDD 或数据源创建 DataFrames。

    通过 Spark 提供的 JSON 格式的数据源文件 ./examples/src/main/resources/people.json 来进行演示,该数据源内容如下:

    bcee23abc211343eb1ee2ab717140d3e.png

    【补充:xshell上传文件】

    rz

    【补充:xshell下载文件】

    sz

    展开全文
  • Spark Streaming 基本操作

    2022-07-14 09:54:19
    Spark Streaming 基本操作

    一、案例引入

    这里先引入一个基本的案例来演示流的创建:获取指定端口上的数据并进行词频统计。项目依赖和代码实现如下:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>2.4.3</version>
    </dependency>
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object NetworkWordCount {
    
      def main(args: Array[String]) {
    
        /*指定时间间隔为 5s*/
        val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        /*创建文本输入流,并进行词频统计*/
        val lines = ssc.socketTextStream("hadoop001", 9999)
        lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
    
        /*启动服务*/
        ssc.start()
        /*等待服务结束*/
        ssc.awaitTermination()
      }
    }
    

    使用本地模式启动 Spark 程序,然后使用 nc -lk 9999 打开端口并输入测试数据:

    [root@hadoop001 ~]#  nc -lk 9999
    hello world hello spark hive hive hadoop
    storm storm flink azkaban
    

    此时控制台输出如下,可以看到已经接收到数据并按行进行了词频统计。

    在这里插入图片描述


    下面针对示例代码进行讲解:

    3.1 StreamingContext

    Spark Streaming 编程的入口类是 StreamingContext,在创建时候需要指明 sparkConfbatchDuration(批次时间),Spark 流处理本质是将流数据拆分为一个个批次,然后进行微批处理,batchDuration 就是批次拆分的时间间隔。这个时间可以根据业务需求和服务器性能进行指定,如果业务要求低延迟并且服务器性能也允许,则这个时间可以指定得很短。

    这里需要注意的是:示例代码使用的是本地模式,配置为 local[2],这里不能配置为 local[1]。这是因为对于流数据的处理,Spark 必须有一个独立的 Executor 来接收数据,然后再由其他的 Executors 来处理,所以为了保证数据能够被处理,至少要有 2 个 Executors。这里我们的程序只有一个数据流,在并行读取多个数据流的时候,也需要保证有足够的 Executors 来接收和处理数据。

    3.2 数据源

    在示例代码中使用的是 socketTextStream 来创建基于 Socket 的数据流,实际上 Spark 还支持多种数据源,分为以下两类:

    • 基本数据源:包括文件系统、Socket 连接等;
    • 高级数据源:包括 Kafka,Flume,Kinesis 等。

    在基本数据源中,Spark 支持监听 HDFS 上指定目录,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下:

    // 对于文本文件,指明监听目录即可
    streamingContext.textFileStream(dataDirectory)
    // 对于其他文件,需要指明目录,以及键的类型、值的类型、和输入格式
    streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    

    被监听的目录可以是具体目录,如 hdfs://host:8040/logs/;也可以使用通配符,如 hdfs://host:8040/logs/2017/*

    关于高级数据源的整合单独整理至:Spark Streaming 整合 FlumeSpark Streaming 整合 Kafka

    3.3 服务的启动与停止

    在示例代码中,使用 streamingContext.start() 代表启动服务,此时还要使用 streamingContext.awaitTermination() 使服务处于等待和可用的状态,直到发生异常或者手动使用 streamingContext.stop() 进行终止。

    二、Transformation

    2.1 DStream与RDDs

    DStream 是 Spark Streaming 提供的基本抽象。它表示连续的数据流。在内部,DStream 由一系列连续的 RDD 表示。所以从本质上而言,应用于 DStream 的任何操作都会转换为底层 RDD 上的操作。例如,在示例代码中 flatMap 算子的操作实际上是作用在每个 RDDs 上 (如下图)。因为这个原因,所以 DStream 能够支持 RDD 大部分的transformation算子。

    在这里插入图片描述

    2.2 updateStateByKey

    除了能够支持 RDD 的算子外,DStream 还有部分独有的transformation算子,这当中比较常用的是 updateStateByKey。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用 updateStateByKey 算子。代码如下:

    object NetworkWordCountV2 {
    
    
      def main(args: Array[String]) {
    
        /*
         * 本地测试时最好指定 hadoop 用户名,否则会默认使用本地电脑的用户名,
         * 此时在 HDFS 上创建目录时可能会抛出权限不足的异常
         */
        System.setProperty("HADOOP_USER_NAME", "root")
          
        val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        /*必须要设置检查点*/
        ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
        val lines = ssc.socketTextStream("hadoop001", 9999)
        lines.flatMap(_.split(" ")).map(x => (x, 1))
          .updateStateByKey[Int](updateFunction _)   //updateStateByKey 算子
          .print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      /**
        * 累计求和
        *
        * @param currentValues 当前的数据
        * @param preValues     之前的数据
        * @return 相加后的数据
        */
      def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
        val current = currentValues.sum
        val pre = preValues.getOrElse(0)
        Some(current + pre)
      }
    }
    

    使用 updateStateByKey 算子,你必须使用 ssc.checkpoint() 设置检查点,这样当使用 updateStateByKey 算子时,它会去检查点中取出上一次保存的信息,并使用自定义的 updateFunction 函数将上一次的数据和本次数据进行相加,然后返回。

    2.3 启动测试

    在监听端口输入如下测试数据:

    [root@hadoop001 ~]#  nc -lk 9999
    hello world hello spark hive hive hadoop
    storm storm flink azkaban
    hello world hello spark hive hive hadoop
    storm storm flink azkaban
    

    此时控制台输出如下,所有输入都被进行了词频累计:

    在这里插入图片描述

    同时在输出日志中还可以看到检查点操作的相关信息:

    # 保存检查点信息
    19/05/27 16:21:05 INFO CheckpointWriter: Saving checkpoint for time 1558945265000 ms 
    to file 'hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000'
    
    # 删除已经无用的检查点信息
    19/05/27 16:21:30 INFO CheckpointWriter: 
    Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000
    

    三、输出操作

    3.1 输出API

    Spark Streaming 支持以下输出操作:

    Output OperationMeaning
    print()在运行流应用程序的 driver 节点上打印 DStream 中每个批次的前十个元素。用于开发调试。
    saveAsTextFiles(prefix, [suffix])将 DStream 的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。
    saveAsObjectFiles(prefix, [suffix])将 DStream 的内容序列化为 Java 对象,并保存到 SequenceFiles。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。
    saveAsHadoopFiles(prefix, [suffix])将 DStream 的内容保存为 Hadoop 文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。
    foreachRDD(func)最通用的输出方式,它将函数 func 应用于从流生成的每个 RDD。此函数应将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或通过网络将其写入数据库。

    前面的四个 API 都是直接调用即可,下面主要讲解通用的输出方式 foreachRDD(func),通过该 API 你可以将数据保存到任何你需要的数据源。

    3.1 foreachRDD

    这里我们使用 Redis 作为客户端,对文章开头示例程序进行改变,把每一次词频统计的结果写入到 Redis,并利用 Redis 的 HINCRBY 命令来进行词频统计。这里需要导入 Jedis 依赖:

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>
    

    具体实现代码如下:

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import redis.clients.jedis.Jedis
    
    object NetworkWordCountToRedis {
      
        def main(args: Array[String]) {
    
        val sparkConf = new SparkConf().setAppName("NetworkWordCountToRedis").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        /*创建文本输入流,并进行词频统计*/
        val lines = ssc.socketTextStream("hadoop001", 9999)
        val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
         /*保存数据到 Redis*/
        pairs.foreachRDD { rdd =>
          rdd.foreachPartition { partitionOfRecords =>
            var jedis: Jedis = null
            try {
              jedis = JedisPoolUtil.getConnection
              partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
            } catch {
              case ex: Exception =>
                ex.printStackTrace()
            } finally {
              if (jedis != null) jedis.close()
            }
          }
        }
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    

    其中 JedisPoolUtil 的代码如下:

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class JedisPoolUtil {
    
        /* 声明为 volatile 防止指令重排序 */
        private static volatile JedisPool jedisPool = null;
        private static final String HOST = "localhost";
        private static final int PORT = 6379;
    
        /* 双重检查锁实现懒汉式单例 */
        public static Jedis getConnection() {
            if (jedisPool == null) {
                synchronized (JedisPoolUtil.class) {
                    if (jedisPool == null) {
                        JedisPoolConfig config = new JedisPoolConfig();
                        config.setMaxTotal(30);
                        config.setMaxIdle(10);
                        jedisPool = new JedisPool(config, HOST, PORT);
                    }
                }
            }
            return jedisPool.getResource();
        }
    }
    

    3.3 代码说明

    这里将上面保存到 Redis 的代码单独抽取出来,并去除异常判断的部分。精简后的代码如下:

    pairs.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val jedis = JedisPoolUtil.getConnection
        partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
        jedis.close()
      }
    }
    

    这里可以看到一共使用了三次循环,分别是循环 RDD,循环分区,循环每条记录,上面我们的代码是在循环分区的时候获取连接,也就是为每一个分区获取一个连接。但是这里大家可能会有疑问:为什么不在循环 RDD 的时候,为每一个 RDD 获取一个连接,这样所需要的连接数会更少。实际上这是不可行的,如果按照这种情况进行改写,如下:

    pairs.foreachRDD { rdd =>
        val jedis = JedisPoolUtil.getConnection
        rdd.foreachPartition { partitionOfRecords =>
            partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
        }
        jedis.close()
    }
    

    此时在执行时候就会抛出 Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis,这是因为在实际计算时,Spark 会将对 RDD 操作分解为多个 Task,Task 运行在具体的 Worker Node 上。在执行之前,Spark 会对任务进行闭包,之后闭包被序列化并发送给每个 Executor,而 Jedis 显然是不能被序列化的,所以会抛出异常。

    第二个需要注意的是 ConnectionPool 最好是一个静态,惰性初始化连接池 。这是因为 Spark 的转换操作本身就是惰性的,且没有数据流时不会触发写出操作,所以出于性能考虑,连接池应该是惰性的,因此上面 JedisPool 在初始化时采用了懒汉式单例进行惰性初始化。

    3.4 启动测试

    在监听端口输入如下测试数据:

    [root@hadoop001 ~]#  nc -lk 9999
    hello world hello spark hive hive hadoop
    storm storm flink azkaban
    hello world hello spark hive hive hadoop
    storm storm flink azkaban
    

    使用 Redis Manager 查看写入结果 (如下图),可以看到与使用 updateStateByKey 算子得到的计算结果相同。

    在这里插入图片描述


    展开全文
  • SparkSQL是spark用来处理结构化... 注:本文所有操作是基于ambari工具,搭建好了 hdfs yarn hive spark mapReduce等大数据常用的组件 一、进入spark命令窗口 输入命令 spark-shell 以上是各种报错的部分...
  • Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。 1.Hive and SparkSQL SparkSQL 的前身是 Shark,给熟悉 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具。Hive 是早期唯一运行在...
  • conda常用命令行

    2022-06-21 20:07:26
    1)使用zip打包,必须切换到anaconda3的虚拟环境目录下 cd anaconda3/envs zip -r -9 -q py36spark.zip py36spark -9是指最高压缩,默认是-6. 2)使用conda pack打包 先在虚拟环境外部安装,使用conda安装贼慢 pip ...
  • hive命令行常用配置

    千次阅读 2022-03-06 15:51:35
    set spark.yarn.queue=fcqueue; set hive.cli.print.header=true; -- 动态分区 set hive.exec.dynamic.partition=true; -- 非严格模式 set hive.exec.dynamic.partition.mode=nonstrict; set spark.executor.memory=...
  • 快速入门pyspark教程 ...不适用jupyter时,运行spark.stop() 关闭spark 1.代码运行时,通过Spark UI 网页查看当前进行程度 2.读写parquet文件为DataFrame Pyspark将Parquet文件读入DataFrame ...
  • 本文针对在YARN上运行Spark常用配置参数进行讲解 1. 在yarn上启动spark application 确保HADOOP_CONF_DIR或YARN_CONF_DIR指向包含Hadoop集群(客户端)配置文件的目录。 这些configs用于写入HDFS并连接YARN ...
  • Spark之——Spark Submit提交应用程序

    万次阅读 2018-06-19 21:44:36
    本部分来源,也可以到spark官网查看英文版。 spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有...
  • Spark对程序提供了非常灵活的配置方式,可以使用环境变量、配置文件、命令行参数,还可以直接在Spark程序中指定,不同的配置方式有不同的优先级,可以相互覆盖。而且这些配置的属性在Web界面中可以直接看到,非常...
  • 今天看了王知无-大数据技术与架构老师的 Spark源码分析之Spark Shell 不由感慨 这不就是标准的模板脚本吗,今天我主要对启动过程中涉及到的一些shell脚本涉及的基础命令进行总结,在这里也非常感谢 老师兢兢业业的...
  • 12款最佳Linux命令行终端工具

    千次阅读 2021-05-10 10:11:52
    12款最佳Linux命令行终端工具如果你跟我一样,整天要花大量的时间使用Linux命令行,而且正在寻找一些可替代系统自带的老旧且乏味的终端软件,那你真是找对了文章。我这里搜集了一些非常有趣的终端软件,可以用来替代...
  • spark-shell中运行代码 编写Spark独立应用程序 在集群上运行Spark应用程序
  • Spark REPL

    2020-11-22 14:53:09
    文章目录一.Scala REPL二.Spark REPL三.总结 一.Scala REPL scala repl(“Read-Evaluate-Print-Loop”) 是一个交互式命令行解释器,它提供了一个测试scala代码的环境。ILoop和IMain是其核心实现。 属性 有用的REPL...
  • 77.SparkSQL中提供的spark-sql命令行的使用 78.SparkSQL中ThriftServer配置使用详解 79.SparkSQL性能优化及DataFrame是什么 80.SparkSQL读取多种数据数据实例 81.DataFrame实现多数据源数据的关联分析 82....
  • 总结命令行06:Spark

    2017-08-30 16:47:20
    启动命令行:spark-shell 启动spark:sbin/start-all.sh -> start-master.sh -> start-slaves.sh spark提交任务的过程 bin/spark-submit --class ...
  • 《深入理解Spark:核心思想与源码分析》一书前言的...核心思想与源码分析》一书第二章的内容请看链接《第2章SPARK设计理念与基本架构》Utils是Spark中最常用的工具类之一,如果不关心其实现,也不会对理解Spark有...
  • 实践数据湖icerberg专栏 本专栏记录iceberg菜鸟到专家的过程,记录踩坑,填坑,iceberg特征的了解与原理的认知
  • 原标题:Spark原理与实战(六)-- Spark任务提交与运行 导读:前面我们介绍了关于spark编程的基础知识以及三个简单示例,那么应该怎么去调试或者提交运行spark程序呢?作者:小舰 中国人民大学计算机硕士来源:DLab...
  • Spark

    2021-07-25 15:19:30
    Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点...
  • spark 部署安装

    千次阅读 2021-11-19 20:24:01
    安装笔Spark Local 模式搭建文档 在本地使用单机多线程模拟Spark集群中的各个角色 安装包下载 目前Spark最新稳定版本:课程中使用目前Spark最新稳定版本:3.1.x系列 ...
  • start-hbase.sh 2、停止 stop-hbase.sh 3、验证 jps jps -m web页面: http://bigdata01:16010/ 4、HBase的shell命令行 hbase shell 六、Scala 无常用命令 七、Spark 1、standalone模式 (1)启动 注意,这里的start...
  • Spark远程调试+页面监控-用最熟悉的方式开发Spark应用
  • Spark入门

    2022-04-16 12:51:19
    Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 一、Spark运行模式 部署Spark集群大体上分为两种模式:单机模式与集群模式 大多数分布式框架都支持单机模式,...(国内常用) 3)YARN模式:Spar
  • 比较常用的方式就是在spark的UI界面上查看。 一般的spark 因为driver所在的node是固定的,ip也就是固定的,所以能够通过ip或者域名映射后即可很方便的进行访问。 spark on k8s比较特别的是 driver是一个在k8s集群中...
  • 所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境 Spark的Standalone模式体现了经典的master-slave模式。 yarn模式就是将spark计算所需要的资源等的调度工作交由Yarn
  • Spark 操作 Hive

    千次阅读 2021-11-15 21:35:58
    文章目录内置Hive外部的 Hive代码操作 Hive运行 Spark SQL CLI运行 Spark beeline Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,243
精华内容 2,497
热门标签
关键字:

常用spark命令行