2017-01-27 14:07:23 lisi1129 阅读数 5947
  • Spark多数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6063 人正在学习 去看看 CSDN讲师

一:spark的组件构成

      

     1:每一个application有自己的executor的进程,它们相互隔离,每个executor中可以有多个task线程。这样可以很好的隔离各个applications,各个spark applications 不能分享数据,除非把数据写到外部系统。

      2:SparkContext对象可以视为Spark应用程序的入口,主程序被称为driver program,SparkContext可以与不同种类的集群资源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 进行通信,从而分配到程序运行所需的资源,获取到集群运行所需的资源后,SparkContext将得到集群中其它工作节点(Worker Node) 上对应的Executors (不同的Spark应用程序有不同的Executor,它们之间也是独立的进程,Executor为应用程序提供分布式计算及数据存储功能),之后SparkContext将应用程序代码分发到各Executors,最后将任务(Task)分配给executors执行。

 二:spark相关概念

          Application                      运行在集群上的用户程序,包含集群上的driver program 和多个executor线程组成;

          Driver program               application运行的main方法,并生成sparkcontext;

          Cluster manager             集群资源管理器 ;

          Deploy mode                   部署模式 用于区别driver program的运行方式:集群模式(cluter mode),driver在集群内部启动;客户端模式(client mode),driver进程从集群外部启动;   

          Worker node                    工作节点,运行application的节点

          Executor                           work node的上进程,运行task并保持数据交互,每一个application有自己的executor

          Task                                   运行于Executor中的任务单元,Spark应用程序最终被划分为经过优化后的多个任务的集合

          Job                                     由多个转变构建的并行计算任务,具体为Spark中的action操作, 一个action就为一个job

  三:Resilient Distributed Datasets (RDDs)         

           spark 涉及的核心概念就是resilient distributed dataset (RDD),rdd是具有容错性的数据集合,并可以并行数据计算。有两种方法可以创建rdd,第一种就是parallelizing 方法:序列化存在driver program 中的集合,见下方代码
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize 方法中可以指定数据分区参数,并每个分区对应一个task 如下面代码
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data,10)
         

     RDD 可以抽象的认为是一个数组,这个数组分布在集群上,RDD可以进行逻辑上的分区,每个分区叫一个partition。在spark application运行过程中,RDD经过一个个transformtion 转换算子后,最后通过Action算计触发操作;RDD是懒加载的,前面的转化并不发生实际的操作,这个转化为记录在DAG图中,只有触发action后,才实际就行操作。逻辑上每经历一个变化,RDD就会转化为新的RDD,rdd之间通过lineage关系,这个关系在容错中起到至关重要的作用。

RDD的源码中标注了5个性质:

  1. 一组分片(partition),即数据集的基本组成单位
  2. 每个分片都可以计算
  3. 对parent RDD的依赖,这个依赖描述了RDD之间的lineage
  4. 对于key-value的RDD,一个Partitioner
  5. 一个列表,存储存取每个partition的preferred位置。对于一个HDFS文件来说,存储每个partition所在的块的位置。

     四:RDD的依赖
          RDD在每次转化时候,会生成一个新的RDD,但新的RDD和旧的RDD之间保持着关系,就是依赖,依据依赖的样式,可以划分为窄依赖和宽依赖;
      
    
       窄依赖是指每个父RDD的Partition最多被子RDD的一个Partition所使用,例如map、filter,见上左图
       宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey等
       
   五:RDD的持久化
          RDD的持久化是spark的一个重要的特性,当你把RDD持久化,每个Node会存储RDD的分区在内存,在其他action中用到此rdd的时候,就不用从头转化,而是直接使用。你可以用persist或者cache方法持久化rdd,Spark的 缓 存是一个容 错 的技 术 -如果RDD的任何一个分区 丢 失,它 可以通 过 原有的 转换 ( transformations )操作自 动 的重复 计 算并且 创 建出 这 个分区。另外,每一个RDD可以选择不同的持久化级别:
     MEMORY_ONLY     把RDD非序列化为Java对象存在jvm中,如果RDD不适合持久化在内存中,RDD的一些分区可能不能持久化,让此RDD需要的时候,此丢失的RDD分区会重新计算获取;
     MEMORY_AND_DISK   将RDD作为非序列化的Java对象存储在jvm中。如何RDD不适合存在内存中,将这些不适合存在内存的分区存在磁盘中
     MEMORY_ONLY_SER     把RDD序列化为Java对象存在jvm中,如果RDD不适合持久化在内存中,RDD的一些分区可能不能持久化,让此RDD需要的时候,此丢失的RDD分区会重新计算获取;
     DISK_ONLY  仅仅存在磁盘中
  Spark也会自动持久化一些shuffle操作(如 reduceByKey )中的中间数据,即使用户没有调用 persist 方法。 这样 的好 处 是避免了在shuffle出 错 情况下,需要重复 计 算整个 输 入

     

             


          

Spark
2019-05-30 23:09:22 ifenggege 阅读数 760
  • Spark多数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6063 人正在学习 去看看 CSDN讲师

简介

Spark是使用Scala语言编写、基于内存运算的大数据计算框架。

以Spark core为核心,提供了Spark SQL、Spark Streaming、MLlib几大功能组件

中文文档:https://spark.apachecn.org/#/

github地址:https://github.com/apache/spark

Spark Core

Spark提供了多种资源调度框架,基于内存计算、提供了DAG的执行流程管理以及RDD的血缘关系来保证计算的快速和高容错性。RDD是Spark的核心概念

Spark SQL

SparkSQL基于Spark Core来优化sql查询,将sql的查询转为对应的RDD(DateFrame),并进行优化,简化了开发,提高了数据清洗的效率

Spark Streaming

SparkStreaming是基于SparkCore实现的流处理框架,通过微批的概念实现了流处理(DStream),可以将数据的延迟保证为最少500ms,是一个高吞吐高容错的流式处理框架。


从今天开始写一些关于Spark的文章,让自己每天都有所收获。

不只要作为使用者,更要去了解它。

2019-04-01 22:49:23 ddbbff2005 阅读数 109
  • Spark多数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6063 人正在学习 去看看 CSDN讲师

定于:Spark Shell(是一个交互式的命令行,里面可以写spark程序,方便学习和测试,他也是一个客户端,用于提交spark应用程序)
启动(本地单机版-非集群):

./bin/spark-shell

上面的方式没有指定master的地址,即用的是spark的local模式运行的(模拟的spark集群用心的过程)
./bin/spark-shell --master spark://hdp02:7077,hdp01:7077 –executor-memory 512mb --total-executor-cores 5
第二个指定了master 在集群上运行
这里需要注意,集群上运行spark shell 也必须指定调度资源,不然可能会出现下图,cup0的情况在这里插入图片描述
在这里插入图片描述
启动hadoop集群:star-dfs.sh
vi一个work.txt,放到hds上,等会做workcount的基础数据

vi work.txt 
hello guowei
hello yjz
hello rzp
hello zxb
hello word
#把work.txt放到hdfs上,创建一个文件夹,把他移动到文件夹下
hadoop fs -put /home/hdp01/work.txt /
hadoop fs -mkdir /sparktest
hadoop fs -mv /work.txt /sparktest

.
执行wordcount,计算

sc.textFile("hdfs://hdp01:9000/sparktest").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).collect

在这里插入图片描述

2016-03-08 15:06:42 chenyuangege 阅读数 3779
  • Spark多数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6063 人正在学习 去看看 CSDN讲师

在按照王家林的文档安装完scala,spark集群和idea-IC开发工具后,用spark自带的示例SparkPi测试scala和spark集群

1、按照王家林文档中的方法把spark自带的SparkPi示例的代码复制到idea-IC开发工具中创建的项目中后,把SparkPi代码的修改为:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("spark://192.168.91.128:7077")//改成自己master主机的ip地址
    val spark = new SparkContext(conf)
    spark.addJar("/home/cy/workspace/untitled/out/artifacts/sparkpi_jar/untitled.jar")//把写的代码打包成jar包,存放打包后jar包的路径
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

2、点击idea-IC开发工具左上角的File->Project Structure->Artifcats

3、添加jar包:


4、选择Main Class,点击OK:


5、修改jar包的名字、保存的路径和类型,注意上述代码中jar包的路径要与这边jar包的路径一致:


6、完成上述的操作后,先通过start-all.sh启动hadoop集群,再进入spark安装目录的sbin目录下,通过./start-all.sh命令启动spark集群

7、运行SparkPi代码,出现以下结果就说明成功了:


8、要关掉集群的时候,先进入spark安装目录下的sbin目录,通过./stop-all.sh命令先关掉spark集群,再通过stop-all.sh关掉hadoop集群

2019-07-27 23:03:21 weixin_43241054 阅读数 71
  • Spark多数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6063 人正在学习 去看看 CSDN讲师

Spark Streaming简介  

       Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

       和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此 得名“离散化”)。

       DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

 

Spark Streaming简介
1、SPark Streaming是Spark中一个组件,基于Spark Core进行构建,用于对流式进行处理,类似于Storm。
2、Spark Streaming能够和Spark Core、Spark SQL来进行混合编程。
3、Spark Streaming主要关注:
      1、Spark Streaming 能接受什么数据? kafka、flume、HDFS、Twitter等。
      2、Spark Streaming 能怎么处理数据? 无状态的转换(前面处理的数据和后面处理的数据没啥关系)、有转换转换(前面处理的数据和后面处理的数据是有关系的,比如叠加关系)

Spark Streaming实现
1、Spark Streaming 采用“微批次”架构。
2、对于整个流式计算来说,数据流你可以想象成水流,微批次架构的意思就是将水流按照用户设定的时间间隔分割为多个水流段。一个段的水会在Spark中转换成为一个RDD,所以对水流的操作也就是对这些分割后的RDD进行单独的操作。每一个RDD的操作都可以认为是一个小的批处理(也就是离线处理)。

Spark Streaming DStream简介
1、DStream是类似于RDD和DataFrame的针对流式计算的抽象类。在源码中DStream是通过HashMap来保存他所管理的数据流的。K是RDD中数据流的时间,V是包含数据流的RDD。
2、对于DStream的操作也就是对于DStream他所包含的所有以时间序列排序的RDD的操作。

Spark Streaming 用法
1、通过StreamingContext来进入Spark Streaming。可以通过已经创建好的SparkContext来创建SparkStreaming。

scala> val ssc = new StreamingContext(sc, Seconds(1))

Spark Streaming 的输入

1、 文件数据源
     1、Spark Streaming通过streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) 这个方法提供了对目录下文件数据源的支持。
      2、如果文件是比较简单的文本文件,可以使用 streamingContext.textFileStream(dataDirectory)  来代替。

scala> val lines = ssc.textFileStream("hdfs://master01:9000/data/")

        3、文件数据源目前不支持嵌套目录:
                     1、 文件需要有相同的数据格式
         2、文件进入 dataDirectory的方式需要通过移动或者重命名来实现
         3、一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据。
 
2、自定义Receiver
    1、需要新建一个Class去继承Receiver,并给Receiver传入一个类型参数,该类型参数是你需要接收的数据的类型。
    2、需要去复写Receiver的方法: onStart方法(在Receiver启动的时候调用的方法)、onStop方法(在Receiver正常停止的情况下调用的方法)
    3、可以在程序中通过streamingContext.receiverStream( new CustomeReceiver)来调用你定制化的Receiver。

3、RDD数据源
     1、可以通过StreamingContext.queueStream(rddQueue)这个方法来监控一个RDD的队列,所有加入到这个RDD队列中的新的RDD,都会被Streaming去处理。


4、Spark Streaming和Kafka的集成

Spark的简介

阅读数 670

没有更多推荐了,返回首页