2019-02-17 09:33:59 boling_cavalry 阅读数 1992
  • IDEA开发工具+Maven使用详解视频课程(适合初学者的...

    本课程从IDEA开发工具的安装及配置使用讲起,详细讲解Maven项目管理工具,适合初学者的教程,让你少走弯路! 1.Maven简介及安装 2.配置本地仓库和镜像仓库、项目的目录结构 3.pom.xml详解 4.继承、聚合、依赖 5.使用Nexus搭建私服 6.Maven综合实战应用 教学全程采用笔记+代码案例的形式讲解,通俗易懂!!!

    3257 人正在学习 去看看 汤小洋

Scala语言在函数式编程方面的优势适合Spark应用开发,IDEA是我们常用的IDE工具,今天就来实战IDEA开发Scala版的Spark应用;

版本信息

以下是开发环境:

  1. 操作系统:win10;
  2. JDK:1.8.0_191;
  3. IntelliJ IDEA:2018.2.4 (Ultimate Edition)

以下是运行环境:

  1. Spark:2.3.3;
  2. Scala:2.11.12;
  3. Hadoop:2.7.7;

如果您想在几分钟之内搭建好Spark集群环境,请参考《docker下,极速搭建spark集群(含hdfs集群)》

下载Spark安装包

  1. 去spark官网下载spark安装包,里面有开发时所需的库,如下图,地址是:http://spark.apache.org/downloads.html
    在这里插入图片描述
  2. 将下载好的文件解压,例如我这里解压后所在目录是:C:\software\spark-2.3.3-bin-hadoop2.7

IDEA安装scala插件

  1. 打开IDEA,选择"Configure"->“Plugins”,如下图:
    在这里插入图片描述
  2. 如下图,在红框1输入"scala",点击红框2,开始在中央仓库说搜索:
    在这里插入图片描述
  3. 在搜索结果中选中"scala",再点击右侧的"Install",如下:
    在这里插入图片描述
  4. 等待在线安装成功后,点击"Restart IntelliJ IDEA",如下:
    在这里插入图片描述

新建scala工程

  1. 点击下图红框,创建一个新工程:
    在这里插入图片描述
  2. 在弹出窗口中选择"Scala"->“IDEA”,如下图:
    在这里插入图片描述
  3. 如下图,在红框1中输入项目名称,点击红框2,选择Scala SDK:
    在这里插入图片描述
  4. 在弹出的窗口选择"2.11.12"版本,如下图:
    在这里插入图片描述
  5. 点击下图红中的"Finish",完成工程创建:
    在这里插入图片描述
  6. 工程创建成功了,接下来是添加spark库,点击"File"->“Project Structure”,如下图:
    在这里插入图片描述
  7. 在弹出窗口选择新增一个jar库,如下图:
    在这里插入图片描述
  8. 在弹出窗口选择前面安装的spark-2.3.3-bin-hadoop2.7文件夹下面的jar文件夹,如下:
    在这里插入图片描述
  9. 如下图,弹出的窗口用来选择模块,就选工程目录即可:
    在这里插入图片描述
  10. 至此,整个spark开发环境已经设置好了,现在写一个demo试试,创建一个object,源码如下:
package com.bolingcavalry.sparkscalademo.app

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Description: 第一个scala语言的spark应用
  * @author: willzhao E-mail: zq2599@gmail.com
  * @date: 2019/2/16 20:23
  */
object FirstDemo {
  def main(args: Array[String]): Unit={
    val conf = new SparkConf()
      .setAppName("first spark app(scala)")
      .setMaster("local[1]");

    new SparkContext(conf)
      .parallelize(List(1,2,3,4,5,6))
      .map(x=>x*x)
      .filter(_>10)
      .collect()
      .foreach(println);
  }
}

以上代码的功能很简单:创建用一个数组,将每个元素做平方运算,再丢弃小于10的元素,然后逐个打印出来;
11. 代码完成后,点击右键选择"Run FirstDemo",即可立即在本机运行,如下图:
在这里插入图片描述
12. 由于windows环境并没有做hadoop相关配置,因此控制台上会有错误堆栈输出,但这些信息并不影响程序运行(本例没有用到hadoop),输出如下,可见结果已经被打印出来(16、25、36):

2019-02-17 09:04:21 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool 
2019-02-17 09:04:21 INFO  DAGScheduler:54 - ResultStage 0 (collect at FirstDemo.scala:20) finished in 0.276 s
2019-02-17 09:04:21 INFO  DAGScheduler:54 - Job 0 finished: collect at FirstDemo.scala:20, took 0.328611 s
16
25
36
2019-02-17 09:04:21 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2019-02-17 09:04:21 INFO  AbstractConnector:318 - Stopped Spark@452ba1db{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-02-17 09:04:21 INFO  SparkUI:54 - Stopped Spark web UI at http://DESKTOP-82CCEBN:4040
2019-02-17 09:04:21 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-02-17 09:04:21 INFO  MemoryStore:54 - MemoryStore cleared
2019-02-17 09:04:21 INFO  BlockManager:54 - BlockManager stopped
2019-02-17 09:04:21 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-02-17 09:04:21 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-02-17 09:04:21 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-02-17 09:04:21 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-02-17 09:04:21 INFO  ShutdownHookManager:54 - Deleting directory C:\Users\12167\AppData\Local\Temp\spark-4bbb584a-c7c2-4dc8-9c7e-473de7f8c326

Process finished with exit code 0

构建打包,提交到spark环境运行

  1. 生产环境下一般是将应用构建打包成jar,放到spark集群中运行,所以我们来操作构建打包;
  2. 在菜单上选择"File"->“Project Structure”,弹出窗口点击"Artifacts",选择新增jar,如下图:
    在这里插入图片描述
  3. 如下图,在弹出的窗口中,红框1位置输入要运行的class,红框2选择的是单选框的第二个"copy to the output …":
    在这里插入图片描述
  4. 在菜单上选择"Build"->“Build Artifacts…”,如下图:
    在这里插入图片描述
  5. 在弹出的菜单中选择"sparkscalademo:jar"->“Rebuild”,如下:
    在这里插入图片描述
  6. 如果编译成功,在项目的out\artifacts目录下就会生成文件sparkscalademo.jar,如下:
    在这里插入图片描述
  7. 将文件上传到spark服务器上,执行提交命令:
spark-submit --class com.bolingcavalry.sparkscalademo.app.FirstDemo /root/jars/sparkscalademo.jar
  1. 控制台会显示运行信息和结果,如下图:
    在这里插入图片描述
    至此,idea开发spark应用实战就完成了,希望在您配置开发环境的时候本文能够提供一些参考;

欢迎关注我的公众号:程序员欣宸

在这里插入图片描述

2019-06-06 22:18:42 liweihope 阅读数 1600
  • IDEA开发工具+Maven使用详解视频课程(适合初学者的...

    本课程从IDEA开发工具的安装及配置使用讲起,详细讲解Maven项目管理工具,适合初学者的教程,让你少走弯路! 1.Maven简介及安装 2.配置本地仓库和镜像仓库、项目的目录结构 3.pom.xml详解 4.继承、聚合、依赖 5.使用Nexus搭建私服 6.Maven综合实战应用 教学全程采用笔记+代码案例的形式讲解,通俗易懂!!!

    3257 人正在学习 去看看 汤小洋


spark-shell适合做测试
如果是开发的话是用IDEA+Maven+Scala来开发Spark应用程序

创建一个Maven项目,添加依赖

前面创建的过程省略。
创建了一个Scala的Maven项目后,需要添加依赖什么的。
我这边pom.xml文件添加代码如下:

 <properties>
    <scala.version>2.11.8</scala.version>
    <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
    <spark.version>2.4.0</spark.version>
  </properties>

  <dependencies>
    <!--添加Spark依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!--添加Scala依赖-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!--添加Hadoop依赖-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
  </dependencies>

添加之后,点击Reimport,重新导入一下
在这里插入图片描述
但是导入之后,Hadoop的有些没有导进来,红色波浪线,报错了:
在这里插入图片描述
需要加一个仓库repository:
http://repository.cloudera.com/artifactory/cloudera-repos/
在这里插入图片描述
继续点击到如下目录:
http://repository.cloudera.com/artifactory/cloudera-repos/org/apache/hadoop/hadoop-client/2.6.0-cdh5.7.0/
这个就是上面我们添加的2.6.0-cdh5.7.0/
如果写的是2.6.0-cdh5.7.0,就要加这个仓库,如果写的是2.6.0,不说cdh的就不用加这个仓库。
在这里插入图片描述
然后再pom.xml里添加一下

  <repositories>
    <repository>
      <id>cloudera</id>
      <name>cloudera</name>
      <url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>

再重新导入一下,Reimport,发现还是红色波浪线,报错
clean一下:
在这里插入图片描述
在这里插入图片描述
发现BUILD SUCCESS,没问题,说明没有影响,可能是其它的一些jar包没有导进来,不过没有影响的。
好了,有了上面的操作,看源码就很方便了,直接搜索就可以看了。
比如,想看Hadoop的入口类:FileSystem的源码
在这里插入图片描述
点击一下,就可以看到源码了:
在这里插入图片描述

词频统计案例开发及上传jar包到服务器并准备测试数据

建个包:
在这里插入图片描述
建个scala对象
在这里插入图片描述
在这里插入图片描述
开发代码:
一开始的三步骤:首先构建一个sparkConf,再构建一个sc,然后关掉sc

package com.ruozedata.spark.com.ruozedata.spark.core
import org.apache.spark.{SparkConf, SparkContext}

object WordCountApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    //业务逻辑
    sc.stop()
  }
}

开发业务逻辑代码:wordcount代码

package com.ruozedata.spark.com.ruozedata.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object WordCountApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
	
	//args(0)表示传进来的第一个参数
    val textFile = sc.textFile(args(0))
    val wc = textFile.flatMap(line => line.split("\t")).map((_,1)).reduceByKey(_+_)
	
	//打印出来
    wc.collect().foreach(println)

    sc.stop()
  }
}

开发完成,代码打包
在这里插入图片描述
等一会,就打包成功了:
在这里插入图片描述
jjar包在这个目录:spark-train\target\spark-train-1.0.jar
然后把jar上传到虚拟机或者服务器上,

[hadoop@hadoop001 lib]$ pwd
/home/hadoop/lib
[hadoop@hadoop001 lib]$ ls
spark-train-1.0.jar

wordcount数据准备:

[hadoop@hadoop001 data]$ hdfs dfs -text /data/wordcount.txt
19/06/06 12:06:00 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
19/06/06 12:06:00 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
world   world   hello
China   hello
people  person
love

jar包提交Spark应用程序运行

如何提交?看官网:
在这里插入图片描述
如何使用spark-submit,看帮助,spark-submit --help 看一下。
在这里插入图片描述
–class的获得,如下:
在这里插入图片描述
得到:–class com.ruozedata.spark.com.ruozedata.spark.core.WordCountApp
–master 为loacl[2]
application-jar的位置:/home/hadoop/lib/spark-train-1.0.jar
[application-arguments] 参数:hdfs://hadoop001:9000/data/wordcount.txt
汇总一下,如下:

./spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.WordCountApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  hdfs://hadoop001:9000/data/wordcount.txt

运行结果:
在这里插入图片描述

词频统计案例迭代之输出结果到HDFS

加一个 wc.saveAsTextFile(args(1)):

package com.ruozedata.spark.com.ruozedata.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object WordCountApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    val textFile = sc.textFile(args(0))
    val wc = textFile.flatMap(line => line.split("\t")).map((_,1)).reduceByKey(_+_)

    //wc.collect().foreach(println)
    wc.saveAsTextFile(args(1))

    sc.stop()
  }
}

再打包上传,替换上面的jar包,再运行代码:

./spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.WordCountApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  hdfs://hadoop001:9000/data/wordcount.txt \
  hdfs://hadoop001:9000/data/wcoutput/

结果:
在这里插入图片描述

词频统计案例迭代之处理多个输入文件

多个文件如下:

[hadoop@hadoop001 lib]$ hdfs dfs -ls /data/wordcount/
Found 5 items
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount1.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount2.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount3.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount4.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount5.txt

执行代码:

./spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.WordCountApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  hdfs://hadoop001:9000/data/wordcount/ \
  hdfs://hadoop001:9000/data/wcoutput2/

执行结果如下:
在这里插入图片描述

词频统计案例迭代之输入文件规则匹配

文件如下:

[hadoop@hadoop001 lib]$ hdfs dfs -ls /data/wordcount/
Found 6 items
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/test
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount1.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount2.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount3.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount4.txt
-rw-r--r--   1 hadoop supergroup         49 2019-0 /data/wordcount/wordcount5.txt

执行代码

./spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.WordCountApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  hdfs://hadoop001:9000/data/wordcount/*.txt \      //这里是文件规则匹配
  hdfs://hadoop001:9000/data/wcoutput3/

结果如下:
还是5个,说明上面的那个test文件并没有作为输入。
在这里插入图片描述

带排序的词频统计案例开发及运行过程深度剖析

代码如下:

package com.ruozedata.spark.com.ruozedata.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object WordCountApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    val textFile = sc.textFile(args(0))
    val wc = textFile.flatMap(line => line.split("\t")).map((_,1)).reduceByKey(_+_)
    val sorted = wc.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
//这个在上一节已经有详细介绍
    //wc.collect().foreach(println)
    sorted.saveAsTextFile(args(1))

    sc.stop()
  }
}

打包上传,替换原有jar包,再执行代码:

./spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.WordCountApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  hdfs://hadoop001:9000/data/wordcount/*.txt \
  hdfs://hadoop001:9000/data/wcoutput4/

执行结果如下:
降序:
在这里插入图片描述

求用户访问量的TopN的Hive实现以及Spark Core实现过程分析

先用hive来实现
创建一张表:

create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
) row format delimited fields terminated by '\t';

加载数据进来:

load data local inpath '/home/hadoop/data/page_views.dat' overwrite into table page_views;

查询结果:

hive (g6)> select end_user_id,count(*) count  from page_views group by end_user_id order by count desc limit 5;
.......省略....
end_user_id     count
NULL    60871
123626648       40
116191447       38
122764680       34
85252419        30
Time taken: 43.963 seconds, Fetched: 5 row(s)
hive (g6)> 

可以看到每个用户的id以及相应的访问数量。

再用Spark Core实现:求用户访问量的 top5
拿到需求,进行分析,然后功能拆解(中文描述出来,详细设计说明书),最后是代码的开发实现。
首先拿到一个文件,里面有很多行,每行好几个字段,用户的id在第六个字段,每个字段之间按照\t键进行分割。
所以需要拿到用户id,拿到了之后,后面就跟wordcount案例一样了,首先每个用户赋值为1,然后根据用户id分组求总次数,就是根据相同的key进行相加,然后反转,降序,最后再反转,取top5输出。
代码实现:

package com.ruozedata.spark.com.ruozedata.spark.core
import org.apache.spark.{SparkConf, SparkContext}

object PageViewsApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val pageViews = sc.textFile(args(0))

    //获取用户id
    //按照tab键分割,分割之后拿到第六个字段(从0开始),
    //然后每个元素赋值1,形成tuple元组
    val userids = pageViews.map(x => (x.split("\t")(5),1))

	//和wordcount一样
    val result = userids.reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(5).foreach(println)

    sc.stop()
  }
}

然后打包上传,运行代码:

spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.PageViewsApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  /home/hadoop/data/page_views.dat

运行结果,和上面hive运行的结果一样:
在这里插入图片描述
就像上面一样,工作中很多场景都可以看到wordcount的影子。

求平均年龄的Spark Core实现

数据格式为:ID + " " + 年龄,如下:

[hadoop@hadoop001 data]$ cat agedata.txt
1 12
2 34
3 54
4 3
5 33
6 23
7 12
8 54
9 45
10 28

分析步骤:
①取出年龄
②求出人数
③年龄相加/人数
开发代码:

package com.ruozedata.spark.com.ruozedata.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object AvgAgeCalculatorApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val dataFile = sc.textFile(args(0))

    val ageData = dataFile.map(x => x.split(" ")(1))
    val totalAge = ageData.map(x => x.toInt).reduce(_+_)
    val count = ageData.count()
    val avgAge = totalAge/count
    println(avgAge)

    sc.stop()
  }
}

打包上传,执行命令:

spark-submit \
  --class com.ruozedata.spark.com.ruozedata.spark.core.AvgAgeCalculatorApp \
  --master local[2] \
  /home/hadoop/lib/spark-train-1.0.jar \
  /home/hadoop/data/agedata.txt

即可得到结果。

求男女人数以及最高和最低身高

需求:
1)统计男女人数
2)男性中最高身高和最低身高
3)女性中最高身高和最低身高
数据格式为:ID + " " + 性别 + " " + 身高
如下:

[hadoop@hadoop001 data]$ cat people_info.txt 
1 M 178
2 M 156
3 F 165
4 M 178
5 F 154
6 M 167
7 M 189
8 F 210
9 F 209
10 F 190
11 M 176
12 M 165
13 F 159
14 M 155
15 M 164

代码如下:

package com.ruozedata.spark.com.ruozedata.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object peopleApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val peopleData = sc.textFile(args(0))

    //第一步分别取出 男性+身高  以及  女性+身高
    val mRdd = peopleData.map(x =>x.split(" ")).map(line => (line(1),line(2))).filter(_._1 == "M")
    val fRdd = peopleData.map(x =>x.split(" ")).map(line => (line(1),line(2))).filter(_._1 == "F")

    //求男女人数
    val mCount = mRdd.count()
    val fCount = fRdd.count()

    //求男性中最高身高和最低身高
    val mMaxHeight =mRdd.map(x => (x._2).toInt)max()
    //也可以这样求:peopleData.map(line =>line.split(" ")).filter(line => line(1)=="M").sortBy(x => x(2),false).take(1)
    //peopleData.map(line =>line.split(" ")).filter(line => line(1)=="M").map(x =>x(2).toInt).max
    val mMinHeight = mRdd.map(x => (x._2).toInt).min()

    //求女性中最高身高和最低身高
    val fMaxHeight = fRdd.map(x => (x._2).toInt).max()
    //也可以这样求:peopleData.map(line =>line.split(" ")).filter(line => line(1)=="F").sortBy(x => x(2),false).take(1)
    //peopleData.map(line =>line.split(" ")).filter(line => line(1)=="F").map(x =>x(2).toInt).max
    val fMinHeight = fRdd.map(x => (x._2).toInt).min()

    println("男性人数:" + mCount + "" + "女性人数:" + fCount)
    println("男性中最高身高为:" + mMaxHeight + "男性中最低身高为:" + mMinHeight)
    println("女性中最高身高为:" + fMaxHeight + "女性中最低身高为:" + fMinHeight)

    sc.stop()
  }
}

下图只是部分测试结果,可参考:
在这里插入图片描述

2016-11-14 16:18:31 HelloJFS 阅读数 340
  • IDEA开发工具+Maven使用详解视频课程(适合初学者的...

    本课程从IDEA开发工具的安装及配置使用讲起,详细讲解Maven项目管理工具,适合初学者的教程,让你少走弯路! 1.Maven简介及安装 2.配置本地仓库和镜像仓库、项目的目录结构 3.pom.xml详解 4.继承、聚合、依赖 5.使用Nexus搭建私服 6.Maven综合实战应用 教学全程采用笔记+代码案例的形式讲解,通俗易懂!!!

    3257 人正在学习 去看看 汤小洋

使用IDEA开发spark程序 (windows)

  1. 安装JDK

    配置环境变量(计算机属性 - 高级设置 - 环境变量 - 新建 ):

    JAVA_HOM= E:\Java\jdk1.8.0_101
    PATH=E:\Java\jdk1.8.0_101\bin
    CLASSPATH=E:\Java\jdk1.8.0_101\lib 
    
  2. 安装scala

    这个直接安装就可以了。但是要特别注意版本(会出现spark不兼容的情况),最好选用2.10的版本,我用的是2.10.4

  3. 配置Hadoop

    HADOOP_HOME=E:\hadoop\deploy\hadoop-1.2.1
    PATH=E:\hadoop\deploy\hadoop-1.2.1\bin
    

注意:向Hadoop的bin目录下添加winutils.exe文件。否则会报错:
没有winutile.exe

附下载地址:链接: https://pan.baidu.com/s/1c01FEe 密码: 6e4k

4.配置spark应用开发环境

4.1 安装Intellij IDEA,在 “Configure” -> “Plugin” -> “Browse repositories” -> 输入scala ; (这里如果下载很慢或者失败可以直接去官网下载插件,然后这里选择本地文件)

4.2 用户在Intellij IDEA 中创建Scala Project, SparkTest

4.3 在菜单栏 “File” -> “project structure” -> “Libraries” 命定,单击 “+“,导入
spark-assembly-1.3.0-SNAPSHOT-hadoop2.5.0-cdh5.3.0.jar 包;

附下载地址:

5.试运行SparkPi程序

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2016/11/14.
  */
object Main {
  def main(args: Array[String]):Unit= {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
                            //在本地运行设置Maser为local或者local[N]
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = 100000 * slices
    val count = spark.parallelize(1 to n, slices).map { i =>
      val x = Math.random * 2 - 1
      val y = Math.random * 2 - 1
      if (x * x + y * y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is rounghly " + 4.0 * count / n)
    spark.stop()
  }
}

运行结果为:

SparkPi运行结果:

注意:不要在Edit configurations里面设置 “Program argument” 为 local,否则会出错;

错误为:

set.Master

2017-01-18 12:49:28 a11a2233445566 阅读数 286
  • IDEA开发工具+Maven使用详解视频课程(适合初学者的...

    本课程从IDEA开发工具的安装及配置使用讲起,详细讲解Maven项目管理工具,适合初学者的教程,让你少走弯路! 1.Maven简介及安装 2.配置本地仓库和镜像仓库、项目的目录结构 3.pom.xml详解 4.继承、聚合、依赖 5.使用Nexus搭建私服 6.Maven综合实战应用 教学全程采用笔记+代码案例的形式讲解,通俗易懂!!!

    3257 人正在学习 去看看 汤小洋


 配置开发环境

1. 要在本地安装好javascala

由于spark1.6需要scala 2.10.X版本推荐 2.10.4java版本最好是1.8所以提前我们要需要安装好java和scala 并在环境变量中配置好

2. 下载 IDEA 社区版本选择windows 版本并按照配置。

安装完成以后启动IDEA进行配置,默认即可然后点击ok以后,设置ui风格 然后点击next 会出现插件的选择页面,默认不需求修改,点击next选择安装scala语言,点击 install按钮(非常重要,以为要开发spark程序所以必须安装),等安装完成以后点击start启动 IDEA

3. 创建scala项目

点击  create new project ,然后填写project name 为“Wordcount” 选择项目的保存地址 project location。

然后设置project sdk 即java 的安装目录。点击右侧的new 按钮,选择jdk,然后选择java安装路径即可 

然后选择scala sdk 。点击右侧的create ,默认出现时2.10.x版本scala ,点击ok即可。然后点击finish

4. 设置 spark的jar 依赖

点击file->project structure 来设置工程的libraries核心是添加sparkjar依赖。 选择 Libraries ,点击右侧的加号,选择java选择spark1.6.0 spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar 点击ok稍等片刻后然后点击ok (Libraries作用WordCount),然后点击apply,点击ok(这一步很重要,如果没有无法编写spark的代码)

 编写代码

1. 在src 下建立spark程序工程包

在src上右击 new ->package 填入package 的name为com.dt.spark 。

2. 创建scala的入口

包的名字上右击 选择 new ->scala class 。在弹出框中 填写Name ,并制定kind 为object ,点击ok。

3. 编写local代码

package com.dt.spark

 

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

/**

 * 使用Scala开发本地测试的Spark WordCount程序

 * @author DT大数据梦工厂

 * 新浪微博:http://weibo.com/ilovepains/

 */

object WordCount {

    def main(args: Array[String]){

      /**

       * 1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

       * 例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置

       * local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

       * 只有1G的内存)的初学者       *

       */

      val conf = new SparkConf() //创建SparkConf对象

      conf.setAppName("Wow,My First Spark in IDEA!") //设置应用程序的名称,在程序运行的监控界面可以看到名称

      conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群

      

      /**

       * 2步:创建SparkContext对象

       * SparkContextSpark程序所有功能的唯一入口,无论是采用ScalaJavaPythonR等都必须有一个SparkContext

       * SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGSchedulerTaskSchedulerSchedulerBackend

       * 同时还会负责Spark程序往Master注册程序等

       * SparkContext是整个Spark应用程序中最为至关重要的一个对象

       */

      val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息

      

      /**

       * 3步:根据具体的数据来源(HDFSHBaseLocal FSDBS3等)通过SparkContext来创建RDD

       * RDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作

       * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴

       */

      //val lines: RDD[String] = sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置为一个Partion

      val lines = sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置为一个Partion

      /**

       * 4步:对初始的RDD进行Transformation级别的处理,例如mapfilter等高阶函数等的编程,来进行具体的数据计算

       * 4.1步:讲每一行的字符串拆分成单个的单词

       */

      

      val words = lines.flatMap { line => line.split(" ")} //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合

      

      /**

       * 4步:对初始的RDD进行Transformation级别的处理,例如mapfilter等高阶函数等的编程,来进行具体的数据计算

       * 4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)

       */

      val pairs = words.map { word => (word, 1) }

      

      /**

       * 4步:对初始的RDD进行Transformation级别的处理,例如mapfilter等高阶函数等的编程,来进行具体的数据计算

       * 4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数

       */

      val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括LocalReducer级别同时Reduce

      

      wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))

      

      sc.stop()

      

    }

}

 代码去右击 选择点击 run”wordCount” 来运行程序。生成环境下肯定是写自动化shell脚本自动提交程序的

4. 编写Cluster 模式代码

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

/**

 * 使用Scala开发集群运行的Spark WordCount程序

 * @author DT大数据梦工厂

 * 新浪微博:http://weibo.com/ilovepains/

 */

object WordCount_Cluster {

    def main(args: Array[String]){

      /**

       * 1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

       * 例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置

       * local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

       * 只有1G的内存)的初学者       *

       */

      val conf = new SparkConf() //创建SparkConf对象

      conf.setAppName("Wow,My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称

//      conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

      

      /**

       * 2步:创建SparkContext对象

       * SparkContextSpark程序所有功能的唯一入口,无论是采用ScalaJavaPythonR等都必须有一个SparkContext

       * SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGSchedulerTaskSchedulerSchedulerBackend

       * 同时还会负责Spark程序往Master注册程序等

       * SparkContext是整个Spark应用程序中最为至关重要的一个对象

       */

      val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息

      

      /**

       * 3步:根据具体的数据来源(HDFSHBaseLocal FSDBS3等)通过SparkContext来创建RDD

       * RDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作

       * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴

       */

      

      //val lines = sc.textFile("hdfs://Master:9000/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions

      val lines = sc.textFile("/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions

      /**

       * 4步:对初始的RDD进行Transformation级别的处理,例如mapfilter等高阶函数等的编程,来进行具体的数据计算

       * 4.1步:讲每一行的字符串拆分成单个的单词

       */

      

      val words = lines.flatMap { line => line.split(" ")} //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合

      

      /**

       * 4步:对初始的RDD进行Transformation级别的处理,例如mapfilter等高阶函数等的编程,来进行具体的数据计算

       * 4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)

       */

      val pairs = words.map { word => (word, 1) }

      

      /**

       * 4步:对初始的RDD进行Transformation级别的处理,例如mapfilter等高阶函数等的编程,来进行具体的数据计算

       * 4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数

       */

     val wordCountsorderd = pairs.reduceByKey(_+_).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair =>(pair._1.pair._2)) //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)

wordCountsorderd.collect.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))

 

      

      sc.stop()

      

    }

}

 

程序达成jar 包

点击 file-> project structure, 在弹出的页面点击 Artifacts,点击右侧的+”,选择jar –> from modules with dependencies,弹出的页面中,设置main class 然后点击ok在弹出页面修改Name(系统生成的name不规范)导出位置删除 scalasparkjar(因为集群环境中已经存在)点击ok 。然后菜单栏中点击build –> Artifacts ,在弹出按钮中,点击bulid,会自动开始打包。

 

在hadoop中执行wordcount方法

将jar 放到linux 系统某个目录中。执行

 ./spark-submit

--class  com.dt.spark. WordCount_Cluster

--master spark://master:7077

/root/documents/sparkapps/wordcount.jar

 

注意事项:

为什么不能再ide开发环境中,直接发布spark程序到spark集群

1. 开发机器的内存和cores的限制,默认情况情况下,spark程序的dirver提交spark程序的机器上, 如果idea中提交程序的话,那idea机器就必须非常强大

2. Dirver指挥workers的运行并频繁的发生同学,如果开发环境和spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种必要的问题。

3. 这是不安全的。

 

 

 

2016-07-05 16:41:52 kwu_ganymede 阅读数 26236
  • IDEA开发工具+Maven使用详解视频课程(适合初学者的...

    本课程从IDEA开发工具的安装及配置使用讲起,详细讲解Maven项目管理工具,适合初学者的教程,让你少走弯路! 1.Maven简介及安装 2.配置本地仓库和镜像仓库、项目的目录结构 3.pom.xml详解 4.继承、聚合、依赖 5.使用Nexus搭建私服 6.Maven综合实战应用 教学全程采用笔记+代码案例的形式讲解,通俗易懂!!!

    3257 人正在学习 去看看 汤小洋

基于IntelliJ IDEA开发Spark的Maven项目——Scala语言


1、Maven管理项目在JavaEE普遍使用,开发Spark项目也不例外,而Scala语言开发Spark项目的首选。因此需要构建Maven-Scala项目来开发Spark项目,本文采用的工具是IntelliJ IDEA 2016,IDEA工具越来越被大家认可,开发java, python ,scala 支持都非常好

下载链接 : https://www.jetbrains.com/idea/download/

安装直接下一步即可


2、安装scala插件,File->Settings->Editor->Plugins,搜索scala即可安装



可能由于网络的原因下载不了,可以采取离线安装的方式,例如:


提示下载失败后,根据提示的地址下载离线安装包 http://plugins.jetbrains.com/files/631/24825/python-145.86.zip

在界面选择离线安装即可:



3、创建Maven工程,File->New Project->Maven

选择相应的JDK版本,直接下一步


设定Maven项目的GroupId及ArifactId


创建项目的工程名称,点击完成即可

创建Maven工程完毕,默认是Java的,没关系后面我们再添加scala与spark的依赖



4、修改Maven项目的pom.xml文件,增加scala与spark的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ganymede</groupId>
    <artifactId>sparkplatformstudy</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>1.6.0</spark.version>
        <scala.version>2.10</scala.version>
        <hadoop.version>2.6.0</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.39</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <!-- maven官方 http://repo1.maven.org/maven2/  或 http://repo2.maven.org/maven2/ (延迟低一些) -->
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Repository Switchboard</name>
            <layout>default</layout>
            <url>http://repo2.maven.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>

        <plugins>
            <plugin>
                <!-- MAVEN 编译使用的JDK版本 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>




5、删除项目的java目录,新建scala并设置源文件夹


添加scala的SDK


添加scala的SDK成功


6、开发Spark实例


测试案例来自spark官网的mllib例子 http://spark.apache.org/docs/latest/mllib-data-types.html

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by wuke on 2016/7/5.
  */
object LoadLibSVMFile extends App{
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.util.MLUtils
  import org.apache.spark.rdd.RDD

  val conf = new SparkConf().setAppName("LogisticRegressionMail").setMaster("local")

  val sc = new SparkContext(conf)
  val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

  println(examples.first)
}

测试通过


7、打包编译,线上发布



注意选择依赖包



没有更多推荐了,返回首页