2014-06-08 10:48:37 iteye_11982 阅读数 122
  • Spark初级入门(6):Scala 模式匹配

    Spark源码视频教程,Scala模式匹配精讲,2014年至2015年,Spark 经历了高速发展,无可争议地成为大数据领域内活跃的开源项目。除此之外,已经有超过200个公司为Spark奉献过源代码,使 Spark社区成为迄今为止开发人员参与多的社区。

    5645 人正在学习 去看看 CSDN讲师

转载说明出处

lcs_168@163.com

 

Spark支持多种运行模式:

    分布式部署:运行在Cluster集群中,底层资源调度可以使用Mesos或者Hadoop YARN,也可以使用Spark自带的Standalone模式

    伪分布式部署

    本地模式运行

为了入门方便并且考虑到个人学习成本(笔记本资源有限!!!),本篇介绍如何在local模式下运行Spark,接下来我们走起!!!

 

NO.1资源准备

1、  VMware10.0.1 build-1379776(我从网上下的,教程问度娘或者谷老师)

2、  CentOS6.5(给个地址,上面的资源还是蛮全的)

3、  JDK1.7CentOS自带的是OPENJDK,大家懂得,还是用正规军的好)

4、  spark-0.9.0-incubating-bin-hadoop2(我采用的是0.9.0版本---当时的最高版本,现在已经是1.0了,建议,如果是研究的话,可以下最新版本的,放地址

 

NO.2 环境搭建

1、  安装VMware

2、  安装CentOS—建议采用桥接模式,方便省心,后期做个ftpssh都是方便的不得了

3、  安装JDK1.7,安装完后务必记得配置下环境变量,否则还是使用自带的OPENJDK

4、  spark-0.9.0-incubating-bin-hadoop2.tgz上传到我们的linux环境下

 

NO.3 SPARK框架环境

1、  解压:tar –xvf spark-0.9.0-incubating-bin-hadoop2.tgz

2、  跳转到SPARKhome目录:cd spark-0.9.0-incubating-bin-hadoop2

3、  执行sbt命令 ./sbt/sbt assembly  (网速好[我家10M光纤]的话,大概半个小时完成)

4、  修改hosts文件,例如:vi /etc/hosts   加上  192.168.1.53 CentOS

5、  OK执行完成以上命令,我们的SPARK就可以在本地运行了

NO.4 环境验证

1、  进入~/ spark-0.9.0-incubating-bin-hadoop2/bin目录

 

36 [root@CentOS bin]# ./spark-shell 
37 14/06/08 06:27:47 INFO HttpServer: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
38 14/06/08 06:27:47 INFO HttpServer: Starting HTTP Server 
39 Welcome to 
40 ____ __ 
41 / __/__ ___ _____/ /__ 
42 _\ \/ _ \/ _ `/ __/ '_/ 
43 /___/ .__/\_,_/_/ /_/\_\ version 0.9.0 
44 /_/ 
45 
46 Using Scala version 2.10.3 (Java HotSpot(TM) Client VM, Java 1.7.0_51) 
47 Type in expressions to have them evaluated. 
48 Type :help for more information. 
49 14/06/08 06:27:51 INFO Slf4jLogger: Slf4jLogger started 
50 14/06/08 06:27:51 INFO Remoting: Starting remoting 
51 14/06/08 06:27:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@CentOS:38659] 
52 14/06/08 06:27:51 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@CentOS:38659] 
53 14/06/08 06:27:51 INFO SparkEnv: Registering BlockManagerMaster 
54 14/06/08 06:27:51 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140608062751-301e 
55 14/06/08 06:27:51 INFO MemoryStore: MemoryStore started with capacity 297.0 MB. 
56 14/06/08 06:27:51 INFO ConnectionManager: Bound socket to port 55885 with id = ConnectionManagerId(CentOS,55885) 
57 14/06/08 06:27:51 INFO BlockManagerMaster: Trying to register BlockManager 
58 14/06/08 06:27:51 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager CentOS:55885 with 297.0 MB RAM 
59 14/06/08 06:27:51 INFO BlockManagerMaster: Registered BlockManager 
60 14/06/08 06:27:51 INFO HttpServer: Starting HTTP Server 
61 14/06/08 06:27:51 INFO HttpBroadcast: Broadcast server started at http://192.168.1.53:47324 
62 14/06/08 06:27:51 INFO SparkEnv: Registering MapOutputTracker 
63 14/06/08 06:27:51 INFO HttpFileServer: HTTP File server directory is /tmp/spark-d4a4b013-6a2c-4bb2-b3e6-f680cec875e7 
64 14/06/08 06:27:51 INFO HttpServer: Starting HTTP Server 
65 14/06/08 06:27:52 INFO SparkUI: Started Spark Web UI at http://CentOS:4040 
66 14/06/08 06:27:53 INFO Executor: Using REPL class URI: http://192.168.1.53:38442 
67 14/06/08 06:27:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
68 Created spark context.. 
69 Spark context available as sc. 
70 
71 scala> println("hello,World!!") 
72 hello,World!!

 

 

NO.5 DEMO验证

 

1 [root@CentOS bin]# ./run-example org.apache.spark.examples.SparkLR local[2] 
2 SLF4J: Class path contains multiple SLF4J bindings. 
3 SLF4J: Found binding in [jar:file:/root/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
4 .......................省略................... 
5 4883 [spark-akka.actor.default-dispatcher-4] INFO org.apache.spark.scheduler.DAGScheduler - Completed ResultTask(4, 0) 
6 4883 [spark-akka.actor.default-dispatcher-4] INFO org.apache.spark.scheduler.DAGScheduler - Stage 4 (reduce at SparkLR.scala:64) finished in 0.075 s 
7 4884 [main] INFO org.apache.spark.SparkContext - Job finished: reduce at SparkLR.scala:64, took 0.098657134 s 
8 Final w: (5816.075967498865, 5222.008066011391, 5754.751978607454, 3853.1772062206846, 5593.565827145932, 5282.387874201054, 3662.9216051953435, 4890.78210340607, 4223.371512250292, 5767.368579668863) 
9 [root@CentOS bin]# 

 

2016-04-08 13:14:44 weixin_34281537 阅读数 223
  • Spark初级入门(6):Scala 模式匹配

    Spark源码视频教程,Scala模式匹配精讲,2014年至2015年,Spark 经历了高速发展,无可争议地成为大数据领域内活跃的开源项目。除此之外,已经有超过200个公司为Spark奉献过源代码,使 Spark社区成为迄今为止开发人员参与多的社区。

    5645 人正在学习 去看看 CSDN讲师

Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。在本地运行模式中,master和worker都运行在一个jvm进程中,通过该模式,可以快速的测试Spark的功能。

下载 Spark

下载地址为http://spark.apache.org/downloads.html,根据页面提示选择一个合适的版本下载,这里我下载的是 spark-1.3.0-bin-cdh4.tgz。下载之后解压:

 cd ~
 wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.0/spark-1.3.0-bin-cdh4.tgz
 tar -xf spark-1.3.0-bin-cdh4.tgz
 cd spark-1.3.0-bin-cdh4

下载之后的目录为:

⇒  tree -L 1
.
├── CHANGES.txt
├── LICENSE
├── NOTICE
├── README.md
├── RELEASE
├── bin
├── conf
├── data
├── ec2
├── examples
├── lib
├── python
└── sbin

运行 spark-shell

本地模式运行spark-shell非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

$ MASTER=local 
$ bin/spark-shell

MASTER=local就是表明当前运行在单机模式。如果一切顺利,将看到下面的提示信息:

Created spark context..
Spark context available as sc.

这表明spark-shell中已经内置了Spark context的变量,名称为sc,我们可以直接使用该变量进行后续的操作。

spark-shell 后面设置 master 参数,可以支持更多的模式,请参考 http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

我们在sparkshell中运行一下最简单的例子,统计在README.md中含有Spark的行数有多少,在spark-shell中输入如下代码:

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

如果你觉得输出的日志太多,你可以从模板文件创建 conf/log4j.properties :

$ mv conf/log4j.properties.template conf/log4j.properties

然后修改日志输出级别为WARN

log4j.rootCategory=WARN, console

如果你设置的 log4j 日志等级为 INFO,则你可以看到这样的一行日志 INFO SparkUI: Started SparkUI at http://10.9.4.165:4040,意思是 Spark 启动了一个 web 服务器,你可以通过浏览器访问http://10.9.4.165:4040来查看 Spark 的任务运行状态等信息。

pyspark

运行 bin/pyspark 的输出为:

$ bin/pyspark
Python 2.7.6 (default, Sep  9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
15/03/30 15:19:07 WARN Utils: Your hostname, june-mac resolves to a loopback address: 127.0.0.1; using 10.9.4.165 instead (on interface utun0)
15/03/30 15:19:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/03/30 15:19:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ / __/  _/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc, HiveContext available as sqlCtx.

你也可以使用 IPython 来运行 Spark:

IPYTHON=1  ./bin/pyspark

如果要使用 IPython NoteBook,则运行:

IPYTHON_OPTS="notebook"  ./bin/pyspark

从日志可以看到,不管是 bin/pyspark 还是 bin/spark-shell,他们都有两个内置的变量:sc 和 sqlCtx。

SparkContext available as sc, HiveContext available as sqlCtx

sc 代表着 Spark 的上下文,通过该变量可以执行 Spark 的一些操作,而 sqlCtx 代表着 HiveContext 的上下文。

spark-submit

在Spark1.0之后提供了一个统一的脚本spark-submit来提交任务。

对于 python 程序,我们可以直接使用 spark-submit:

$ mkdir -p /usr/lib/spark/examples/python
$ tar zxvf /usr/lib/spark/lib/python.tar.gz -C /usr/lib/spark/examples/python

$ ./bin/spark-submit examples/python/pi.py 10

对于 Java 程序,我们需要先编译代码然后打包运行:

$ spark-submit --class "SimpleApp" --master local[4] simple-project-1.0.jar

测试 RDD

在 Spark 中,我们操作的集合被称为 RDD,他们被并行拷贝到集群各个节点上。我们可以通过 sc 来创建 RDD 。

创建 RDD 有两种方式:

  • sc.parallelize()
  • sc.textFile()

使用 Scala 对 RDD 的一些操作:

val rdd1=sc.parallelize(List(1,2,3,3))
val rdd2=sc.parallelize(List(3,4,5))

//转换操作
rdd1.map(2*).collect //等同于:rdd1.map(t=>2*t).collect
//Array[Int] = Array(2, 4, 6, 6)

rdd1.filter(_>2).collect
//Array[Int] = Array(3, 3)

rdd1.flatMap(_ to 4).collect
//Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 3, 4, 3, 4)

rdd1.sample(false, 0.3, 4).collect
//Array[Int] = Array(3, 3)

rdd1.sample(true, 0.3, 4).collect
//Array[Int] = Array(3)

rdd1.union(rdd2).collect
//Array[Int] = Array(1, 2, 3, 3, 3, 4, 5)

rdd1.distinct().collect
//Array[Int] = Array(1, 2, 3)

rdd1.map(i=>(i,i)).groupByKey.collect
//Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1)), (2,CompactBuffer(2)), (3,CompactBuffer(3, 3)))

rdd1.map(i=>(i,i)).reduceByKey(_ + _).collect
//Array[(Int, Int)] = Array((1,1), (2,2), (3,6))

rdd1.map(i=>(i,i)).sortByKey(false).collect
//Array[(Int, Int)] = Array((3,3), (3,3), (2,2), (1,1))

rdd1.map(i=>(i,i)).join(rdd2.map(i=>(i,i))).collect
//Array[(Int, (Int, Int))] = Array((3,(3,3)), (3,(3,3)))

rdd1.map(i=>(i,i)).cogroup(rdd2.map(i=>(i,i))).collect
//Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(4))), (1,(CompactBuffer(1),CompactBuffer())), (5,(CompactBuffer(),CompactBuffer(5))), (2,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(3, 3),CompactBuffer(3))))

rdd1.cartesian(rdd2).collect()
//Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5), (3,3), (3,4), (3,5))

rdd1.pipe("head -n 1").collect
//Array[String] = Array(1, 2, 3, 3)

//动作操作
rdd1.reduce(_ + _)
//Int = 9

rdd1.collect
//Array[Int] = Array(1, 2, 3, 3)

rdd1.first()
//Int = 1

rdd1.take(2)
//Array[Int] = Array(1, 2)

rdd1.top(2)
//Array[Int] = Array(3, 3)

rdd1.takeOrdered(2)
//Array[Int] = Array(1, 2)

rdd1.map(i=>(i,i)).countByKey()
//scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

rdd1.countByValue()
//scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

rdd1.intersection(rdd2).collect()
//Array[Int] = Array(3)

rdd1.subtract(rdd2).collect()
//Array[Int] = Array(1, 2)

rdd1.foreach(println)
//3
//2
//3
//1

rdd1.foreachPartition(x => println(x.reduce(_ + _)))

更多例子,参考http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

2018-08-25 23:36:56 cold_wolfie 阅读数 107
  • Spark初级入门(6):Scala 模式匹配

    Spark源码视频教程,Scala模式匹配精讲,2014年至2015年,Spark 经历了高速发展,无可争议地成为大数据领域内活跃的开源项目。除此之外,已经有超过200个公司为Spark奉献过源代码,使 Spark社区成为迄今为止开发人员参与多的社区。

    5645 人正在学习 去看看 CSDN讲师

spark支持的运行模式:本地模式、本地集群模式、standalone模式、yarn模式及mesos模式。

spark运行模式

本地模式

local、local[N]或local[N,maxRetries]。主要用于代码调试和跟踪。不具备容错能力,不适用于生产环境。

本地模式只有Driver,没有Master和Worker。执行任务的Executor与Driver在同一个JVM进程中。

本地集群模式

local-cluster[N,cores,memory]。也主要用于代码调试和测试,是源码学习常用的模式。不具备容错能力,不能用于生产环境。

Driver、Master与Worker运行在同一个JVM进程中。每个Worker可启动多个Executor,每个Executor都是一个JVM进程。

Standalone模式

spark://。具备容错能力并且支持分布式部署运行。

Driver在集群之外,可以是任意的客户端程序。Master部署于单独的进程,甚至在单独的机器上,可以有多个,但只能有一个处于激活状态。Worker部署于单独的进程,推荐在单独的机器上部署。

YARN模式

yarn模式是将任务管理与资源调度功能交给YARN框架进行处理的模式。分为yarn-client和yarn-cluster两种模式。

yarn-client适用于交互、调试,希望立即看到应用的输出;yarn-cluster适用于生产环境。

yarn-cluster模式下,driver运行在AM(ApplicationMaster)中,负责向YARN申请资源并监控作业的运行状况。当用户提交完作业后,就可以关闭client,作业会继续在YARN上运行。

yarn-cluster模式不适合运行交互类型的作业。而在yarn-client模式下,AM(ApplicationMaster)仅仅向YARN请求executor,client会和请求的executor通信来调度工作,client不能离开。

yarn-client模式

yarn-cluster模式

Mesos模式

运行模式类似于YARN,分为client和cluster两种模式。资源调度器分为粗粒度(默认)和细粒度(不推荐)。

2017-04-10 17:12:00 weixin_33739541 阅读数 2
  • Spark初级入门(6):Scala 模式匹配

    Spark源码视频教程,Scala模式匹配精讲,2014年至2015年,Spark 经历了高速发展,无可争议地成为大数据领域内活跃的开源项目。除此之外,已经有超过200个公司为Spark奉献过源代码,使 Spark社区成为迄今为止开发人员参与多的社区。

    5645 人正在学习 去看看 CSDN讲师

之前搞过STORM知道本地模式非常的方便,特意查询学习SPARK本地DEBUG模式开发

Pom.xml

        <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>

2017年4月10日最新Spark

Java示例代码

经典WorldCount

参考文档:http://blog.csdn.net/xsdxs/article/details/52203922

package com.chuibilong.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class WordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName(
                "wordCountTest");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> list = new ArrayList<String>();
        list.add("1 1 2 a b");
        list.add("a b 1 2 3");
        JavaRDD<String> RddList = sc.parallelize(list);
        // 先切分为单词,扁平化处理
        JavaRDD<String> flatMapRdd = RddList
                .flatMap(new FlatMapFunction<String, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<String> call(String str) {
                        System.out.println(str);
                        return Arrays.asList(str.split(" ")).iterator();
                    }
                });
        // 再转化为键值对
        JavaPairRDD<String, Integer> pairRdd = flatMapRdd
                .mapToPair(new PairFunction<String, String, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    public Tuple2<String, Integer> call(String word)
                        throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

        // 对每个词语进行计数
        JavaPairRDD<String, Integer> countRdd = pairRdd
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        System.out.println("结果:" + countRdd.collect());
        sc.close();
    }
}

DAY DAY UP

后面准备写个 SPARK STREAMING 的DEMO(预想是怒Spark 的github)

直接参考:http://blog.csdn.net/jacklin929/article/details/53689365

//注意本地调试,master必须为local[n],n>1,表示一个线程接收数据,n-1个线程处理数据
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//设置日志运行级别
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
//创建一个将要连接到
JavaReceiverInputDStream<String> lines = hostname:port 的离散流
ssc.socketTextStream("master1", 9999); 
JavaPairDStream<String, Integer> counts = 
        lines.flatMap(x->Arrays.asList(x.split(" ")).iterator())
        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
        .reduceByKey((x, y) -> x + y);

// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
counts.print();
// 启动计算
ssc.start();
ssc.awaitTermination();

建立服务端 
找台Linux服务器,运行netcat小工具: 
nc -lk 9999 
也就是上面代码里socketTextStream的参数.

转载于:https://my.oschina.net/chuibilong/blog/876256

2018-05-21 17:29:36 Java_Soldier 阅读数 1491
  • Spark初级入门(6):Scala 模式匹配

    Spark源码视频教程,Scala模式匹配精讲,2014年至2015年,Spark 经历了高速发展,无可争议地成为大数据领域内活跃的开源项目。除此之外,已经有超过200个公司为Spark奉献过源代码,使 Spark社区成为迄今为止开发人员参与多的社区。

    5645 人正在学习 去看看 CSDN讲师

1.local 本地模式:不需要hadoop(除非用到),不需要启动Master,Worker
spark-shell(spark-shell –master local[n])
spark-submit (spark-submit –master local[n])

2.local cluster 模式:不需要hadoop(除非用到),不需要启动Master,Worker
spark-submit –master local-cluster[x,y,z]
x :executor 数量 ,y: 每个executor核数,z:每个executor内存

3.standalone 模式:spark自带clusterManager的方式(不需要hadoop(除非用到))
①启动Master,Worker
spark-submit –master spark://wl1:7077 –deploy-mode client
4.standalone 模式:spark自带clusterManager的方式(不需要hadoop(除非用到))
①启动Master,Worker
spark-submit –master spark://wl1:6066 –deploy-mode cluster

5.基于yarn的client模式,无需启动Master,Worker,需要启动Hadoop
基于yarn的模式,不需要启动Master,Worker,yarn充当cluster Manager
spark-submit –master yarn –deploy-mode client

6.基于yarn的cluster模式,无需启动Master,Workder,需要启动Hadoop
spark-submit –master yarn –deploy-mode cluster

Spark运行模式

阅读数 301

没有更多推荐了,返回首页