2017-02-08 15:07:22 yjgithub 阅读数 2136
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12077 人正在学习 去看看 CSDN讲师

需求:根据tomcat日志计算url访问了情况,具体的url如下,
要求:区别统计GET和POST URL访问量
结果为:访问方式、URL、访问量
测试数据集:
在CODE上查看代码片派生到我的代码片
196.168.2.1 - - [03/Jul/2014:23:36:38 +0800] “GET /course/detail/3.htm HTTP/1.0” 200 38435 0.038
182.131.89.195 - - [03/Jul/2014:23:37:43 +0800] “GET /html/notes/20140617/888.html HTTP/1.0” 301 - 0.000
196.168.2.1 - - [03/Jul/2014:23:38:27 +0800] “POST /service/notes/addViewTimes_23.htm HTTP/1.0” 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:39:03 +0800] “GET /html/notes/20140617/779.html HTTP/1.0” 200 69539 0.046
196.168.2.1 - - [03/Jul/2014:23:43:00 +0800] “GET /html/notes/20140318/24.html HTTP/1.0” 200 67171 0.049
196.168.2.1 - - [03/Jul/2014:23:43:59 +0800] “POST /service/notes/addViewTimes_779.htm HTTP/1.0” 200 1 0.003
196.168.2.1 - - [03/Jul/2014:23:45:51 +0800] “GET /html/notes/20140617/888.html HTTP/1.0” 200 70044 0.060
196.168.2.1 - - [03/Jul/2014:23:46:17 +0800] “GET /course/list/73.htm HTTP/1.0” 200 12125 0.010
196.168.2.1 - - [03/Jul/2014:23:46:58 +0800] “GET /html/notes/20140609/542.html HTTP/1.0” 200 94971 0.077
196.168.2.1 - - [03/Jul/2014:23:48:31 +0800] “POST /service/notes/addViewTimes_24.htm HTTP/1.0” 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:48:34 +0800] “POST /service/notes/addViewTimes_542.htm HTTP/1.0” 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:49:31 +0800] “GET /notes/index-top-3.htm HTTP/1.0” 200 53494 0.041
196.168.2.1 - - [03/Jul/2014:23:50:55 +0800] “GET /html/notes/20140609/544.html HTTP/1.0” 200 183694 0.076
196.168.2.1 - - [03/Jul/2014:23:53:32 +0800] “POST /service/notes/addViewTimes_544.htm HTTP/1.0” 200 2 0.004
196.168.2.1 - - [03/Jul/2014:23:54:53 +0800] “GET /service/notes/addViewTimes_900.htm HTTP/1.0” 200 151770 0.054
196.168.2.1 - - [03/Jul/2014:23:57:42 +0800] “GET /html/notes/20140620/872.html HTTP/1.0” 200 52373 0.034
196.168.2.1 - - [03/Jul/2014:23:58:17 +0800] “POST /service/notes/addViewTimes_900.htm HTTP/1.0” 200 2 0.003
196.168.2.1 - - [03/Jul/2014:23:58:51 +0800] “GET /html/notes/20140617/888.html HTTP/1.0” 200 70044 0.057
186.76.76.76 - - [03/Jul/2014:23:48:34 +0800] “POST /service/notes/addViewTimes_542.htm HTTP/1.0” 200 2 0.003
186.76.76.76 - - [03/Jul/2014:23:46:17 +0800] “GET /course/list/73.htm HTTP/1.0” 200 12125 0.010
8.8.8.8 - - [03/Jul/2014:23:46:58 +0800] “GET /html/notes/20140609/542.html HTTP/1.0” 200 94971 0.077

由于Tomcat日志是不规则的,需要先过滤清洗数据。


package ClassicCase

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 业务场景:分析非结构化数据
  * Created by YJ on 2017/2/8.
  */


object case7 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("reduce")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val data = sc.textFile("hdfs://192.168.109.130:8020//user/flume/ClassicCase/case7/*")

    //filter 过滤长度小于0, 过滤不包含GET与POST的URL   
    val filtered = data.filter(_.length() > 0).filter(line => (line.indexOf("GET") > 0 || line.indexOf("POST") > 0))

    //转换成键值对操作  
    val res = filtered.map(line => {
      if (line.indexOf("GET") > 0) {
        //截取 GET 到URL的字符串  
        (line.substring(line.indexOf("GET"), line.indexOf("HTTP/1.0")).trim, 1)
      } else {
        //截取 POST 到URL的字符串  
        (line.substring(line.indexOf("POST"), line.indexOf("HTTP/1.0")).trim, 1)
      } //最后通过reduceByKey求sum  
    }).reduceByKey(_ + _)

    //触发action事件执行  
    res.collect()
  }
}

输出结果
(POST /service/notes/addViewTimes_779.htm,1),
(GET /service/notes/addViewTimes_900.htm,1),
(POST /service/notes/addViewTimes_900.htm,1),
(GET /notes/index-top-3.htm,1),
(GET /html/notes/20140318/24.html,1),
(GET /html/notes/20140609/544.html,1),
(POST /service/notes/addViewTimes_542.htm,2),
(POST /service/notes/addViewTimes_544.htm,1),
(GET /html/notes/20140609/542.html,2),
(POST /service/notes/addViewTimes_23.htm,1),
(GET /html/notes/20140617/888.html,3),
(POST /service/notes/addViewTimes_24.htm,1),
(GET /course/detail/3.htm,1),
(GET /course/list/73.htm,2),
(GET /html/notes/20140617/779.html,1),
(GET /html/notes/20140620/872.html,1)

spark RDD
2017-12-26 10:56:44 qq_36864672 阅读数 603
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12077 人正在学习 去看看 CSDN讲师

RDD是什么?

RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。

如何创建RDD?

RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

举例:从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12

举例:读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素

scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12

虽然还有别的方式可以创建RDD,但在本文中我们主要使用上述两种方式来创建RDD以说明RDD的API。

map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 
它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
    var res = List[(T, T)]() 
    var pre = iter.next while (iter.hasNext) {
        val cur = iter.next; 
        res .::= (pre, cur) pre = cur;
    } 
    res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 
mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

举例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
  • 第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
  • 第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) 
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect 
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 
举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

flatMapWith

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

举例

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

reduce

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

举例

scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55

上述例子对RDD中的元素求和。

reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

举例:

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> a.reduceByKey((x,y) => x + y).collect
res7: Array[(Int, Int)] = Array((1,2), (3,10))

上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。


1.概念 
RDD(Resilient Distributed Datasets,弹性分布式数据集)是一个分区的只读记录的集合。RDD只能通过在稳定的存储器或其他RDD的数据上的确定性操作来创建。我们把这些操作称作变换以区别其他类型的操作。例如 map,filter和join。 
RDD在任何时候都不需要被”物化”(进行实际的变换并最终写入稳定的存储器上)。实际上,一个RDD有足够的信息描述着其如何从其他稳定的存储器上的数据生成。它有一个强大的特性:从本质上说,若RDD失效且不能重建,程序将不能引用该RDD。而用户可以控制RDD的其他两个方面:持久化和分区。用户可以选择重用哪个RDD,并为其制定存储策略(比如,内存存储)。也可以让RDD中的数据根据记录的key分布到集群的多个机器。 这对位置优化来说是有用的,比如可用来保证两个要jion的数据集都使用了相同的哈希分区方式。

2.spark 编程接口 
对编程人员通过对稳定存储上的数据进行变换操作(如map和filter).而得到一个或多个RDD。然后可以调用这些RDD的actions(动作)类的操作。这类操作的目是返回一个值或是将数据导入到存储系统中。动作类的操作如count(返回数据集的元素数),collect(返回元素本身的集合)和save(输出数据集到存储系统)。spark直到RDD第一次调用一个动作时才真正计算RDD。 
还可以调用RDD的persist(持久化)方法来表明该RDD在后续操作中还会用到。默认情况下,spark会将调用过persist的RDD存在内存中。但若内存不足,也可以将其写入到硬盘上。通过指定persist函数中的参数,用户也可以请求其他持久化策略(如Tachyon)并通过标记来进行persist,比如仅存储到硬盘上或是在各机器之间复制一份。最后,用户可以在每个RDD上设定一个持久化的优先级来指定内存中的哪些数据应该被优先写入到磁盘。 
PS: 
缓存有个缓存管理器,spark里被称作blockmanager。注意,这里还有一个误区是,很多初学的同学认为调用了cache或者persist的那一刻就是在缓存了,这是完全不对的,真正的缓存执行指挥在action被触发。

说了一大堆枯燥的理论,我用一个例子来解释下吧: 
现在数据存储在hdfs上,而数据格式以“;”作为每行数据的分割:

"age";"job";"marital";"education";"default";"balance";"housing";"loan"
30;"unemployed";"married";"primary";"no";1787;"no";"no"
33;"services";"married";"secondary";"no";4789;"yes";"yes"
  • 1
  • 2
  • 3

scala代码如下:

 //1.定义了以一个HDFS文件(由数行文本组成)为基础的RDD
 val lines = sc.textFile("/data/spark/bank/bank.csv")
 //2.因为首行是文件的标题,我们想把首行去掉,返回新RDD是withoutTitleLines
 val withoutTitleLines = lines.filter(!_.contains("age"))
 //3.将每行数据以;分割下,返回名字是lineOfData的新RDD
 val lineOfData = withoutTitleLines.map(_.split(";"))
 //4.将lineOfData缓存到内存到,并设置缓存名称是lineOfData
 lineOfData.setName("lineOfData")
 lineOfData.persist
 //5.获取大于30岁的数据,返回新RDD是gtThirtyYearsData
 val gtThirtyYearsData = lineOfData.filter(line => line(0).toInt > 30)
 //到此,集群上还没有工作被执行。但是,用户现在已经可以在动作(action)中使用RDD。
 //计算大于30岁的有多少人
 gtThirtyYearsData.count
 //返回结果是3027
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

OK,我现在要解释两个概念NO.1 什么是lineage?,NO.2 transformations 和 actions是什么? 
lineage
这里写图片描述

在上面查询大于30岁人查询里,我们最开始得出去掉标题行所对应的RDD lines,即为withTitleLines,接着对withTitleLines进行map操作分割每行数据内容,之后再次进行过滤age大于30岁的人、最后进行count(统计所有记录)。Spark的调度器会对最后的那个两个变换操作流水线化,并发送一组任务给那些保存了lineOfData对应的缓存分区的节点。另外,如果lineOfData的某个分区丢失,Spark将只在该分区对应的那些行上执行原来的split操作即可恢复该分区。 
所以在spark计算时,当前RDD不可用时,可以根据父RDD重新计算当前RDD数据,但如果父RDD不可用时,可以可以父RDD的父RDD重新计算父RDD。

transformations 和 actions

transformations操作理解成一种惰性操作,它只是定义了一个新的RDD,而不是立即计算它。相反,actions操作则是立即计算,并返回结果给程序,或者将结果写入到外存储中。

下面我以示例解释下:

这里写图片描述

先简单介绍这些吧,稍后文章我会详细介绍每个方法的使用,感兴趣可以看spark官方文档

3.RDDs接口5个特性

这里写图片描述

简单概括为:一组分区,他们是数据集的最小分片;一组 依赖关系,指向其父RDD;一个函数,基于父RDD进行计算;以及划分策略和数据位置的元数据。例如:一个表现HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置信息。同时,对RDD进行map操作后具有相同的划分。当计算其元素时,将map函数应用于父RDD的数据。

4.RDDs依赖关系

  1. 在spark中如何表示RDD之间的依赖关系分为两类: 
    窄依赖:每个父RDD的分区都至多被一个子RDD的分区使用,即为OneToOneDependecies; 
    宽依赖:多个子RDD的分区依赖一个父RDD的分区,即为OneToManyDependecies。 
    例如,map操作是一种窄依赖,而join操作是一种宽依赖(除非父RDD已经基于Hash策略被划分过了)
  2. 详细介绍: 
    首先,窄依赖允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元素地依次执行filter操作和map操作。相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。 
    其次,在窄依赖中,节点失败后的恢复更加高效。因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。 
    对于hdfs:HDFS文件作为输入RDD。对于这些RDD,partitions代表文件中每个文件块的分区(包含文件块在每个分区对象中的偏移量),preferredLocations表示文件块所在的节点,而iterator读取这些文件块。 
    对于map:在任何一个RDD上调用map操作将返回一个MappedRDD对象。这个对象与其父对象具有相同的分区以及首选地点(preferredLocations),但在其迭代方法(iterator)中,传递给map的函数会应用到父对象记录。 
    再一个经典的RDDs依赖图吧 
    这里写图片描述

5.作业调度

当用户对一个RDD执行action(如count 或save)操作时, 调度器会根据该RDD的lineage,来构建一个由若干阶段(stage) 组成的一个DAG(有向无环图)以执行程序,如下图所示。 
每个stage都包含尽可能多的连续的窄依赖型转换。各个阶段之间的分界则是宽依赖所需的shuffle操作,或者是DAG中一个经由该分区能更快到达父RDD的已计算分区。之后,调度器运行多个任务来计算各个阶段所缺失的分区,直到最终得出目标RDD。 
调度器向各机器的任务分配采用延时调度机制并根据数据存储位置(本地性)来确定。若一个任务需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给那个节点。否则,如果一个任务处理的某个分区,该分区含有的RDD提供较佳的位置(例如,一个HDFS文件),我们把该任务分配到这些位置。 
“对应宽依赖类的操作 {比如 shuffle依赖),会将中间记录物理化到保存父分区的节点上。这和MapReduce物化Map的输出类似,能简化数据的故障恢复过程。 
对于执行失败的任务,只要它对应stage的父类信息仍然可用,它便会在其他节点上重新执行。如果某些stage变为不可用(例如,因为shuffle在map阶段的某个输出丢失了),则重新提交相应的任务以并行计算丢失的分区。 
若某个任务执行缓慢 (即”落后者”straggler),系统则会在其他节点上执行该任务的拷贝,这与MapReduce做法类似,并取最先得到的结果作为最终的结果。 
这里写图片描述 
实线圆角方框标识的是RDD。阴影背景的矩形是分区,若已存于内存中则用黑色背景标识。RDD G 上一个action的执行将会以宽依赖为分区来构建各个stage,对各stage内部的窄依赖则前后连接构成流水线。在本例中,stage 1 的输出已经存在RAM中,所以直接执行 stage 2 ,然后stage 3。

6.内存管理

Spark提供了三种对持久化RDD的存储策略:未序列化Java对象存于内存中、序列化后的数据存于内存及磁盘存储。第一个选项的性能表现是最优秀的,因为可以直接访问在JAVA虚拟机内存里的RDD对象。在空间有限的情况下,第二种方式可以让用户采用比JAVA对象图更有效的内存组织方式,代价是降低了性能。第三种策略适用于RDD太大难以存储在内存的情形,但每次重新计算该RDD会带来额外的资源开销。

对于有限可用内存,Spark使用以RDD为对象的LRU回收算法来进行管理。当计算得到一个新的RDD分区,但却没有足够空间来存储它时,系统会从最近最少使用的RDD中回收其一个分区的空间。除非该RDD便是新分区对应的RDD,这种情况下,Spark会将旧的分区继续保留在内存,防止同一个RDD的分区被循环调入调出。因为大部分的操作会在一个RDD的所有分区上进行,那么很有可能已经存在内存中的分区将会被再次使用。

7.检查点支持(checkpoint) 
虽然lineage可用于错误后RDD的恢复,但对于很长的lineage的RDD来说,这样的恢复耗时较长。因此,将某些RDD进行检查点操作(Checkpoint)保存到稳定存储上,是有帮助的。 
通常情况下,对于包含宽依赖的长血统的RDD设置检查点操作是非常有用的,在这种情况下,集群中某个节点的故障会使得从各个父RDD得出某些数据丢失,这时就需要完全重算。相反,对于那些窄依赖于稳定存储上数据的RDD来说,对其进行检查点操作就不是有必要的。如果一个节点发生故障,RDD在该节点中丢失的分区数据可以通过并行的方式从其他节点中重新计算出来,计算成本只是复制整个RDD的很小一部分。 
Spark当前提供了为RDD设置检查点(用一个REPLICATE标志来持久化)操作的API,让用户自行决定需要为哪些数据设置检查点操作。 
最后,由于RDD的只读特性使得比常用的共享内存更容易做checkpoint,因为不需要关心一致性的问题,RDD的写出可在后台进行,而不需要程序暂停或进行分布式快照。


概念与特性

RDD是spark最重要的抽象。spark统一建立在抽象的RDD之上。设计一个通用的编程抽象,使得spark可以应对各种场合的大数据情景。RDD模型将不同的组件融合到一起,选用其中的几个/所有,可以应付各种不同的场景。解决了mr的缺陷 
1. 弹性分布式数据集Resilient Distributed Dataset。 
2. 只读分区数据集,final修饰的 
3. 一个分布式的数据集合,是spark中的核心,spark的操作都是围绕RDD展开的。 
4. 真正的运算是在各个计算节点。 
5. 当某个RDD操作丢失的时候,可以很快恢复。

分区

  • 不同分区可能被划分到不同机器上。但是每个分区对应一个数据block
  • 分区是个逻辑概念,新旧分区可能是同一块内存。(重要的优化,节约资源。)。在函数式编程,经常使用常量,但是很费内存,rdd的这种优化非常实用。防止内存的无限性扩充。
  • 只是记录需要做的操作。只有当真正要执行的时候,才具体的执行。

计算

  • 并行计算。计算/处理都是在各分区上,并行计算。并行,提高了效率。
  • 真正的数据处理都是在各个分散的节点上。

依赖

  • 子RDD从父RDD产生,父子RDD之间的关系。 
    • 宽依赖:依赖上级所有的RDD分区。宽依赖一般非常消耗资源,结果一般要缓存下来
    • 窄依赖:依赖上级RDD的部分分区。计算的时候可能都在同一个节点上,节省资源
  • stage以依赖的区别,分成不同的stage
  • 每个父RDD的分区,只能被最多一个字RDD使用,子RDD可以使用任意个父RDD。

RDD的操作

创建

  • 从外部数据集中读取。来源于文件系统(HDFS,HBASE),textFile方法 
    这里的路径要让每个RDD的节点都能访问到
// 从外部文本中创建RDD。可以指定分片的个数
lines =  sc.textFle("路径",3)
// 一个目录下所有的文件。
lines = sc.wholeTextFile("路径")
// 从hadoop系统创建RDD
val rdd = newApiHadoopFile()
// 从hadoop数据创建RDD
val RDD = new ApiHadoopRDD()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 从驱动程序中对一个集合进行并行化 
    在测试的时候用的多,parallelize方法。可以指定分区个数
val lines = sc.parallelize(list["name","age"])
  • 1

转化(Transform)

  • 不进行具体操作,类似scala中的惰性求值
  • 从一个RDD生成另一个RDD的过程。spark用lineage的方式表示各个RDD的依赖关系,链表的表头是textFile
  • 参考fp中的概念,这里只做逻辑运算,接受一个RDD,结果产生一个RDD,没有任何副作用
  • RDD常见的转化操作

map                RDD.map(fun)              将函数应用于每个元素,结果返回一个RDD
flatmap            RDD.flatmap(fun)          同map,返回一个包含所有处理结果的整体。生成的分片数不变,只是在逻辑上成一个整体
filter             RDD.filter(fun)           过滤掉不符合要求的数据 
distinct           RDD.distinct()            去重,需要shuffle代价大
union              RDD.union(RDD1)           两个RDD求并集
intersection       RDD. intersection(RDD1)   两个RDD求交集
substract          RDD.substract(RDD1)       从RDD中移除RDD1的内容
cartesian          RDD.cartesian(RDD1)       生成RDD与RDD1的笛卡尔积
pipe               RDD.pipe("shell命令")      利用linux中的shell语言,对数据操作。
zip                RDD.zip(RDD1)             将RDD和RDD1组成一个kv格式的新RDD
sample             RDD.sample()              随机采样,返回RDD
-----------------------------------------------------------------------------
groupBy            RDD.groubBy(key)          根据key的取值,分组
reduceByKey        RDD.reduceByKey()         根据key的取值聚合
sortByKey          RDD.sortByKey()           根据key的值排序
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

行动(Action)

  • 真正的开始处理和操作,强制执行所有的RDD
  • RDD常见的行动操作
collect()           RDD.collect()             返回RDD中的所有元素。只能计算小规模的RDD,否则要shuffle代价大
count()             RDD.count()               统计RDD中元素的个数
countByVale()       RDD.countByValue()        每个元素在RDD中出现的次数。
take()              RDD.take(n)               返回RDD中的n个元素
top()               RDD.top(N)                返回RDD中的前N个元素
takeOrdered()       RDD.takeOrdered(n)        按照要求的顺序返回前n个元素
takeSample()        RDD.takeSample(n)         从RDD中任意返回n个元素
reduce()            RDD.reduce(fun)           并行整合RDD中所有的元素
fold()              RDD.fold(num)(fun)        提供初始值,的reduce
aggregate()         RDD.aggregate()            ?????????
foreach()           RDD.foreach(fun)          对RDD中的 每个元素使用给定的函数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

RDD常见操作

过滤

val textRDD = sc.textFile(path)
val resultRDD1 = textRDD.filter(_.split(" ")(0).equals("1"))
val resultRDD2 = resultRDD1.map(s => { | val columns = s.split(",") | val pm = columns(1)  | val host = columns(2).replaceAll("a","b") pm+""+host })
  • 1
  • 2
  • 3
  • 4

去重

val RDD = sc.textFile("path")
val R1 = RDD.map(s => { | val one = s.split(",") | val two = columns(0) | val three = columms(1) | (p2,p1) | })
val R2 = R1.distinct()
  • 1
  • 2
  • 3
  • 4

共享变量

广播变量

副本拷贝的方式获得

累加器

只能进行add操作

持久化

  • 提高了数据的可重用性
  • 把RDD中的结果持久化到内存中。当后续的操作需要用到某些RDD运算结果的时候,持久化到内存可以提高效率。主要有cahce方法和persist方法。
  • cache方法只有一个默认的缓存级别(MEMORY_ONLY),persist可以有许多级别。cache实际上是调用了persist方法
  • 当要缓存的内容太多,用LRU算法淘汰。
  • 保存
RDD.saveAsTextFile("路径")
  • 1
  • 持久化级别:
级别 描述
MEMORY_ONLY 只放在内存,超过的部分丢弃,不存储(默认的级别)
MEMORY_AND_DISK 放在内存,超过的放在磁盘上
MEMORY_ONLY_SER RDD作为序列化的对象存储
MEMORY_AND_DISK_SER 超过的部分放在disk上
DISK_ONLY 只放在disk上

工作流程

  • RDD把操作记录程DAG图,记录各个DAG中的转换关系
  • 无论进行了多少次转换,只有真正遇到action的时候才真正

一、 定义

1、 RDD定义

RDD是弹性分布式数据集(Resilient Distributed Dataset)的简称,其实就是分布式元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有的RDD、调用RDD操作进行求值。

2、 操作类型

RDD有两种类型的操作:Transformation操作、Action操作,Transformation操作和Action操作区别在于Spark计算RDD的方式不同。

  • Transformation操作会由一个RDD生成另一个新的RDD,生成的新的RDD是惰性求值的,只有在Action操作时才会被计算。
  • Action操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者是把结果存储到外部存储系统中。

二、 Spark RDD五大特性

RDD是弹性分布式数据集,是Spark中最关键、最基本的一个抽象,他代表的是不可变的、分区的集合,这个集合可以被并行处理。

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for 
    an HDFS file)

1、 RDD是分区的

一个RDD对应的数据集合是分区的,各个分区分布在各个机器当中,每个RDD是不可变的,每个分区对应一个Task。 
查看rdd的分区数量:

scala> val rdd=sc.textFile("/input.txt")
scala> rdd.getNumPartitions
16/10/18 05:16:24 INFO mapred.FileInputFormat: Total input paths to process : 1
res2: Int = 2
  • 1
  • 2
  • 3
  • 4
  • 5

2、 每个分区都可以运用函数

每个分区上都应用于一个函数,各个分区运行的计算逻辑是一样的,只是数据不同。

3、 每个RDD都有一系列其他依赖

每个RDD依赖于一系列的RDD,因为有DAG,所以能找到依赖的RDD,目的是可以进行容错,当某一个RDD操作失败了,可以找到他的依赖进行重新计算。 
查看某一个rdd的血缘关系:


scala> val rdd=sc.textFile("/input.txt")
scala> var wordcountRdd=rdd.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a,b)=>(a+b))
scala> wordcountRdd.toDebugString
res3: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:29 []
 +-(2) MapPartitionsRDD[3] at map at <console>:29 []
    |  MapPartitionsRDD[2] at flatMap at <console>:29 []
    |  /input.txt MapPartitionsRDD[1] at textFile at <console>:27 []
    |  /input.txt HadoopRDD[0] at textFile at <console>:27 []
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4、 键值对RDD类型可以指定分区方式

对于键值对类型的RDD,可以制定一个分区方式。

5、 数据本地化

Task所运行的机器与处理的数据在同一个机器上,就是数据本地化。

三、 RDD创建方式

1、 并行化集合

scala> val list=List(1,2,3,4,5,6,7)
scala> val rddNum=sc.parallelize(list)
scala> rddNum.count
res0: Long = 7
  • 1
  • 2
  • 3
  • 4
  • 5

2、 从外部数据集读取

可以从HDFS读取文件加载数据或者HBbase加载数据。

scala> val rdd=sc.textFile("/input.txt")
res1: Long = 5
  • 1
  • 2
  • 3

四、 RDD三大Operation

1、 transformation(转换)

从一个RDD变为另外一个RDD的操作叫做transformation操作,transformation的操作都是懒操作,即不会立即执行,只有当进行action操作时才会真正的去执行。 
如:map()、filter()、distinct()

2、 action(执行)

action操作能返回Driver程序一个值或者是导出数据到外部系统。 
比如:count()、reduce()、collect()、take()

3、 Persistence(持久化)

缓存数据到内存、磁盘、外部的存储器。 
persist():缓存数据,可以指定缓存级别。 
cache():就是调用的persist(),缓存级别为内存。

(1) 缓存级别:

  • NONE:都不存储
  • DISK_ONLY:只存储在磁盘
  • DISK_ONLY_2:只存储在磁盘,存储两个副本
  • MEMORY_ONLY:只存放内存中
  • MEMORY_ONLY_2:只存放在内存中,存储两个副本。
  • MEMORY_ONLY_SER:只存储在内存中并序列化到磁盘
  • MEMORY_ONLY_SER_2:只存储在内存中并序列化到磁盘,存储两个副本
  • MEMORY_AND_DISK:优先放入内存,内存不够就放在硬盘
  • MEMORY_AND_DISK_2:优先放入内存,内存不够就放在硬盘,存储两个副本
  • MEMORY_AND_DISK_SER:优先放入内存,内存不够就放在硬盘,并序列化
  • MEMORY_AND_DISK_SER_2:优先放入内存,内存不够就放在硬盘,并序列化,存储两个副本
  • OFF_HEAP:外部存储。

(2) 何时进行缓存

有两种情况下要进行缓存: 
- RDD之后会被使用很多次 
- 某个RDD的数据是经过非常复杂的清洗过滤得到的

五、 Spark Application调度

1、 Spark Application组成结构图

这里写图片描述

(1) 一个Application有多个Job,一个Action操作会产生一个Job。 
(2) 每个Job有多个stage 
(3) 每个stage有多个task,每个task是一个线程 
(4) 每个task业务逻辑相同,数据不同

2、 Spark Application 运行架构

这里写图片描述

(1) Spark应用的运行架构主要分三部分:Driver、Executor、Cluster Manager。 
(2) Driver Program负责提交Job,Spark在Driver Program的main函数中,创建一个SparkContext的实例,使用SparkContext来提交Job。 
(3) Cluster Manager是个外部的服务,主要起管理资源的作用,它可能是standalone manager、Mesos,YARN。 
(4) Application:只指创建在Spark上的用户程序,包括集群上的Driver Program和Executors两部分。 
(5) Application Jar:一个jar包包含了用户的Spark Application,一个jar包可以有很多 Application,Jar包里不能包含Hadoop和Spark的包,Hadoop包和Spark包会在运行时被添加 
(6) Worker节点,是集群中用来运行代码的节点,是实际来干活的机器。 
(7) Executor负责来执行任务,运行在Worker节点上,是一个进程,在Executor上运行很多Task,每一个Task是一个线程。一个Executor只属于一个Application。 
(8) Task是一个工作的单位是一个线程,运行在Executor上。 
(9) Job是一个并行的计算,包含多个task,通常RDD一个Action执行时就触发一个Job。 
(10) Stage:每一个Job会被划分为几个Stage,每个Stage之间是相互依赖的。


2019-11-27 15:53:52 czc999999 阅读数 25
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12077 人正在学习 去看看 CSDN讲师

市面上已经有很优秀的嵌入式KV数据库了,如Berkeley DB。为什么还需要把Sqlite当KV数据库用呢?原因若干。
1,可能是为了好玩或者纯属无聊
2,可结合关系型数据库与KV数据库的优点
3,可利用一些sqlite特性做其他KV数据库不好做的事情
4,事务管理更方便
5,sqlite更可靠,更流行
实现思路
使用json(或pickle)dump数据,并将数据写入有KEY(主键)和VALUE两个字段的SQLITE库表中。参照kv数据库调用办法实现外部接口。
主要功能
1,put:写入key/value数据
2,get:获取某个key的value
3,put_many:批量写入key/value数据
4,keys:获取所有key的列表
5,value:获取所有value的列表
6,limit:利用SQL语句中limit关键字,获取数据库中“前”N条KV数据
7,random:利用SQL语句中random关键字,从数据库中随即获取N条KV数据
8,has_key:某个key是否存在
9,cursor_execute:执行sql自定义语句
10,其他:items,pop,filter,count等

代码(KVSqlite.py):

import os, json, sqlite3
from threading import Lock

PY3 = os.sys.version_info >= (3,)
if PY3:
    ifilter = filter
else:
    from itertools import ifilter

DUMPS = lambda d: json.dumps(d)
LOADS = lambda d: json.loads(d)

class SDB(object):
    _DEFAULT_TABLE = '__KVS_DEFAULT_TABLE__'
    _MEMORY_DB = ":memory:"

    def __init__(self, filename):
        if filename is None \
                or len(filename) < 1 \
                or filename.lower() == self._MEMORY_DB:
            self.filename = self._MEMORY_DB
        else:
            self.filename = filename
        self._lock = Lock()
        self._db_init()

    def _row_factory(self, cursor, row):
        result = []
        for idx, col in enumerate(cursor.description):
            if col[0].lower() in ('k', 'v'):
                result.append(LOADS(row[idx]))
            else:
                result.append(row[idx])
        return result

    def _db_init(self):
        _new_table = "CREATE TABLE IF NOT EXISTS {0} ( k PRIMARY KEY,v)".format(self._DEFAULT_TABLE)
        db = sqlite3.connect(self.filename, timeout=60, check_same_thread=False)
        db.row_factory = self._row_factory
        db.execute(_new_table)
        self._cursor = db.cursor()
        self._db = db

    def _statement_init(self):
        table = self._DEFAULT_TABLE
        return dict(insert="insert or replace into {0}(k,v) values(:1,:2)".format(table),
                    delete="delete from {0} where k=:1".format(table),
                    update="update {0} set v=:1 where k=:2".format(table),
                    clear="delete from {0}".format(table),
                    get="select v from {0} where k=:1".format(table),
                    has_key="select count(1) from {0} where k=:1".format(table),
                    keys="select k from {0}".format(table),
                    values="select v from {0}".format(table),
                    items="select k,v from {0}".format(table),
                    count="select count(*) from {0}".format(table),
                    random="select * from {0} order BY RANDOM() limit :1".format(table),
                    limit="select * from {0} limit :1 offset :2".format(table)
                    )

    _statements = property(_statement_init)
    del _statement_init

    def _insert(self, key, value):
        try:
            self._lock.acquire(True)
            self._cursor.execute(self._statements.get('insert'), (DUMPS(key), DUMPS(value)))
        finally:
            self._lock.release()

    def _update(self, key, value):
        try:
            self._lock.acquire(True)
            self._cursor.execute(self._statements.get('update'), (DUMPS(value), DUMPS(key)))
        finally:
            self._lock.release()

    def _delete(self, key):
        try:
            self._lock.acquire(True)
            self._cursor.execute(self._statements.get('delete'), (DUMPS(key),))
        finally:
            self._lock.release()

    def _clear(self):
        '''
        删除所有数据
        :return:
        '''
        try:
            self._lock.acquire(True)
            self._cursor.execute(self._statements.get('clear'))
        except Exception as e:
            self._db.rollback()
            raise e
        finally:
            self._lock.release()

    def keys(self, sort=False, sort_key=None, reverse=False):
        if sort:
            return sorted(self.iterkeys(), key=sort_key, reverse=reverse)
        return list(self.iterkeys())

    def values(self, sort=False, sort_key=None, reverse=False):
        if sort:
            return sorted(self.itervalues(), key=sort_key, reverse=reverse)
        return list(self.itervalues())

    def iterkeys(self):
        try:
            self._lock.acquire(True)
            for k in self._cursor.execute(self._statements.get('keys')):
                yield k[0]
        finally:
            self._lock.release()

    def itervalues(self):
        try:
            self._lock.acquire(True)
            for v in self._cursor.execute(self._statements.get('values')):
                yield v[0]
        finally:
            self._lock.release()

    def items(self, sort=False, key=None, reverse=False):
        if sort:
            return sorted(self.iteritems(), key=key, reverse=reverse)
        return list(self.iteritems())

    def iteritems(self):
        try:
            self._lock.acquire(True)
            for k, v in self._cursor.execute(self._statements.get('items')):
                yield k, v
        finally:
            self._lock.release()

    def count(self):
        try:
            self._lock.acquire(True)
            return self._cursor.execute(self._statements.get('count')).fetchone()[0]
        finally:
            self._lock.release()

    def has_key(self, key):
        try:
            self._lock.acquire(True)
            r = self._cursor.execute(self._statements.get('has_key'), (DUMPS(key),)).fetchone()[0]
            return r > 0
        finally:
            self._lock.release()

    def get(self, key):
        try:
            self._lock.acquire(True)
            _key = DUMPS(key)
            data = self._cursor.execute(self._statements.get('get'), (_key,)).fetchone()
            if data:
                return data[0]
        finally:
            self._lock.release()

    def put(self, key, value):
        try:
            self._insert(key, value)
            self._db.commit()
        except Exception as e:
            self._db.rollback()
            raise e

    def pop(self, key):
        try:
            value = self.get(key)
            self._delete(key)
            self._db.commit()
            return value
        except Exception as e:
            self._db.rollback()
            raise e

    def put_many(self, rows):
        try:
            self._lock.acquire(True)
            if rows and len(rows) > 0:
                self._cursor.executemany(self._statements.get('insert'),
                                         [(DUMPS(k), DUMPS(v)) for k, v in rows])
                self._db.commit()
        except Exception as e:
            self._db.rollback()
            raise e
        finally:
            if self._lock.locked():
                self._lock.release()

    def limit(self, limit=1, offset=0):
        try:
            self._lock.acquire(True)
            rows = self._cursor.execute(self._statements.get('limit'), (limit, offset))
            if limit == 1:
                return rows.fetchone()
            return rows.fetchall()
        finally:
            self._lock.release()

    def random(self, limit=1):
        try:
            self._lock.acquire(True)
            rows = self._cursor.execute(self._statements.get('random'), (limit,))
            if limit == 1:
                return rows.fetchone()
            return rows.fetchall()
        finally:
            self._lock.release()

    def filter(self, func):
        return list(ifilter(func, self.items()))

    def ifilter(self, func):
        return ifilter(func, self.iteritems())

    def cursor_execute(self, sql, parameters=None):
        '''
        执行SQL语句,如:SELECT K,V FROM __KVS_DEFAULT_TABLE__ WHERE K LIKE 'ABC%'
        '''
        try:
            self._lock.acquire(True)
            return self._cursor.execute(sql=sql, parameters=parameters)
        finally:
            self._lock.release()

    def close(self):
        try:
            self._db.rollback()
            self._cursor.close()
            self._db.close()
        except:
            pass

调用示例:

from KVSqlite import SDB
#打开数据库
db = SDB('test.sqlite')
#写入单条数据
db.put('first','第一条数据')
db.put('second',dict(a=1,b=2,c=[2,3,4]))
#获取数据
db.get('first')
#写入多条数据
db.put_many([[1,2],[3,4],['A','abc']])
#获取key的列表
db.keys()
2018-12-31 23:45:24 qq_43688472 阅读数 94
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12077 人正在学习 去看看 CSDN讲师

一:获取文件

官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

spark本身 有测试文件

[hadoop@hadoop001 resources]$ ls
employees.json  kv1.txt     people.json  user.avsc   users.orc
full_user.avsc  people.csv  people.txt   users.avro  users.parquet

[hadoop@hadoop001 resources]$ cat people.txt
Michael, 29
Andy, 30
Justin, 19
[hadoop@hadoop001 resources]$ pwd
/home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources

在spark上操作:这种事标准写法

scala> spark.read.format("text").load("file:///home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt").show(false)
+-----------+
|value      |
+-----------+
|Michael, 29|
|Andy, 30   |
|Justin, 19 |
+-----------+

二:idea操作

package g5.learning

import org.apache.spark.sql.{SaveMode, SparkSession}

object DataSpurceAPIApp {
  def main(args: Array[String]): Unit = {
    val  sparksession= SparkSession.builder().appName("DataSpurceAPIApp")
      .master("local[2]")
      .getOrCreate()
    //标准读写法(loat也可以传2个路径,逗号隔开就可以了)
    sparksession.read.format("text/json/parquet/jdbc/orc").load("file:///home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt").show(false)
   //简化写法(format这里换成相应的文件)SparkSql默认处理的format就是parquent
    val df =sparksession.read.text("")
//这种写法也是可以的,path是固定写法
    sparksession.read.format("text/json/parquet/jdbc/orc/csv").option("path","file:///home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt").load.show(false)
    //在CSV的时候要注意一些,其中是分隔符和包含头和推导相应数据类型
    sparksession.read.format("csv").option("sep", ";").option("header", "true") .option("inferSchema", "true").load("file:///home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt").show(false)
    //标准写写法(筛选之前文本的name写成json格式,如果存在删除掉之前的,写在这个路径下
    df.select("name").write.format("json").mode("overwrite").save("file:///home/hadoop/tmp/jsonfile")
//写到hive的库里面
    df.select("name").write.saveAsTable("ruoze_saprk")



    sparksession.stop()

  }

}

三:总结

Developer:build libraries for various data sources

Users:easy loading/saving DataFrames

       读:spark.read.format("")

            内置数据源:json,parquet,jdbc,csv(2.x以后有)

            外部数据源:https://spark-packages.org/

            SparkSql默认处理的format就是parquent

         写:people.write.format().save("");

操作Hive表数据:

spark.table(tablename)

df.write.saveAsTable(tableName)
2019-12-09 16:40:50 weixin_43548518 阅读数 8
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12077 人正在学习 去看看 CSDN讲师

kv 类型RDD 在java 显示:JavaPairRDD<String, Integer>
而不是JavaRDD<Tuple2<String, Integer>>
这个很重要不要搞错

kv数据库

阅读数 267

玩具kv数据库

阅读数 929

没有更多推荐了,返回首页