2017-11-11 10:27:54 zhanglh046 阅读数 966
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9131 人正在学习 去看看 CSDN讲师

一 启动流处理引擎

1.1初始化StreamingContext

首先需要初始化StreamingContext,在初始化的过程中会对DStreamGraph、JobScheduler等进行初始化,DStreamGraph类似于RDD的有向无环图,包含DStream之间相互依赖的有向无环图;JobScheduler定时查看DStreamGraph,然后根据流入的数据生成运行作业

 

1.2 创建InputDStream

根据你采用不同的数据源,可能生成的输入数据流不一样

 

1.3 启动JobScheduler

创建完成InputDStream之后,调用StreamingContext的start方法启动应用程序,并且需要启动JobScheduler,启动JobScheduler的时候会实例化ReceiverTracker和JobGenerator.

def start(): Unit = synchronized {
  // JobShceduler已经启动则退出
 
if (eventLoop != null) return

 
logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    overrideprotected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    overrideprotected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()

  for {
    // 获取InputDStream
   
inputDStream<- ssc.graph.getInputStreams
   
rateController<- inputDStream.rateController
 
} ssc.addStreamingListener(rateController)

  listenerBus.start()
  // 构建ReceiverTrackerInputInfoTracker
 
receiverTracker
= new ReceiverTracker(ssc)
  inputInfoTracker= new InputInfoTracker(ssc)

  val executorAllocClient: ExecutorAllocationClient= ssc.sparkContext.schedulerBackend match {
    case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
    case _ => null
 
}

  executorAllocationManager= ExecutorAllocationManager.createIfEnabled(
    executorAllocClient,
    receiverTracker,
    ssc.conf,
    ssc.graph.batchDuration.milliseconds,
    clock)
  executorAllocationManager.foreach(ssc.addStreamingListener)
  // 启动ReceiverTracker
 
receiverTracker
.start()
  // 启动JobGenerator
 
jobGenerator
.start()
  executorAllocationManager.foreach(_.start())
  logInfo("Started JobScheduler")
}

 

1.4 启动JobGenerator

启动JobGenerator需要判断是否第一次运行,如果不是第一次运行需要进行上次检查点的恢复,如果是第一次运行则调用startFirstTime方法,在该方法中初始化了定时器的开启时间,并启动了DStreamGraph和定时器timer

private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}

 

timer的getStartTime方法会计算出来下一个周期到期时间,计算公式: 当前时间 / 间隔时间

 

二 接收机存储流数据

2.1 启动ReceiverTracker

启动ReceiverTracker的时候,如果输入数据流不为空,则调用launchReceivers方法,然后他就会向ReceiverTrackerEndpoint发送StartAllReceivers方法,启动所有Receivers

private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map { nis =>
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  }
  runDummySparkJob()
  // 发送启动所有receiver的消息
  endpoint.send(StartAllReceivers(receivers))
}

 

case StartAllReceivers(receivers) =>
  // 根据receiver分发策略,获取与之对应的receiverexecutor调度信息
  val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
  // 遍历receivers,为根据receiver获取候选的executor,更新被调度receiver的位置信息,即executor信息
  // 开启receiver
  for (receiver <- receivers) {
    val executors = scheduledLocations(receiver.streamId)
    updateReceiverScheduledExecutors(receiver.streamId, executors)
    // 保存流数据接收器Receiver首选位置
    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
    // 启动每一个Receiver
    startReceiver(receiver, executors)
  }

最后创建ReceiverSupervisor,并启动,在启动的时候,由它启动Receiver

2.2Receiver启动并接收数据

Receiver启动会调用各个具体子类的onstart方法,这里面就会接收数据,以kafka为例,则会根据提供配置创建连接,获取消息流,构造一个线程池,为每一个topic分区分配一个线程处理数据

def onStart() {
  // 获取kafka连接参数
  val props = new Properties()
  kafkaParams.foreach(param => props.put(param._1, param._2))

  val zkConnect = kafkaParams("zookeeper.connect")
  // Create the connection to the cluster
  logInfo("Connecting to Zookeeper: " + zkConnect)
  // 构造消费者配置文件
  val consumerConfig = new ConsumerConfig(props)
  // 根据消费者配置文件创建消费者连接
  consumerConnector = Consumer.create(consumerConfig)
  logInfo("Connected to " + zkConnect)
  // 构造keyDecodervalueDecoder
  val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    .newInstance(consumerConfig.props)
    .asInstanceOf[Decoder[K]]
  val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    .newInstance(consumerConfig.props)
    .asInstanceOf[Decoder[V]]

  // Create threads for each topic/message Stream we are listening
  // 创建消息流
  val topicMessageStreams = consumerConnector.createMessageStreams(
    topics, keyDecoder, valueDecoder)
  // 构造线程池
  val executorPool =
    ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
  try {
    // 开始处理每一个分区的数据
    topicMessageStreams.values.foreach { streams =>
      streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
    }
  } finally {
    executorPool.shutdown() // Just causes threads to terminate after work is done
  }
}

 

2.3 启动BlockGenerator生成block

在ReceiverSupervisorImpl的onstart方法中调用BlockGenerator的start启动BlockGenerator

override protected def onStart() {
  registeredBlockGenerators.asScala.foreach { _.start() }
}

启动时候会先更新自身状态为Active,然后启动2个线程:

blockIntervalTimer:定义开始一个新batch,然后准备把之前的batch作为一个block

blockPushingThread:把数据块 push到block manager

def start(): Unit = synchronized {
  if (state == Initialized) {
    // 更改状态
    state = Active
    // 开启一个定时器,定期的把缓存中的数据封装成数据块
    blockIntervalTimer.start()
    // 开始一个线程,不断将封装好的数据封装成数据块
    blockPushingThread.start()
    logInfo("Started BlockGenerator")
  } else {
    throw new SparkException(
      s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
  }
}

 

private def updateCurrentBuffer(time: Long): Unit = {
  try {
    var newBlock: Block = null
    synchronized {
      // 判断当前放数据的buffer是否为空,如果不为空
      if (currentBuffer.nonEmpty) {
        // 则赋给一个新的block buffer,然后再把置为currentBuffer
        val newBlockBuffer = currentBuffer
        currentBuffer = new ArrayBuffer[Any]
        // 构建一个blockId
        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
        listener.onGenerateBlock(blockId)
        // 构建block
        newBlock = new Block(blockId, newBlockBuffer)
      }
    }
    // 新的block不为空,则放入push队列,如果该队列满了则由其他线程pushblock manager
    if (newBlock != null) {
      blocksForPushing.put(newBlock// put is blocking when queue is full
    }
  } catch {
    case ie: InterruptedException =>
      logInfo("Block updating timer thread was interrupted")
    case e: Exception =>
      reportError("Error in block updating thread", e)
  }
}

 

2.4 数据存储

Receiver会进行数据的存储,如果数据量很少,则攒多条数据成数据块在进行块存储;如果数据量很大,则直接进行存储,对于需要攒多条数据成数据块的操作在Receiver.store方法里面调用ReceiverSupervisor的pushSingle方法处理。在pushSingle中把数据先保存在内存中,这些内存数据被BlockGenerator的定时器线程blockIntervalTimer加入队列并调用ReceiverSupervisor的pushArrayBuffer方法进行处理。

他们其实都是调用的是pushAndReportBlock,该方法会调用ReceiveBlockHandler的storeBlock方法保存数据并根据配置进行预写日志;另外存储数据块并向driver报告:

def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  // 获取一个blockId
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  // 存储block
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  // 结果数量
  val numRecords = blockStoreResult.numRecords
  // 构建ReceivedBlockInfo
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  // ReceiverTrackerEndpoint发送AddBlock消息
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

 

 

 

三 数据处理

我们知道DStream在进行action操作时,会触发job。我们以saveAsTextFiles方法为例:

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
  // 封装了一个保存函数,内部其实调用的RDDsaveAsTextFile
 
val saveFunc = (rdd: RDD[T], time: Time) => {
    val file = rddToFileName(prefix, suffix, time)
    rdd.saveAsTextFile(file)
  }
  // 调用foreachRDD方法遍历RDD
 
this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}

 

foreachRDD:它会向DStreamGraph注册,根据返回的当前的DStream然后创建ForEachDStream

private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  // 它会向DStreamGraph注册,根据返回的当前的DStream然后创建ForEachDStream
 
new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

 

register: 向DStreamGraph注册,即向DStreamGraph添加输出流

private[streaming] def register(): DStream[T] = {
  // DStreamGraph添加输出流
 
ssc.graph.addOutputStream(this)
  this
}

 

JobGenerator初始化的时候会构造一个timer定时器:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

 

它会启动一个后台线程,不断去调用triggerActionForNextInterval方法,该方法就会不断调用processsEvent方法,并且传递GenerateJobs事件

 

private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

 

JobGenerator#     generateJobs

调用DStreamGraph的generateJobs方法产生job,然后利用JobScheduler开始提交job集合

private def generateJobs(time: Time) {
  // checkpoint所有那些标记为checkpointing状态的RDDs以确保他们的血缘
  //
关系会定期删除,否则血缘关系太长会造成栈溢出
 
ssc
.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
  Try {
    // 根据时间分配blockbatch,一个batch可能你有多个block
   
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    // DStream
Dgraph根据时间产生job集合,使用分配的数据块
   
graph
.generateJobs(time) // generate jobs using allocated block
 
} match {
    case Success(jobs) =>
      // 如果成功,则提交jobset
     
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs fortime " + time, e)
      PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
  }
  // 进行checkpoint
 
eventLoop
.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

# DStreamGraph的generateJobs根据时间产生job集

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  // 根据DStreamGraph的输出流创建job集合
 
val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      // 调用DStreamgenerateJob方法产生job
     
val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
   
}
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

 

# 然后调用DStream的generateJobs产生job

private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    case None => None
  }
}

 

# 最后提交job集合

提交job集合,遍历每一个job,创建JobHandler,然后JobHandler是一个线程类,在其run方法中会向JobScheduler发送JobStarted事件,从而开始处理job

private class JobHandler(job: Job) extends Runnable with Logging {
  import JobScheduler._

  def run() {
    val oldProps = ssc.sparkContext.getLocalProperties
    try {
      ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
      val formattedTime = UIUtils.formatBatchTime(
        job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
      val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
      val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

      ssc.sc.setJobDescription(
        s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
      ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
      ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
      ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
      var _eventLoop = eventLoop
      if (_eventLoop != null) {
        _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          job.run() //真正开始处理job
        }
        _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
        }
      } else {
      }
    } finally {
      ssc.sparkContext.setLocalProperties(oldProps)
    }
  }
}

 

private def handleJobStart(job: Job, startTime: Long) {
  // 根据时间获取jobSet
  val jobSet = jobSets.get(job.time)
  // 判断是否已经开始运行
  val isFirstJobOfJobSet = !jobSet.hasStarted
  // 更新jobset开始时间
  jobSet.handleJobStart(job)
  if (isFirstJobOfJobSet) {
    listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
  }
  job.setStartTime(startTime)
  listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
  logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}

 

2019-01-13 12:00:15 qq_43688472 阅读数 43
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9131 人正在学习 去看看 CSDN讲师

Spark集群试运行

下面试运行Spark。
1)在Master主机上,分别启动Hadoop与Spark。

cd /usr/local/hadoop/sbin/./start-all.shcd /usr/local/spark/sbin./start-all.sh

**2)检查Master与Worker进程是否在各自节点上启动。在Master主机上,执行命令jps,**如图1-5所示。
在这里插入图片描述

图1-5 在Master主机上执行jps命令
在Worker节点上,以Worker1为
在这里插入图片描述
执行命令jps,如图1-6所示。

图1-6 在Worker节点上执行jps命令
从图1-6中可以清晰地看到,Master进程与Worker及相关进程在各自节点上成功运行,Hadoop与Spark运行正常。
3)通过Spark Web UI查看集群状态。在浏览器中输入Master的IP与端口,打开Spark Web UI,如图1-7所示。
在这里插入图片描述

Spark Web UI界面
从图1-7中可以看到,当集群内仅有一个Worker节点时,Spark Web UI显示该节点处于Alive状态,CPU Cores为1,内存为1GB。此页面会列出集群中所有启动后的Worker节点及应用的信息。
4)运行样例。Spark自带了一些样例程序可供试运行。在Spark根目录下,example/src/main文件夹中存放着Scala、Java、Python及用R语言编写的样例,
用户可以运行其中的某个样例程序。
先拷贝到Spark根目录下,然后执行bin/run-example [class] [params]即可。
例如可以在Master主机命令行执行:
./run-example SparkPi 10
然后可以看到该应用的输出,在Spark Web UI上也可以查看应用的状态及其他信息。

2019-01-13 12:13:23 qq_43688472 阅读数 159
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9131 人正在学习 去看看 CSDN讲师

使用Spark Shell开发运行Spark程序

Spark Shell是一种学习API的简单途径,也是分析数据集交互的有力工具。
虽然本章还没涉及Spark的具体技术细节,但从总体上说,
Spark弹性数据集RDD有两种创建方式:
❑ 从文件系统输入(如HDFS)。
❑ 从已存在的RDD转换得到新的RDD。

现在我们从RDD入手,利用Spark Shell简单演示如何书写并运行Spark程序。
下面以word count这个经典例子来说明。
1)启动spark shell: cd进SPARK_HOME/bin,执行命令

./spark-shell

2)进入scala命令行,执行如下命令:

scala> val file = sc.textFile("hdfs://localhost:50040/hellosparkshell")
scala> val count = file.flatMap(line => line.split(" ")).map(word => (word, 1)).
reduceByKey(_+_)
scala> count.collect()

首先从本机上读取文件hellosparkshell,然后解析该文件,最后统计单词及其数量并输出如下:

15/09/29  16:11:46  INFO  spark.SparkContext:  Job  finished:  collect  at<console>:17, took 1.624248037 s
res5: Array[(String, Int)] = Array((hello,12), (spark,12), (shell,12), (this,1),
(is,1), (chapter,1), (three,1)
2014-03-22 17:19:14 zlcd1988 阅读数 2291
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9131 人正在学习 去看看 CSDN讲师

环境:CentOS 6.3, Hadoop 1.1.2, JDK 1.6, Spark 1.0.0, Scala 2.10.3, Eclipse(Hadoop 可以不安装)

首先明白一点,spark就是scala项目。也就是说spark是用scala开发的。

1. 请确认你已正确安装Spark 并且运行过,如果没有,请安装如下链接进行安装。

    http://blog.csdn.net/zlcd1988/article/details/21177187

2.新建Scala项目

  请根据下面blog,创建scala项目

     http://blog.csdn.net/zlcd1988/article/details/31366589

3. 导入spark jar包,就可以使用spark API了(这个jar包就是第二步里面编译生成的jar包)

    在导入的项目,右击选择build path->config build path

    

4. 配置sbt 配置文件(类似于maven的pom.xml文件)

在项目根目录下,创建build.sbt文件,加入如下内容

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"


5. 编写一个spark 应用程序

package spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SimpleApp {

  def main(args: Array[String]) = {

    val logFile = "/usr/lib/spark/README.md"
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache
    val numAs = logData.filter(line => line.contains("a")).count
    val numBs = logData.filter(line => line.contains("b")).count
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }

}


6. 打包部署

进入到项目根目录

$ cd scala
$ sbt package
[info] Loading global plugins from /home/hadoop/.sbt/0.13/plugins
[info] Set current project to Simple Project (in build file:/home/hadoop/workspace/scala/)
[info] Compiling 1 Scala source to /home/hadoop/workspace/scala/target/scala-2.10/classes...
[info] Packaging /home/hadoop/workspace/scala/target/scala-2.10/simple-project_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 5 s, completed Jun 15, 2014 10:32:48 PM

7. 运行

$ $SPARK_HOME/bin/spark-submit --class "spark.SimpleApp" --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar

运行结果:

Lines with a: 73, Lines with b: 35
14/06/15 22:34:05 INFO TaskSetManager: Finished TID 3 in 24 ms on localhost (progress: 2/2)
14/06/15 22:34:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

2019-06-19 23:20:13 CHANGGUOLONG 阅读数 260
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9131 人正在学习 去看看 CSDN讲师

Spark程序运行方式

本文主要介绍Spark上传集群运行的过程及shell脚本的编写

脚本文件编写参数介绍

在linux环境下 spark-submit指令打印如下

[hadoop@hadoop01 MyShell]$ spark-submit
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor. File paths of these files
                              in executors can be accessed via SparkFiles.get(fileName).

  --conf PROP=VALUE           Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.
*************************************这条线上边的是local和其他模式公用的参数下边是standalone模式独有的***********************
 Spark standalone with cluster deploy mode only:
  --driver-cores NUM          Cores for driver (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone and Mesos only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone and YARN only:
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)
****************************************下边是yarn独有的参数**************************************************************
 YARN-only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
  --principal PRINCIPAL       Principal to be used to login to KDC, while running on
                              secure HDFS.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above. This keytab will be copied to
                              the node running the Application Master via the Secure
                              Distributed Cache, for renewing the login tickets and the
                              delegation tokens periodically.

1.local本地运行

代码如下

package com.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
  * 使用scala实现wordCount,读取HDFS的数据
  * 注意这里在运行的时候用到了高可用的额组名,这样的话就需要将core-site.xml和hdfs-site.xml放在classPath的目录下
  * 放在resources的目录下运行就可以了
  *
  */
object _03wordCountHDFS {
  def main(args: Array[String]): Unit = {
    //创建配置对象
    val conf: SparkConf = new SparkConf()
      .setAppName(s"${_03wordCountHDFS.getClass.getSimpleName}")
      .setMaster("local")
    val sc = new SparkContext(conf)

    //生成RDD
    val textRDD: RDD[String] = sc.textFile("hdfs://bd1901/wordcount/in/word.txt")
    //进行transformation操作和action操作
    val retRDD: RDD[(String, Int)] = textRDD.flatMap(_.split("\\.")).map((_,1)).reduceByKey(_+_)
    //将最终的数据进行输出
   // retRDD.foreach(x=>println(x._1+"------->"+x._2))
    retRDD.saveAsTextFile("hdfs://bd1901/sparkwordout")
    //关闭资源
    sc.stop()
  }
}

将项目打成jar包上传到linux服务器本地目录
编写shell脚本文件

#!/bin/sh


SPARK_BIN=/home/hadoop/apps/spark/bin

${SPARK_BIN}/spark-submit \
--class com.wordcount._03wordCountHDFS \  //运行的类的全限定名
--master local \     //指定本地运行
--deploy-mode client \  //client模式
--executor-memory 600m \   //内存600m
/home/hadoop/jars/spark-core-1.0-SNAPSHOT.jar  //jar包的地址

2.standalone运行

这是采用Spark运行的方式
master的写法如下:
master: spark://bigdata01:7077或者 spark://bigdata01:7077,bigdata02:7077

  • 单机模式:spark://bigdata01:7077

  • 如果是HA的集群,spark://bigdata01:7077,bigdata02:7077

deploy-mode: 为cluster方式和client方式
两种方式的区别:

  • client :driver在本地启动,提交spark作业的那台机器就是driver,sparkcontext就在这台机器创建
  • cluster :driver不在本地启动,driver会在spark集群中被启动,driver也在worker节点上面

代码如下:

package com.wordcount

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
  * 使用scala实现wordCount,读取HDFS的数据
  * 注意这里在运行的时候用到了高可用的额组名,这样的话就需要将core-site.xml和hdfs-site.xml放在classPath的目录下
  * 放在resources的目录下运行就可以了
  *
  */
object _04wordCountHDFSjar{
  def main(args: Array[String]): Unit = {
    //创建配置对象
    val conf: SparkConf = new SparkConf()
      .setAppName(s"${_04wordCountHDFSjar.getClass.getSimpleName}")
  //.set("HADOOP_USER_NAME","hadoop").set("fs.defaultFS", "hdfs://bd1901:8020")
    val sc = new SparkContext(conf)

    //生成RDD
    val textRDD: RDD[String] = sc.textFile("hdfs://bd1901/wordcount/in/word.txt")
    //进行transformation操作和action操作
    val retRDD: RDD[(String, Int)] = textRDD.flatMap(_.split("\\.")).map((_,1)).reduceByKey(_+_)
    //将最终的数据进行输出
    retRDD.saveAsTextFile("hdfs://bd1901/sparkwordout")
    //关闭资源
    sc.stop()
  }
}

将代码打成jar包上传集群运行既可,这种方式应将jar包放在hdfs上,这样不同节点均能访问
编写脚本如下
client的脚本

#!/bin/sh


SPARK_BIN=/home/hadoop/apps/spark/bin

${SPARK_BIN}/spark-submit \
--class com.wordcount._04wordCountHDFSjar \
--master spark://hadoop01:7077,hadoop02:7077 \
--deploy-mode client \
--executor-memory 1000m \
--total-executor-cores 1 \
--executor-cores 1 \
hdfs://bd1901/jars/spark-core-1.0-SNAPSHOT.jar

cluster方式的脚本的编写

#!/bin/sh


SPARK_BIN=/home/hadoop/apps/spark/bin

${SPARK_BIN}/spark-submit \
--class com.wordcount._04wordCountHDFSjar \
--master spark://hadoop01:7077,hadoop02:7077 \
--deploy-mode cluster \
--executor-memory 1000M \
--total-executor-cores 1 \
--executor-cores 1 \
hdfs://bd1901/jars/spark-core-1.0-SNAPSHOT.jar

运行相应对的脚本即可

3.yarn的方式运行

master的写法 -------->master:yarn
deploy-mode:

client :driver在本地启动,提交spark作业的那台机器就是driver,sparkcontext就在这台机器创建

cluster :driver不在本地启动,driver会在yarn集群中被启动,driver也在nodemanager节点上面
代码如下

package com.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
  * 使用scala实现wordCount,读取HDFS的数据
  * 注意这里在运行的时候用到了高可用的额组名,这样的话就需要将core-site.xml和hdfs-site.xml放在classPath的目录下
  * 放在resources的目录下运行就可以了
  *这种使用参数的形式进行
  */
object _05wordCountHDFSjarargs{
  def main(args: Array[String]): Unit = {
    if(args==null||args.length<3){
      println(
            """
              |Parameter Errors! Usage:<inputpath> <sleep> <output>
              |inputpath:   程序输入参数
              |sleep:   休眠时间
              |output:  程序输出参数
            """.stripMargin
      )
    }
    val Array(inputpath,sleep,outputpath)=args//模式匹配
    //创建配置对象
    val conf: SparkConf = new SparkConf()
      .setAppName(s"${_04wordCountHDFSjar.getClass.getSimpleName}")
    val sc = new SparkContext(conf)

    //生成RDD
    val textRDD: RDD[String] = sc.textFile(inputpath)
    //进行transformation操作和action操作
    val retRDD: RDD[(String, Int)] = textRDD.flatMap(_.split("\\.")).map((_,1)).reduceByKey(_+_)
    //将最终的数据进行输出
    retRDD.saveAsTextFile(outputpath)
    Thread.sleep(sleep.toLong)
    //关闭资源
    sc.stop()
  }
}

将代码打成jar包上传到hdfs上,这样就可以不同节点共享
编写shell脚本

client方式的脚本的编写

#!/bin/sh


SPARK_BIN=/home/hadoop/apps/spark/bin
OUT_PATH=hdfs://bd1901/out/spark/wc
`hdfs dfs -test -z ${OUT_PATH}`

if [ $? -eq 0 ]
then
  echo 1111
  `hdfs dfs -rm -R ${OUT_PATH}`
fi

${SPARK_BIN}/spark-submit \
--class com.wordcount._05wordCountHDFSjarargs \
--master yarn \
--deploy-mode client \
--executor-memory 1000m \
--num-executors 1 \ 
--executor-cores 1 \
hdfs://bd1901/jars/spark-core-1.0-SNAPSHOT.jar \
hdfs://bd1901/wordcount/in/word.txt 1000 ${OUT_PATH}

cluster的方式的shell编写

#!/bin/sh


SPARK_BIN=/home/hadoop/apps/spark/bin
OUT_PATH=hdfs://bd1901/out/spark/wc     //指定输出的路径
`hdfs dfs -test -z ${OUT_PATH}`          //判断输出路径是否存再

if [ $? -eq 0 ]                   //如果存在的话将此路径的文件进行删除,避免报错
then
  echo 1111
  `hdfs dfs -rm -R ${OUT_PATH}`
fi

${SPARK_BIN}/spark-submit \
--class com.wordcount._05wordCountHDFSjarargs \     //运行的类的完全限定名
--master yarn \                        //yarn模式
--deploy-mode cluster \               //cluster模式
--executor-memory 1000m \          //内存1000m
--num-executors 1 \ 
--executor-cores 1 \
hdfs://bd1901/jars/spark-core-1.0-SNAPSHOT.jar \      //jar包路径
hdfs://bd1901/wordcount/in/word.txt 1000 ${OUT_PATH}      //手动输入程序的参数

运行相应的脚本,注意这里需要在脚本中传入相应的参数给运行的程序,这样程序更加健壮

注意:
在打jar包的过程中需要注意,打成jar包之后需要检查一下是否包含所要运行的类,如果不包含的话就不要上传,否则也要报错,如果不包含所有的类的话有可能是maven的打包的插件不正常
可以引入如下的插件

<build>
	<pluginManagement>
		<plugins>
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.2.2</version>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
			</plugin>
		</plugins>
	</pluginManagement>
	<plugins>
		<plugin>
			<groupId>net.alchim31.maven</groupId>
			<artifactId>scala-maven-plugin</artifactId>
			<executions>
				<execution>
					<id>scala-compile-first</id>
					<phase>process-resources</phase>
					<goals>
						<goal>add-source</goal>
						<goal>compile</goal>
					</goals>
				</execution>
				<execution>
					<id>scala-test-compile</id>
					<phase>process-test-resources</phase>
					<goals>
						<goal>testCompile</goal>
					</goals>
				</execution>
			</executions>
		</plugin>

		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<executions>
				<execution>
					<phase>compile</phase>
					<goals>
						<goal>compile</goal>
					</goals>
				</execution>
			</executions>
		</plugin>

		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.4.3</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<filters>
							<filter>
								<artifact>*:*</artifact>
								<excludes>
									<exclude>META-INF/*.SF</exclude>
									<exclude>META-INF/*.DSA</exclude>
									<exclude>META-INF/*.RSA</exclude>
								</excludes>
							</filter>
						</filters>
					</configuration>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

spark运行模式总结

阅读数 161

spark -运行

阅读数 603

spark运行机制

博文 来自: fangwc

Spark 运行架构

阅读数 235

Spark集群运行模式

阅读数 988

没有更多推荐了,返回首页