2017-01-26 20:06:35 zkq_1986 阅读数 1536
  • Android底层技术:Java层系统服务(Android Service)

    为什么我们需要学习撰写系统服务呢? 底层系统服务是Android 框架裡接近Linux/Driver 的部分。为了充分发挥硬件设备的差異化特性,系统服务是让上层Java 应用程序來使用Driver/HW Device 特色的重要管道。于此,先仔细解析Java层系统服务的架构,然后阐述这系统服务的代码开发要点,以及其API设计方法。

    32697 人正在学习 去看看 高煥堂

Spark工作机制

Client

Driver程序

Spark Context

RDD DAG

DAGSchedular

TaskSchedular

 

SparkEnv

 

Worker Node

Executor

Task

Task

Cache

Worker Node

 

Executor

Task

Task

Cache

Cluster Manager

图 Spark架构图

4.1应用程序执行流程

应用程序的执行流程为:

1)写好的应用程序,打包成jar文件。然后通过客户端上传到集群。根据Driver的配置模式,要么运行在客户端,要么由master指定worker启动driver进程,并对整个应用程序进行监控和管理。接着,配置一些上下文环境。然后顺序执行代码。

2)RDD的算子包括两大类:一是转换算子,二是行动算子。只有Action算子才会触发Job的提交,也就是说,Spark采用的是惰性机制,在碰到行动算子的时候,才提交作业。接着生成RDD有向无环图DAG,由DAG调度器DAGScheduler转化为阶段Stage DAG,每个阶段Stage中产生相应的任务Task集合,任务调度器TaskScheduler将任务分发到worker上的Executor执行。每个任务对应一个数据块,使用用户定义的函数处理数据块。如图:

 

4.1.1 应用提交与执行方式

Driver配置(deploy-mode)模式包含以下两种方式。

·Driver进程运行在客户端,对应用进行管理监控。(为默认项)

·主节点指定某个Worker节点启动Driver,对应用进行监控管理。

    

 

图4-4 Spark Driver位于Client   图4-5 Spark Driver位于Worker节点的应用提交与执行

4.2 Spark任务调度模式

Spark有多种运行模式,如单机(Local)模式、Standalone模式、YARN模式、Mesos模式。

4.2.1 Spark应用程序之间的调度

一个Executor在一个时间段内只能给一个应用使用。

4.2.2 作业调度

不同线程提交的作业Job可以并行运行。一个作业分为多个Stage。整个RDD DAG为一个Job。action算子中的本质是调用Spark上下文(SparkContext)中的runJob提交了Job。

作业的调度主要有FIFO和FAIR两种模式。

FIFO模式

FIFO(先进先出)。

fair模式

在fair共享模式调度下,多个作业以轮询(round robin)方式为分配资源。考虑到长任务和短任务问题,这样长任务在前,短任务在后,短任务也可以获得不错的响应时间。

4.2.3 阶段(Stage)调度

Action算子触发作业的提交,并形成RDD DAG。DAG Scheduler(调度器)负责将RDD DAG转化为Stage(阶段)DAG。Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage并执行,如果提交的Stage仍有未完成的父母Stage,则Stage需要等待其父Stage执行完才能执行。

waitingStages中记录仍有未执行的父母Stage,防止过早执行。runningStages中保存正在执行的Stage,防止重复执行。failedStages中保存执行失败的Stage,需要重新执行,这里的设计是出于容错的考虑。

4.2.4 任务(Task)调度

一个应用只有一个任务调度器(TaskScheduler)。所有TaskSetManager都是由这个TaskScheduler调度。一个Stage对也只有一个TaskSetManager。TaskSetManager通过一定次序放入调度池pool中。在调度池中,这些TaskSetMananger又会根据Job ID排序,先提交的Job的TaskSetManager优先调度,然后一个Job内的TaskSetManager ID小的先调度。

在执行任务时,任务分配规则:

按照“尽量将任务分配到数据块所存储的位置”原则分配任务。数据块的存储位置请见4.3.3节。

执行地点的选取:

1)  如果是调用过cache()方法的RDD,则读取内存缓存中分区的数据。

2)  如果在磁盘中,通常最开始的RDD会有相应信息,例如,从HDFS上读取的数据,HDFS分区就是最好的执行地点。

3)  如果不是上面两种情况,将遍历RDD DAG获取第一个窄依赖的父亲RDD对应分区的执行地点。

4.3 Spark I/O机制

4.3.1 序列化

序列化是将对象转换为字节流,本质上可以理解为将链表存储的非连续空间的数据存储转化为连续空间存储的数组中。这样就可以将数据进行流式传输或者块存储。

4.3.2 压缩

当大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,数据适合进行压缩。数组或者对象序列化后的数据块可以考虑压缩。所以序列化后的数据可以压缩,使数据紧缩,减少空间开销。

Snappy提供了更高的压缩速度,LZF提供了更高的压缩比,用户可以根据具体需求选择压缩方式。

4.3.3 Spark存储系统

可以从以下几个维度理解整个存储系统:类接口、数据读写流程和数据通信。

(1)     类接口。

所有外部类都通过块管理器接口(BlockManager)对存储模块(storage)进行操作。

(2)     数据读写流程

数据存储分为3个层次:内存、本地磁盘和远程磁盘。在diskManager中,存储块ID(blockId)和文件路径映射。

·数据读取流程

在RDD类中,通过compute方法调用迭代器(iterator)读取某个分区(Partition)的数据。分区是逻辑概念。一个分区对应物理上的一个块(block)。一个Executor负责若干个分区。查看数据存储位置的优先级是:

1)  内存;

2)  Tachyon;

3)  本地磁盘;

4)  远程磁盘

在获取远程数据时,先得到远程数据路径,然后通过块管理器工作机创建通信管理器,并从远程读取数据。

·数据写入流程

数据写入流程主要分为以下几个步骤。

1)在RDD类中,通过调用compute方法计算要写到哪个分区

2)然后通过缓存管理器(CacheManager调用块管理器(BlockManager),判断数据是否已经写入,如果未写则写入。

3)块管理器(BlockManager)根据指定的存储层次向相应块写入数据。并向主节点汇报存储状态。

·MemoryStore:提供Block在内存中的Block读写功能。

·DiskStore:提供Block在磁盘上以文件形式读写的功能。

·BlockManagerWorker:对远端数据的异步传输进行管理。

·ConnectionManager:提供本地机器和远端节点进行网络传输Block的功能。

(3)     数据通信

主节点和从节点之间通过Actor传送信息。

·BlockManagerMasterActor:在主节点创建,所有从节点都用于这个Actor引用,并通过这个Actor的引用向主节点传递信息。

·BlockManagerSlaveActor:在从节点创建,主节点拥有所有从节点的这个Actor引用,通过这个Actor引用向从节点传递控制信息(命令)。

块管理器(BlockManager)在内部封装块管理器Master(BlockManagerMaster),并通过BlockManagerMaster对Actor通信进行管理。各从节点的块管理器(BlockManager)对象在Spark上下文环境中(SparkEnv)中创建。在SparkEnv中也会创建其他管理组件,例如connectionManager、broadcastManager、cacheManager等。

4.3.3.2 Spark的数据存储

图3-2 RDD数据管理模型

在物理上,RDD对象实质上是一个元数据结构,存储着块、节点(Block、Node)等的映射关系,以及其他的元数据信息。数据块Block对应一个分区,若干个分区组成一个RDD。

分区是逻辑概念,变换前后的分区在物理上可能处在同一块内存。这是很重要的优化,以防止函数式数据不变性(immutable导致的内存需求无限扩张。如果要重复使用数据(机器学习中多次迭代),可以调用cache()方法缓存数据。图3-2为RDD的数据存储模型。

4.4 Spark通信模块

下面介绍分布式通信的几种方式。

(1)       RPC(Remote Produce Call)

RPC是远程过程调用协议,基于C/S模型调用。过程大致可以理解为本地分布式对象向本机发请求,不用自己编写底层通信本机。通过网络向服务器发送请求,服务器对象接收参数后,进行处理,再把处理后的结果发送回客户端。

(2)       RMI(Remote MethodInvocation)

RMI(远程方法调用)和RPC一样都是调用远程的方法,可以把RMI看做是用Java语言实现了RPC协议。RPC不支持对象通信,支持对象传输,这也是RMI相比于RPC的优越之处。

(3)       JMS(Java Message Service)

JMS, java消息服务是Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。其支持P2P和pub/stub两种消息模型,即点对点和发布订阅两种模型。其优点在于:支持异步通信、消息生产者和消费者耦合度低。应用程序通过读写队列消息(针对应用程序的数据)来通信,而无须专用连接来连接它们。

4.4.1 通信框架AKKA Spark

Spark模块间通信使用AKKA框架。AKKA是用Scala开发的一个库,用于编写Actor模型应用。Actor是一些包含状态和行为的对象。每一个actor拥有自己的属性和方法,从而使得Actor模型容易并发执行。

Actor通过消息邮件队列通信。发送端通过“!”符号发送消息,接收端通过receive方法中的case模式匹配接收消息,并进行相应处理。这些通信是异步的。

通常一个Actor系统是一个重量级结构。它会分配多个线程。所以对于每一个应用,一般只要一个Actor系统

AKKAActor树形结构

一个Actor会创建多个可子Actor,并负责监督这些子Actor,让这些子Actor完成小的任务。同时,子Actor又可下分为多个子Actor。

4.4.2 Client、Master和Worker间的通信

在Standalone模式下,存在三个角色: client、master、worker。

·Client:提交作业。

·Master:负责接收作业,并启动Driver,管理Worker和Executor。

·Worker:期性地通过beatheart向Master发送状态信息。当master向它传来启动executor命令的时候,它就启动Executor进行计算。

4.5 容错机制

一般来说,分布式数据集的容错性有两种方式:数据检查点记录数据的更新。数据检查点操作成本很高,因此,Spark选择记录更新的方式。RDD只支持粗粒度转换,是对全局数据做同样的重做进而恢复数据。

4.6 Shuffle机制

Shuffle的本义是洗牌、混洗,即把一组有一定规则的数据打散重新组合转换成一组无规则随机数据分区。Spark中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据,Spark中的Shuffle和MapReduce中的Shuffle思想相同。

4.7 union, aggregate, join, concatenation区别

·union合并,例:

1,2

3,4  

union  

4,2

1,3  =》                               1,2

                                                          3,4

                                                          4,2

1,2

·aggregate 聚集

1,2

3,4  

union  

4,2

1,3  =》                               1,2

1,2

 

                                                          3,4

                                                          4,2

与union合并相比多了一个排序

·join 联接,类似数据库的联接操作(通过关键词联接)

数据1:

1,23,23

2,23,12

3,333,112

数据2:

1,  we,asd

2,  sd,asd

3,  llksd,asd

数据1 join 数据2:

1,23,23,we,asd

2,23,12,sd,asd

3,333,112,llksd,asd

·concatenation结合,连接(英语,共同迎合)

2014-10-15 21:35:51 kongdavid 阅读数 718
  • Android底层技术:Java层系统服务(Android Service)

    为什么我们需要学习撰写系统服务呢? 底层系统服务是Android 框架裡接近Linux/Driver 的部分。为了充分发挥硬件设备的差異化特性,系统服务是让上层Java 应用程序來使用Driver/HW Device 特色的重要管道。于此,先仔细解析Java层系统服务的架构,然后阐述这系统服务的代码开发要点,以及其API设计方法。

    32697 人正在学习 去看看 高煥堂
本篇文章很重要,也是spark为什么是Spark原因:
1.Spark的核心是什么?
2.RDD在内存不足时,是怎么处理的?
3.如何创建RDD,有几种方式
4.Spark编程支持几种语言

5.是否能够写出一个Driver程序


Spark核心概念Resilient Distributed Dataset (RDD)弹性分布数据集
  • RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
  • RDD的特点:

    • 它是在集群节点上的不可变的、已分区的集合对象。
    • 通过并行转换的方式来创建如(map, filter, join, etc)。
    • 失败自动重建。
    • 可以控制存储级别(内存、磁盘等)来进行重用。
    • 必须是可序列化的。
    • 是静态类型的。
  • RDD的好处

    • RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
    • RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
    • RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
    • RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
  • RDD的存储与分区

    • 用户可以选择不同的存储级别存储RDD以便重用。
    • 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
    • RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
  • RDD的内部表示
    在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

    • 分区列表(数据块列表)
    • 计算每个分片的函数(根据父RDD计算出此RDD)
    • 对父RDD的依赖列表
    • 对key-value RDD的Partitioner【可选】
    • 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
  • RDD的存储级别
    RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

    val NONE = new StorageLevel(false, false, false) 
    val DISK_ONLY = new StorageLevel(true, false, false) 
    val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) 
    val MEMORY_ONLY = new StorageLevel(false, true, true) 
    val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) 
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false) 
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) 
    val MEMORY_AND_DISK = new StorageLevel(true, true, true) 
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) 
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) 
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)  

  • RDD的好处

    • RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
    • RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
    • RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
    • RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
  • RDD的存储与分区

    • 用户可以选择不同的存储级别存储RDD以便重用。
    • 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
    • RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
  • RDD的内部表示
    在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

    • 分区列表(数据块列表)
    • 计算每个分片的函数(根据父RDD计算出此RDD)
    • 对父RDD的依赖列表
    • 对key-value RDD的Partitioner【可选】
    • 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
  • RDD的存储级别
    RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

    val NONE = new StorageLevel(false, false, false) 
    val DISK_ONLY = new StorageLevel(true, false, false) 
    val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) 
    val MEMORY_ONLY = new StorageLevel(false, true, true) 
    val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) 
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false) 
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) 
    val MEMORY_AND_DISK = new StorageLevel(true, true, true) 
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) 
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) 
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) 


  • RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

RDD的生成
  • RDD有两种创建方式:
    1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。
    2、从父RDD转换得到新RDD。
  • 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
    // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 
    // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 
    def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { 
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
        classOf[Text], minSplits) .map(pair => pair._2.toString) }

    // 根据Hadoop配置,及InputFormat等创建HadoopRDD  
    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)  


  • 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:
   // 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 
    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)

    val key: K = reader.createKey()
    val value: V = reader.createValue()

    //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。
    override def getNext() = {
    try {
      finished = !reader.next(key, value)
    } catch {
      case eof: EOFException =>
        finished = true
    }
      (key, value)
    }


RDD的转换与操作
  • 对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。
  • 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
  • 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
  • 下面使用一个例子来示例说明Transformations与Actions在Spark的使用。
  val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), 
        Seq(System.getenv("SPARK_TEST_JAR")))

    val rdd_A = sc.textFile(hdfs://.....)
    val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))

    val rdd_C = sc.textFile(hdfs://.....)
    val rdd_D = rdd_C.map(line => (line.substring(10), 1))
    val rdd_E = rdd_D.reduceByKey((a, b) => a + b)

    val rdd_F = rdd_B.jion(rdd_E)

    rdd_F.saveAsSequenceFile(hdfs://....)


 







Lineage(血统)
  • 利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
  • RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。
容错
  • 在RDD计算,通过checkpint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。
资源管理与作业调度
  • Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易,Spark on Yarn的大致框架图。



 


  • 让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。


编程接口
  • Spark通过与编程语言集成的方式暴露RDD的操作,类似于DryadLINQ和FlumeJava,每个数据集都表示为RDD对象,对数据集的操作就表示成对RDD对象的操作。Spark主要的编程语言是Scala,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能(JVM上的静态强类型语言)。
  • Spark和Hadoop MapReduce类似,由Master(类似于MapReduce的Jobtracker)和Workers(Spark的Slave工作节点)组成。用户编写的Spark程序被称为Driver程序,Dirver程序会连接master并定义了对各RDD的转换与操作,而对RDD的转换与操作通过Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操作发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对RDD的操作时,根据数据分片信息进行本地化数据操作,生成新的数据分片、返回结果或把RDD写入存储系统。  
Scala
  • Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:
    val sc = new SparkContext(master, appName, [sparkHome], [jars]) 
    val textFile = sc.textFile("hdfs://.....") 
    textFile.map(....).filter(.....).....


Java
  • Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:

    JavaSparkContext sc = new JavaSparkContext(...);  
    JavaRDD lines = ctx.textFile("hdfs://..."); 
    JavaRDD words = lines.flatMap( 
      new FlatMapFunction<String, String>() { 
         public Iterable call(String s) { 
            return Arrays.asList(s.split(" ")); 
         } 
       } 
    );


Python
  • 现在Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操作,从而实现使用python编写Spark程序。Spark也同样提供了pyspark,一个Spark的python shell,可以以交互式的方式使用Python编写Spark程序。 如:

    from pyspark import SparkContext 
    sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 
    words = sc.textFile("/usr/share/dict/words") 
    words.filter(lambda w: w.startswith("spar")).take(5)


使用示例Standalone模式
  • 为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是非常好的设计,但是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark因此提供了Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很相似,就连集群启动方式都几乎是一样。
  • 以Standalone模式运行Spark集群

    • 下载Scala2.9.3,并配置SCALA_HOME
    • 下载Spark代码(可以使用源码编译也可以下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
    • 解压spark-0.7.3-prebuilt-cdh4.tgz安装包
    • 修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。

SCALA_HOME=/home/spark/scala-2.9.3 
JAVA_HOME=/home/spark/jdk1.6.0_45 
SPARK_MASTER_IP=spark1             
SPARK_MASTER_PORT=30111 
SPARK_MASTER_WEBUI_PORT=30118 
SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g 
SPARK_WORKER_PORT=30333 
SPARK_WORKER_WEBUI_PORT=30119 
SPARK_WORKER_INSTANCES=1


  • 把Hadoop配置copy到conf目录下
  • 在master主机上对其它机器做ssh无密码登录
  • 把配置好的Spark程序使用scp copy到其它机器
  • 在master启动集群

$SPARK_HOME/start-all.sh


yarn模式
  • Spark-shell现在还不支持Yarn模式,使用Yarn模式运行,需要把Spark程序全部打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn。
  • 以Yarn模式运行Spark

    • 下载Spark代码.


git clone git://github.com/mesos/spark


  • 切换到branch-0.8


cd spark 
git checkout -b yarn --track origin/yarn 


  • 使用sbt编译Spark并


$SPARK_HOME/sbt/sbt 
> package 
> assembly


  • 把Hadoop yarn配置copy到conf目录下
  • 运行测试


SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ 
./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ 
--class spark.examples.SparkPi --args yarn-standalone


使用Spark-shell
  • Spark-shell使用很简单,当Spark以Standalon模式运行后,使用$SPARK_HOME/spark-shell进入shell即可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。
  • 在Spark-shell上写程序非常简单,就像在Scala Shell上写程序一样。


    scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") 
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

    scala> textFile.count() // Number of items in this RDD
    res0: Long = 21374

    scala> textFile.first() // First item in this RDD
    res1: String = # Spark


编写Driver程序
  • 在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。如WorkCount程序如下:


import spark.SparkContext
import SparkContext._

object WordCount {
  def main(args: Array[String]) {
    if (args.length ==0 ){
      println("usage is org.test.WordCount <master>")
    }
    println("the args: ")
    args.foreach(println)

    val hdfsPath = "hdfs://hadoop1:8020"

    // create the SparkContext, args(0)由yarn传入appMaster地址
    val sc = new SparkContext(args(0), "WrodCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

    val textFile = sc.textFile(hdfsPath + args(1))

    val result = textFile.flatMap(line => line.split("\\s+"))
        .map(word => (word, 1)).reduceByKey(_ + _)

    result.saveAsTextFile(hdfsPath + args(2))
  }
}





2017-04-12 22:05:59 dsl200970 阅读数 4396
  • Android底层技术:Java层系统服务(Android Service)

    为什么我们需要学习撰写系统服务呢? 底层系统服务是Android 框架裡接近Linux/Driver 的部分。为了充分发挥硬件设备的差異化特性,系统服务是让上层Java 应用程序來使用Driver/HW Device 特色的重要管道。于此,先仔细解析Java层系统服务的架构,然后阐述这系统服务的代码开发要点,以及其API设计方法。

    32697 人正在学习 去看看 高煥堂
1.驱动器节点(Driver)
Spark的驱动器是执行开发程序中的 main方法的进程。它负责开发人员编写的用来创建SparkContext、创建 RDD,以及进行 RDD 的转化操作和行动操作代码的执行。如果你是用spark shell,那么当你启动 Spark shell的时候,系统后台自启了一个 Spark 驱动器程序,就是在Spark shell 中预加载的一个叫作 sc 的 SparkContext 对象。如果驱动器程序终止,那么Spark 应用也就结束了。
Driver在spark作业执行时主要负责以下操作:
1)把用户程序转为任务
Driver程序负责把用户程序转为多个物理执行的单元,这些单元也被称为任务(task)。从上层来看,spark程序的流程是这样的:读取或者转化数据创建一系列 RDD,然后使用转化操作生成新的RDD,最后使用行动操作得到结果或者将数据存储到文件存储系统中。Spark 程序其实是隐式地创建出了一个由上述操作组成的逻辑上的有向无环图。当Driver序运行时,它会把这个逻辑图转为物理执行计划。
Spark 会对逻辑执行计划作一些优化,比如将连续的映射转为流水线化执行,将多个操作合并到一个步骤中等。这样 Spark 就把逻辑计划转为一系列步骤(stage)。而每个stage又由多个task组成。这些task会被打包并送到集群中。task是 Spark 中最小的执行单元,用户程序通常要启动成百上千的独立任务。
2)跟踪Executor的运行状况
有了物理执行计划之后,Driver程序必须在各个Executor进程间协调任务的调度。Executor进程启动后,会向Driver进程注册自己。因此,Driver进程就可以跟踪应用中所有的Executor节点的运行信息。
3)为执行器节点调度任务
Driver程序会根据当前的Executor节点集合,尝试把所有Task基于数据所在位置分配给合适的Executor进程。当Task执行时,Executor进程会把缓存数据存储起来,而Driver进程同样会跟踪这些缓存数据的位置,并且利用这些位置信息来调度以后的任务,以尽量减少数据的网络传输。
4)UI展示应用运行状况
Driver程序会将一些 Spark 应用的运行时的信息通过网页界面呈现出来,默认在端口4040 上。比如,在本地模式下,访问 http://localhost:4040 就可以看到这个网页了。
2.执行器节点(Executor)
Spark Executor节点是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
执行器进程有两大作用:
1、它们负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
2、它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
执行器程序通常都运行在专用的进程中。
2019-02-21 17:43:30 qq_36344346 阅读数 134
  • Android底层技术:Java层系统服务(Android Service)

    为什么我们需要学习撰写系统服务呢? 底层系统服务是Android 框架裡接近Linux/Driver 的部分。为了充分发挥硬件设备的差異化特性,系统服务是让上层Java 应用程序來使用Driver/HW Device 特色的重要管道。于此,先仔细解析Java层系统服务的架构,然后阐述这系统服务的代码开发要点,以及其API设计方法。

    32697 人正在学习 去看看 高煥堂

在集群之外搭建一个节点用于提交spark程序到spark集群
说明:用于提交程序的节点ip: 192.168.1.188 spark集群Master节点ip:192.168.1.73(spark集群和hadoop集群是在一起的)

1.保证该节点和集群的master节点是互通的,在该节点安装和集群同样版本的spark和hadoop程序
不需要启动,只用于提交作业时在driver端用于获取集群信息
2.配置文件 core-site.xml 修改
ip都改成spark集群Master节点ip(重要),不能写成 主机名,识别不到
(下面的某些配置可能不是必须的)

<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://192.168.1.73:9000</value>
        </property>
        <property>
         <name>io.file.buffer.size</name>
         <value>131072</value>
       </property>
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/usr/hadoop-2.7.7/tmp</value>
        </property>
</configuration>

3.配置文件 hdfs-site.xml 修改

<configuration>
    <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>192.168.1.73:50070</value>
    </property>
    <property>
      <name>dfs.replication</name>
      <value>2</value>
    </property>
    <property>
      <name>dfs.namenode.name.dir</name>
      <value>file:/usr/hadoop-2.7.7/hdfs/name</value>
    </property>
    <property>
      <name>dfs.datanode.data.dir</name>
      <value>file:/usr/hadoop-2.7.7/hdfs/data</value>
    </property>
</configuration>

4. yarn-site.xml

<property>
          <name>yarn.nodemanager.aux-services</name>
          <value>mapreduce_shuffle</value>
     </property>
     <property>
           <name>yarn.resourcemanager.address</name>
           <value>192.168.1.73:8032</value>
     </property>
     <property>
          <name>yarn.resourcemanager.scheduler.address</name>
          <value>192.168.1.73:8030</value>
      </property>
     <property>
         <name>yarn.resourcemanager.resource-tracker.address</name>
         <value>192.168.1.73:8031</value>
     </property>
     <property>
         <name>yarn.resourcemanager.admin.address</name>
         <value>192.168.1.73:8033</value>
     </property>
     <property>
         <name>yarn.resourcemanager.webapp.address</name>
         <value>192.168.1.73:8088</value>
     </property>
</configuration>

5.将 core-site.xml和 hdfs-site.xml复制到 spark的conf目录下,
6.配置hadoop环境变量

export JAVA_HOME=/usr/local/jdk1.8.0_191
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

export SPARK_HOME=/usr/spark-2.3.2-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

export HADOOP_HOME=/usr/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

7.无需启动hadoop和spark,此时在我们搭建好的这台节点上打入命令 hadoop fs -ls / 即是 hadoop集群上的hdfs目录
8.spark-submit提交程序 on yarn:

spark-submit --class com.cs.spark.MLXG.NewWordCount \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 1 \
    --driver-memory 512m \
    --executor-memory 512m \
    --total-executor-cores 1 \
    /usr/lib/MLXG-0.0.1-jar-with-dependencies.jar

9.有可能还需要提交程序的节点和spark master节点进行免秘钥登录处理

2018-11-21 21:08:37 zhuiqiuuuu 阅读数 93
  • Android底层技术:Java层系统服务(Android Service)

    为什么我们需要学习撰写系统服务呢? 底层系统服务是Android 框架裡接近Linux/Driver 的部分。为了充分发挥硬件设备的差異化特性,系统服务是让上层Java 应用程序來使用Driver/HW Device 特色的重要管道。于此,先仔细解析Java层系统服务的架构,然后阐述这系统服务的代码开发要点,以及其API设计方法。

    32697 人正在学习 去看看 高煥堂

 spark架构

spark任务的两种运行模式 

spark submit --deploy cluster/client

当为client模式时,本地提交时,driver程序在堡垒机上运行,所以堡垒机上能看到自己打印的一些日志;线上时,driver程序在客户端节点上执行。客户端节点的资源,决定了提交到集群的任务的并发数,一版为队列中状态的是在客户端节点上,执行中状态才是在集群上的;当选择cluster模式时,driver程序在集群上运行, 查看日志只能通过集群代理查看。

Spark任务两种运行模式 

 

Spark概述

阅读数 1024

Spark Driver的启动

阅读数 653

没有更多推荐了,返回首页