sparkapi_sparkapi- - CSDN
  • Spark API介绍

    2019-01-02 14:54:45
    Spark机器学习,API浏览 Spark官方API http://spark.apache.org/docs/1.6.2/api/java/index.html http://spark.apache.org/docs/2.2.0/api/java/index.html 1 RDD的支持,是Spark的基础,2根据需求来查看API 一...
    Spark机器学习,API浏览
    Spark官方API
    http://spark.apache.org/docs/1.6.2/api/java/index.html
    http://spark.apache.org/docs/2.2.0/api/java/index.html
    
    1 RDD的支持,是Spark的基础,2根据需求来查看API
    
    一Spark的功能模块
    SparkSQL 
    SparkGraphx
    SparkScreaming
    SparkML
    SparkMLLIb
    
    二常用的机器学习的API
    ml 输入采用DataFrame(输入来源于SparkSQL)
    mllib 输入参数是普通的RDD(输入来自于hdfs)
    
    
    例子userId(用户ID),productId(产品ID),评分,来推荐给用户
    
    协同过滤来找到用户对其它产品感兴趣
    常用算法:ALS算法(最小二乘法)
    org.apache.spark.ml.recommendation ALS
    
    监督分类: org.apache.spark.mllib.classification,
    预先给用户打上标签
    
    非监督分类mllib.clustering 里面也是一样的方法
    KMeans
    
    决策树 mllib.tree
    
    图形计算org.apache.spark.graphx
    org.apache.spark.sql : 我们把数据导入到mysql中,如何放入到spark中来,然后进行机器学习进行预测统计分析,然后放入到hdfs中去 
    
    四API扩展
    可以从mysql,oracle中读取数据
    org.apache.spark.sql
    org.apache.spark.sql.api.java
    org.apache.spark.sql.expressions
    org.apache.spark.sql.hive
    org.apache.spark.sql.hive.execution
    org.apache.spark.sql.jdbc
    org.apache.spark.sql.sources
    org.apache.spark.sql.types
    org.apache.spark.sql.util
    
    org.apache.spark.straming相当于我们的流式计算,
    org.apache.spark.streaming.flume
    org.apache.spark.streaming.kafka
    org.apache.spark.streaming.kinesis
    org.apache.spark.streaming.mqtt
    org.apache.spark.streaming.receiver
    org.apache.spark.streaming.scheduler
    org.apache.spark.streaming.twitter
    org.apache.spark.streaming.util
    org.apache.spark.streaming.zeromq
    
    ml 输入采用DataFrame(输入来源于SparkSQL)
    org.apache.spark.ml
    org.apache.spark.ml.attribute
    org.apache.spark.ml.classification
    org.apache.spark.ml.clustering
    org.apache.spark.ml.evaluation
    org.apache.spark.ml.feature
    org.apache.spark.ml.param
    org.apache.spark.ml.recommendation
    org.apache.spark.ml.regression
    org.apache.spark.ml.source.libsvm
    org.apache.spark.ml.tree
    org.apache.spark.ml.tuning
    org.apache.spark.ml.util
    
    mllib 输入参数是普通的RDD(输入来自于hdfs)
    org.apache.spark.mllib.classification
    org.apache.spark.mllib.clustering
    org.apache.spark.mllib.evaluation
    org.apache.spark.mllib.feature
    org.apache.spark.mllib.fpm
    org.apache.spark.mllib.linalg
    org.apache.spark.mllib.linalg.distributed
    org.apache.spark.mllib.optimization
    org.apache.spark.mllib.pmml
    org.apache.spark.mllib.random
    org.apache.spark.mllib.rdd
    org.apache.spark.mllib.recommendation
    org.apache.spark.mllib.regression
    org.apache.spark.mllib.stat
    org.apache.spark.mllib.stat.distribution
    org.apache.spark.mllib.stat.test
    org.apache.spark.mllib.tree
    org.apache.spark.mllib.tree.configuration
    org.apache.spark.mllib.tree.impurity
    org.apache.spark.mllib.tree.loss
    org.apache.spark.mllib.tree.model
    org.apache.spark.mllib.util
    
    
    展开全文
  • spark官方文档中文版

    2020-07-30 23:32:04
    Spark Streaming编程指南 中文PDF版,Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。它可以接受来自Kafka, Flume, Twitter, ZeroMQ和kinesis的数据,也可以通过高阶函数map, reduce,...
  • spark_API文档

    2020-07-30 23:32:12
    为了方便学习,wget了sparkapi,有需要的可下载参考。
  • Spark API

    2019-05-03 14:46:29
    连接到spark集群,入口点. [HadoopRDD] 读取hadoop上的数据, [MapPartitionsRDD] 针对父RDD的每个分区提供了函数构成的新类型RDD. [PairRDDFunctions] 对偶RDD函数类。 可用于KV类型RDD的附加...

    [SparkContext]
            连接到spark集群,入口点.

        [HadoopRDD]
            读取hadoop上的数据,

        [MapPartitionsRDD]
            针对父RDD的每个分区提供了函数构成的新类型RDD.

        [PairRDDFunctions]
            对偶RDD函数类。
            可用于KV类型RDD的附加函数。可以通过隐式转化得到.

        [ShuffleRDD]
            从Shuffle中计算结果的RDD.

        [RDD]
            是分区的集合.
            弹性分布式数据集.
            不可变的数据分区集合.
            基本操作(map filter , persist)
            分区列表                    //数据
            应用给每个切片的计算函数    //行为
            到其他RDD的依赖列表            //依赖关系
            (可选)针对kv类型RDD的分区类
            (可选)首选位置列表
        
        [DAGScheduler]
            高级调度器层面,实现按照阶段(stage),shuffle按照.
            对每个JOB的各阶段计算有向无环图(DAG),并且跟踪RDD和每个阶段的输出。
            找出最小调度运行作业,将Stage对象以TaskSet方式提交给底层的调度器。
            底层调度器实现TaskScheduler,进而在cluster上运行job.
            TaskSet已经包含了全部的单独的task,这些Task都能够基于cluster的数据进行
            正确运行。

            Stage通过在需要shuffle的边界处将RDD打碎来创建Stage对象。
            具有'窄依赖'的RDD操作(比如map /filter)被管道化至一个taskset中.
            而具有shuffle依赖的操作则包含多个Stage(一个进行输出,另一个进行输入)
            最会,每个stage都有一个针对其他stage的shuffle依赖,可以计算多个操作。
        
            Dag调度器检测首选位置来运行rask,通过基于当前的缓存状态,并传递给底层的
            task调度器来实现。根据shuffle的输出是否丢失处理故障问题。

            不是由stage内因为丢失文件引发的故障有task调度处理。在取消整个stage之前,
            task会进行少量次数的重试操作。

            为了容错,同一stage可能会运行多次,称之为"attemp",如果task调度器报告了一个故障(该
            故障是由于上一个stage丢失输出文件而导致的)DAG调度就会重新提交丢失的stage。这个通过
            具有 FetchFailed的CompletionEvent对象或者ExecutorLost进行检测的。
            DAG调度器会等待一段时间看其他节点或task是否失败,然后对丢失的stage重新提交taskset,
            计算丢失的task。

    术语介绍

     

    [job]
                提交给调度的顶层的工作项目,由ActiveJob表示。
                是Stage集合。

            [Stage]
                是task的集合,计算job中的中间结果。同一RDD的每个分区都会应用相同的计算函数。
                在shuffle的边界处进行隔离(因此引入了隔断,需要上一个stage完成后,才能得到output结果)
                有两种类型的stage:1)ResultStage,用于执行action动作的最终stage。2)ShuffleMapStage,
                对shuffle进行输出文件的写操作的。如果job重用了同一个rdd的话,stage通常可以跨越多个
                job实现共享。

                并行任务的集合,都会计算同一函数。所有task有着同样的shuffle依赖,调度器运行的task DAG
                在shuffle边界处划分成不同阶段。调度器以拓扑顺序执行.

                每个stage可以shuffleMapStage,该阶段下输出是下一个stage的输入,也可以是resultStage,该阶段
                task直接执行spark action。对于shuffleMapStage,需要跟踪每个输出分区所在的节点。

                每个stage都有FirstJobId,区分于首次提交的id
                
                [ShuffleMapStage]
                    产生输出数据,在每次shuffle之前发生。内部含有shuffleDep字段,有相关字段记录产生多少输出
                    以及多少输出可用。
                    DAGScheduler.submitMapStage()方法可以单独提交ubmitMapStage().

                [ResultStage]
                    该阶段在RDD的一些分区中应用函数来计算Action的结果。有些stage并不会在所有分区上执行。
                    例如first(),lookup();

            [Task]
                单独的工作单元,每个发送给一台主机。

            [Cache tracking]
                Dag调度器找出哪些RDD被缓存,避免不必要的重复计算,同时,也会记住哪些shuffleMap已经输出了
                结果,避免map端shuffle的重复处理。

            [Preferred locations]
                dag调度器根据rdd的中首选位置属性计算task在哪里运行。

            [Cleanup]
                运行的job如果完成就会清楚数据结构避免内存泄漏,主要是针对耗时应用。

            
            [ActiveJob]
                在Dag调度器中运行job。作业分为两种类型,1)result job,计算ResultStage来执行action.
                2)map-state job,为shuffleMapState结算计算输出结果以供下游stage使用。
                主要使用finalStage字段进行类型划分。

                job只跟踪客户端提交的"leaf" stage,通过调用Dag调度器的submitjob或者submitMapStage()方法实现.
                job类型引发之前stage的执行,而且多个job可以共享之前的stage。这些依赖关系由DAG调度器内部管理。

            [LiveListenerBus]
                异步传输spark监听事件到监听器事件集合中。

            [EventLoop]
                从caller接受事件,在单独的事件线程中处理所有事件,该类的唯一子类是DAGSchedulerEventProcessLoop。

            [LiveListenerBus]
                监听器总线,存放Spark监听器事件的队列。用于监控。
            
            [OutputCommitCoordinator]
                输出提交协调器.决定提交的输出是否进入hdfs。

            
            [TaskScheduler]
                底层的调度器,唯一实现TaskSchedulerImpl。可插拔,同Dag调度器接受task,发送给cluster,
                运行任务,失败重试,返回事件给DAG调度器。
            
            [TaskSchedulerImpl]
                TaskScheduler调度器的唯一实现,通过BackendScheduler(后台调度器)实现各种类型集群的任务调度。
            

            [SchedulerBackend]
                可插拔的后台调度系统,本地调度,mesos调度,。。。
                在任务调度器下方,
                实现有三种
                1.LocalSchedulerBackend
                    本地后台调度器
                    启动task.
                
                2.StandaloneSchedulerBackend
                    独立后台调度器

                3.CoarseGrainedSchedulerBackend
                    粗粒度后台调度器

            [Executor]
                spark程序执行者,通过线程池执行任务。

    展开全文
  • Spark常用API(五)

    2019-08-11 19:57:45
    spark集群搭建2. 初步认识Spark3. 理解spark的RDD4. 使用shell方式操作Spark,熟悉RDD的基本操作5. 使用jupyter连接集群的pyspark6. 理解Spark的shuffle过程7. 学会使用SparkStreaming8. 说一说take,collect,first...


    【Task5】Spark常用API
    spark集群搭建
    初步认识Spark (解决什么问题,为什么比Hadoop快,基本组件及架构Driver/)
    理解spark的RDD
    使用shell方式操作Spark,熟悉RDD的基本操作
    使用jupyter连接集群的pyspark
    理解Spark的shuffle过程
    学会使用SparkStreaming
    说一说take,collect,first的区别,为什么不建议使用collect?
    向集群提交Spark程序
    使用spark计算《The man of property》中共出现过多少不重复的单词,以及出现次数最多的10个单词。
    计算出movielen数据集中,平均评分最高的五个电影。
    计算出movielen中,每个用户最喜欢的前5部电影
    学会阅读Spark源码,整理Spark任务submit过程

    参考资料:

    远程连接jupyter

    【没有jblas库解决办法】

    下载jblas包 :https://pan.baidu.com/s/1o8w6Wem

    运行spark-shell时添加jar:spark-shell --jars [jblas path] /jblas-1.2.4.jar

    1. spark集群搭建

    使用jupyter远程连接Spark集群

    下载,解压缩并重命名

    //解压spark
    tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz
    //将压缩包删掉(可选)
    rm -rf spark-2.1.1-bin-hadoop2.7.tgz
    //改名为spark
    mv spark-2.1.1-bin-hadoop2.7 hadoop
    

    前期基本的spark集群我们已经搭建好了,现在是要使用jupyter远程连接Spark集群,默认Centos7中是有python安装的,但是2.7版本,我们需要安装py3。我们去看一下默认的py2.7在哪里。

    修改配置文件

    vi ~/.bashrc
    //下方是修改内容,添加SPARK_HOME,在PATH里添加路径
    export SPARK_HOME=/usr/local/spark
    export PATH=$PATH:$JAVA_HOME/bin:$HIVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$SPARK_HOME/bin
    //使得配置生效
    source ~/.bashrc
    

    cd到 /spark/conf目录下,执行如下操作

    //复制spark-env.sh.template成spark-env.sh
    cp spark-env.sh.template spark-env.sh
    
    //修改spark-env.sh,添加如下内容
    export HADOOP_HOME=/usr/local/hadoop
    export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
    export JAVA_HOME=/usr
    # 设置Master的主机名
    export SPARK_MASTER_IP=192.168.1.105
    export SPARK_LOCAL_IP=192.168.1.105
    export SPARK_MASTER_HOST=192.168.1.105
    # 提交Application的端口,默认就是这个,万一要改呢,改这里
    export SPARK_MASTER_PORT=7077
    # 每一个Worker最多可以使用的cpu core的个数,我虚拟机就一个...
    # 真实服务器如果有32个,你可以设置为32个
    export SPARK_WORKER_CORES=1
    # 每一个Worker最多可以使用的内存,我的虚拟机就2g
    # 真实服务器如果有128G,你可以设置为100G
    export SPARK_WORKER_MEMORY=1g
    export SCALA_HOME=/usr/local/scala
    export SPARK_HOME=/usr/local/spark
    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
    

    复制slaves.template成slaves,并修改内容

    cp slaves.template slaves
    # 修改slaves,添加如下内容
    spark1
    spark2
    spark3
    这里同时将里面的localhost删去
    

    将配置好的spark文件复制到Slave1和Slave2节点。

    scp -r /usr/local/spark root@spark2:/usr/local
    scp -r /usr/local/spark root@spark3:/usr/local
    

    修改spark2/spark3的配置

    在CP2和CP3上分别修改~/.bashrc,增加Spark的配置,过程同Master一样。
    在CP2和CP3修改spark-env.sh,将export SPARK_LOCAL_IP=xxx.xxx.xxx.xxx改成spark1和spark2对应节点的IP
    

    在Master节点启动集群,(start-all.sh会与hadoop冲突,所以这里加上路径)

    /usr/local/spark/sbin/start-all.sh
    

    查看集群是否启动成功

    jps
    

    spark1在Hadoop的基础上新增了:Master和Worker:

    10066 NameNode
    17269 Kafka
    30694 Worker
    10248 SecondaryNameNode
    10392 ResourceManager
    30680 Worker
    30936 Jps
    16589 QuorumPeerMain
    30575 Master
    

    spark2、spark3在Hadoop的基础上新增了:Worker

    2. 初步认识Spark

    (解决什么问题,为什么比Hadoop快,基本组件及架构Driver/)

    1.背景:

    在spark出现之前,hadoop的迅速发展,hadoop分布式集群,把编程简化为自动提供 位置感知性调度,容错,以及负载均衡的一种模式,用户就可以在普通的PC机上运行超大集群运算,hadoop有一个非常大的问题:hadoop是基于流处理的,hadoop会从(物理存储)hdfs中加载数据,然后处理之后再返回给物理存储hdfs中,这样不断的读取与写入,占用了大量的IO,后来hadoop出现了非循环的数据流模型,也就是DAG,但是其中任然出现了两个重大的问题:

    1.任然是不断的重复写入和读取磁盘。每次操作都要完成这两步,太浪费了。

    3.交互式数据查询。比如:用户不断查询具体的一个用户的子集。

    2.比如,机器学习,图计算,数据挖掘方面不适用,现在要做大量的重复操作,并且下一次的开始,要依据前面计算的结果,这样对于hadoop来说就要重新的计算,从而浪费大量的资源。

    2.Spark到底解决了什么根本性的技术问题?

    基于上述:

    spak提出了分布式的内存抽象,RDD(弹性分布式数据集)支持工作集的应用,也具有数据流模型的特点,例如,自动容错,位置感知,可伸缩性和可扩展性,并且RDD支持多个查询时,显示的将工作集缓存到内存中,后续查询时能够重用工作集的结果。这样与hadoop相比,就极大的提高了速度。

    RDD提供了共享内存模型,RDD本身只记录分区的集合,只能通过其他的RDD通过转换例如,map,join等操作来创建新的RDD,而RDD并不需要检查点操作,为什么?因为前后之间的RDD是有”血统”关系的,其核心原因是,每个RDD包含了从其他RDD计算出分区的所有内容,并且这个计算不是从头开始计算,而是仅仅指的是从上一步开始计算得到即可,这也就实现了工作集的复用。

    Spark周围的SQL,机器学习,图计算都是基于此构建出来的,使得Spark成为一体化的大数据平台,不仅降低了各个开发,运维的成本,也提高了性能。

    参考:https://blog.csdn.net/snail_gesture/article/details/49688569

    https://blog.csdn.net/zisheng_wang_DATA/article/details/50555860

    https://blog.csdn.net/dashujuedu/article/details/53487199

    Spark为什么快?

    1、消除了冗余的HDFS读写

    Hadoop每次shuffle操作后,必须写到磁盘,而Spark在shuffle后不一定落盘,可以cache到内存中,以便迭代时使用。如果操作复杂,很多的shufle操作,那么Hadoop的读写IO时间会大大增加。

    2、消除了冗余的MapReduce阶段

    Hadoop的shuffle操作一定连着完整的MapReduce操作,冗余繁琐。而Spark基于RDD提供了丰富的算子操作,且reduce操作产生shuffle数据,可以缓存在内存中。

    3、JVM的优化

    Hadoop每次MapReduce操作,启动一个Task便会启动一次JVM,基于进程的操作。而Spark每次MapReduce操作是基于线程的,只在启动Executor是启动一次JVM,内存的Task操作是在线程复用的。

    每次启动JVM的时间可能就需要几秒甚至十几秒,那么当Task多了,这个时间Hadoop不知道比Spark慢了多少。

    考虑一种极端查询:Select month_id,sum(sales) from T group by month_id;

    这个查询只有一次shuffle操作,此时,也许Hive HQL的运行时间也许比Spark还快。

    结论:Spark快不是绝对的,但是绝大多数,Spark都比Hadoop计算要快。这主要得益于其对mapreduce操作的优化以及对JVM使用的优化。

    参考:https://blog.csdn.net/lmalds/article/details/51189924

    spark的四大核心组件

    在这里插入图片描述
    相对于第一代的大数据生态系统Hadoop中的MapReduce,Spark 无论是在性能还是在方案的统一性方面,都有着极大的优势。Spark框架包含了多个紧密集成的组件,如图4所示。位于底层的是Spark Core,其实现了Spark的作业调度、内存管理、容错、与存储系统交互等基本功能,并针对弹性分布式数据集提供了丰富的操作。在Spark Core的基础上,Spark提供了一系列面向不同应用需求的组件,主要有Spark SQL、Spark Streaming、MLlib、GraphX。

    1.Spark SQL

    Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发者将SQL语句融入到Spark应用程序开发过程中,无论是使用Python、Java还是Scala,用户可以在单个的应用中同时进行SQL查询和复杂的数据分析。由于能够与Spark所提供的丰富的计算环境紧密结合,Spark SQL得以从其他开源数据仓库工具中脱颖而出。Spark SQL在Spark l.0中被首次引入。在Spark SQL之前,美国加州大学伯克利分校曾经尝试修改Apache Hive以使其运行在Spark上,进而提出了组件Shark。然而随着Spark SQL的提出与发展,其与Spark引擎和API结合得更加紧密,使得Shark已经被Spark SQL所取代。

    2.Spark Streaming

    众多应用领域对实时数据的流式计算有着强烈的需求,例如网络环境中的网页服务器日志或是由用户提交的状态更新组成的消息队列等,这些都是实时数据流。Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。由于这些API与Spark Core中的基本操作相对应,因此开发者在熟知Spark核心概念与编程方法之后,编写Spark Streaming应用程序会更加得心应手。从底层设计来看,Spark Streaming支持与Spark Core同级别的容错性、吞吐量以及可伸缩性。

    3.MLlib

    MLlib是Spark提供的一个机器学习算法库,其中包含了多种经典、常见的机器学习算法,主要有分类、回归、聚类、协同过滤等。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语,包括一个通用的梯度下降优化基础算法。所有这些方法都被设计为可以在集群上轻松伸缩的架构。

    4.GraphX

    GraphX是Spark面向图计算提供的框架与算法库。GraphX中提出了弹性分布式属性图的概念,并在此基础上实现了图视图与表视图的有机结合与统一;同时针对图数据处理提供了丰富的操作,例如取子图操作subgraph、顶点属性操作mapVertices、边属性操作mapEdges等。GraphX还实现了与Pregel的结合,可以直接使用一些常用图算法,如PageRank、三角形计数等。

    上述这些Spark核心组件都以jar包的形式提供给用户,这意味着在使用这些组件时,与MapReduce上的Hive、Mahout、Pig等组件不同,无需进行复杂烦琐的学习、部署、维护和测试等一系列工作,用户只要搭建好Spark平台便可以直接使用这些组件,从而节省了大量的系统开发与运维成本。将这些组件放在一起,就构成了一个Spark软件栈。基于这个软件栈,Spark提出并实现了大数据处理的一种理念——“一栈式解决方案(one stack to rule them all)”,即Spark可同时对大数据进行批处理、流式处理和交互式查询,如图5所示。借助于这一软件栈用户可以简单而低耗地把各种处理流程综合在一起,充分体现了Spark的通用性。
    在这里插入图片描述
    参考:https://blog.csdn.net/c36qUCnS2zuqF6/article/details/81518150

    https://blog.csdn.net/mys_35088/article/details/81020542

    3. 理解spark的RDD

    参考:https://blog.csdn.net/u011094454/article/details/78992293

    https://blog.csdn.net/guohecang/article/details/51736572

    4. 使用shell方式操作Spark,熟悉RDD的基本操作

    RDD的操作分为两种,一种是转化操作,一种是执行操作,转化操作并不会立即执行,而是到了执行操作才会被执行。

    转化操作

    map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD

    flatMap() 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD

    filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

    distinct() 没有参数,将RDD里的元素进行去重操作

    union() 参数是RDD,生成包含两个RDD所有元素的新RDD

    intersection() 参数是RDD,求出两个RDD的共同元素

    subtract() 参数是RDD,将原RDD里和参数RDD里相同的元素去掉

    cartesian() 参数是RDD,求两个RDD的笛卡儿积

    行动操作

    collect() 返回RDD所有元素

    count() RDD里元素个数

    countByValue() 各元素在RDD中出现次数

    reduce() 并行整合所有RDD数据,例如求和操作

    fold(0)(func) 和reduce功能一样,不过fold带有初始值

    aggregate(0)(seqOp,combop) 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样

    foreach(func) 对RDD每个元素都是使用特定函数

    行动操作每次的调用时不会存储前面的计算结果的,若果想要存储前面的操作结果需要把结果加载需要在需要缓存中间结果的RDD调用cache(),cache()方法是把中间结果缓存到内存中,也可以指定缓存到磁盘中(也可以只用persisit())

    参考:https://blog.csdn.net/HANLIPENGHANLIPENG/article/details/53508746

    输入spark-shell进入spark交互式环境中。RDDs可以使用Hadoop InputFormats(例如HDFS文件)创建,也可以从其他的RDDs转换。让我们在Spark源代码目录里从README.md文本文件中创建一个新的RDD。

    Spark Shell启动时遇到:14: error: not found: value spark import spark.implicits._ :14: error: not found: value spark import spark.sql错误的解决办法(图文详解)

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
          /_/
             
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")
    textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24
    

    file:///usr/local/spark/README.md,首部的file代表本地目录,注意file:后有三个斜杠(/);中间加粗部分是我的spark安装目录。

    RDD的actions从RDD中返回值,transformations可以转换成一个新RDD并返回它的引用。下面展示几个action:

    scala> textFile.count()
    res0: Long = 104
    
    scala> textFile.first()
    res1: String = # Apache Spark
    

    其中,count代表RDD中的总数据条数;first代表RDD中的第一行数据。

    下面使用一个transformation,我们将使用filter函数对textFile这个RDD进行过滤,取出包含字符串"Spark"的行,并返回一个新的RDD:

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
    linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26
    

    当然也可以把actions和transformations连接在一起使用:
    上面这条语句表示有多少行包括字符串"Spark"。

    更多RDD操作

    RDD actions和transformations能被用在更多的复杂计算中。比如想要找到一行中最多的单词数量:

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    res3: Int = 22
    

    首先将行映射成一个整型数值产生一个新的RDD。在这个新的RDD上调用reduce找到行中最大的单词数个数。map和reduce的参数是Scala的函数串(闭包),并且可以使用任何语言特性或者Scala/Java类库。例如,我们可以很方便地调用其他的函数声明。我们使用Math.max()函数让代码更容易理解:

    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res4: Int = 22
    

    大家都知道,Hadoop流行的一个通用数据流模式是MapReduce。Spark能够很容易地实现MapReduce:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
    wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:27
    

    这里,我们结合了flatMap、map和reduceByKey来计算文件里每个单词出现的数量,它的结果是包含一组(String, Int)键值对的RDD。我们可以使用collect操作收集单词的数量:

    scala> wordCounts.collect()
    res5: Array[(String, Int)] = Array((package,1), (For,3), (Programs,1), (processing.,1), (Because,1), (The,1), 
    (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (than,1), (APIs,1), 
    (have,1), (Try,1), (computation,1), (through,1), (several,1), (This,2), (graph,1), (Hive,2), (storage,1), 
    (["Specifying,1), (To,2), ("yarn",1), (Once,1), (prefer,1), (SparkPi,2), (engine,1), (version,1), (file,1), 
    (documentation,,1), (processing,,1), (the,24), (are,1), (systems.,1), (params,1), (not,1), (different,1), (refer,2), 
    (Interactive,2), (R,,1), (given.,1), (if,4), (build,4), (when,1), (be,2), (Tests,1), (Apache,1), (thread,1), (programs,,1), 
    (including,4), (./bin/run-example,2), (Spark.,1), (package.,1), (1000).count(),1), (Versions,1), (HDFS,1), (Data.,1), (>>>,...
    

    缓存

    Spark支持把数据集缓存到内存之中,当要重复访问时,这是非常有用的。举一个简单的例子:

    scala> linesWithSpark.cache()
    res6: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:26
    
    scala> linesWithSpark.count()
    res7: Long = 20
    
    scala> linesWithSpark.count()
    res8: Long = 20
    
    scala> linesWithSpark.count()
    res9: Long = 20
    

    先缓存linesWithSpark数据集,然后重复访问count函数返回的值。当然,我们并不能察觉明显的查询速度变化,但是当在大型的数据集中使用缓存,将会非常显著的提升相应的迭代操作速度。

    参考:https://blog.csdn.net/universe_ant/article/details/52014068

    5. 使用jupyter连接集群的pyspark

    Spark伪分布式环境搭建 + jupyter连接spark集群
    备份Python2.7

    [root@spark1 ~]# cd /usr/
    [root@spark1 usr]# cd bin
    [root@spark1 bin]# ls python*
    

    python python2 python2.7

    三个显示结果中最后一个是python2.7,实际上这几个文件之间是有依赖关系的。在 ls 后面加个 -al参数,如下:

    [root@spark1 bin]# ls -al  python*
    

    lrwxrwxrwx. 1 root root 7 Jul 24 21:39 python -> python2
    lrwxrwxrwx. 1 root root 9 Jul 24 21:39 python2 -> python2.7
    -rwxr-xr-x. 1 root root 7216 Oct 30 2018 python2.7

    依赖关系很明显就可以看到。我们要安装版本3,首先要把刚才显示的三个python文件中的第一个python给备份一下(不保留源文件,仅保留备份文件就可以)

    [root@spark1 bin]# mv python python.bak
    

    安装python3
    先在官网下载python3的压缩包,然后在指定的文件夹里进行解压。

    cd /usr/bin/
    tar -xvzf  Python-3.6.9.tgz
    cd Python-3.6.9
    ./configure --prefix=/usr/local/python3.6.9
    make
    make install
    

    我在这里安装过程中遇到一些错误,主要是两点,其一是要在 /usr/bin/yum 的文件里,将第一行的python版本进行修改,这里我直接改为python2.7,yum默认是python2的;其二是要把下载器中的python也修改对应的版本。

    vi /usr/libexec/urlgrabber-ext-down
    
    将文件头部的
    #!/usr/bin/python
    
    改成
    #!/usr/bin/python2.7
    

    下面是遇到错误后我找的一些解决方法。

    Linux下切换Python版本

    Python3.6安装报错 configure: error: no acceptable C compiler found in $PATH

    centos7 yum报错Error downloading packages:

    yum出现Error downloading packages错误

    Python安装常见问题(1):zipimport.ZipImportError: can’t decompress data

    yum -y install zlib*
    vim Module/Setup
    
    找到下面这一行
    #zlib zlibmodule.c -I$(prefix)/include -L$(exec_prefix)/lib -lz
    去掉注释
         zlib zlibmodule.c -I$(prefix)/include -L$(exec_prefix)/lib -lz
    
    cd ..
    make && make install
    

    终于安装成功!

    Anaconda安装

    Linux Centos7安装anaconda3和jupyter
    清华大学开源软件镜像站

    6. 理解Spark的shuffle过程

    MapReduce中的shuffle机制

    在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。

    Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

    什么是Spark Shuffle?

    比如: reduceByKey是一个聚合类的函数(reduceByKey包括groupByKey和reduce),当使用reduceByKey时会产生Shuffle过程,reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成 一个value,然后生成一个新的RDD,元素类型是对的形 式,这样每一个key对应一个聚合起来的value。

    Spark中的Shuffle是把一组无规则的数据尽量转换成一组具有一定规则的数据。

    Spark Shuffle产生的问题

    每一个key对应的value不一定都是在一个partition中 ,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,它的partition极有可能分布在各个节点上。

    既然出现如上的问题,那么Spark如何进行聚合?

    – Shuffle Write:上一个stage的每个map task就必须保证将自己处理 的当前分区中的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。

    – Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。

    参考:Spark学习笔记——spark shuffle过程

    https://blog.csdn.net/databatman/article/details/53023818

    7. 学会使用SparkStreaming

    什么是Spark Streaming?

    首先,什么是流(streaming)?数据流是连续到达的无穷序列。流处理将不断流动的输入数据分成独立的单元进行处理。流处理是对流数据的低延迟处理和分析。Spark Streaming是Spark API核心的扩展,可实现实时数据的快速扩展,高吞吐量,高容错处理。Spark Streaming适用于大量数据的快速处理。

    Spark Streaming支持如HDFS目录,TCP套接字,Kafka,Flume,Twitter等数据源。数据流可以用Spark 的核心API,DataFrames SQL,或机器学习的API进行处理,并且可以被保存到HDFS,databases或Hadoop OutputFormat提供的任何文件系统中去。

    Spark Straming如何工作

    Spark Streaming将数据流每X秒分作一个集合,称为Dstreams,它在内部是一系列RDD。您的Spark应用程序使用Spark API处理RDD,并且批量返回RDD操作的结果。

    编写 Spark Streaming 程序的基本步骤是

    通过创建输入DStream来定义输入源;
    通过对 DStream 应用的 转换操作 和 输出操作 来定义流计算;
    用streamingContext.start()来开始接收数据和处理流程;
    通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束);
    可以通过streamingContext.stop()来手动结束流计算进程。
    本例是创建一个实时的wordcount程序,程序监听TCP套接字的数据服务器获取文本数据,然后计算文本中包含的单词数。本例是在spark-shell中实现的,如果想编写独立的应用程序会稍有不同。在编写程序之前我们要先运行Netcat作为数据服务器,比如你可以在master节点运行以下命令:

    nc –lk 9999
    

    如果你没有安装nc,需要运行下列命令安装nc:

    # Removes the old package
    yum erase nc
    
    # Manually downloads the working package from the Official Repository
    wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm
    
    # Installs the package
    rpm -iUv nc-1.84-22.el6.x86_64.rpm
    

    如果你跟我一样找不到yum(原因是之前把python2的软链接改了),那么:

    vi /usr/bin/yum
    # 将第一行的/usr/bin/python改成/usr/bin/python2
    

    然后安装nc,并运行nc –lk 9999命令。
    时下面会有一个光标在闪动。

    然后开第二个终端,在spark-shell中输入 :paste 回车,冒号也要输入,进入粘贴模式。

    然后复制下面的代码:

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    val ssc = new StreamingContext(sc, Seconds(1))
    val lines = ssc.socketTextStream("master", 9999)
     
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
     
    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    

    按下ctrl+D执行这段代码。这时会不断打印出处理结果,每1s打印一次。

    这时我们在第一个终端输入hello word,回车。看到第二个终端结果中出现(hello,1),(world,1),成功。

    使用的过程中遇到错误,参考启动 ./spark-shell 命令报错解决了,不过我不敢删除文件夹,就把文件夹重命名了。

    参考:腾讯云技术社区:Spark Streaming入门

    使用 Spark Streaming 进行实时流计算(一)

    https://blog.csdn.net/u013468917/article/details/51249711

    8. 说一说take,collect,first的区别,为什么不建议使用collect?

    first:返回第一个元素

    scala> val rdd = sc.parallelize(List(1,2,3,3))
    
    scala> rdd.first()
    res1: Int = 1
    

    take:rdd.take(n)返回第n个元素

    scala> val rdd = sc.parallelize(List(1,2,3,3))
    
    scala> rdd.take(2)
    res3: Array[Int] = Array(1, 2)
    

    collect:rdd.collect() 返回 RDD 中的所有元素

    scala> val rdd = sc.parallelize(List(1,2,3,3))
    
    scala> rdd.collect()
    res4: Array[Int] = Array(1, 2, 3, 3)
    

    Spark内有collect方法,是Action操作里边的一个算子,这个方法可以将RDD类型的数据转化为数组,同时会从远程集群是拉取数据到driver端。

    首先,collect是Action里边的,根据RDD的惰性机制,真正的计算发生在RDD的Action操作。那么,一次collect就会导致一次Shuffle,而一次Shuffle调度一次stage,然而一次stage包含很多个已分解的任务碎片Task。这么一来,会导致程序运行时间大大增加,属于比较耗时的操作,即使是在local模式下也同样耗时。

    其次,从环境上来讲,本机local模式下运行并无太大区别,可若放在分布式环境下运行,一次collect操作会将分布式各个节点上的数据汇聚到一个driver节点上,而这么一来,后续所执行的运算和操作就会脱离这个分布式环境而相当于单机环境下运行,这也与Spark的分布式理念不合。

    最后,将大量数据汇集到一个driver节点上,并且像这样val arr = data.collect(),将数据用数组存放,占用了jvm堆内存,可想而知,是有多么轻松就会内存溢出。

    官方说法:

    打印一个弹性分布式数据集元素,使用时要注意不要导致内存溢出!

    建议使用 take(): rdd.take(100).foreach(println),

    而不使用rdd.collect().foreach(println)。

    因为后者会导致内存溢出!!

    (spark collect操作的特点是从远程集群是拉取数据到本地,经过网络传输,如果数据量的话,会给网络造成很大的压力,和foreach的区别是,foreach是在远程集群上遍历rdd中的元素,如果是在本地的话,差别不大。建议使用foreach,不要用collect.)

    因此:

    若需要遍历RDD中元素,大可不必使用collect,可以使用foreach语句;

    若需要打印RDD中元素,可用take语句,返回数据集前n个元素

    参考:https://blog.csdn.net/T1DMzks/article/details/70667011

    https://blog.csdn.net/high2011/article/details/53138279

    https://blog.csdn.net/chaoshengmingyue/article/details/82021746

    9. 向集群提交Spark程序

    (1)在集群中运行应用程序JAR包

    向独立集群管理器提交应用,需要把spark://master:7077作为主节点参数递给spark-submit。下面我们可以运行Spark安装好以后自带的样例程序SparkPi,它的功能是计算得到pi的值(3.1415926)。

    到spark的安装目录下的examples/jars路径下查看自带的样例程序SparkPi:

    [root@CP1 jars]# ll
    total 2052
    -rw-r--r--. 1 500 500  121970 Apr 25  2017 scopt_2.11-3.3.0.jar
    -rw-r--r--. 1 500 500 1975967 Apr 25  2017 spark-examples_2.11-2.1.1.jar
    

    然后回到spark目录下,运行下列代码:

    bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 examples/jars/spark-examples_2.11-2.1.1.jar 100 2>&1 | grep "Pi is roughly"
    

    2)在集群中运行pyspark

    也可以用spark-shell连接到独立集群管理器上。

    首先做一点准备工作,把一个README.md文件拷贝到HDFS上,用于后面的测试。

    cd /usr/local/hadoop/
    # 下面这条命令中,我们把spark安装目录下的README.md文件上传到分布式文件系统HDFS的根目录下
    bin/hadoop fs -put /usr/local/spark/README.md /
    

    在Shell中输入如下命令启动进入pyspark:

    cd /usr/local/spark/
    bin/pyspark --master spark://master:7077
    

    当然,我运行到这就进入jupyter notebook了,因为之前配置过。如果你在pyspark交互式界面,可以输入以下命令测试(还记得嘛,我在scala界面测试过):

    >>> textFile = sc.textFile("hdfs://master:9000/README.md")
    >>> textFile.count()
    99                                                                 
    >>> textFile.first()
    # Apache Spark
    

    Hadoop YARN管理器

    (1)在集群中运行应用程序JAR包
    向Hadoop YARN集群管理器提交应用,需要把yarn-cluster作为主节点参数递给spark-submit。
    请登录Linux系统,打开一个终端,在Shell中输入如下命令:

    bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster examples/jars/spark-examples_2.11-2.1.1.jar
    

    然后会出现一个tracking URL,复制该URL到浏览器,点击查看Logs,再点击stdout,即可查看结果

    参考:

    在集群上运行Spark应用程序(Python版)_厦大数据库实验室博客

    10. 使用spark进行计算

    《The man of property》中共出现过多少不重复的单词,以及出现次数最多的10个单词。

    # 设置数据的路径
    textData = sc.textFile("file:///usr/local/liling/TheManOfProperty.txt")
    
    # 将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合
    splitData = textData.flatMap(lambda line:line.split(" "))
    
    # 处理合并后的集合中的元素,每个元素的值为1,返回一个元组(key,value)
    # 其中key为单词,value这里是1,即该单词出现一次
    flagData = splitData.map(lambda word:(word,1))
    
    # reduceByKey会将textSplitFlag中的key相同的放在一起处理
    # 传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1
    countData = flagData.reduceByKey(lambda x,y:x+y)
    
    countData.count()
    # 输出:18344
    

    因为之前配置过了,进入pyspark后已经有sc环境,所以直接使用就可以。可以看到不重复的单词有18344个,应该是对的吧。

    出现次数最多的10个单词

    result = countData.sortBy(lambda x: x[1], False).take(10)
    result
    

    输出:

    [('the', 5229),
     ('of', 3496),
     ('', 3189),
     ('to', 2824),
     ('and', 2593),
     ('a', 2512),
     ('he', 2069),
     ('his', 1902),
     ('in', 1733),
     ('was', 1695)]
    

    参考:

    https://blog.csdn.net/proplume/article/details/79798289

    python spark 通过key来统计不同values个数

    pyspark进行词频统计并返回topN - Sea_Sky - 博客园

    11. 计算出movielen数据集中,平均评分最高的五个电影

    12. 计算出movielen中,每个用户最喜欢的前5部电影

    13. 学会阅读Spark源码,整理Spark任务submit过程

    展开全文
  • Spark 核心API

    2019-06-15 12:14:59
    Spark配置对象,设置各种参数,使用kv类型。 2.SparkContext spark主要入口点,代表到spark集群的连接,可以创建 rdd、累加器和广播变量。 每个JVM中只能有一个SparkContext,启动新的SparkContext必须stop的...

    1.SparkConf
        Spark配置对象,设置各种参数,使用kv类型。
    2.SparkContext
        spark主要入口点,代表到spark集群的连接,可以创建
        rdd、累加器和广播变量。

        每个JVM中只能有一个SparkContext,启动新的SparkContext必须stop的原来的。 

        val rdd1 = sc.textFile()

    3.RDD
        rdd有依赖列表.
        弹性分布式数据库,是spark的基本抽象,表示为不可变的、分区化的集合,可用于并行计算。
        该类包含基本操作,map、filter、persist。
        对于kv类型的rdd,方法封装在PairRDDFunction类中。
        轻量的集合,里面没有数据。

        内部有5大属性:
        1.分区列表
        2.计算每个切片的函数
        3.到其他RDD的依赖列表
        4.(可选)针对kv类型RDD的分区类
        5.(可选)计算每个的首选的位置列表。

    RDD常见操作
    ------------------
        rdd都是延迟计算的,只有调用action方法时,才会触发job的提交。
        1.变换
            只要返回新的RDD就是transform。
            map
            filter
            flatMap
            mapPartitons                //对每个分区进行变换处理
            sample
            union
            distinct
            intersection
            groupByKey                    //没有combine过程,可以改变v类型
            reduceByKey                    //有combine过程,不能改变v类型
            join
        2.action
            2.1)collect
            2.2)foreachPartition        //迭代每个分区,

    4.Dependency
        依赖,
        指的是子RDD的每个分区和父RDD的分区之间数量的对应关系。
        Dependency
            |
            |---NarrowDependency(窄依赖)
                |----OneToOne依赖(一对一依赖)
                |----Range依赖(范围依赖)
                |----Prune依赖(修剪依赖)

            |---ShuffleDependency(宽依赖)

    4.Stage
        阶段是并行任务的集合,由调度器运行DAG图根据shuffle进行划分成若干stage。
        阶段分两种类型:ShuffleMapStage和ResultStage
        1.ShuffleMapStage
            该阶段的输出是下一个阶段的输入,跟踪每个节点的输出情况。
            一个阶段会重试执行多次处于容错考虑。
            由多个ShuffleMapTask构成。

        2.ResultStage
            在某些分区上应用计算函数,有些操作例如take(n)/first()没必要在所有分区上执行的。
            结果阶段的输出结果回传给driver.
            由多个ResultTask构成。

        
    5.Task
        Spark执行的最小单位,有两种类型,和Stage相对。
        1.ShuffleMapTask
            
        2.ResultTask
            执行任务,并将结果回传给driver。

    6.job
        每个action是一个job。

    7.Application
        一个应用有多个job,对应一个SparkContext。

    展开全文
  • Spark Java API(一)

    2018-08-19 12:35:30
    一、Spark简介  1、什么是Spark  发源于AMPLab实验室的分布式内存计算平台,它克服了MapReduce在迭代式计算和交互式计算方面的不足。  相比于MapReduce,Spark能充分利用内存资源提高计算效率。  2、Spark...

    原文引自:http://blog.csdn.net/u011497897/article/details/71440323

    一、Spark简介

      1、什么是Spark

        发源于AMPLab实验室的分布式内存计算平台,它克服了MapReduce在迭代式计算和交互式计算方面的不足。

        相比于MapReduce,Spark能充分利用内存资源提高计算效率。

      2、Spark计算框架

        Driver程序启动很多workers,然后workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),最后对RDD在内存中进行缓存和计算

    3、为什么Spark计算速度快

        (1)内存计算

        (2)优化执行计划

       4、Spark Api语言支持

        (1)Scala

        (2)Java

        (3)Python

      5、怎么运行Spark

        Local本地模式、Spark独立集群、Mesos、Yarn-Standalone、Yarn-Client

    二、编程模型

      1、RDD(弹性分布式数据集)是什么

        只读的、分块的数据记录集合

        可以通过读取来不同存储类型的数据进行创建、或者通过RDD操作生成(map、filter操作等)

        使用者只能控制RDD的缓存或者分区方式

        RDD的数据可以有多种类型存储方式(可(序列化)存在内存或硬盘中) 

    2、RDD 存储类型 

        RDD可以设置不同类型存储方式,只存硬盘、只存内存等。

     

     

    3、RDD操作

        Transformation:根据已有RDD创建新的RDD数据集build

        Action:在RDD数据集运行计算后,返回一个值或者将结果写入外部存储

     

    4、RDD如何创建

        

       首先创建JavaSparkContext对象实例sc

            JavaSparkContext  sc = new JavaSparkContext("local","SparkTest");

        接受2个参数:

          第一个参数表示运行方式(local、yarn-client、yarn-standalone等)

          第二个参数表示应用名字       直接从集合转化 sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))

      从HDFS文件转化 sc.textFile("hdfs://")

      从本地文件转化 sc.textFile("file:/")

      下面例子中list2就是根据data2List生成的一个RDD

     

     

    根据文件或者集合生成RDD后,接着就可以通过RDD的Transformation操作来完成对数据的各种转化操作

      常用的map、flatMap、filter操作都是对单元素的操作

      常用的groupByKey、join都是对(key、value)类型元素操作

    5、RDD操作例子Java Api

      (1)map

        map操作对数据集每行数据执行函数里面操作

        list1数据集("a,b,c,d,e"),("1,2,3,4,5"); 

    执行结果:对list1数据集每行数据用","进行切分

    2)flatMap

          flatMap相比于map操作,它对每行数据操作后会生成多行数据,而map操作只会生成一行。

    执行结果:对list1数据集每行数据用","进行切分

    3)filter

         filter对每行数据执行过滤操作,返回true则保留,返回false则过滤该行数据

    执行结果:过滤list1数据集中包含‘a’字符的行

     

    4)union

          union操作对两个RDD数据进行合并。与SQL中的union一样

          list2数据集("11,22,33,44,55"),("aa,bb,cc,dd,ee"); 

    执行结果:合并list1与list2数据集

    (5)groupByKey

          groupByKey对pair中的key进行group by操作

          pair1RDD数据集("a,1"),("b,2"),("a,3"),("b,4")

    执行结果:对pair1RDD数据集按key进行group by

     

    (6)reduceByKey

          reduceByKey对pair中的key先进行group by操作,然后根据函数对聚合数据后的数据操作

    执行结果:先group by操作后进行concat

     

    (7)mapValues

          mapValues操作对pair中的value部分执行函数里面的操作

    执行结果:对pair1RDD中value部分加上test字符串

     

    8)join

          join与sql中join含义一致,将两个RDD中key一致的进行join连接操作

          pair2RDD数据集("a,11"),("b,22"),("a,13"),("c,4")

    执行结果:对pair1RDD与pair2RDD按key进行join

    (9)cogroup

          cogroup对两个RDD数据集按key进行group by,并对每个RDD的value进行单独group by

    执行结果:对pair1RDD与pair2RDD按key进行cogroup

    6、RDD数据如何输出

        使用RDD的Transformation对数据操作后,需要再使用Action操作才能将结果数据输出

        可以分别使用count、collect、save等操作来输出或统计RDD结果

      

    7、RDD Action实例

    执行结果:

          count:统计输出数据行数

    collect:输出所有输出数据

    save:保存输出数据至外部存储

     

    7、WordCount实例   

     执行结果:

    8、广播变量& 累加器

        Broadcast variables(广播变量) 

          广播变量,类似于hadoop中的distribute cache,将同一份数据分发至每台机器。

        Accumulators(累加器)

          类似于MapReduce中的counter,用于计数

     

    三、调度机制

      1、DAG Scheduler

        为每个job分割stage,同时会决定最佳路径,并且DAG Scheduler会记录哪个RDD或者stage的数据被checkpoint,从而找到最优调度方案                                         (transformations是延迟执行的原因)

     

    2、DAG Scheduler优化

        单个Stage内Pipeline执行

        基于分区选择合适的join算法最小化shuffle

        重用已经cache过的数据

      3、窄依赖& 宽依赖

        窄依赖:每个子分区只依赖有限数目的父分区 

        宽依赖:每个子分区只依赖所有的父分区

    4、Stage

        调度器会在产生宽依赖的地方形成一个stage,同一个stage内的RDD操作会流式执行,不会发生数据迁移。

    rdd join操作属于宽依赖,从spark产生的日志可以看出需要分3个stage执行

    rdd flatMap、Map操作属于窄依赖,从spark产生的日志可以看出需要分1个stage执行

    5、Shuffle

        每个RDD都可以选择Partitioner进行shuffle操作

        任何在两个RDD上的shuffle操作,将选择其中一个RDD的Partitioner作为标准。如果两个RDD都没有设置Partitioner的话,就使用默认的HashPartitioner

        shuffle需要在node之间移动数据,会影响spark执行效率,应该尽量避免RDD操作中发生shuffle。

    展开全文
  • Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和collect Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Scala ...
  • spark API RDD

    2020-06-03 23:30:10
    spark API RDD pdf版的..........对初学者应该有所帮助
  • 开发自己的spark API

    2015-11-29 12:06:03
    spark提供的API函数能满足大多数场景的应用,但是有时候也需要根据实际数据,自己开发API,以进一步提高性能,看下面一个小例子val rdd=sc.parallelize(Array(1,2,3,4,5,6,Double.NaN,7,8,9),2)现在要对其进行统计...
  • Spark API Java编程使用方法如何使用map Use lambda syntax JavaDStream<String> lines = messages.map(s -> s.substring(0, 5)) Implement the Function interfaces // Function[T1, R] JavaDStream<String> lines ...
  • Spark亚太研究院系列丛书——Spark实战高手之路 从零开始》本书通过Spark的shell测试Spark的工作;使用Spark的cache机制观察一下效率的提升构建Spark的IDE开发环境;通过Spark的IDE搭建Spark开发环境;测试Spark ...
  • 这次 我们以指定executor-memory参数的方式来启动spark-shell: 启动成功了 在命令行中我们指定了spark-shell运行暂用的每个机器上的executor的内存为1g大小,启动成功后参看web页面: 从...
  • 创建一个Scala IDEA工程: 点击“Next”: 点击“Finish”完成工程的创建: 修改项目的属性: 首先修改Modules选项: ...因为要开发Spark程序,所以需要把Spark的开发需要的jar包导
  • 初识SparkSpark API

    2018-01-18 17:35:46
    API应用可以通过使用Spark提供的库获得Spark集群的计算能力,这些库都是Scala编写的,但是Spark提供了面向各种语言的API,例如Scala、Python、Java等,所以可以使用以上语言进行Spark应用开发。 SparkAPI主要由两...
  • 我们现在需要监控datapre0这个任务每一次执行的进度,操作如下: 1. 如图所示,打开spark管理页面,找到对应的任务,点击任务名datapre0 2. 进去之后,获得对应IP和端口 ...4. 其他API说明(对应spar...
  • 在YARN上运行Spark API

    2018-12-18 11:43:13
    启动命令格式: $ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] &lt;app jar&...$ ./bin/spark-submit --class org.apache.spark.exam...
  • 代码片段1: package ... import org.apache.spark._ import org.eclipse.jetty.client.ContentExchange import org.eclipse.jetty.client.HttpClient object BasicMapPartition
  • 本节中所用到的内容是来自搜狗实验室,网址为:...我们使用的是迷你版本的tar.gz格式的文件,其大小为87K,下载后如下所示: 上传到服务器后,解压并查看: ...访问时间 \t 用户ID \t 查询词 \t 该URL在返
  • spark api学习网站

    2016-10-11 14:25:55
    推荐一个spark api学习网站 http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
1 2 3 4 5 ... 20
收藏数 51,963
精华内容 20,785
热门标签
关键字:

sparkapi