• 1.spark-shell的本地模式和集群模式 1.1 local本地模式 直接启动spark-shell命令窗口 脚本启动后,会生成一个SparkContext的上下文对象sc。并且启动的是本地模式(local)。如图: 1.1.1 加载本地数据 sc.t...

    原文:https://blog.csdn.net/qq_33689414/article/details/80232605

    1.spark-shell的本地模式和集群模式

    1.1 local本地模式

    直接启动spark-shell命令窗口

    脚本启动后,会生成一个SparkContext的上下文对象sc。并且启动的是本地模式(local)。如图:

    1.1.1 加载本地数据

    sc.textFile("file:///home/hadoop/words.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).sortBy(_._2,true).collect()

       

    1.1.2 加载hdfs数据

    # 因为在spark-env.sh中配置了HADOOP_CONF_DIR目录,所以默认使用hdfs文件系统。
    # /spark/words.txt:表示hdfs文件系统目录 
    sc.textFile("/spark/words.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).sortBy(_._2,true).collect()

    1.2 集群模式

    1.2.1 启动集群模式的spark-shell窗口

    spark-shell --master spark://server01:7077 --total-executor-cores 3 --executor-memory 1g

      参数介绍:

    --master spark://server01:7077:指定master进程的机器
    
    --total-executor-cores 3:指定executor的核数(worker数量)
    
    --executor-memory 1g:指定executor执行的内存大小

    1.2.2 代码执行

    sc.textFile("/spark/words.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).sortBy(_._2).collect()

    1.2.3 结果查看

    通过集群模式提交的任务,在web页面上是有展示的

    本地模式(local)和集群模式的区别

    1.本地模式不运行在集群上,运行在当前执行的机器上

    2.本地模式的任务不会在web页面显示

    3.本地模式是采用线程来模拟集群的worker进程

    2. scala api实现的本地模式和集群模式

    2.1 local本地模式

    object WordCountLocal {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")
            val sc = new SparkContext(conf)
            sc.textFile("file:///d:/a.txt").flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2).saveAsTextFile("file:///d:/out")
        }
    }

     

    因为本地安装了spark,所以可以直接在本地运行,在本地运行。

    setMaster("local[2]")中local表示本地运行,[2]表示是使用2个线程。

    2.2 生成jar提交到集群

    代码:

    object WordCountMaster {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("WordCountMaster")
            val sparkContext = new SparkContext(conf)
            sparkContext.textFile(args(0)).flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2).saveAsTextFile(args(1))
        }
    }

     

    提交到集群:

    spark-submit \ 
    --master spark://server01:7077 \
    --executor-memory 1g \
    --total-executor-cores 2 \
    --class com.yundoku.spark.WordCountMaster \
    /home/hadoop/sparkscalawordcount.jar \ 
    /spark/words.txt \ 
    /spark/scala_wordcount_out
    

     

    参数讲解:

    --master:指定集群的master
    --executor-memory:executor的内存大小(worker)
    --total-executor-cores 2:executor核数为2(worker的执行个数)
    --class com.yundoku.spark.WordCountMaster:包含main的类,程序的入口
    
    /home/hadoop/sparkscalawordcount.jar :jar文件
    /spark/words.txt:输入参数1
    /spark/scala_wordcount_out:输入参数2
    

     

    运行结果:

    这里会生成part-000000和part-0000012个结果文件,表示有2个分区。

    原因是,在spark的读取文件时默认是使用的最小分区为2

    defaultMinPartitions的值初始化,如下图所示

    3.Spark集群三种部署模式的区别

    目前Apache Spark支持三种分布式部署方式,分别是standalone、spark on mesos和 spark on YARN,其中,第一种类似于MapReduce 1.0所采用的模式,内部实现了容错性和资源管理,后两种则是未来发展的趋势,部分容错性和资源管理交由统一的资源管理系统完成:让Spark运行在一个通用的资源管理系统之上,这样可以与其他计算框架,比如MapReduce,公用一个集群资源,最大的好处是降低运维成本和提高资源利用率(资源按需分配)。本文将介绍这三种部署方式,并比较其优缺点。

    1. Standalone模式

    即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础。借鉴Spark开发模式,我们可以得到一种开发新型计算框架的一般思路:先设计出它的standalone模式,为了快速开发,起初不需要考虑服务(比如master/slave)的容错性,之后再开发相应的wrapper,将stanlone模式下的服务原封不动的部署到资源管理系统yarn或者mesos上,由资源管理系统负责服务本身的容错。目前Spark在standalone模式下是没有任何单点故障问题的,这是借助zookeeper实现的,思想类似于Hbase master单点故障解决方案。将Spark standalone与MapReduce比较,会发现它们两个在架构上是完全一致的: 

    1)  都是由master/slaves服务组成的,且起初master均存在单点故障,后来均通过zookeeper解决(Apache MRv1的JobTracker仍存在单点问题,但CDH版本得到了解决); 
    2) 各个节点上的资源被抽象成粗粒度的slot,有多少slot就能同时运行多少task。不同的是,MapReduce将slot分为map slot和reduce slot,它们分别只能供Map Task和Reduce Task使用,而不能共享,这是MapReduce资源利率低效的原因之一,而Spark则更优化一些,它不区分slot类型,只有一种slot,可以供各种类型的Task使用,这种方式可以提高资源利用率,但是不够灵活,不能为不同类型的Task定制slot资源。总之,这两种方式各有优缺点。

    2. Spark On Mesos模式

    这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序(可参考Andrew Xia的“Mesos Scheduling Mode on Spark”): 

    1)   粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。 

    2)   细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。

    3. Spark On YARN模式

    这是一种很有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode)。这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划中了。 

    spark on yarn 的支持两种模式: 
    1) yarn-cluster:适用于生产环境; 
    2) yarn-client:适用于交互、调试,希望立即看到app的输出 

    yarn-cluster和yarn-client的区别在于yarn appMaster,每个yarn app实例有一个appMaster进程,是为app启动的第一个container;负责从ResourceManager请求资源,获取到资源后,告诉NodeManager为其启动container。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。

    总结: 

    这三种分布式部署方式各有利弊,通常需要根据实际情况决定采用哪种方案。进行方案选择时,往往要考虑公司的技术路线(采用Hadoop生态系统还是其他生态系统)、相关技术人才储备等。上面涉及到Spark的许多部署模式,究竟哪种模式好这个很难说,需要根据你的需求,如果你只是测试Spark Application,你可以选择local模式。而如果你数据量不是很多,Standalone 是个不错的选择。当你需要统一管理集群资源(Hadoop、Spark等),那么你可以选择Yarn或者mesos,但是这样维护成本就会变高。 
    · 从对比上看,mesos似乎是Spark更好的选择,也是被官方推荐的 
    · 但如果你同时运行hadoop和Spark,从兼容性上考虑,Yarn是更好的选择。 · 如果你不仅运行了hadoop,spark。还在资源管理上运行了docker,Mesos更加通用。 
    · Standalone对于小规模计算集群更适合!

    展开全文
  • Spark本地模式运行

    2019-05-29 01:09:18
    Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。在本地运行模式中,master和worker都运行在一个jvm进程中,通过该模式,可以快速的测试Spark的...

    Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。在本地运行模式中,master和worker都运行在一个jvm进程中,通过该模式,可以快速的测试Spark的功能。

    下载 Spark

    下载地址为http://spark.apache.org/downloads.html,根据页面提示选择一个合适的版本下载,这里我下载的是 spark-1.3.0-bin-cdh4.tgz。下载之后解压:

     cd ~
     wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.0/spark-1.3.0-bin-cdh4.tgz
     tar -xf spark-1.3.0-bin-cdh4.tgz
     cd spark-1.3.0-bin-cdh4
    

    下载之后的目录为:

    ⇒  tree -L 1
    .
    ├── CHANGES.txt
    ├── LICENSE
    ├── NOTICE
    ├── README.md
    ├── RELEASE
    ├── bin
    ├── conf
    ├── data
    ├── ec2
    ├── examples
    ├── lib
    ├── python
    └── sbin
    

    运行 spark-shell

    本地模式运行spark-shell非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

    $ MASTER=local 
    $ bin/spark-shell
    

    MASTER=local就是表明当前运行在单机模式。如果一切顺利,将看到下面的提示信息:

    Created spark context..
    Spark context available as sc.
    

    这表明spark-shell中已经内置了Spark context的变量,名称为sc,我们可以直接使用该变量进行后续的操作。

    spark-shell 后面设置 master 参数,可以支持更多的模式,请参考 http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

    我们在sparkshell中运行一下最简单的例子,统计在README.md中含有Spark的行数有多少,在spark-shell中输入如下代码:

    scala>sc.textFile("README.md").filter(_.contains("Spark")).count
    

    如果你觉得输出的日志太多,你可以从模板文件创建 conf/log4j.properties :

    $ mv conf/log4j.properties.template conf/log4j.properties
    

    然后修改日志输出级别为WARN

    log4j.rootCategory=WARN, console
    

    如果你设置的 log4j 日志等级为 INFO,则你可以看到这样的一行日志 INFO SparkUI: Started SparkUI at http://10.9.4.165:4040,意思是 Spark 启动了一个 web 服务器,你可以通过浏览器访问http://10.9.4.165:4040来查看 Spark 的任务运行状态等信息。

    pyspark

    运行 bin/pyspark 的输出为:

    $ bin/pyspark
    Python 2.7.6 (default, Sep  9 2014, 15:04:36)
    [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
    15/03/30 15:19:07 WARN Utils: Your hostname, june-mac resolves to a loopback address: 127.0.0.1; using 10.9.4.165 instead (on interface utun0)
    15/03/30 15:19:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    15/03/30 15:19:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ / __/  _/
       /__ / .__/\_,_/_/ /_/\_\   version 1.3.0
          /_/
    
    Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
    SparkContext available as sc, HiveContext available as sqlCtx.
    

    你也可以使用 IPython 来运行 Spark:

    IPYTHON=1  ./bin/pyspark
    

    如果要使用 IPython NoteBook,则运行:

    IPYTHON_OPTS="notebook"  ./bin/pyspark
    

    从日志可以看到,不管是 bin/pyspark 还是 bin/spark-shell,他们都有两个内置的变量:sc 和 sqlCtx。

    SparkContext available as sc, HiveContext available as sqlCtx
    

    sc 代表着 Spark 的上下文,通过该变量可以执行 Spark 的一些操作,而 sqlCtx 代表着 HiveContext 的上下文。

    spark-submit

    在Spark1.0之后提供了一个统一的脚本spark-submit来提交任务。

    对于 python 程序,我们可以直接使用 spark-submit:

    $ mkdir -p /usr/lib/spark/examples/python
    $ tar zxvf /usr/lib/spark/lib/python.tar.gz -C /usr/lib/spark/examples/python
    
    $ ./bin/spark-submit examples/python/pi.py 10
    

    对于 Java 程序,我们需要先编译代码然后打包运行:

    $ spark-submit --class "SimpleApp" --master local[4] simple-project-1.0.jar
    

    测试 RDD

    在 Spark 中,我们操作的集合被称为 RDD,他们被并行拷贝到集群各个节点上。我们可以通过 sc 来创建 RDD 。

    创建 RDD 有两种方式:

    • sc.parallelize()
    • sc.textFile()

    使用 Scala 对 RDD 的一些操作:

    val rdd1=sc.parallelize(List(1,2,3,3))
    val rdd2=sc.parallelize(List(3,4,5))
    
    //转换操作
    rdd1.map(2*).collect //等同于:rdd1.map(t=>2*t).collect
    //Array[Int] = Array(2, 4, 6, 6)
    
    rdd1.filter(_>2).collect
    //Array[Int] = Array(3, 3)
    
    rdd1.flatMap(_ to 4).collect
    //Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 3, 4, 3, 4)
    
    rdd1.sample(false, 0.3, 4).collect
    //Array[Int] = Array(3, 3)
    
    rdd1.sample(true, 0.3, 4).collect
    //Array[Int] = Array(3)
    
    rdd1.union(rdd2).collect
    //Array[Int] = Array(1, 2, 3, 3, 3, 4, 5)
    
    rdd1.distinct().collect
    //Array[Int] = Array(1, 2, 3)
    
    rdd1.map(i=>(i,i)).groupByKey.collect
    //Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1)), (2,CompactBuffer(2)), (3,CompactBuffer(3, 3)))
    
    rdd1.map(i=>(i,i)).reduceByKey(_ + _).collect
    //Array[(Int, Int)] = Array((1,1), (2,2), (3,6))
    
    rdd1.map(i=>(i,i)).sortByKey(false).collect
    //Array[(Int, Int)] = Array((3,3), (3,3), (2,2), (1,1))
    
    rdd1.map(i=>(i,i)).join(rdd2.map(i=>(i,i))).collect
    //Array[(Int, (Int, Int))] = Array((3,(3,3)), (3,(3,3)))
    
    rdd1.map(i=>(i,i)).cogroup(rdd2.map(i=>(i,i))).collect
    //Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(4))), (1,(CompactBuffer(1),CompactBuffer())), (5,(CompactBuffer(),CompactBuffer(5))), (2,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(3, 3),CompactBuffer(3))))
    
    rdd1.cartesian(rdd2).collect()
    //Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5), (3,3), (3,4), (3,5))
    
    rdd1.pipe("head -n 1").collect
    //Array[String] = Array(1, 2, 3, 3)
    
    //动作操作
    rdd1.reduce(_ + _)
    //Int = 9
    
    rdd1.collect
    //Array[Int] = Array(1, 2, 3, 3)
    
    rdd1.first()
    //Int = 1
    
    rdd1.take(2)
    //Array[Int] = Array(1, 2)
    
    rdd1.top(2)
    //Array[Int] = Array(3, 3)
    
    rdd1.takeOrdered(2)
    //Array[Int] = Array(1, 2)
    
    rdd1.map(i=>(i,i)).countByKey()
    //scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
    
    rdd1.countByValue()
    //scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
    
    rdd1.intersection(rdd2).collect()
    //Array[Int] = Array(3)
    
    rdd1.subtract(rdd2).collect()
    //Array[Int] = Array(1, 2)
    
    rdd1.foreach(println)
    //3
    //2
    //3
    //1
    
    rdd1.foreachPartition(x => println(x.reduce(_ + _)))
    

    更多例子,参考http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

    展开全文
  • 在做Kaggle比赛的时候,中间处理之后的训练数据有5000多万条数据,结果Spark 本地模式运行的时候,一直报出out of memory 问题。我在程序中使用了DataFrame.rdd.collect()方法。RDD的Collect()方法把RDD的数据全部放...

    在做Kaggle比赛的时候,中间处理之后的训练数据有5000多万条数据,结果Spark 本地模式运行的时候,一直报出out of memory 问题。我在程序中使用了DataFrame.rdd.collect()方法。RDD的Collect()方法把RDD的数据全部放入到数组中进行返回,5000多万条数据全部放入到数组当中进行返回。当然会内存溢出。

    通过打印GC日志发现,Eden区域和老年代的空间使用全部都是100%。没办法,电脑8g内存,我通过调大堆的内存,发现还是内存溢出,内存还是不够存下这么多条数据。我想了一下解决方法。

    解决方法1

    这是一种简单暴力的方法,自己可以再去买一个8G内存条,然后把堆得空间通过-Xms??M -Xmx??M -Xmn??M来设置尽可能大一点。尽量设置为最大的空间。这样看看能不能解决,如果还是不能解决的话,说明你的数据太大了。

    解决方法2

    我调用RDD重新分区函数将RDD重新分为了20个分区,然后通过RDD的saveAsTextFile方法将它们存到了20个文件当中去了。然后,跑程序的时候,我一个文件一个文件的来进行读取。然后一行一行的使用数据。结果是程序可以跑了,但是非常慢。没钱买内存条,只用这种方法。

    解决方法2运行到3000多万条数据的时候,发现程序运行的很慢,通过jstat命令发现新生代的内存用完了。这块我还在考虑,等我之后想到了完美的办法,在来完善这个问题。

    磁盘空间不足问题

    另外我在Spark上面进行微博数据处理的时候,突然报出了这样的错误:磁盘空间不足。我处理的微博数据大概有5.3G。在这里我上网查了一下,原来Spark把中间计算的结果放在了本地磁盘,默认的位子是在C盘下面,而我自己的C盘下面的磁盘空间非常少了,所以导致了中间计算结果存不下来。所以报出了磁盘空间不足的错误,这里我们只要设置一下中间结果保存的路径即可.在sparkConf中设置参数spark.local.dir的值即可。下面是解决方法:

    //后面的值是你要存放的路径
    sparkConf.set(“spark.local.dir”,”S:\TempData”)

    这里就解决了磁盘空间不足的问题,当然这里是Windows Spark本地模式的磁盘空间不足解决方法,估计来Linux里面也可以采用相似的方法来解决。

    展开全文
  • spark java版本地(local模式运行词频统计,WordCount

    1 背景

    之前工作都是把spark程序打包好,上传到公司的集群环境后进行代码调试!一个spark程序用maven打包后往往有100M左右,上传调试很不方便。spark可以local单机运行,所以就在eclipse上搭建了一个单机调试环境。

    2 环境配置

      首先需要配置java(版本:1.8)以及maven(版本:3.3.9)环境,版本不要求一致,也可使用其他java和maven版本配置开发环境。这两者配置在此不做介绍。
    1. Window 7 下装hadoop 2.6.4,需要下载两个文件:
      Hadoop:hadoop-2.6.4.tar.gz
      下载地址:http://hadoop.apache.org/releases.html
      Hadoop-Common:hadoop2.6(x64)V0.2.zip(2.4以后)、(hadoop-common-2.2.0-bin-master.zip)
      下载地址:http://download.csdn.net/detail/myamor/8393459
    2. 配置hadoop_home环境变量
      把hadoop-2.6.4.tar.gz解压到某个路径,如:D:\hadoop-2.6.4。新建HADOOP_HOME,并把%HADOOP_HOME%\bin加到path中。环境变量配置流程,如下图。
    环境变量配置流程
    环境变量配置流程
    3.拷贝Hadoop-Common 下的文件
      官方下载的Apache hadoop 2.6.4的压缩包里,缺少windows下运行的链接库(hadoop.dll,winutils.exe,libwinutils.lib等)。下载Hadoop-Common后直接解压,把里面的文件全部拷贝到官方hadoop目录下的bin目录即可。
      缺少winutils.exe,否则运行会报如下错误:
      java.io.IOException: Could not locate executable D:\hadoop-2.6.4\bin\winutils.exe in the Hadoop binaries.

    3 maven pom 依赖

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}\lib\tools.jar</systemPath>
        </dependency>
        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
             <version>1.6.1</version>
         </dependency>

    4 代码

    经典案例,词频统计。

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    public class WordCount {
    
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("wordCountTest");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String outputDir=args[0];
    
        List<String> list=new ArrayList<String>();
        list.add("1 1 2 a b");
        list.add("a b 1 2 3");
        JavaRDD<String> RddList=sc.parallelize(list);
    
        //先切分为单词,扁平化处理
        JavaRDD<String> flatMapRdd = RddList.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String str) {
                return Arrays.asList(str.split(" "));
            }
        });
    
        //再转化为键值对
        JavaPairRDD<String, Integer> pairRdd = flatMapRdd.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
    
        //对每个词语进行计数
        JavaPairRDD<String, Integer> countRdd = pairRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });
        System.out.println("结果:"+countRdd.collect());
        countRdd.saveAsTextFile(outputDir);
        sc.close();
    }
    }

    5 运行以及结果

    1. 运行:Eclipse 下Run Configuration输入参数out。注意,输出目录必须不存在,否则报错。
      这里写图片描述
    2. 结果
      结果1:控制台正常输出,且不报错。
      这里写图片描述
      结果2:生成一个out文件
      这里写图片描述
      其中part-00000内容为:
      (a,2)
      (b,2)
      (2,2)
      (3,1)
      (1,3)

    6 下载

    这是jdk 1.8的工程项目,如果不是jdk1.8,需要在pom.xml 中进行修改成对应版本,然后执行 maven –>update project 命令,进行更新
    http://download.csdn.net/download/xsdxs/9602880

    展开全文
  • 本文将介绍spark在windows下本地模式的搭建 Spark运行模式基本可以分为两种: 本地模式 即Driver程序只在本机运行 集群模式 即Dirver程序会在集群中运行,具体到集群模式,又可以分为spark集群、MESOS、YARN等。 ...

    本文将介绍spark在windows下本地模式的搭建

    Spark的运行模式基本可以分为两种:

    本地模式 即Driver程序只在本机运行

    集群模式 即Dirver程序会在集群中运行,具体到集群模式,又可以分为spark集群、MESOS、YARN等。

    作为初学者入坑,自然是本地模式调通最方便。Spark在Mac、Linux下的安装步骤不必说,基本没有额外的问题,但windows下的安装还是要注意一下的。

    基本步骤:

    1.到spark官网 spark.apache.org/downloads.h… 下载with hadoop版本的

    这里要注意的是,下载with hadoop版本的,即本地不需要再进行hadoop集群的安装部署。

    下载后解压,到bin目录下执行spark-shell.cmd,此时会报

    java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState': ........

    Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveExternalCatalog':.................

    错误原因是因为没有下载Hadoop windows可执行文件。

    2.下载对应版本的hadoop binary文件

    下载链接 github.com/steveloughr…

    下载后将对应版本的HADOOP_HOME添加到环境变量,bin也加到path里

    3.这时会报一个The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rw-rw-rw- (on Windows)

    这个是/tmp/hive的权限错误,这时到步骤2中下载的hadoop/bin下面,执行 winutils.exe chmod 777 E:\tmp\hive

    这里的前缀E:\表示我的spark程序解压在E盘了。\tmp\hive这个是固定的文件夹,即已经创建好了。

    这个文件夹比较明显就是spark的临时文件目录了。

    至此,LOCAL模式搭建完毕。spark-shell.cmd运行没有错误后,IDE用代码的方式运行spark程序,也不会报错了。

    Spark的成功运行标志:

    展开全文
  • import org.apache.spark.{SparkConf, SparkContext} /** * Created by root on 2016/5/16. */ object ForeachDemo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Foreach
  • Spark本地运行代码

    2019-07-12 21:07:45
    object demo{ def main(args:Array[string]){ val conf =new SparkConf().setAppName("demo").setMaster("local") //设置本地运行模式 local, ...
  • 最后,我突然知道的本地也是可以运行spark的,,,,,唉 费虚拟机呢么大劲干嘛。。。。 SO,如果你只是学习spark的基础知识,本地完全够用。还快速!!!! 搭建Windows的spark环境 下载hadoop 地址:...
  • Spark本地模式,在eclipse 或 IDEA中开发spark程序要用local模式,本地模式,多用于测试,下面我分别用java和 scala语言举出一个local模式的例子,这里我们以WordCount为例。 eclipse中代码如下: import org....
  • windows下可以通过简单设置,搭建本地运行环境。1、下载spark预编译版本,spark运行环境依赖jdk,scala,这里下载的最新spark版本是2.3,对应jdk1.8+scala2.11.8。java -version java version "1.8.0_151" ...
  • spark 本地调试运行

    2018-08-10 10:13:54
    spark本地调试
  • Spark与Hadoop一样,是一种开源的集群计算环境,但在...下面作者将通过单节点本地模式搭建Spark运行环境 前言: Spark本身用scala写的,运行在JVM之上。 JAVA版本:java 6 /higher edition
  • spark本地提交集群运行踩过的坑 1.本地提交,集群跑spark程序设置(scala) val conf = new SparkConf().setAppName("SparkWordCount") conf.setMaster("spark://hadoop-01:7077") conf.setJars(Array("D:\\hadoop\...
  • 一、Standalone模式 1、使用SparkSubmit提交任务的时候(包括Eclipse或者其它开发工具使用new SparkConf()来运行任务的时候),Driver运行在Client;使用SparkShell提交的任务的时候,Driver是运行在Master上 2、使用...
  • 在做Spark开发时,一般会在windows下进行Spark本地模式程序调试,在本地调试好了再打包运行在Spark集群上。因此需要在windows上进行Spark开发配置。本文将给出三种开发工具的配置:1、使用eclipse java api开发;2、...
  • Ubuntu下spark开发(Local模式)1、下载scala、spark、jdk并解压至/opt/路径下 scala下载地址: http://www.scala-lang.org/ 解压路径:/opt/scala spark下载地址:http://spark.apache.org/downloads.html ...
  • spark本地模式类似于hadoop的单机模式,是为了方便我们调试或入门的。 1.先去官网下载下来http://spark.apache.org/downloads.html,不要下错了,下载pre-built(这是已经编译好了,产生了二进制文件的)for 你的...
  • 【问题】Spark在windows能跑集群模式吗? 我认为是可以的,但是需要详细了解cmd命令行的写法。目前win下跑spark的单机模式是没有问题的。 【关键点】spark启动机制容易被windows的命令行cmd坑  1、带空格、奇怪...
1 2 3 4 5 ... 20
收藏数 20,640
精华内容 8,256