精华内容
下载资源
问答
  • spark使用

    千次阅读 2016-05-31 10:36:04
    一、基础概念:  RDD对象:  spark的核心对象,  文件等加载均转化为RDD对象(SparkContext.textFile(input_file) )  RDD对象属性、方法: ...二、使用 1.声明: from pyspark import SparkContext, Spark

    一、基础概念:

     RDD对象:

            spark的核心对象,

           文件等加载均转化为RDD对象(SparkContext.textFile(input_file)  )

     RDD对象属性、方法:

    map、reduce、flatmap、reducebykey
    

    二、使用

    1.声明:

    from pyspark import SparkContext, SparkConf  #pyspark ,python-spark支持


    appName = "程序名"   #也可以用于web上监控
    master = "spark://服务器名:端口" #服务器名可以使用ip

    conf = SparkConf().setAppName(appName).setMaster(master)
    sc = SparkContext(conf=conf)


    说明:

    val sc = new SparkContext(master, appName, [sparkHome], [jars])

    参数master指明集群的地址,是字符串,master可以是"local"--在本地单机运行,也可以是Spark或者Mesos集群的URL。
    参数appName是Spark应用的名称,会在集群的web界面里显示出来。
    参数sparkHome是spark的安装目录,注意,集群内所有节点的Spark必须安装在同样的目录下。
    参数jars是打包后的Spark应用,是本地目录,这些Jar包会被复制发送到集群内的所有节点执行。


    2.文件加载及处理:

    a.处理:

    """odflow.py"""
    from pyspark import SparkContext
    
    fileDir = "/TripChain3_Demo.txt"
    # sc = SparkContext("local", "ODFlow")
    sc = SparkContext("spark://ITS-Hadoop10:7077", "ODFlow")
    lines = sc.textFile(fileDir)
    
    # python不能直接写多行的lambda表达式,所以要封装在函数中
    def toKV(line):
        arr = line.split(",")
        t = arr[5].split(" ")[1].split(":")
        return (t[0]+t[1]+","+arr[11]+","+arr[18],1)
    
    r1 = lines.map( lambda line : toKV(line) ).reduceByKey(lambda a,b: a+b)
    # 排序并且存入一个(repartition)文件中
    r1.sortByKey(False).saveAsTextFile("/pythontest/output")

    b.发布

    spark-submit \
      --master spark://ITS-Hadoop10:7077 \
      odflow.py

    2.1 单个文件处理

    2.1.1 无配置文件情况

        data_file = sc.textFile(input_file)  #sc为SparkContext对象
        data_file.map(handle_one_line).filter(lambda x: len(x.split("\t"))==7).saveAsTextFile(output_file)


    2.1.2 有配置文件情况


    2.2 两个文件(多个文件)处理


    展开全文
  • spark使用文档

    千次阅读 2018-01-26 14:19:52
    最近在看了一些spark使用文档,以及官网简介,自己总结了一点使用文档,记录一下 快速启动spark 关于这一部分 Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab ...

    最近在看了一些spark的使用文档,以及官网简介,自己总结了一点使用文档,记录一下

    快速启动spark

    关于这一部分

    Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
    在Spark 2.0之前,Spark的主要编程接口是具有弹性的分布式数据集(RDD)。在Spark 2.0之后,RDDs被数据集取代,数据集就像RDD一样强类型,但在底层有更丰富的优化。

    Spark Shell的交互分析

    Spark的shell提供了一种学习API的简单方法,同时还提供了一种强大的工具,可以交互式地分析数据。它可以在Scala中(在JavaVM上运行,因此是使用现有Java库的好方法)或Python。启动它,在Spark目录中运行以下内容

    ./bin/spark-shell

    Spark的主要抽象是一个称为数据集的项目的分布式集合。数据集可以从Hadoop inputformat(比如HDFS文件)中创建,也可以通过转换其他数据集来创建。让我们从Spark源目录中的README文件的文本中创建一个新的数据集:

    scala> val textFile = spark.read.textFile("README.md")

    您可以通过调用一些操作来直接从数据集获得值,或者转换数据集以获得一个新的值:

    scala> textFile.count() //这个数据集中的项目数量
    scala> textFile.first()//这个数据集的第一项

    现在让我们将这个数据集转换为一个新的数据集。我们调用filter来返回一个新的数据集,其中有文件中条目的子集。

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

    我们可以将转换和动作链接在一起:

    scala> textFile.filter(line => line.contains("Spark")).count()

    spark其他的数据操作

    数据集动作和转换可以用于更复杂的计算。假设我们想要找到最符合要求的一行:

    scala> textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b)

    这首先将一条线映射到一个整型值,创建一个新的数据集。在该数据集上调用reduce,以找到最大的单词计数。映射和减少的参数是Scala函数字面量(闭包),可以使用任何语言特性或Scala/java库。例如,我们可以很容易地调用其他地方声明的函数。我们将使用math.max()函数使这段代码更容易理解:

    scala> import java.lang.Math
    import java.lang.Math
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

    一个常见的数据流模式是MapReduce,它是由Hadoop普及的。Spark可以很容易地实现MapReduce流:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

    在这里,我们调用flatMap,将行数据集转换为单词的数据集,然后将groupByKey和count组合起来,计算文件中的每个单词计数,作为一个(字符串,长)对的数据集。在我们的shell中收集单词计数,我们可以调用收集:

    scala> wordCounts.collect()

    spark缓存

    Spark还支持将数据集拉到集群范围内的内存缓存中。当数据被多次访问时,这是非常有用的,例如在查询一个小的“hot”数据集或者运行像PageRank这样的迭代算法时。作为一个简单的例子,让我们标记一下我们的linesWithSpark数据集来缓存:

    scala> linesWithSpark.cache()
    scala> linesWithSpark.count()
    scala> linesWithSpark.count()

    spark RDD编程

    概述:

    在高层次上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的主要功能,并在集群上执行各种并行操作。主要的抽象火花提供了一个有弹性的分布式数据集(RDD),它是一个可以并行操作的集群节点上划分的元素集合。RDDs是通过在Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件开始创建的,或者在驱动程序中使用现有的Scala集合,并对其进行转换。用户还可以要求Spark在内存中持久化一个RDD,这样就可以在并行操作中有效地重用它。最后,RDDs会自动从节点故障中恢复。
    Spark的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当Spark在不同的节点上并行地运行一个函数时,它会将函数中使用的每个变量的副本都复制到每个任务中。有时,需要在任务之间共享一个变量,或者在任务和驱动程序之间共享一个变量。Spark支持两种类型的共享变量:广播变量,它可以用于在所有节点上缓存一个值,以及累计变量,这些变量只被“添加”到计数器和求和等变量中。

    spark shell使用

    在Spark shell中,在名为sc的变量中已经为您创建了一个特殊的解释器感知的SparkContext,使您自己的SparkContext无法工作。您可以设置上下文连接使用-主参数的主上下文,您可以通过将一个逗号分隔的列表传递给–jars参数,将jar添加到类路径中。您还可以通过向–packages参数提供一个逗号分隔的Maven坐标列表,任何依赖项可能存在的附加存储库(例如Sonatype)都可以传递给–repositores参数。例如,要在运行/spark-shell,请使用:

    $ ./bin/spark-shell --master local[4]

    或者, 添加 code.jar 包在使用路径, 请使用:

    $ ./bin/spark-shell --master local[4] --jars code.jar

    使用Maven坐标包含依赖项:

    $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

    注意:对于一个完整的选项列表,运行spark-shell –help。

    spark 外部数据集

    Spark可以从Hadoop支持的任何存储源中创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等等。Spark支持文本文件、序列文件和任何其他Hadoop InputFormat。
    可以使用SparkContext的textFile方法创建文本文件RDDs。该方法获取文件的URI(或者是机器上的本地路径,或者是hdfs://、s3n://等等URI),并将其作为行的集合读取。下面是一个示例调用:

    scala> val distFile = sc.textFile("data.txt")

    一旦创建,distFile就可以由数据集操作来执行。例如,我们可以使用映射来添加所有行的大小,并减少操作,如下:

    distFile.map(s => s.length).reduce((a, b) => a + b).

    spark RDD操作

    RDDs支持两种类型的操作:转换,它从现有的一个中创建一个新的数据集,以及操作,它在数据集上运行一个计算之后,将一个值返回给驱动程序。例如,map是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。另一方面,reduce是一个使用某个函数聚合所有RDD元素的动作,并将最终结果返回给驱动程序(尽管也有一个并行的还原ebykey返回一个分布式数据集)。
    Spark中的所有转换都是惰性的,因为它们不会立即计算结果。相反,它们只记住应用于一些基本数据集(例如一个文件)的转换。只有当一个动作需要返回到驱动程序的结果时,才会计算转换。这种设计使Spark能够更高效地运行。例如,我们可以认识到,通过map创建的数据集将被用于减少,只返回到驱动程序的结果,而不是更大的映射数据集。
    Spark的API严重依赖于驱动程序中的传递函数来在集群上运行。有两种推荐的方法:

    1. 匿名函数语法,它可以用于简短的代码。
    2. 全局单例对象中的静态方法。例如,您可以定义对象my函数,然后传递my函数。func1,如下所示:
    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    myRdd.map(MyFunctions.func1)

    3.请注意,虽然可以在类实例中传递对方法的引用(而不是单例对象),但这需要发送包含该类的对象和方法。例如,考虑:

    class MyClass {
      def func1(s: String): String = { ... }
      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
    }

    RDD的基础数据类型

    并行集合

    并行集合(Parallelized collections)的创建是通过一个已有的集合(Scala Seq)上调用SparkContext的parallelize方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中,例如:这里演示了如何一个包含1到5数组种创建并行集合:

    Val data=Array(1,2,3,4,5)
    Val distData=sc.parallelize(data)

    一旦创建完成,这个分布式数据集(distData)就可以被并行操作,例如:我们可以调用distData.reduce(a,b)=>a+b,将这个数组中的元素相加,我们以后在描述在分布式上的一些操作。
    并行集合一个很重要的参数是切片书(slices),表示一个数据集切分的分数,Spark会在集群上运行一个任务。你可以在集群上为每个CPU设置2-4个切片(slices),正常情况下,Spark会试着基于你的集群状况自动地设置切片的数目,然而,你也可以通过parallelize的第二个参数手动地设置(例如:sc.parallelize(data,10))。

    外部数据集

    Spark可以从任何一个Hadoop支持的存储源创建分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等,Spark支持文本文件(text files),SequenceFiles和其他Hadoop InputFormat。
    文本文件RDDs可以使用SparkContext的textFile方法创建,在这个方法里传入文件的URI(机器上的本地路径或hdfs://,s3n://等),然后他会将文件读取成一个行集合,这里是一个调用例子:

    Scala> val distFile=sc.textFile(“data.txt”)

    RDD使用

    RDD actions和tranformations能被用在更多的复杂计算中,比方说,我们想要找到一行中最多的单词数量:

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

    首先履行映射成一个整形数值产生一个新RDD上调用reduce找到行中最大的个数,map和reduce的参数是Scala的函数串(闭包),并且可以使用任何语言特性或者Scala/Java类库,例如,我们可以很方便的调用其他的函数声明,我们使用Math.max()函数让代码更容易理解:

    scala> import java.lang.Math
    import java.lang.Math
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res5:Int=15

    Hadoop流行的一个通用的数据流模式是MapReduce。Spark能够很容易的实现MapReduce

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

    这里,我们结合flatMap,map和reduceByKey来计算文件里每个单词出现的数量,他的结果是包含一组(String,int)键值对的RDD,我们可以使用[collect]操作在我们的shell中手机单词的数量:

    scala> wordCounts.collect()

    spark SQL使用

    概述

    Spark SQL是结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口提供了关于数据的结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息执行额外的优化。有几种方法可以与Spark SQL进行交互,其中包括SQL和Dataset API。当计算结果时,使用相同的执行引擎,独立于您使用的api/语言来表示计算。这种统一意味着开发人员可以很容易地在不同的api之间来回切换,这提供了最自然的方式来表达给定的转换。

    Starting Point: SparkSession

    Spark中所有功能的入口点是SparkSession类。要创建一个基本的SparkSession,只需使用SparkSession.builder():

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //对于隐式转换,比如将rdd转换为DataFrames
    import spark.implicits._

    创建DataFrames

    通过一个SparkSession,应用程序可以从一个现有的RDD中创建DataFrames,从一个Hive表,或者从Spark数据源。
    作为一个示例,下面创建一个基于JSON文件内容的DataFrame:

    val df = spark.read.json("examples/src/main/resources/people.json")
    // Displays the content of the DataFrame to stdout
    df.show()

    DataFrames为在Scala、Java、Python和r中的结构化数据操作提供了一种领域特有的语言。
    如前所述,在Spark 2.0中,DataFrames只是Scala和Java API中的行数据集。这些操作也被称为“非类型转换”,与“类型转换”形成鲜明对比的是强类型的scala/java数据集。

    这里我们包括一些使用数据集的结构化数据处理的基本示例:

    import spark.implicits._
    df.printSchema()
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
    df.filter($"age" > 21).show()
    df.groupBy("age").count().show()
    

    除了简单的列引用和表达式之外,Datasets还有一个丰富的函数库,包括字符串操作、日期算术、常见的数学运算等等。完整的列表在DataFrame函数引用中可用。

    以编程方式运行SQL查询

    SparkSession中的sql函数使应用程序能够以编程的方式运行sql查询,并将结果作为DataFrame返回。

    //将DataFrame注册为一个SQL临时视图
    df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()

    全局临时视图

    Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,则会消失。如果您想要在所有会话中共享一个临时视图,并在Spark应用程序终止之前保持活力,您可以创建一个全局临时视图。全局临时视图绑定到一个系统保存的数据库globaltemp,我们必须使用限定名来引用它,e.g. SELECT * FROM global_temp.view1.

    //将DataFrame注册为全局临时视图
    df.createGlobalTempView("people")
    //全局临时视图绑定到一个系统保存的数据库globaltemp
    spark.sql("SELECT * FROM global_temp.people").show()
    //全局临时视图是交叉会话
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    

    创建数据集

    然而,数据集与RDDs类似,而不是使用Java序列化或Kryo,它们使用专门的编码器来序列化对象,以便在网络上进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并且使用一种格式,允许Spark执行许多操作,如过滤、排序和散列,而不将字节反序列化为对象。

    //您可以使用实现产品接口的自定义类
    case class Person(name: String, age: Long)
    //为案例类创建编码器
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    //大多数常见类型的编码器是通过引入spark.implicits自动提供的._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    //通过提供一个类,可以将DataFrames转换为数据集。映射将通过名称进行
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()

    使用反射推断模式

    Spark SQL的Scala接口支持自动将包含case类的RDD转换为DataFrame。case类定义了表的模式。对case类的参数的名称是使用反射读取的,并成为列的名称。Case类也可以嵌套或包含复杂的类型,例如Seqs或数组。这个RDD可以隐式地转换为一个DataFrame,然后将其注册为一个表。表可以在后续的SQL语句中使用。

        import spark.implicits._
    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    //将DataFrame注册为一个临时视图
    peopleDF.createOrReplaceTempView("people")
    //可以使用Spark提供的SQL方法来运行SQL语句
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    //结果中的列的列可以通过字段索引来访问
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    //基本类型和case类也可以定义为
    // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
    // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

    以编程的方式指定模式

    当case类不能提前定义时(例如,记录的结构是在一个字符串中编码的,或者文本数据集将被解析,不同的用户将对字段进行不同的预测),一个DataFrame可以用3个步骤来创建。
    1. 从原始的RDD中创建一个行;
    2. 创建在第1步中创建的RDD中所表示的结构类型的结构类型。
    3. 通过SparkSession提供的createDataFrame方法将模式应用到行的RDD中。

    import org.apache.spark.sql.types._
    // 创建一个RDD
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    // 模式是用字符串编码的
    val schemaString = "name age"
    // 根据模式的字符串生成模式
    val fields = schemaString.split(" ")
     .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    // 将RDD(人员)的记录转换为行
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))
    // 将该模式应用于RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    // 使用DataFrame创建一个临时视图
    peopleDF.createOrReplaceTempView("people")
    // 可以通过使用DataFrames创建的临时视图来运行SQL
    val results = spark.sql("SELECT name FROM people")
    // SQL查询的结果是DataFrames并支持所有常规的RDD操作
    // 结果中的列的列可以通过字段索引或字段名来访问。
    results.map(attributes => "Name: " + attributes(0)).show()

    聚合函数

    内置的DataFrames函数提供了诸如count()、countDistinct ()、avg()、max()、min()等公共聚合,而这些函数都是为DataFrames设计的,Spark SQL也为Scala和Java中的一些提供了类型安全的版本,用于与强类型数据集一起工作。此外,用户不仅限于预定义的聚合函数,而且可以创建自己的聚合函数。

    无类型定义的聚合函数

    用户必须扩展UserDefinedAggregateFunction抽象类实现一个自定义的无类型的聚合函数。例如,用户定义的平均值可以是这样的:

    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    object MyAverage extends UserDefinedAggregateFunction {
      // 这个聚合函数的输入参数的数据类型
      def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
      // 聚合缓冲区中的值的数据类型
      def bufferSchema: StructType = {
        StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
      }
      // 返回值的数据类型  
    def dataType: DataType = DoubleType
      // 这个函数是否总是在相同的输入上返回相同的输出
      def deterministic: Boolean = true
      // 初始化给定的聚合缓冲区。缓冲区本身是一个行,除了
      // 标准方法,例如检索索引中的值(例如get()、getBoolean())
      // 更新它的价值的机会。注意,缓冲区内的数组和映射仍然是  
    // 不变的.
      def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
      // 使用来自输入的新输入数据更新给定的聚合缓冲区缓冲区  
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0)) {
          buffer(0) = buffer.getLong(0) + input.getLong(0)
          buffer(1) = buffer.getLong(1) + 1
        }
      }
      // 合并两个聚合缓冲区,并将更新后的缓冲区值存储到buffer1
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
      // 计算出最终结果  
    def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }
    // 注册该函数以访问它
    spark.udf.register("myAverage", MyAverage)
    val df = spark.read.json("examples/src/main/resources/employees.json")
    df.createOrReplaceTempView("employees")
    df.show()
    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()
    

    类型安全的用户定义的聚合函数

    对于强类型数据集的用户定义聚合是围绕聚集器抽象类进行的。例如,类型安全的用户定义的平均值可以是这样的:

    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.SparkSession
    case class Employee(name: String, salary: Long)
    case class Average(var sum: Long, var count: Long)
    object MyAverage extends Aggregator[Employee, Average, Double] {
      // 这个聚合的零值。应该满足任意b+0=b的性质
      def zero: Average = Average(0L, 0L)
      // 结合两个值来生成一个新值。对于性能,函数可以修改缓冲区 
     // 然后返回,而不是新建一个对象
      def reduce(buffer: Average, employee: Employee): Average = {
        buffer.sum += employee.salary
        buffer.count += 1
        buffer
      }
      // 合并两个中间值
      def merge(b1: Average, b2: Average): Average = {
        b1.sum += b2.sum
        b1.count += b2.count
        b1
      }
      // 转换输出的输出
      def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
      // 指定中间值类型的编码器
      def bufferEncoder: Encoder[Average] = Encoders.product
      // 指定最终输出值类型的编码器  
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }
    val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
    ds.show()
    // 将函数转换为TypedColumn并给它起一个名字
    val averageSalary = MyAverage.toColumn.name("average_salary")
    val result = ds.select(averageSalary)
    result.show()

    数据源

    Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用关系转换操作,也可以用来创建临时视图。将一个DataFrame注册为一个临时视图允许您在其数据上运行SQL查询。本节描述使用Spark数据源加载和保存数据的一般方法,然后进入用于内置数据源的特定选项。

    通用的加载/保存功能

    在最简单的形式中,默认数据源(由spark.sql.sources.default拼花,除非另有配置)将用于所有操作。

    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

    手动指定选项

    您还可以手动指定将使用的数据源,以及您想要传递给数据源的任何额外选项。数据源由其完全限定的名称(即:org.apache.spark.sql.parquet),但对于内置的来源还可以使用短名称(json、jdbc、csv、文本)。从任何数据源类型加载的DataFrames都可以使用这种语法转换为其他类型。

    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

    直接在文件上运行SQL

    您可以使用SQL来直接查询该文件,而不是使用read API将文件加载到DataFrame并查询它。

    val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

    Bucketing, Sorting and Partitioning

    对于基于文件的数据源,还可以对输出进行存储、排序或分区。嵌接和排序仅适用于持久表:

    peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

    在使用数据集api时,可以同时使用分区和save。

    usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

    对于单个表,可以使用分区和嵌接:

    peopleDF
      .write
      .partitionBy("favorite_color")
      .bucketBy(42, "name")
      .saveAsTable("people_partitioned_bucketed")

    分区创建一个目录结构,正如在分区发现节中所描述的那样。因此,它对具有高基数的列的适用性有限。相反,bucketa将数据分布在固定数量的桶中,当许多惟一值是无界的时,可以使用它。

    结构化流编程

    结构化流是一个可伸缩的、容错的流处理引擎,构建在Spark SQL引擎上。您可以用相同的方式表示流计算,就像在静态数据上表示批处理一样。Spark SQL引擎将会逐渐地、持续地运行它,并在流数据继续到达时更新最终结果。您可以使用Scala、Java、Python或R中的dataset/dataframe API来表示流媒体聚合、事件时间窗口、流到批连接等等,这些计算都是在同一个优化的Spark SQL引擎上执行的。最后,系统确保了端到端的端对端容错保证,通过检查点和写前面的日志。简而言之,结构化流提供了快速、可伸缩的、容错的、端到端的即时处理,而无需用户去考虑流媒体。

    简单的例子

    假设您想要维护从一个数据服务器接收到的一个正在运行的文本数据,而这个数据服务器是通过TCP套接字监听的。让我们看看如何使用结构化的流媒体来表达这个问题。如果你下载Spark,你可以直接运行这个例子。在任何情况下,让我们一步一步地了解这个示例,并了解它是如何工作的。首先,我们必须导入必要的类并创建一个局部SparkSession,这是与Spark相关的所有功能的起点。

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .getOrCreate()
    import spark.implicits._

    接下来,让我们创建一个流数据的DataFrame,它表示从服务器接收到的服务器上接收的文本数据,并将DataFrame转换为计算单词计数。

    //创建来自连接到localhost:9999的输入行流的DataFrame
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val words = lines.as[String].flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    现在,我们已经对流数据进行了查询。剩下的就是开始接收数据并计算计数。要做到这一点,我们设置它来打印完整的计数集(outputMode(“complete”)指定,每次更新时都要输出到控制台。然后使用start()启动流计算。

    // 开始运行将运行计数打印到控制台的查询
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()
    query.awaitTermination()

    在执行该代码之后,流计算将在后台启动。查询对象是一个对该活动流查询的句柄,我们已经决定使用瓦伊终止()等待查询的终止,以防止在查询处于活动状态时退出进程。
    要实际执行这个示例代码,您可以在自己的Spark应用程序中编译代码,也可以在下载Spark后运行这个示例。我们展示的是后者。您首先需要运行Netcat(在大多数类unix系统中发现的一个小型实用程序)作为数据服务器

    $ nc -lk 9999

    然后,在一个不同的终端中,您可以通过使用

    $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

    然后,在运行netcat服务器的终端中输入的任何行都将被计数并在屏幕上打印。

    spark streaming编程

    Spark流是核心Spark API的一个扩展,它支持可伸缩、高吞吐量、容错的实时数据流处理。数据可以从许多来源中摄取,比如卡夫卡、Flume、kin遥感或TCP套接字,并且可以使用复杂的算法来处理,这些算法使用高级功能,如map、reduce、join和window。最后,处理的数据可以被推送到文件系统、数据库和实时指示板。实际上,您可以在数据流上应用Spark的机器学习和图形处理算法。

    image

    在内部,它是这样工作的。Spark流接收实时的输入数据流,并将数据分成几批,然后由Spark引擎处理,以批量生成最终的结果流。

    image

    Spark流提供了一个称为离散流或DStream的高级抽象,它代表了连续的数据流。DStreams可以从诸如卡夫卡、Flume和kinor等来源的输入数据流中创建,也可以通过在其他DStreams上应用高级操作来创建。在内部,DStream被表示为一个RDDs的序列。

    一个简单的例子

    首先,我们导入了Spark流类的名称,以及从StreamingContext到我们的环境中的一些隐式转换,以便向我们需要的其他类添加有用的方法(如DStream)。StreamingContext是所有流媒体功能的主要入口点。我们使用两个执行线程创建一个本地流上下文,一个批处理间隔为1秒。

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // 创建一个本地流媒体上下文,使用两个工作线程和一个1秒的批处理间隔。.
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    使用这个上下文,我们可以创建一个DStream,它表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999)。

    val lines = ssc.socketTextStream("localhost", 9999)

    这行DStream表示将从数据服务器接收到的数据流。这个DStream中的每个记录都是一行文本。接下来,我们要将空格字符分割成单词。

    val words = lines.flatMap(_.split(" "))

    flatMap是一对多的DStream操作,它通过在源DStream中的每个记录生成多个新记录来创建一个新的DStream。在这种情况下,每一行将被分割成多个单词,而单词流则表示为DStream。接下来,我们要计算这些单词。

    import org.apache.spark.streaming.StreamingContext._ 
    // 在每批中计算每个单词
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    // 将这个DStream中生成的每个RDD的前10个元素打印到控制台
    wordCounts.print()

    当这些行被执行时,Spark流只会设置它在启动时将执行的计算,并且还没有开始真正的处理。在所有转换设置之后开始处理,我们最后调用

    ssc.start()             // 开始计算
    ssc.awaitTermination()  // 等待计算终止

    完整的代码可以在Spark流示例网络wordcount中找到。
    如果您已经下载并构建了Spark,那么您可以像下面这样运行这个示例。您首先需要运行Netcat(在大多数类unix系统中发现的一个小型实用程序)作为数据服务器

    $ nc -lk 9999

    然后,在一个不同的终端中,您可以通过使用

    $ ./bin/run-example streaming.NetworkWordCount localhost 9999

    然后,在运行netcat服务器的终端中输入的任何行都将被计数并在屏幕上打印。

    展开全文
  • Spark使用explode展开嵌套的JSON数据

    万次阅读 2016-08-17 08:47:08
    Spark使用explode展开嵌套的JSON数据

    在使用Spark的人中,估计很多人都会使用DataFrame及SQLContext,而在众多的数据格式中,很可能会遇到JSON数据,此数据还可能包含嵌套关系,比如像如下的JSON数据:

    {"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
    {"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
    {"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
    

    此时,如果我们直接用DataFrame的show方法可以看到:

    +---+--------------------+-------+
    |age|             myScore|   name|
    +---+--------------------+-------+
    | 25|  [[23,19], [50,58]]|Michael|
    | 30|[[33,29], [52,38]...|   Andy|
    | 19|  [[43,39], [53,28]]| Justin|
    +---+--------------------+-------+
    
    root
     |-- age: long (nullable = true)
     |-- myScore: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- score2: long (nullable = true)
     |    |    |-- score1: long (nullable = true)
     |-- name: string (nullable = true)

    由于myScore是一个数组,所以,在上述show得到的表中,我们不能直接使用sql来查询或聚合,那么如何才能将myScore的数组类型展开呢?
    我们可以考虑使用explode函数,如下

    val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
    df.show()
    df.printSchema()
    val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
    val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
    dfScore.show()

    此时,会得到如下结果,这个时候的表,就跟我们平时看到的关系型数据庫的表是一样的了,接下来,我们就可以执行相关的sql查询了。

    +-------+-----------------+------------------+
    |   name|           score1|            score2|
    +-------+-----------------+------------------+
    |Michael|               19|                23|
    |Michael|               58|                50|
    |   Andy|               29|                33|
    |   Andy|               38|                52|
    |   Andy|               88|                71|
    | Justin|               39|                43|
    | Justin|               28|                53|
    +-------+-----------------+------------------+

    完整的代码如下:

    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SQLContext}
    import org.apache.spark.sql.functions._
    import org.junit.{After, Before, Test}
    import org.junit.Assert.assertEquals
    
    /**
      * Created by yang on 8/16/16.
      */
    class Test {
    
      @transient var sc: SparkContext = _
    
      @transient var sqlContext:SQLContext = _
    
      @Before
      def init(): Unit ={
        val conf = new SparkConf().setAppName("Test").setMaster("spark://master:7077")
        sc = new SparkContext(conf)
        sqlContext = new org.apache.spark.sql.SQLContext(sc)
      }
    
      @Test
      def TestMapFun(): Unit ={
        val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
        df.show()
        df.printSchema()
    
        val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
        val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
        dfMyScore.show()
    
        dfMyScore.registerTempTable("table1")
        val result = sqlContext.sql("select name,avg(hwScore_Std),avg(exScore_Std) from table1")
        assertEquals(7,dfMyScore.count())
      }
    }  

    以上代码需要一些包,我是用sbt构建的,内容如下:

    name := "Test"
    
    version := "1.0"
    
    scalaVersion := "2.10.5"
    
    libraryDependencies += "junit" % "junit" % "4.12" % "test"
    
    libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
    
    // https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10
    libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0"
    展开全文
  • spark使用总结

    万次阅读 2016-10-15 23:40:44
    如果要缓存的数据太多,内存中放不下,spark会自动利用最近最少使用(LRU)策略把最老的分区从内存中移除。对于仅放在内存中的缓存级别,下次要用到已被移除的分区时,这些分区就需要重新计算。对于使用内存与磁盘的...

    弹性分布式数据集(RDD)是分布式处理的一个数据集的抽象, RDD是只读的,在RDD之上的操作都是并行的 。实际上,RDD只是一个逻辑实体,其中存储了分布式数据集的一些信息,并没有包含所谓的“物理数据”,“物理数据”只有在RDD被计算并持久化之后才存在于内存或磁盘中。RDD的重要内部属性有:

    • 计算RDD分区的函数。
    • 所依赖的直接父RDD列表。
    • RDD分区及其地址列表。
    • RDD分区器。
    • RDD分区优先位置。

    RDD操作起来与Scala集合类型没有太大差别,这就是Spark追求的目标:像编写单机程序一样编写分布式程序,但它们的数据和运行模型有很大的不同,用户需要具备更强的系统把控能力和分布式系统知识。

    Transformation与Action

    RDD提供了两种类型的操作: transformation操作 (转化操作)和 action操作 (行动操作)。transformation操作是得到一个新的RDD ,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD。action操作则是得到其他数据类型 的结果。

    所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,spark在内部只是用新的RDD记录这些transformation操作并形成RDD对象的有向无环图(DAG),计算只有在action被提交的时候才被触发。实际上,我们不应该把RDD看作存放着特定数据的数据集,而最好把每个RDD当作我们通过transformation操作构建出来的、记录如何计算数据的指令列表。

    RDD的action算子会触发一个新的job,spark会在DAG中寻找是否有cached或者persisted的中间结果,如果没有找到,那么就会重新执行这些中间过程以重新计算该RDD。因此,如果想在多个action操作中重用同一个RDD,那么最好使用cache() /persist() 将RDD缓存在内存中,但如果RDD过大,那么最好使用 persist(StorageLevel.MEMORY_AND_DISK) 代替。注意cache/persist仅仅是设置RDD的存储等级,因此你应该在第一次调用action之前调用cache/persist。cache/persist使得中间计算结果存在内存中,这个才是说为啥Spark是内存计算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允许你把中间结果放内存里。

    在spark程序中打印日志时,尤其需要注意打印日志的代码很有可能使用到了action算子,如果没有缓存中间RDD就可能导致程序的效率大大降低。另外,如果一个RDD的计算过程中有抽样、随机值或者其他形式的变化,那么一定要缓存中间结果,否则程序执行结果可能都是不准确的!

    RDD持久化(缓存)

    正如在转化和行动操作部分所说的一样,为了避免在一个RDD上多次调用action操作从而可能导致的重新计算,我们应该将该RDD在第一次调用action之前进行持久化。对RDD进行持久化对于迭代式和交互式应用非常有好处,好处大大滴有。

    持久化可以使用 cache() 或者 persist() 。默认情况下的缓存级别为 MEMORY_ONLY ,spark会将对象直接缓存在JVM的堆空间中,而不经过序列化处理。我们可以给persist()传递持久化级别参数以指定的方式持久化RDD。MEMORY_AND_DISK 持久化级别尽量将RDD缓存在内存中,如果内存缓存不下了,就将剩余分区缓存在磁盘中。MEMORY_ONLY_SER 将RDD进行序列化处理(每个分区序列化为一个字节数组)然后缓存在内存中。还有MEMORY_AND_DISK_SER 等等很多选项。选择持久化级别的原则是:尽量选择缓存在内存中,如果内存不够,则首选序列化内存方式,除非RDD分区重算开销比缓存到磁盘来的更大(很多时候,重算RDD分区会比从磁盘中读取要快)或者序列化之后内存还是不够用,否则不推荐缓存到磁盘上。

    如果要缓存的数据太多,内存中放不下,spark会自动利用最近最少使用(LRU)策略把最老的分区从内存中移除。对于仅放在内存中的缓存级别,下次要用到已被移除的分区时,这些分区就需要重新计算。对于使用内存与磁盘的缓存级别,被移除的分区都会被写入磁盘。

    另外,RDD还有一个 unpersist() 方法,用于手动把持久化的RDD从缓存中移除。

    环境变量 SPARK_LOCAL_DIRS 用来设置RDD持久化到磁盘的目录,它同时也是shuffle的缓存目录。

    各种RDD与RDD操作

    基本RDD

    抽象类 RDD 包含了各种数据类型的RDD都适用的通用操作。下面对基本类型RDD的操作进行分门别类地介绍。

    针对各个元素的转化操作:

    • map : 对各个元素进行映射操作。
    • flatMap : 对各个元素进行映射操作,并将最后结果展平。
    • filter : 过滤不满足条件的元素。filter操作可能会引起数据倾斜,甚至可能导致空分区,新形成的RDD将会包含这些可能生成的空分区。所有这些都可能会导致问题,要想解决它们,最好在filter之后重新分区。

    伪集合操作:

    尽管RDD不是严格意义上的集合,但它支持许多数学上的集合操作。注意:这些操作都要求操作的RDD是相同的数据类型的。

    • distinct : 对RDD中的元素进行去重处理。需要注意的是,distinct操作开销很大,因为它需要shuffle所有数据,以确保每一个元素都只有一份。
    • union : 返回一个包含两个或多个RDD中所有元素的RDD。spark的union并不会去重,这点与数学上的不同。
    • intersection : 返回两个RDD中都有的元素。intersection会在运行时除去所有重复的元素,因此它也需要shuffle,性能要差一些。
    • subtract : 返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。它也需要shuffle。
    • cartesian : 计算两个RDD的笛卡尔积。需要注意的是,求大规模RDD的笛卡尔积开销巨大。
    • sample : 对RDD进行采样,返回一个采样RDD。

    基于分区的转化操作:

    • glom : 将每个分区中的所有元素都形成一个数组。如果在处理当前元素时需要使用前后的元素,该操作将会非常有用,不过有时我们可能还需要将分区边界的数据收集起来并广播到各节点以备使用。
    • mapPartitions : 基于分区的map,spark会为操作分区的函数该分区的元素的迭代器。
    • mapPartitionsWithIndex : 与mapPartitions不同之处在于带有分区的序号。

    管道(pipe)操作:

    spark在RDD上提供了 pipe() 方法。通过pipe(),你可以使用任意语言将RDD中的各元素从标准输入流中以字符串形式读出,并将这些元素执行任何你需要的操作,然后把结果以字符串形式写入标准输出,这个过程就是RDD的转化操作过程。

    使用pipe()的方法很简单,假如我们有一个用其他语言写成的从标准输入接收数据并将处理结果写入标准输出的可执行脚本,我们只需要将该脚本分发到各个节点相同路径下,并将其路径作为pipe()的参数传入即可。

    行动操作:

    • foreach : 对每个元素进行操作,并不会返回结果。
    • foreachPartition : 基于分区的foreach操作,操作分区元素的迭代器,并不会返回结果。
    • reduce : 对RDD中所有元素进行规约,最终得到一个规约结果。reduce接收的规约函数要求其返回值类型与RDD中元素类型相同。
    • fold : 与reduce类似,不同的是,它接受一个“初始值”来作为每个分区第一次调用时的结果。fold同样要求规约函数返回值类型与RDD元素类型相同。
    • aggregate : 与reduce和fold类似,但它把我们从返回值类型必须与所操作的RDD元素类型相同的限制中解放出来。
    • count : 返回RDD元素个数。
    • collect : 收集RDD的元素到driver节点,如果数据有序,那么collect得到的数据也会是有序的。大数据量最好不要使用RDD的collect,因为它会在本机上生成一个新的Array,以存储来自各个节点的所有数据,此时更好的办法是将数据存储在HDFS等分布式持久化层上。
    • take : 返回指定数量的元素到driver节点。它会尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。需要注意的是,该操作返回元素的顺序与你预期的可能不一样。
    • top : 如果为元素定义了顺序,就可以使用top返回前几个元素。
    • takeSample : 返回采样数据。

    键值对RDD

    PairRDDFunctions 封装了用于操作键值对RDD的一些功能函数。一些文件读取操作( sc.sequenceFile() 等)会直接返回RDD[(K, V)]类型。在RDD上使用map操作也可以将一个RDD转换为RDD[(K, V)]类型。在用Scala书写的Spark程序中,RDD[(K, V)]类型到PairRDDFunctions类型的转换一般由隐式转换函数完成。

    基本类型RDD的操作同样适用于键值对RDD。下面对键值对类型RDD特有的操作进行分门别类地介绍。

    针对各个元素的转化操作:

    • mapValues : 对各个键值对的值进行映射。该操作会保留RDD的分区信息。
    • flatMapValues : 对各个键值对的值进行映射,并将最后结果展平。该操作会保留RDD的分区信息。

    聚合操作:

    • reduceByKey : 与reduce相当类似,它们都接收一个函数,并使用该函数对值进行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是对键相同的值进行规约,并最终形成RDD[(K, V)],而不像reduce那样返回单独一个“值”。
    • foldByKey : 与fold类似,就像reduceByKey之于reduce那样。熟悉MapReduce中的合并器(combiner)概念的你可能已经注意到,reduceByKey和foldByKey会在为每个键计算全局的总结果之前先自动在每台机器上进行本地合并。用户不需要指定合并器。更泛化的combineByKey可以让你自定义合并的行为。
    • combineByKey : 是最常用的基于键进行聚合的函数,大多数基于键聚合的函数都是用它实现的。与aggregate一样,combineByKey可以让用户返回与输入数据的类型不同的返回值。combineByKey的内部实现分为三步来完成:首先根据是否需要在map端进行combine操作决定是否对RDD先进行一次mapPartitions操作(利用createCombiner、mergeValue、mergeCombiners三个函数)来达到减少shuffle数据量的作用。第二步根据partitioner对MapPartitionsRDD进行shuffle操作。最后在reduce端对shuffle的结果再进行一次combine操作。

    数据分组:

    • groupBy : 根据自定义的东东进行分组。groupBy是基本RDD就有的操作。
    • groupByKey : 根据键对数据进行分组。虽然 groupByKey + reduce 也可以实现reduceByKey 一样的效果,但是请你记住:groupByKey是低效的,而reduceByKey会在本地先进行聚合,然后再通过网络传输求得最终结果。

    在执行聚合或分组操作时,可以指定分区数以对并行度进行调优。

    连接:

    • cogroup : 可以对多个RDD进行连接、分组、甚至求键的交集。其他的连接操作都是基于cogroup实现的。
    • join : 对数据进行内连接,也即当两个键值对RDD中都存在对应键时才输出。当一个输入对应的某个键有多个值时,生成的键值对RDD会包含来自两个输入RDD的每一组相对应的记录,也即笛卡尔积。
    • leftOuterJoin : 即左外连接,源RDD的每一个键都有对应的记录,第二个RDD的值可能缺失,因此用Option表示。
    • rightOuterJoin : 即右外连接,与左外连接相反。
    • fullOuterJoin : 即全外连接,它是是左右外连接的并集。

    如果一个RDD需要在多次连接操作中使用,对该RDD分区并持久化分区后的RDD是有益的,它可以避免不必要的shuffle。

    数据排序:

    在基本类型RDD中, sortBy() 可以用来排序, max()min() 则可以用来方便地获取最大值和最小值。另外,在OrderedRDDFunctions中,存在一个sortByKey() 可以方便地对键值对RDD进行排序,通过spark提供的隐式转换函数可以将RDD自动地转换为OrderedRDDFunctions,并随意地使用它的排序功能。

    行动操作:

    键值对RDD提供了一些额外的行动操作供我们随意使用。如下:

    • countByKey : 对每个键对应的元素分别计数。
    • collectAsMap : 将结果以Map的形式返回,以便查询。
    • lookup : 返回给定键对应的所有值。

    数值RDD

    DoubleRDDFunctions 为包含数值数据的RDD提供了一些描述性的统计操作,RDD可以通过隐式转换方便地使用这些方便的功能。

    这些数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以StatCounter 对象返回。如果你只想计算这些统计数据中的一个,也可以直接对RDD调用对应的方法。更多信息参见Spark API。

    RDD依赖、窄宽依赖

    RDD依赖与DAG

    一系列转化操作形成RDD的有向无环图(DAG),行动操作触发作业的提交与执行。每个RDD维护了其对 直接父RDD (一个或多个)的依赖,其中包含了父RDD的引用和依赖类型信息,通过dependencies() 我们可以获取对应RDD的依赖,其返回一个依赖列表。

    通过RDD的父RDD引用就可以从DAG上向前回溯找到其所有的祖先RDD。spark提供了 toDebugString 方法来查看RDD的谱系。对于如下一段简单的代码:

    val input = sc.parallelize(1 to 10)
    val repartitioned = input.repartition(2)
    val sum = repartitioned.sum
    

    我们就可以通过在RDD上调用toDebugString来查看其依赖以及转化关系,结果如下:

    // input.toDebugString
    res0: String = (4) ParallelCollectionRDD[0] at parallelize at <console>:21 []
    
    // repartitioned.toDebugString
    res1: String =
    (2) MapPartitionsRDD[4] at repartition at <console>:23 []
     |  CoalescedRDD[3] at repartition at <console>:23 []
     |  ShuffledRDD[2] at repartition at <console>:23 []
     +-(4) MapPartitionsRDD[1] at repartition at <console>:23 []
        |  ParallelCollectionRDD[0] at parallelize at <console>:21 []
    

    上述 repartitioned 的依赖链存在两个缩进等级。同一缩进等级的转化操作构成一个Stage(阶段),它们不需要混洗(shuffle)数据,并可以流水线执行(pipelining)。

    窄依赖和宽依赖

    spark中RDD之间的依赖分为 窄(Narrow)依赖宽(Wide)依赖 两种。我们先放出一张示意图:

    窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。图中,map/filter和union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。

    宽依赖指子RDD的分区依赖于父RDD的多个或所有分区,这是因为 shuffle 类操作,如图中的groupByKey和未经协同划分的join。

    窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier(路障)): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。关于流水线优化,从MapPartitionsRDD中compute()的实现就可以看出端倪,该compute方法只是对迭代器进行复合,复合就是嵌套,因此数据处理过程就是对每条记录进行同样的嵌套处理直接得出所需结果,而没有中间计算结果,同时也要注意:依赖过长将导致嵌套过深,从而可能导致栈溢出。

    转换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。

    另外,宽窄依赖的划分对spark的容错也具有重要作用,参见本文容错机制部分。

    DAG到任务的划分

    用户代码定义RDD的有向无环图,行动操作把DAG转译为执行计划,进一步生成任务在集群中调度执行。

    具体地说,RDD的一系列转化操作形成RDD的DAG,在RDD上调用行动操作将触发一个Job(作业)的运行,Job根据DAG中RDD之间的依赖关系(宽依赖/窄依赖,也即是否发生shuffle)的不同将DAG划分为多个Stage(阶段),一个Stage对应DAG中的一个或多个RDD,一个Stage对应多个RDD是因为发生了流水线执行(pipelining),一旦Stage划分出来,Task(任务)就会被创建出来并发给内部的调度器,进而分发到各个executor执行,一个Stage会启动很多Task,每个Task都是在不同的数据分区上做同样的事情(即执行同样的代码段),Stage是按照依赖顺序处理的,而Task则是独立地启动来计算出RDD的一部分,一旦Job的最后一个Stage结束,一个行动操作也就执行完毕了。

    Stage分为两种: ShuffleMapStageResultStage ShuffleMapStage 是非最终stage,后面还有其他的stage,所以它的输出一定是需要shuffle并作为后续stage的输入。ShuffleMapStage的最后Task就是ShuffleMapTaskResultStage 是一个Job的最后一个Stage,直接生成结果或存储。ResultStage的最后Task就是ResultTask 。一个Job含有一个或多个Stage,最后一个为ResultTask,其他都为ShuffleMapStage。

    RDD不能嵌套

    RDD嵌套是不被支持的,也即不能在一个RDD操作的内部再使用RDD。如果在一个RDD的操作中,需要访问另一个RDD的内容,你可以尝试join操作,或者将数据量较小的那个RDD广播(broadcast)出去。

    你同时也应该注意到:join操作可能是低效的,将其中一个较小的RDD广播出去然后再join可以避免不必要的shuffle,俗称“小表广播”。

    使用其他分区数据

    由于RDD不能嵌套,这使得“在计算一个分区时,访问另一个分区的数据”成为一件困难的事情。那么有什么好的解决办法吗?请继续看。

    spark依赖于RDD这种抽象模型进行粗粒度的并行计算,一般情况下每个节点的每次计算都是针对单一记录,当然也可以使用 RDD.mapPartition 来对分区进行处理,但都限制在一个分区内(当然更是一个节点内)。

    spark的worker节点相互之间不能直接进行通信,如果在一个节点的计算中需要使用到另一个分区的数据,那么还是有一定的困难的。

    你可以将整个RDD的数据全部广播(如果数据集很大,这可不是好办法),或者广播一些其他辅助信息;也可以从所有节点均可以访问到的文件(hdfs文件)或者数据库(关系型数据库或者hbase)中读取;更进一步或许你应该修改你的并行方案,使之满足“可针对拆分得到的小数据块进行并行的独立的计算,然后归并得到大数据块的计算结果”的MapReduce准则,在“划分大的数据,并行独立计算,归并得到结果”的过程中可能存在数据冗余之类的,但它可以解决一次性没法计算的大数据,并最终提高计算效率,hadoop和spark都依赖于MapReduce准则。

    对RDD进行分区

    何时进行分区?

    spark程序可以通过控制RDD分区方式来减少通信开销。分区并不是对所有应用都是有好处的,如果给定RDD只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助,同时记得将分区得到的新RDD持久化哦。

    更多的分区意味着更多的并行任务(Task)数。对于shuffle过程,如果分区中数据量过大可能会引起OOM,这时可以将RDD划分为更多的分区,这同时也将导致更多的并行任务。spark通过线程池的方式复用executor JVM进程,每个Task作为一个线程存在于线程池中,这样就减少了线程的启动开销,可以高效地支持单个executor内的多任务执行,这样你就可以放心地将任务数量设置成比该应用分配到的CPU cores还要多的数量了。

    如何分区与分区信息

    在创建RDD的时候,可以指定分区的个数,如果没有指定,则分区个数是系统默认值,即该程序所分配到的CPU核心数。在Java/Scala中,你可以使用 rdd.getNumPartitions (1.6.0+)或 rdd.partitions.size() 来获取分区个数。

    对基本类型RDD进行重新分区,可以通过 repartition() 函数,只需要指定重分区的分区数即可。repartition操作会引起shuffle,因此spark提供了一个优化版的repartition,叫做coalesce() ,它允许你指定是否需要shuffle。在使用coalesce时,需要注意以下几个问题:

    • coalesce默认shuffle为false,这将形成窄依赖,例如我们将1000个分区重新分到100个中时,并不会引起shuffle,而是原来的10个分区合并形成1个分区。
    • 但是对于从很多个(比如1000个)分区重新分到很少的(比如1个)分区这种极端情况,数据将会分布到很少节点(对于从1000到1的重新分区,则是1个节点)上运行,完全无法开掘集群的并行能力,为了规避这个问题,可以设置shuffle为true。由于shuffle可以分隔stage,这就保证了上一阶段stage中的任务仍是很多个分区在并行计算,不这样设置的话,则两个上下游的任务将合并成一个stage进行计算,这个stage便会在很少的分区中进行计算。
    • 如果当前每个分区的数据量过大,需要将分区数量增加,以利于充分利用并行,这时我们可以设置shuffle为true。对于数据分布不均而需要重分区的情况也是如此。spark默认使用hash分区器将数据重新分区。

    对RDD进行预置的hash分区,需将RDD转换为RDD[(key,value)]类型,然后就可以通过隐式转换为PairRDDFunctions,进而可以通过如下形式将RDD哈希分区,HashPartitioner 会根据RDD中每个(key,value)中的key得出该记录对应的新的分区号:

    PairRDDFunctions.partitionBy(new HashPartitioner(n))
    

    另外,spark还提供了一个范围分区器,叫做 RangePartitioner 。范围分区器争取将所有的分区尽可能分配得到相同多的数据,并且所有分区内数据的上界是有序的。

    一个RDD可能存在分区器也可能没有,我们可以通过RDD的 partitioner 属性来获取其分区器,它返回一个Option对象。

    如何进行自定义分区

    spark允许你通过提供一个自定义的Partitioner对象来控制RDD的分区方式,这可以让你利用领域知识进一步减少通信开销。

    要实现自定义的分区器,你需要继承 Partitioner 类,并实现下面三个方法即可:

    • numPartitions : 返回创建出来的分区数。
    • getPartition : 返回给定键的分区编号(0到numPartitions-1)。
    • equals : Java判断相等性的标准方法。这个方法的实现非常重要,spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样spark才可以判断两个RDD的分区方式是否相同。

    影响分区方式的操作

    spark内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果RDD自动设置为对应的分区器。

    不过转化操作的结果并不一定会按照已知的分区方式分区,这时输出的RDD可能就会丢失分区信息。例如,由于 map() flatMap() 函数理论上可以改变元素的键,因此当你对一个哈希分区的键值对RDD调用map/flatMap时,结果RDD就不会再有分区方式信息。不过,spark提供了另外两个操作mapValues()flatMapValues() 作为替代方法,它们可以保证每个二元组的键保持不变。

    这里列出了所有会为生成的结果RDD设好分区方式的操作: cogroup()join() leftOuterJoin()rightOuterJoin()fullOuterJoin()groupWith()groupByKey()reduceByKey()combineByKey()partitionBy()sortBy()sortByKey()mapValues() (如果父RDD有分区方式的话)、 flatMapValues() (如果父RDD有分区方式的话)、filter() (如果父RDD有分区方式的话) 等。其他所有操作生成的结果都不会存在特定的分区方式。

    最后,对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD会采用第一个父RDD的分区方式。

    从分区中获益的操作

    spark的许多操作都引入了将数据根据键跨节点进行shuffle的过程。所有这些操作都会从数据分区中获益。这些操作主要有: cogroup()join()leftOuterJoin()rightOuterJoin()fullOuterJoin()groupWith()groupByKey()reduceByKey()combineByKey()lookup() 等。

    RDD分区优先位置

    RDD分区优先位置与spark的调度有关,在spark进行任务调度的时候,会尽可能将任务分配到数据块所存储的节点。我们可以通过RDD的 preferredLocations() 来获取指定分区的优先位置,返回值是该分区的优先位置列表。

    数据加载与保存

    从程序中的集合生成

    sc.parallelize() 可用于从程序中的集合产生RDD。 sc.makeRDD() 也是在程序中生成RDD,不过其还允许指定每一个RDD分区的优先位置。

    以上这些方式一般用于原型开发和测试,因为它们需要把你的整个数据集先放在一台机器(driver节点)的内存中,从而限制了只能用较小的数据量。

    从文本文件加载数据

    sc.textFile() 默认从hdfs中读取文件,在路径前面加上 hdfs:// 可显式表示从hdfs中读取文件,在路径前面加上file:// 表示从本地文件系统读。给sc.textFile()传递的文件路径可以是如下几种情形:

    • 一个文件路径,这时候只装载指定的文件。
    • 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件)。
    • 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件。

    如果想一次性读取一个目录下面的多个文件并想知道数据来自哪个文件,可以使用 sc.wholeTextFiles 。它会返回一个键值对RDD,其中键是输入文件的文件名。由于该函数会将一个文件作为RDD的一个元素进行读取,因此所读取的文件不能太大,以便其可以在一个机器上装得下。

    同其他transform算子一样,文本读取操作也是惰性的并由action算子触发,如果发生重新计算,那么读取数据的操作也可能会被再次执行。另外,在spark中超出内存大小的文件同样是可以被处理的,因为spark并不是将数据一次性全部装入内存,而是边装入边计算。

    从数据库加载数据

    spark中可以使用 JdbcRDD 从数据库中加载数据。spark会将数据从数据库中拷贝到集群各个节点,因此使用JdbcRDD会有初始的拷贝数据的开销。也可以考虑使用sqoop将数据从数据库中迁移到hdfs中,然后从hdfs中读取数据。

    将结果写入文本文件

    rdd.saveAsTextFile() 用于将RDD写入文本文件。spark会将传入该函数的路径参数作为目录对待,默认情况下会在对应目录输出多个文件,这取决于并行度。如果要将结果写入hdfs的一个 文件中,可以这样:

    rdd.coalesce(1).saveAsTextFile("filename")
    

    而不要使用repartition,因为repartition会引起shuffle,而coalesce在默认情况下会避免shuffle。

    关于文件系统

    spark支持读写很多文件系统,包括本地文件系统、HDFS、Amazon S3等等很多。

    spark在本地文件系统中读取文件时,它要求文件在集群中所有节点的相同路径下都可以找到。我们可以通过 sc.addFile() 来将文件弄到所有节点同路径下面,并在各计算节点中通过SparkFiles.get() 来获取对应文件在该节点上的绝对路径。

    sc.addFile() 的输入文件路径不仅可以是本地文件系统的,还可以是HDFS等spark所支持的所有文件系统,甚至还可以是来自网络的,如HTTP、HTTPS、FTP。

    关于并行

    慎用可变数据

    当可变数据用于并发/并行/分布式程序时,都有可能出现问题,因此对于会并发执行的代码段不要使用可变数据。

    尤其要注意不要在scala的object中使用var变量!其实scala的object单例对象只是对java中静态的一种封装而已,在class文件层面,object单例对象就是用java中静态(static)来实现的,而java静态成员变量不会被序列化!在编写并行计算程序时,不要在scala的object中使用var变量,如果确实需要使用var变量,请写在class中。

    另外,在分布式执行的spark代码段中使用可变的闭包变量也可能会出现不同步问题,因此请谨慎使用。

    闭包 vs 广播变量

    有两种方式将你的数据从driver节点发送到worker节点:通过 闭包 和通过 广播变量 。闭包是随着task的组装和分发自动进行的,而广播变量则是需要程序猿手动操作的,具体地可以通过如下方式操作广播变量(假设scSparkContext 类型的对象, bcBroadcast 类型的对象):

    • 可通过 sc.broadcast(xxx) 创建广播变量。
    • 可在各计算节点中(闭包代码中)通过 bc.value 来引用广播的数据。
    • bc.unpersist() 可将各executor中缓存的广播变量删除,后续再使用时数据将被重新发送。
    • bc.destroy() 可将广播变量的数据和元数据一同销毁,销毁之后就不能再使用了。

    任务闭包包含了任务所需要的代码和数据,如果一个executor数量小于RDD partition的数量,那么每个executor就会得到多个同样的任务闭包,这通常是低效的。而广播变量则只会将数据发送到每个executor一次,并且可以在多个计算操作中共享该广播变量,而且广播变量使用了类似于p2p形式的非常高效的广播算法,大大提高了效率。另外,广播变量由spark存储管理模块进行管理,并以MEMORY_AND_DISK级别进行持久化存储。

    什么时候用闭包自动分发数据?情况有几种:

    • 数据比较小的时候。
    • 数据已在driver程序中可用。典型用例是常量或者配置参数。

    什么时候用广播变量分发数据?情况有几种:

    • 数据比较大的时候(实际上,spark支持非常大的广播变量,甚至广播变量中的元素数超过java/scala中Array的最大长度限制(2G,约21.5亿)都是可以的)。
    • 数据是某种分布式计算结果。典型用例是训练模型等中间计算结果。

    当数据或者变量很小的时候,我们可以在Spark程序中直接使用它们,而无需使用广播变量。

    对于大的广播变量,序列化优化可以大大提高网络传输效率,参见本文序列化优化部分。

    巧用累加器

    累加器提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。可以通过 sc.accumulator(xxx) 来创建一个累加器,并在各计算节点中(闭包代码中)直接写该累加器。

    累加器只能在驱动程序中被读取,对于计算节点(闭包代码)是只写的,这大大精简了累加器的设计。

    使用累加器时,我们要注意的是:对于在RDD转化操作中使用的累加器,如果发生了重新计算(这可能在很多种情况下发生),那么累加器就会被重复更新,这会导致问题。而在行动操作(如foreach)中使用累加器却不会出现这种情况。因此,在转化操作中,累加器通常只用于调试目的。尽管将来版本的spark可能会改善这一问题,但在spark 1.2.0中确实存在这个问题。

    关于shuffle

    在经典的MapReduce中,shuffle(混洗)是连接map阶段和reduce阶段的桥梁(注意这里的术语跟spark的map和reduce操作没有直接关系),它是将各个map的输出结果重新组合作为下阶段各个reduce的输入这样的一个过程,由于这一过程涉及各个节点相互之间的数据传输,故此而名“混洗”。下面这幅图清晰地描述了MapReduce算法的整个流程,其中shuffle阶段是介于map阶段和reduce阶段之间。


    Spark的shuffle过程类似于经典的MapReduce,但是有所改进。spark中的shuffle在实现上主要分为 shuffle writeshuffle fetch 这两个大的阶段。如下图所示,shuffle过程大致可以描述为:

    • 首先每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。
    • 其次Mapper产生的结果会根据设置的partition算法填充到每个bucket中去。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中去。
    • 当Reducer启动时,它会根据自己task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket作为Reducer的输入进行处理。

    spark的shuffle实现随着spark版本的迭代正在逐步完善和成熟,这中间曾出现过多种优化实现,关于spark shuffle的演进过程和具体实现参见后面的参考链接。

    shuffle(具体地是shuffle write阶段)会引起数据缓存到本地磁盘文件,从spark 1.3开始,这些缓存的shuffle文件只有在相应RDD不再被使用时才会被清除,这样在lineage重算的时候shuffle文件就不需要重新创建了,从而加快了重算效率(请注意这里的缓存并保留shuffle数据这一行为与RDD持久化和检查点机制是不同的,缓存并保留shuffle数据只是省去了重算时重建shuffle文件的开销,因此我们才有理由在shuffle(宽依赖)之后对形成的RDD进行持久化)。在standalone模式下,我们可以在spark-env.sh 中通过环境变量 SPARK_LOCAL_DIRS 来设置shuffle数据的本地磁盘缓存目录。为了优化效率,本地shuffle缓存目录的设置都应该使用由单个逗号隔开的目录列表,并且这些目录分布在不同的磁盘上,写操作会被均衡地分配到所有提供的目录中,磁盘越多,可以提供的总吞吐量就越高。另外,SPARK_LOCAL_DIRS 也是RDD持久化到磁盘的目录。

    序列化优化

    在spark中,序列化通常出现在跨节点的数据传输(如广播变量、shuffle等)和数据持久化过程中。序列化和反序列化的速度、序列化之后数据大小等都影响着集群的计算效率。

    spark默认使用Java序列化库,它对于除基本类型的数组以外的任何对象都比较低效。为了优化序列化效率,你可以在spark配置文件中通过 spark.serializer 属性来设置你想使用的序列化库,一般情况下,你可以使用这个序列化库:org.apache.spark.serializer.KryoSerializer

    为了获得最佳性能,你还应该向Kryo注册你想要序列化的类,注册类可以让Kryo避免把每个对象的完整类名写下来,成千上万条记录累计节省的空间相当可观。如果你想强制要求这种注册,可以把spark.kryo.registrationRequired 设置为true,这样Kryo会在遇到未注册的类时抛出错误。使用Kryo序列化库并注册所需类的示例如下:

    val conf = new SparkConf()
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrationRequired", "true")
    conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))
    

    Spark调度

    应用调度

    应用是指用户提交的spark应用程序。spark应用程序之间的调度关系,不一定由spark所管理。

    在YARN和Mesos模式下,底层资源的调度策略由YARN和Mesos集群资源管理器所决定。

    只有在standalone模式下,spark master按照当前集群资源是否满足等待列表中的spark应用对资源的需求,而决定是否创建一个SparkContext对应的driver,进而完成spark应用的启动过程,这个过程可以粗略地认为是一种粗颗粒度的有条件的FIFO (先进先出)调度策略。

    作业调度

    作业是指spark应用程序内部的由action算子触发并提交的Job。在给定的spark应用中,不同线程的多个job可以并发执行,并且这个调度是线程安全的,这使得一个spark应用可以处理多个请求。

    默认地,spark作业调度是 FIFO 的,在多线程的情况下,某些线程提交的job可能被大大推迟执行。

    不过我们可以通过配置 FAIR (公平)调度器来使spark在作业之间轮询调度,这样所有的作业都能得到一个大致公平的共享的集群资源。这就意味着即使有一个很长的作业在运行,较短的作业在提交之后也能够得到不错的响应。要启用一个FAIR作业调度,需在创建SparkContext之前配置一下spark.scheduler.modeFAIR

    // 假设conf是你的SparkConf变量
    conf.set("spark.scheduler.mode", "FAIR")
    

    公平调度还支持在池中将工作分组(这样就形成两级调度池),而不同的池可以设置不同的调度选项(如权重)。这种方式允许更重要的job配置在高优先级池中优先调度。如果没有设置,新提交的job将进入默认池 中,我们可以通过在对应线程中给SparkContext设置本地属性spark.scheduler.pool 来设置该线程对应的pool:

    // 假设sc是你的SparkContext变量
    sc.setLocalProperty("spark.scheduler.pool", "pool1")
    

    在设置了本地属性之后,所有在这个线程中提交的job都将会使用这个调度池的名字。如果你想清除该线程相关的pool,只需调用如下代码:

    sc.setLocalProperty("spark.scheduler.pool", null)
    

    在默认情况下,每个调度池拥有相同的优先级来共享整个应用所分得的集群资源。同样的, 默认池中的每个job也拥有同样的调度优先级,但是在用户创建的每个池中,job是通过FIFO方式进行调度的 。

    关于公平调度池的详细配置,请参见官方文档: Spark Job Scheduling

    如果你想阅读相关实现代码,可以观看 Schedulable.scalaSchedulingAlgorithm.scala 以及SchedulableBuilder.scala 等相关文件。

    容错机制与检查点

    spark容错机制是粗粒度并且是轻量级的,主要依赖于RDD的依赖链( lineage )。spark能够通过lineage获取足够的信息来重新计算和恢复丢失的数据分区。这样的基于lineage的容错机制可以理解为粗粒度的重做日志(redo log)。

    鉴于spark的基于lineage的容错机制,RDD DAG中宽窄依赖的划分对容错也有很重要的作用。如果一个节点宕机了,而且运算是窄依赖,那只要把丢失的父RDD分区重算即可,跟其他节点没有依赖。而宽依赖需要父RDD的所有分区都存在,重算代价就很高了。可以这样理解为什么窄依赖开销小而宽依赖开销大:在窄依赖中,在子RDD分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算;而在宽依赖中,丢失一个子RDD分区将导致其每个父RDD的多个甚至所有分区的重算,而重算的结果并不都是给当前丢失的子RDD分区用的,这样就存在了冗余计算。

    不过我们可以通过 检查点 ( checkpoint )机制解决上述问题,通过在RDD上做检查点可以将物理RDD数据存储到持久层(HDFS、S3等)中。在RDD上做检查点的方法是在调用action算子之前调用checkpoint() ,并且RDD最好是缓存在内存中的,否则可能导致重算(参见API注释)。示例如下:

    // 假设rdd是你的RDD变量
    rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
    rdd.checkpoint()
    val count = rdd.count()
    

    在RDD上做检查点会切断RDD依赖,具体地spark会清空该RDD的父RDD依赖列表。并且由于检查点机制是将RDD存储在外部存储系统上,所以它可以被其他应用重用。

    过长的lineage(如在pagerank、spark streaming等中)也将导致过大的重算代价,而且还会占用很多系统资源。因此, 在遇到宽依赖或者lineage足够长时,我们都应该考虑做检查点 。

    集群监控与运行日志

    spark在应用执行时记录详细的进度信息和性能指标。这些内容可以在两个地方找到:spark的网页用户界面以及driver进程和executor进程生成的日志文件中。

    网页用户界面

    在浏览器中打开 http://master:8080 页面,你可以看到集群概况,包括:集群节点、可用的和已用的资源、已运行的和正在运行的应用等。

    http://master:4040 页面用来监控正在运行的应用(默认端口为4040,如果有多个应用在运行,那么端口顺延,如4041、4042),包括其执行进度、构成Job的Stage的执行情况、Stage详情、已缓存RDD的信息、各executor的信息、spark配置项以及应用依赖信息等,该页面经常用来发现应用的效率瓶颈并辅助优化,不过该页面只有在有spark应用运行时才可以被访问到。

    上述404x端口可用于查看正在运行的应用的执行详情,但是应用运行结束之后该页面就不可以访问了。要想查看已经执行结束的应用的执行详情,则需开启事件日志机制,具体地设置如下两个选项:

    • spark.eventLog.enabled: 设置为true时开启事件日志机制。这样已完成的spark作业就可以通过历史服务器查看。

    • spark.eventLog.dir: 开启事件日志机制时的事件日志文件存储位置。如果要在历史服务器中查看事件日志,需要将该值设置为一个全局可见的文件系统路径,比如HDFS中。最后,请确保目录以 ‘/‘ 结束,否则可能会出现如下错误:

      Application history not found ... No event logs found for application ...
      Did you specify the correct logging directory?
      

    在配置好上述选项之后,我们就可以查看新提交的应用的详细执行信息了。在不同的部署模式中,查看的方式不同。在standalone模式中,可以直接在master节点的UI界面(上述8080端口对应的页面)中直接单击已完成应用以查看详细执行信息。在YARN/Mesos模式中,就要开启历史服务器了,此处略去。

    Metrics系统

    spark在其内部拥有一个可配置的度量系统(Metrics),它能够将spark的内部状态通过HTTP、JMX、CSV等多种不同形式呈现给用户。同时,用户也可以定义自己的数据源(Metrics Source)和数据输出方式(Metrics Sink),从而获取自己所需的数据。此处略去详情,可参考下面的链接进一步阅读。

    参考链接及进一步阅读:

    查看日志文件

    spark日志文件的具体位置取决于具体的部署模式。在standalone模式中,日志默认存储于各个工作节点的spark目录下的 work 目录中,此时所有日志还可以直接通过主节点的网页用户界面进行查看。

    默认情况下,spark输出的日志包含的信息量比较合适。我们可以自定义日志行为,改变日志等级或存储位置。spark日志系统使用 log4j 实现,我们只需将conf目录下的log4j.properties.template复制一个并命名为log4j.properties,然后自定义修改即可。

    SparkConf与配置

    spark中最主要的配置机制是通过 SparkConf 类对spark进行配置。当创建出一个SparkContext时,就需要创建出一个SparkConf的实例作为参数。

    SparkConf实例包含用户要重载的配置选项的键值对,spark中的每个配置选项都是基于字符串形式的键值对。你可以调用SparkConf的 set() 或者setXxx() 来设置对应选项。

    另外,spark-submit脚本可以动态设置配置项。当应用被spark-submit脚本启动时,脚本会把这些配置项设置到运行环境中。当一个新的SparkConf被创建出来时,这些环境变量会被检测出来并且自动配到SparkConf中。这样在使用spark-submit时,用户应用通常只需创建一个“空”的SparkConf,并直接传递给SparkContext的构造方法即可。

    spark-submit为常用的spark配置选项提供了专用的标记,还有一个通用标记 --conf 来接收任意spark配置项的值,形如--conf 属性名=属性值

    spark-submit也支持从文件中读取配置项的值。默认情况下,spark-submit会在spark安装目录中找到 conf/spark-defaults.conf 文件,读取该文件中以空格隔开的键值对数据。你也可以通过spark-submit的--properties-File 选项来自定义该文件的路径。

    spark-defaults.conf的作用范围要搞清楚,编辑driver所在机器上的spark-defaults.conf,该文件会影响到driver所提交运行的application,及专门为该application提供计算资源的executor的启动参数。

    spark有特定的优先级顺序来选择实际配置。优先级最高的是在用户代码中显式调用set()方法设置的选项。其次是通过spark-submit传递的参数。再次是写在配置文件中的值。最后是系统默认值。如果你想知道应用中实际生效的配置,可以在应用的网页用户界面中查看。

    下面列出一些常用的配置项,完整的配置项列表可以参见 官方配置文档

    选项默认值描述
    spark.master(none)表示要连接的集群管理器。
    spark.app.name(none)应用名,将出现在UI和日志中。
    spark.driver.memory1g为driver进程分配的内存。注意:在客户端模式中,不能在SparkConf中直接配置该项,因为driver JVM进程已经启动了。
    spark.executor.memory1g为每个executor进程分配的内存。
    spark.executor.coresall/1每个executor可用的核心数。针对standalone和YARN模式。更多参见官方文档。
    spark.cores.max(not set)设置standalone和Mesos模式下应用程序的核心数上限。
    spark.speculationfalse设置为true时开启任务预测执行机制。当出现比较慢的任务时,这种机制会在另外的节点上也尝试执行该任务的一个副本。打开此选项会帮助减少大规模集群中个别较慢的任务带来的影响。
    spark.driver.extraJavaOptions(none)设置driver节点的JVM启动参数。
    spark.executor.extraJavaOptions(none)设置executor节点的JVM启动参数。
    spark.serializerJavaSerializer指定用来进行序列化的类库,包括通过网络传输数据或缓存数据时的序列化。为了速度,推荐使用KryoSerializer。
    spark.eventLog.enabledfalse设置为true时开启事件日志机制。这样已完成的spark作业就可以通过历史服务器查看。
    spark.eventLog.dirfile:///tmp/spark-events开启事件日志机制时的事件日志文件存储位置。如果要在历史服务器中查看事件日志,需要将该值设置为一个全局可见的文件系统路径,比如HDFS中。最后,请确保目录以 ‘/‘ 结束,否则可能会出现错误,参见本文集群监控部分。

    一些问题的解决办法

    /tmp目录写满

    由于Spark在计算的时候会将中间结果存储到/tmp目录,而目前linux又都支持tmpfs,其实说白了就是将/tmp目录挂载到内存当中。那么这里就存在一个问题,中间结果过多导致/tmp目录写满而出现如下错误:

    No Space Left on the device
    

    解决办法就是针对tmp目录不启用tmpfs,修改/etc/fstab。

    无法创建进程

    有时可能会遇到如下错误,即无法创建进程:

    java.lang.OutOfMemory, unable to create new native thread
    

    导致这种错误的原因比较多。有一种情况并非真的是内存不足引起的,而是由于超出了允许的最大文件句柄数或最大进程数。

    排查的步骤就是查看一下允许打开的文件句柄数和最大进程数,如果数值过低,使用ulimit将其调高之后,再试试问题是否已经解决。

    不可序列化

    Task not serializable: java.io.NotSerializableException
    

    作为RDD操作算子参数的匿名函数使用外部变量从而形成闭包。为了效率,spark并不是将所有东东都序列化以分发到各个executor。spark会先对该匿名函数进行ClosureCleaner.clean()处理(将该匿名函数涉及到的$outer中的与闭包无关的变量移除),然后将该匿名函数对象及闭包涉及到的对象序列化并包装成task分发到各个executor。

    看到这里,你或许就发现了一个问题,那就是不管怎样,spark需要序列化的对象必须都可以被序列化! Task not serializable: java.io.NotSerializableException 错误就是由于相应的对象不能被序列化造成的!

    为了解决这个问题,首先你可以使用 -Dsun.io.serialization.extendedDebugInfo=true java选项来让jvm打印出更多的关于序列化的信息,以便了解哪些对象不可以被序列化。然后就是使这些对象对应的类可序列化,或者将这些对象定义在RDD操作算子的参数(匿名函数)中以取消闭包。

    缺少winutils.exe

    在windows上进行spark程序测试时,你可能会碰到如下几个问题:

    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    
    java.lang.NullPointerException
      at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
    

    原因就是缺少 hadoop 的 winutils.exe 这个文件。解决方法是:下载一个(注意是32位还是64位),新建一个文件夹 D:\hadoop\bin\ 并将 winutils.exe 放入其中,并保证winutils.exe双击运行没有报*.dll缺失的错误,然后在程序中设置一下hadoop目录即可,如下:

    System.setProperty("hadoop.home.dir", "D:\hadoop\")
    
    展开全文
  • spark 使用hive metastore

    千次阅读 2016-10-12 09:55:48
    有3,4个月没接触hadoop和spark了,有些生疏,实习时用的是nodejs+python,今天休假,在新电脑跑跑大数据(真不敢相信我以前使用赛扬双核内存4G + 核显跑几个虚拟机来运行hadoop和spark的,跑个任务或者编译android源码有时...
  • Spark中加载本地(或者hdfs)文件以及 spark使用SparkContext实例的textFile读取多个文件夹(嵌套)下的多个数据文件 textFile的参数是一个path,这个path可以是: 1. 一个文件路径,这时候只装载指定的文件 3. 通过...
  •  在Spark使用hql方法执行hive语句时,由于其在查询过程中调用的是Hive的获取元数据信息、SQL解析,并且使用Cglib等进行序列化反序列化,中间可能产生较多的class文件,导致JVM中的持久代使用较多,如果配置不当,...
  • Spring 整合 spark 使用

    千次阅读 2017-09-08 11:03:47
    -- spark 使用 -->   <groupId>org.apache.spark <artifactId>spark-core_2.11 <version>1.6.0 <groupId>org.apache.spark <artifactId>spark-mllib_2.11 <version>1.6.0 ...
  • 2)速度更快:从使用 spark sql 操作普通文件 CSV 和 parquet 文件速度对比上看,绝大多数情况会比使用 csv 等普通文件速度提升10倍左右,在一些普通文件系统无法在 spark上成功运行的情况下,使用 parquet 很多时候...
  • Spark使用get_json_object的问题

    千次阅读 2019-09-18 12:24:46
    一、问题现象:使用spark sql调用get_json_object函数后,报如下错误:yarn 容器被kill,导致任务失败,查看日志:Container killed by YARN for exceeding memory limits 使用spark命令: /opt/software/spark-...
  • spark使用hive时,数据仓库位置指定

    千次阅读 2019-01-09 23:24:01
    spark 2.0.1 中,--hiveconf "hive.metastore.warehouse" 参数已经不再生效,用户应该使用  --conf spark.sql.warehouse.dir=hdfs://HOSTNAME:9000/user/hive/warehouse 命令进行代替
  • Spark 使用JDBC进行select查询

    千次阅读 2018-08-24 14:34:07
    spark 可以 通过jdbc直接将数据库中的一整张表直接读取出来作为一个DataFram,这样会出现几个问题: - 1. 表格太大,导致出现OOM; - 2. 读出来很多不需要的行,在join的时候,造成性能的浪费 这里记录通过JDBC...
  • spark使用独立的Python环境提交任务

    千次阅读 2019-07-30 10:31:09
    需要注意版本,不同的spark版本会有些不同,当前我的spark版本是2.2.1,如果以下的方式不生效,记得先看看版本; 由于公司平台的环境是离线的,pip down下载的包是需要和硬件架构匹配的,我在mac上pip down的包拿到...
  • Spark入门(Python版) Spark1.0.0 多语言编程之python实现 Spark编程指南(python...配置master参数,使用4个Worker线程本地化运行Spark(local[k]应该根据运行机器的CPU核数确定) ./bin/pyspark –master local
  • spark使用hive出错,添加以下配置

    千次阅读 2016-05-12 10:40:45
    错误:Failed to start database 'metastore_db' with class loader org.apache.spark.sql SPARK_HOME/CONF/spark-env.sh中配置: export HIVE_CONF_DIR=/opt/modules/hive-1.0.1/conf export CLASSPATH=...
  • spark使用insertInto存入hive分区表中

    千次阅读 2019-08-01 18:53:07
    spark的处理结果存入hive分区表中,可以直接在sql中设定分区即可,可以使用withColumn算子执行 ss.sql("SELECT merchant_id,platform," + "case when trim(first_channel_id) = '' or first_channel_id is ...
  • Spark使用Java读取mysql数据和保存数据到mysql

    千次阅读 热门讨论 2017-11-07 20:17:39
    基于java应用需要利用Spark读取mysql数据进行数据分析,然后将分析结果保存到mysql中。
  • 根据Spark官网所述,Spark SQL实现了Thrift JDBC/ODBC server: The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in Hive 1.2.1 You can test the JDBC server with the beeline ...
  • 本篇文章的呢主要写的使用spark ml 中的lda算法提取文档的主题的方法思路,不牵扯到lda的 算法原理。至于算法请参照http://www.aboutyun.com/thread-20130-1-1.html 这篇文章 使用lda算法对中文文本聚类并提取主题...
  • spark使用parallelize方法创建RDD

    万次阅读 2015-11-09 14:54:02
    Spark将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个CPU上分布2-4个slices. 一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以通过传递给parallelize的第二个参数来...
  • Spark使用start-slave.sh启动Worker

    千次阅读 2017-12-21 16:20:29
    start-slave.sh: 52: /home/user/spark/spark-2.2.1-bin-hadoop2.7/bin/load-spark-env.sh: [[: not found start-slave.sh: 68: start-slave.sh: function: not found start-slave.sh: 70: shift: can't shift ...
  • Spark使用小结:Java版的GroupByKey示例

    千次阅读 2016-04-22 20:44:47
    Spark Java版的GroupByKey示例 感觉reduceByKey只能完成一些满足交换率,结合律的运算,如果想把某些数据聚合到一些做一些操作,得换groupbykey 比如下面:我想把相同key对应的value收集到一起,完成一些运算...
  •  在Spark使用hql方法执行hive语句时,由于其在查询过程中调用的是Hive的获取元数据信息、SQL解析,并且使用Cglib等进行序列化反序列化,中间可能产生较多的class文件,导致JVM中的持久代使用较多,如果配置不当,...
  • 使用binaryFile加载二进制文件之后,得到的是包含[String,PortableDataStream]的pair的RDD。我继续对得到的RDD使用map,使用PortableDataStream的toArray方法得到二进制文件转换之后的Byte数组。有如下几个问题: ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 177,768
精华内容 71,107
关键字:

spark的使用