spark开发案例

2019-07-06 11:00:10 sdyuy 阅读数 223

本节将介绍如何实际动手进行 RDD 的转换与操作,以及如何编写、编译、打包和运行 Spark 应用程序。

启动 Spark Shell

Spark 的交互式脚本是一种学习 API 的简单途径,也是分析数据集交互的有力工具。Spark 包含多种运行模式,可使用单机模式,也可以使用分布式模式。为简单起见,本节采用单机模式运行 Spark。

无论采用哪种模式,只要启动完成后,就初始化了一个 SparkContext 对象(SC),同时也创建了一个 SparkSQL 对象用于 SparkSQL 操作。进入 Scala 的交互界面中,就可以进行 RDD 的转换和行动操作。

进入目录 SPARK_HOME/bin 下,执行如下命令启动 Spark Shell。

$./spark-shell

Spark Shell 使用

假定本地文件系统中,文件 home/hadoop/SparkData/WordCount/text1 的内容如下。

hello world
hello My name is john I love Hadoop programming

下面我们基于该文件进行 Spark Shell 操作。

1)利用本地文件系统的一个文本文件创建一个新 RDD。

scala>var textFile = sc.textFile(“file://home/Hadoop/SparkData/WordCount/text1”);
textFile:org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
<console>:12

2)执行动作操作,计算文档中有多少行。

scala>textFile.count() //RDD中有多少行
17/05/17 22:59:07 INFO spark.SparkContext:Job finished:count at<console>:15, took 5.654325469 s
resl:Long = 2

返回结果表明文档中有“2”行。

3)执行动作操作,获取文档中的第一行内容。

scala>textFile.first() // RDD 第一行的内容
17/05/17 23:01:25 INFO spark.SparkContext:Job finished:first at <console>:15,took

返回结果表明文档的第一行内容是“hello world”。

4)转换操作会将一个 RDD 转换成一个新的 RDD。获取包含“hello”的行的代码如下

scala>var newRDD = textFile.filter (line => line.contains(“hello”)) //有多少行含有 hello
scala>newRDD.ount() // 有多少行含 hello
17/05/17 23:06:33 INFO spark.SparkContext:Job finished:count at <console>:15,took 0.867975549 s
res4:Long = 2

这段代码首先通过转换操作 filter 形成一个只包括含有“hello”的行的 RDD,然后再通过 count 计算有多少行。

5)Spark Shell 的 WordCount 实现

scala> val file = sc.textFile (“file://home/hendoop/SparkData/WordCount/text1”));
scala> val count = file.flatMap(line=>line.split(“”)).map(word => (word,1)).reduceByKey(_+_)
scala> count.collect()
17/05/17 23:11:46 INFO spark.SparkContext:Job finished: collect at<console>:17,
took 1.624248037 s
res5: Array[(String, Int)] = Array((hello,2),(world,1),(My,1),(is,1),(love,1),(I,1),(John,1),(hadoop,1),(name,1),(programming,1))

  1. 使用 sparkContext 类中的 textFile() 读取本地文件,并生成 MappedBJDD。
  2. 使用 flatMap() 方法将文件内容按照空格拆分单词,拆分形成 FlatMappedRDD。
  3. 使用 map(word=>(word,1)) 将拆分的单词形成 <单词,1> 数据对,此时生成 MappedBJDD。
  4. 使用 reduceByKey() 方法对单词的频度进行统计,由此生成 ShuffledRDD,并由 collect 运行作业得出结果。

编写Java应用程序

1. 安装 maven

手动安装 maven,可以访问 maven 官方下载 apache-maven-3.3.9-bin.zip。选择安装目录为 /usr/local/maven。

sudo unzip ~/下载/apache-maven-3.3.9-bin.zip -d/usr/local
cd /usr/local
sudo mv apache-maven-3.3.9/ ./maven
sudo chown -R hadoop ./maven

2. 编写 Java 应用程序代码

在终端执行以下命令创建一个文件夹 sparkapp2,作为应用程序根目录。

cd~#进入用户主文件夹
mkdir -p ./sparkapp2/src/main/java

使用 vim./sparkapp2/src/main/java/SimpleApp.java 建立一个名为 SimpleApp.java 的文件,代码如下。

  1. /*** SimpleApp.java ***/
  2. import org.apache.spark.api.java.*;
  3. import org.apache.spark.api.java.function.Function;
  4.  
  5. public class SimpleApp {
  6. public static void main(String[] args) {
  7. String logFile = “file:///usr/local/spark/README.md”; // Should be some file on your system
  8. JavaSparkContext sc = new JavaSparkContext(“local”, “Simple App”,
  9. “file:///usr/local/spark/”,new String[] {“target/simple-project-1.0.jar”});
  10.  
  11. JavaRDD<String> logData = sc.textFile(logFile).cache();
  12. long numAs = logData.filter(new Function<String, Boolean>(){
  13. public Boolean call(String s) {
  14. return s.contains (“a”);
  15. }
  16. }).count();
  17.  
  18. long numBs = logData.filter(new Function<String,Boolean>(){
  19. public Boolean call(String s) {
  20. return s.contains(“b”);
  21. }
  22. }).count();
  23. System.out.printIn (“Lines with a:”+ numAs +“,lines with b:”+ numBs);
  24. }
  25. }

该程序依赖 Spark Java API,因此我们需要通过 maven 进行编译打包。在 ./sparkapp2 中新建文件 pom.xml(vim./sparkapp2/pom.xml),并声明该独立应用程序的信息及与 Spark 的依赖关系,代码如下。

  1. <project>
  2. <groupld>edu.berkeley</groupId>
  3. <artifactId>simple-project</artifactId>
  4. <modelVersion>4.0.0</modelVersion>
  5. <name>Simple Project</name>
  6. <packaging>jar</packaging>
  7. <version>l.0</version>
  8. <repositories>
  9. <repository>
  10. <id>Akka repository</id>
  11. <url>http://repo.akka.io/releases</url>
  12. </repository>
  13. </repositories>
  14.  
  15. <dependencies>
  16. <dependency> <!–Spark dependency –>
  17. <groupId>org.apache.spark<groupId>
  18. <artifactId>spark-core_2.11</artifactId>
  19. <version>2.1.0</version>
  20. </dependency>
  21. </dependencies>
  22. </project>

3. 使用 maven 打包 Java 程序

为了保证 maven 能够正常运行,先执行以下命令检查整个应用程序的文件结构。

cd ~/sparkapp2
find

文件结构如图 1 所示。

SimpleApp.java的文件结构
图 1  SimpleApp.java的文件结构

接着,可以通过以下代码将这整个应用程序打包成 Jar。

/usr/local/maven/bin/mvn package

如果运行以上命令后出现类似下面的信息,说明 Jar 包生成成功。

[INFO] ———————————————
[INFO] BUILD SUCCESS
[INFO] ———————————————
[INFO] Total time: 6.583 s
[INFO] Finished at: 2017-02-19T15:52:08+08:00
[INFO] Final Memory: 15M/121M
[INFO]———————————————-

4. 通过 spark-submit 运行程序

最后,可以将生成的 Jar 包通过 spark-submit 提交到 Spark 中运行,命令如下。

/usr/local/spark/bin/spark-submit –class “SimpleApp” ~/sparkapp2/target/simple-project-1.0.jar

最后得到的结果如下。

Lines with a: 62,Lines with b: 30

推荐学习目录:Spark DStream相关操作

2018-08-16 16:34:28 u010675669 阅读数 22499

转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/

搭建开发环境

  1. 安装 Scala IDE

    搭建 Scala 语言开发环境很容易,Scala IDE 官网 下载合适的版本并解压就可以完成安装,本文使用的版本是 4.1.0。

  2. 安装 Scala 语言包

    如果下载的 Scala IDE 自带的 Scala 语言包与 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不一致,那么就需要下载和本文所使用的 Spark 所匹配的版本,以确保实现的 Scala 程序不会因为版本问题而运行失败。

    请下载并安装 Scala 2.10.5 版本

  3. 安装 JDK

    如果您的机器上没有安装 JDK,请下载并安装 1.6 版本以上的 JDK。

  4. 创建并配置 Spark 工程

    打开 Scala IDE,创建一个名称为 spark-exercise 的 Scala 工程。

图 1. 创建 scala 工程

图 1. 创建 scala 工程

在工程目录下创建一个 lib 文件夹,并且把您的 Spark 安装包下的 spark-assembly jar 包拷贝到 lib 目录下。

图 2. Spark 开发 jar 包

图 2. Spark 开发 jar 包

并且添加该 jar 包到工程的 classpath 并配置工程使用刚刚安装的 Scala 2.10.5 版本.,工程目录结构如下。

图 3. 添加 jar 包到 classpath

图 3. 添加 jar 包到 classpath

回页首

运行环境介绍

为了避免读者对本文案例运行环境产生困惑,本节会对本文用到的集群环境的基本情况做个简单介绍。

  • 本文所有实例数据存储的环境是一个 8 个机器的 Hadoop 集群,文件系统总容量是 1.12T,NameNode 叫 hadoop036166, 服务端口是 9000。读者可以不关心具体的节点分布,因为这个不会影响到您阅读后面的文章。
  • 本文运行实例程序使用的 Spark 集群是一个包含四个节点的 Standalone 模式的集群, 其中包含一个 Master 节点 (监听端口 7077) 和三个 Worker 节点,具体分布如下:
Server Name Role
hadoop036166 Master
hadoop036187 Worker
hadoop036188 Worker
hadoop036227 Worker
  • Spark 提供一个 Web UI 去查看集群信息并且监控执行结果,默认地址是:http://<spark_master_ip>:8080 ,对于该实例提交后我们也可以到 web 页面上去查看执行结果,当然也可以通过查看日志去找到执行结果。

图 4. Spark 的 web console

图 4. Spark 的 web console

回页首

案例分析与编程实现

案例一

a. 案例描述

提起 Word Count(词频数统计),相信大家都不陌生,就是统计一个或者多个文件中单词出现的次数。本文将此作为一个入门级案例,由浅入深的开启使用 Scala 编写 Spark 大数据处理程序的大门。

b.案例分析

对于词频数统计,用 Spark 提供的算子来实现,我们首先需要将文本文件中的每一行转化成一个个的单词, 其次是对每一个出现的单词进行记一次数,最后就是把所有相同单词的计数相加得到最终的结果。

对于第一步我们自然的想到使用 flatMap 算子把一行文本 split 成多个单词,然后对于第二步我们需要使用 map 算子把单个的单词转化成一个有计数的 Key-Value 对,即 word -> (word,1). 对于最后一步统计相同单词的出现次数,我们需要使用 reduceByKey 算子把相同单词的计数相加得到最终结果。
c. 编程实现

清单 1.SparkWordCount 类源码

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SparkWordCount {
 def FILE_NAME:String = "word_count_results_";
 def main(args:Array[String]) {
 if (args.length < 1) {
 println("Usage:SparkWordCount FileName");
 System.exit(1);
 }
 val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
 val sc = new SparkContext(conf);
 val textFile = sc.textFile(args(0));
 val wordCounts = textFile.flatMap(line => line.split(" ")).map(
                                        word => (word, 1)).reduceByKey((a, b) => a + b)
 //print the results,for debug use.
 //println("Word Count program running results:");
 //wordCounts.collect().foreach(e => {
 //val (k,v) = e
 //println(k+"="+v)
 //});
 wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
 println("Word Count program running results are successfully saved.");
 }
}

d. 提交到集群执行

本实例中, 我们将统计 HDFS 文件系统中/user/fams 目录下所有 txt 文件中词频数。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,这个 jar 包执行时会被上传到目标服务器的/home/fams 目录下。运行此实例的具体命令如下:

清单 2.SparkWordCount 类执行命令

 ./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt

e. 监控执行状态

该实例把最终的结果存储在了 HDFS 上,那么如果程序运行正常我们可以在 HDFS 上找到生成的文件信息

图 5. 案例一输出结果

图 5. 案例一输出结果

打开 Spark 集群的 Web UI, 可以看到刚才提交的 job 的执行结果。

图 6. 案例一完成状态

图 6. 案例一完成状态

如果程序还没运行完成,那么我们可以在 Running Applications 列表里找到它。

案例二

a. 案例描述

该案例中,我们将假设我们需要统计一个 1000 万人口的所有人的平均年龄,当然如果您想测试 Spark 对于大数据的处理能力,您可以把人口数放的更大,比如 1 亿人口,当然这个取决于测试所用集群的存储容量。假设这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是 ID,第二列是年龄。

图 7. 案例二测试数据格式预览

图 7. 案例二测试数据格式预览

现在我们需要用 Scala 写一个生成 1000 万人口年龄数据的文件,源程序如下:

清单 3. 年龄信息文件生成类源码

 import java.io.FileWriter
 import java.io.File
 import scala.util.Random

 object SampleDataFileGenerator {
 
 def main(args:Array[String]) {
 val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
 val rand = new Random()
 for ( i <- 1 to 10000000) {
 writer.write( i + " " + rand.nextInt(100))
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 writer.close()
 }
 }

b. 案例分析

要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD,其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。

对于第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。

由于本例输出结果很简单,所以只打印在控制台即可。

c. 编程实现

清单 4.AvgAgeCalculator 类源码

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
 def main(args:Array[String]) {
 if (args.length < 1){
 println("Usage:AvgAgeCalculator datafile")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
 val sc = new SparkContext(conf)
 val dataFile = sc.textFile(args(0), 5);
 val count = dataFile.count()
 val ageData = dataFile.map(line => line.split(" ")(1))
 val totalAge = ageData.map(age => Integer.parseInt(
                                String.valueOf(age))).collect().reduce((a,b) => a+b)
 println("Total Age:" + totalAge + ";Number of People:" + count )
 val avgAge : Double = totalAge.toDouble / count.toDouble
 println("Average Age is " + avgAge)
 }
}

d. 提交到集群执行

要执行本实例的程序,需要将刚刚生成的年龄信息文件上传到 HDFS 上,假设您刚才已经在目标机器上执行生成年龄信息文件的 Scala 类,并且文件被生成到了/home/fams 目录下。

那么您需要运行一下 HDFS 命令把文件拷贝到 HDFS 的/user/fams 目录。

清单 5. 年龄信息文件拷贝到 HDFS 目录的命令

hdfs dfs –copyFromLocal /home/fams /user/fams

清单 6.AvgAgeCalculator 类的执行命令

 ./spark-submit \
 --class com.ibm.spark.exercise.basic.AvgAgeCalculator \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 2g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt

e. 监控执行状态

在控制台您可以看到如下所示信息:

图 8. 案例二输出结果

图 8. 案例二输出结果

我们也可以到 Spark Web Console 去查看 Job 的执行状态

图 9. 案例二完成状态

图 9. 案例二完成状态

案例三

a. 案例描述

本案例假设我们需要对某个省的人口 (1 亿) 性别还有身高进行统计,需要计算出男女人数,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的源文件有以下格式, 三列分别是 ID,性别,身高 (cm)。

图 10. 案例三测试数据格式预览

图 10. 案例三测试数据格式预览

我们将用以下 Scala 程序生成这个文件,源码如下:

清单 7. 人口信息生成类源码

import java.io.FileWriter
import java.io.File
import scala.util.Random

object PeopleInfoFileGenerator {
 def main(args:Array[String]) {
 val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false)
 val rand = new Random()
 for ( i <- 1 to 100000000) {
 var height = rand.nextInt(220)
 if (height < 50) {
 height = height + 50
 }
 var gender = getRandomGender
 if (height < 100 && gender == "M")
 height = height + 100
 if (height < 100 && gender == "F")
 height = height + 50
 writer.write( i + " " + getRandomGender + " " + height)
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 writer.close()
 println("People Information File generated successfully.")
 }
 
 def getRandomGender() :String = {
 val rand = new Random()
 val randNum = rand.nextInt(2) + 1
 if (randNum % 2 == 0) {
 "M"
 } else {
 "F"
 }
 }
}

b. 案例分析

对于这个案例,我们要分别统计男女的信息,那么很自然的想到首先需要对于男女信息从源文件的对应的 RDD 中进行分离,这样会产生两个新的 RDD,分别包含男女信息;其次是分别对男女信息对应的 RDD 的数据进行进一步映射,使其只包含身高数据,这样我们又得到两个 RDD,分别对应男性身高和女性身高;最后需要对这两个 RDD 进行排序,进而得到最高和最低的男性或女性身高。

对于第一步,也就是分离男女信息,我们需要使用 filter 算子,过滤条件就是包含”M” 的行是男性,包含”F”的行是女性;第二步我们需要使用 map 算子把男女各自的身高数据从 RDD 中分离出来;第三步我们需要使用 sortBy 算子对男女身高数据进行排序。

c. 编程实现

在实现上,有一个需要注意的点是在 RDD 转化的过程中需要把身高数据转换成整数,否则 sortBy 算子会把它视为字符串,那么排序结果就会受到影响,例如 身高数据如果是:123,110,84,72,100,那么升序排序结果将会是 100,110,123,72,84,显然这是不对的。

清单 8.PeopleInfoCalculator 类源码

object PeopleInfoCalculator {
 def main(args:Array[String]) {
 if (args.length < 1){
 println("Usage:PeopleInfoCalculator datafile")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
 val sc = new SparkContext(conf)
 val dataFile = sc.textFile(args(0), 5);
 val maleData = dataFile.filter(line => line.contains("M")).map(
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))
 val femaleData = dataFile.filter(line => line.contains("F")).map(
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))
 //for debug use
 //maleData.collect().foreach { x => println(x)}
 //femaleData.collect().foreach { x => println(x)}
 val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
 val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
 //for debug use
 //maleHeightData.collect().foreach { x => println(x)}
 //femaleHeightData.collect().foreach { x => println(x)}
 val lowestMale = maleHeightData.sortBy(x => x,true).first()
 val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
 //for debug use
 //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
 //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
 val highestMale = maleHeightData.sortBy(x => x, false).first()
 val highestFemale = femaleHeightData.sortBy(x => x, false).first()
 println("Number of Male Peole:" + maleData.count())
 println("Number of Female Peole:" + femaleData.count())
 println("Lowest Male:" + lowestMale)
 println("Lowest Female:" + lowestFemale)
 println("Highest Male:" + highestMale)
 println("Highest Female:" + highestFemale)
 }
}

d. 提交到集群执行

在提交该程序到集群执行之前,我们需要将刚才生成的人口信息数据文件上传到 HDFS 集群,具体命令可以参照上文。

清单 9.PeopleInfoCalculator 类的执行命令

 ./spark-submit \
 --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 3g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt

e. 监控执行状态

对于该实例,如程序中打印的一样,会在控制台显示如下信息:

图 11. 案例三输出结果

图 11. 案例三输出结果

在 Spark Web Console 里可以看到具体的执行状态信息

图 12. 案例三完成状态

图 12. 案例三完成状态

案例四

a. 案例描述

该案例中我们假设某搜索引擎公司要统计过去一年搜索频率最高的 K 个科技关键词或词组,为了简化问题,我们假设关键词组已经被整理到一个或者多个文本文件中,并且文档具有以下格式。

图 13. 案例四测试数据格式预览

图 13. 案例四测试数据格式预览

我们可以看到一个关键词或者词组可能出现多次,并且大小写格式可能不一致。

b. 案例分析

要解决这个问题,首先我们需要对每个关键词出现的次数进行计算,在这个过程中需要识别不同大小写的相同单词或者词组,如”Spark”和“spark” 需要被认定为一个单词。对于出现次数统计的过程和 word count 案例类似;其次我们需要对关键词或者词组按照出现的次数进行降序排序,在排序前需要把 RDD 数据元素从 (k,v) 转化成 (v,k);最后取排在最前面的 K 个单词或者词组。

对于第一步,我们需要使用 map 算子对源数据对应的 RDD 数据进行全小写转化并且给词组记一次数,然后调用 reduceByKey 算子计算相同词组的出现次数;第二步我们需要对第一步产生的 RDD 的数据元素用 sortByKey 算子进行降序排序;第三步再对排好序的 RDD 数据使用 take 算子获取前 K 个数据元素。

c. 编程实现

清单 10.TopKSearchKeyWords 类源码

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TopKSearchKeyWords {
 def main(args:Array[String]){
 if (args.length < 2) {
 println("Usage:TopKSearchKeyWords KeyWordsFile K");
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
 val sc = new SparkContext(conf)
 val srcData = sc.textFile(args(0))
 val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
 //for debug use
 //countedData.foreach(x => println(x))
 val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
 val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
 topKData.foreach(println)
 }
}

d. 提交到集群执行

清单 11.TopKSearchKeyWords 类的执行命令

 ./spark-submit \
 --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 2g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt

e. 监控执行状态

如果程序成功执行,我们将在控制台看到以下信息。当然读者也可以仿照案例二和案例三那样,自己尝试使用 Scala 写一段小程序生成此案例需要的源数据文件,可以根据您的 HDFS 集群的容量,生成尽可能大的文件,用来测试本案例提供的程序。

图 14. 案例四输出结果

图 14. 案例四输出结果

图 15. 案例四完成状态

图 15. 案例四完成状态

回页首

Spark job 的执行流程简介

我们可以发现,Spark 应用程序在提交执行后,控制台会打印很多日志信息,这些信息看起来是杂乱无章的,但是却在一定程度上体现了一个被提交的 Spark job 在集群中是如何被调度执行的,那么在这一节,将会向大家介绍一个典型的 Spark job 是如何被调度执行的。

我们先来了解以下几个概念:

DAG: 即 Directed Acyclic Graph,有向无环图,这是一个图论中的概念。如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。

Job:我们知道,Spark 的计算操作是 lazy 执行的,只有当碰到一个动作 (Action) 算子时才会触发真正的计算。一个 Job 就是由动作算子而产生包含一个或多个 Stage 的计算作业。

Stage:Job 被确定后,Spark 的调度器 (DAGScheduler) 会根据该计算作业的计算步骤把作业划分成一个或者多个 Stage。Stage 又分为 ShuffleMapStage 和 ResultStage,前者以 shuffle 为输出边界,后者会直接输出结果,其边界可以是获取外部数据,也可以是以一个 ShuffleMapStage 的输出为边界。每一个 Stage 将包含一个 TaskSet。

TaskSet: 代表一组相关联的没有 shuffle 依赖关系的任务组成任务集。一组任务会被一起提交到更加底层的 TaskScheduler。

Task:代表单个数据分区上的最小处理单元。分为 ShuffleMapTask 和 ResultTask。ShuffleMapTask 执行任务并把任务的输出划分到 (基于 task 的对应的数据分区) 多个 bucket(ArrayBuffer) 中,ResultTask 执行任务并把任务的输出发送给驱动程序。

Spark 的作业任务调度是复杂的,需要结合源码来进行较为详尽的分析,但是这已经超过本文的范围,所以这一节我们只是对大致的流程进行分析。

Spark 应用程序被提交后,当某个动作算子触发了计算操作时,SparkContext 会向 DAGScheduler 提交一个作业,接着 DAGScheduler 会根据 RDD 生成的依赖关系划分 Stage,并决定各个 Stage 之间的依赖关系,Stage 之间的依赖关系就形成了 DAG。Stage 的划分是以 ShuffleDependency 为依据的,也就是说当某个 RDD 的运算需要将数据进行 Shuffle 时,这个包含了 Shuffle 依赖关系的 RDD 将被用来作为输入信息,进而构建一个新的 Stage。我们可以看到用这样的方式划分 Stage,能够保证有依赖关系的数据可以以正确的顺序执行。根据每个 Stage 所依赖的 RDD 数据的 partition 的分布,会产生出与 partition 数量相等的 Task,这些 Task 根据 partition 的位置进行分布。其次对于 finalStage 或是 mapStage 会产生不同的 Task,最后所有的 Task 会封装到 TaskSet 内提交到 TaskScheduler 去执行。有兴趣的读者可以通过阅读 DAGScheduler 和 TaskScheduler 的源码获取更详细的执行流程。

回页首

结束语

通过本文,相信读者对如何使用 Scala 编写 Spark 应用程序处理大数据已经有了较为深入的了解。当然在处理实际问题时,情况可能比本文举得例子复杂很多,但是解决问题的基本思想是一致的。在碰到实际问题的时候,首先要对源数据结构格式等进行分析,然后确定如何去使用 Spark 提供的算子对数据进行转化,最终根据实际需求选择合适的算子操作数据并计算结果。本文并未介绍其它 Spark 模块的知识,显然这不是一篇文章所能完成的,希望以后会有机会总结更多的 Spark 应用程序开发以及性能调优方面的知识,写成文章与更多的 Spark 技术爱好者分享,一起进步。由于时间仓促并且本人知识水平有限,文章难免有未考虑周全的地方甚至是错误,希望各位朋友不吝赐教。有任何问题,都可以在文末留下您的评论,我会及时回复。

2018-05-11 22:23:06 JENREY 阅读数 3170

1.案例:通过网络监听端口的方式,实现SparkStreaming的单词计数功能,弊端就是不能全局累加,只能累加同一批的数据

创建Maven项目:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aura.spark</groupId>
    <artifactId>1711spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.7.5</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>




        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>

        </dependency>




    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

创建Scala代码 NetWordCount.scala  (注意是object)

千万注意local[2]最少是2,否则什么事也不会做,因为有Recevier 启动起来就是一个task任务,就需要一个线程,只写1个线程(去接收数据)就没有线程处理数据了,所以在本地最少要写2

而且代码是微批处理并不是非常准确的实时计算,每Seconds(2)两秒运行一次本批数据

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Create NetWordCount.scala by jenrey on 2018/5/11 21:58
  */
object NetWordCount {
  def main(args: Array[String]): Unit = {
    /**
      * 初始化程序入口
      */
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(2))

    /*StreamingContext源码会createNewSparkContext,所以可以省略创建SparkContext对象。
    val ssc = new StreamingContext(conf,Seconds(2))*/

    /**
      * 通过程序入口获取DStream
      * 我们通过监听的方式获取数据,监听主机名和端口。只要一有数据就可以获取到,相当于通过Socket的方式
      */
      //ReceiverInputDStream就是个DStream,继承InputDStream继承DStream(就是一个抽象类,其实就是个HashMap(Time,RDD[T])一个时间点对应一个RDD )
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)

    /**
      * 对DStream流进行操作
      */
      //下面都是Transformation操作
    val wordCountDStream: DStream[(String, Int)] = dstream.flatMap(line => line.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)
    //output的操作类似于RDD的Action
    wordCountDStream.print()  //把数据打印出来
    /**
      * 启动应用程序(固定操作)
      */
    //启动我们的程序
    ssc.start();
    //等待结束
    ssc.awaitTermination();
    //如果结束就释放资源
    ssc.stop();
  }
}

在hadoop04几点上使用下面的命令:

[hadoop@hadoop04 ~]$ nc -lk 9999

================================================

(安装nc:)

[hadoop@hadoop04 ~]$ sudo yum install nc


输入nc命令查看是否安装成功


==================================================

先让代码跑起来。

再在hadoop04 的 nc下输入hadoop,hadoop


IDEA控制台的输出结果:



案例2:在HDFS的WordCount程序

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 用SparkStreaming在HA模式下的HDFS跑WordCount程序
  */
object HDFSWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(2))
    /**
      * 数据的输入
      */
      //监控的是一个目录即文件夹,新增文件就可以接收到了
    val fileDStream: DStream[String] = ssc.textFileStream("hdfs://myha01/streaming")
    /**
      * 数据的处理
      */
    val wordCountDStream: DStream[(String, Int)] = fileDStream.flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)

    /**
      * 数据的输出
      */
    wordCountDStream.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

}


1)在hdfs上新建一个文件夹用来监听文件夹,一旦往监听的文件夹下面放入文件就能监听的到数据

注意:如果用的是HA模式,一定要把两个配置(core-site.xml和hdfs-site.xml)文件放到resources下面


在hdfs创建一个空的用来测试代码的文件夹streaming

[hadoop@hadoop04 ~]$ hadoop fs -mkdir /streaming

把IDEA里面的代码跑起来!!!!

然后vim hello.txt写入

you,jump

i,jump

然后上传到streaming文件夹下面

[hadoop@hadoop04 ~]$ hadoop fs -put hello.txt /streaming




案例3:updateStateByKey的WordCount程序

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Create by jenrey on 2018/5/13 15:09
  */
object UpdateStateByKeyWordCount {
  def main(args: Array[String]): Unit = {
    /**
      * 初始化程序入口
      */
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    //注意一定要设置checkpoint目录,否则程序报错,但是这个HDFS目录一定要有权限,这个目录不用提前创建,自动创建
    ssc.checkpoint("hdfs://myha01/StreamingCheckPoint")
    /**
      * 数据的输入
      */
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
    /**
      * 数据的处理
      */
    val wordCountDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
      .map((_, 1))
      //updateStateByKey(updateFunc: (Seq[V], Option[S]) => Option[S]) 注意里面是一个函数
      //Option:Some:有值,None:没值
      //ByKey:操作就是分组
      //you,1
      //you,1     => you,{1,1}和jump,{1}
      //jump,1
      //下面这个函数每一个key都会调用一次这个函数
      //所以values:Seq[Int]代表List{1,1}  state:Option[Int]代表上一次这个单词出现了多少次,如果上一次没出现过就是None,如果出现过就是Some该1次就1次该2次就2次
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
      val current: Int = values.sum
      //上一次出现多少次如果有就有,没有就是0
      val lastCount: Int = state.getOrElse(0)
      //既然这个单词能调用这个方法,那么这个单词必然最少出现了一次就是Some,所以当前+上一次的就是这个单词一共出现多少次
      Some(current + lastCount)
    })

    /**
      * 数据的输出
      */
    wordCountDStream.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

运行代码:

注意容易报错的两个点:

1.没有设置checkpoint导致报错

2.没有权限,报错,如下图


需要去设置权限

[hadoop@hadoop04 ~]$ hdfs dfs -chmod 777 /

然后运行代码:

在hadoop04节点进行发送数据:

[hadoop@hadoop04 ~]$ nc -lk 9999




案例4:程序运行后停止再运行接着上次的结果继续计算的WordCount程序

普通的为什么就不能停止后接着运行呢?因为又一次创建了程序入口(new StreamingContext),是完全两个不同的程序入口,说白了就是关心Driver服务。两个Driver服务不同。

checkpoint可以把Driver服务里面的信息存到这个目录里,那么下一次我们启动的时候通过checkpoint里面的数据把Driver服务恢复成跟上一次一样的,那么再计算的时候就相当于对上一次的结果进行累加了

import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DriverHAWordCount {
  def main(args: Array[String]): Unit = {
    //注意本程序需要执行一次输入点数据,然后关闭再次执行就可以接着上次进行累加了
    val checkpointDirectory: String = "hdfs://myha01/StreamingCheckPoint3";

    def functionToCreateContext(): StreamingContext = {
      val conf = new SparkConf().setMaster("local[2]").setAppName("DriverHAWordCount")
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc, Seconds(2))
      ssc.checkpoint(checkpointDirectory)
      val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
      val wordCountDStream = dstream.flatMap(_.split(","))
        .map((_, 1))
        .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
          val currentCount = values.sum
          val lastCount = state.getOrElse(0)
          Some(currentCount + lastCount)
        })

      wordCountDStream.print()

      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
      //最后一行代码就是返回的
      ssc
    }

    //从里面获取一个程序入口,如果checkpointDirectory目录里面有程序入口就用这个,如果没有就新new一个程序入口(或者说一个Driver服务)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }

}

运行程序:


关闭代码程序,再次运行,等待一会再输入数据




案例5:单词黑名单

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordBlack {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("WordBlack")
    val sc: SparkContext = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    /**
      * 数据的输入
      */
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)

    /**
      * 自己模拟一个黑名单(正常是用mysql,hbase,redis数据库读取出来的
      */
    //直接转化为RDD
    val wordBlackList: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List("?", "!", "*"))
      .map(param => (param, true))
    /**
      * (?,true)
      * (!,true)
      * (*,true)
      */
    val balckList: Array[(String, Boolean)] = wordBlackList.collect()
    //broadcast广播出去
    val blackListBroadcast: Broadcast[Array[(String, Boolean)]] = ssc.sparkContext.broadcast(balckList)

    /**
      * 数据的处理
      */
    val wordOneDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
      .map((_, 1))
    //transform把DStream转换成RDD,需要又返回值,并且类型为RDD
    val wordCountDStream: DStream[(String, Int)] = wordOneDStream.transform(rdd => {
      val filterRDD: RDD[(String, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
      val resultRDD: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD)

      /**
        * String, (Int, Option[Boolean])
        * String:word
        * Int:1
        * Option:有可能join上也有可能join不上
        *
        * 思路:我们要的是join不上的,说白了要的是Option[Boolean]=None
        * filter:
        * true代表我们要
        */
      resultRDD.filter(tuple => {
        tuple._2._2.isEmpty
      }).map(_._1)
    }).map((_, 1)).reduceByKey(_ + _)

    /**
      * 数据的输出
      */
    wordCountDStream.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

先运行代码,然后再hadoop04传送数据




案例6:窗口操作Window Operations

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 需求:每隔4秒统计最近6秒的单词计数的情况
  * reduceByKeyAndWindow
  */
object WindowOperatorTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("WordBlack")
    val sc: SparkContext = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    /**
      * 数据的输入
      * 到目前为止这个地方还没有跟生产进行对接。
      */
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)

    /**
      * 数据的处理
      */
    val resultWordCountDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
      .map((_, 1))

      /**
        * reduceFunc: (V, V) => V,  匿名函数-达到对单词次数进行累加的效果
        * windowDuration: Duration, 统计多少秒以内的数据-窗口的大小
        * slideDuration: Duration,  每隔多少时间-滑动的大小
        * //numPartitions: Int  指定分区数,要么跟核数有关要么和指定分区数有关
        * 注意:这两个数一定要是Seconds(2)的倍数
        */
      .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(6), Seconds(4))


    /**
      * 数据的输出
      */
    resultWordCountDStream.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}


foreachRDD的使用

注意foreachRDD下面的代码是执行在Driver端的

1.下图会报错

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

connection对象是需要发到executor上执行的。这就需要网络的传输了。就需要序列化。但是这个对象是不支持序列化的。所以就发送不过去,会报错无法序列化。


2.不报错,缺陷每次都需要创建connection对象,还有就是每次处理的是一条数据,频繁的创建和销毁对数据库连接,对数据库影响很大

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

3.不报错,一次拿到一个partition数据,并创建一个connection对象。再把每个分区的数据一条一条发送到数据库。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

调优:我们可以弄个连接池,再弄个批处理,每一百条数据提交一次到数据库

4.不报错,使用连接池

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意:从连接池中获取,使用完把连接丢回到连接池,少了创建的步骤,但是还是一条数据一条数据提交。所以我们要批处理,就要考虑到事务。

下面是操作实战:

现在mysql建立一张表:


写一个连接池的scala文件:

import java.sql.{Connection, DriverManager}

object ConnectionPool {
  private val max=8 ;//连接池的连接总数
  private val connectionNum=10;//每次产生的连接数
  private var conNum=0;//当前连接池已经产生的连接数

  import java.util
  private val pool=new util.LinkedList[Connection]();//连接池

  {
    Class.forName("com.mysql.jdbc.Driver")
  }
  /**
    * 释放连接
    */
  def returnConnection(conn:Connection):Unit={
    pool.push(conn);
  }
  /**
    * 获取连接
    */
  def getConnection():Connection={
    //同步代码块
    AnyRef.synchronized({
      if(pool.isEmpty()){
        for( i <- 1 to connectionNum){
          val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/aura","root","root");
          pool.push(conn);
          conNum+1;
        }
      }
      pool.poll();
    })

  }
}

下面的代码:

/**
  * Create by jenrey on 2018/5/13 20:27
  */
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

/**
  * 接收nc的数据,并把数据存到mysql表中
  */
object OutputTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("OutputTest")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(2))
    ssc.checkpoint("hdfs://myha01/StreamingCheckPoint3")
    /**
      * 数据的输入
      */
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04",9999)


    val wordCountDStream = dstream.flatMap(_.split(","))
      .map((_, 1))
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.sum
        val lastCount = state.getOrElse(0)
        Some(currentCount + lastCount)
      })

    /**
     * 数据的输出
     */
    wordCountDStream.foreachRDD( rdd=>{
      rdd.foreachPartition( paritition =>{
        //从连接池中获取连接
        val connection = ConnectionPool.getConnection()
        //获取Statement对象(用来发送sql指令)
        val statement = connection.createStatement()
        paritition.foreach{
          case (word,count) =>{
            val sql=s"insert into aura.1711wordcount values(now(),'$word',$count)"
            print(sql)
            //借助于Statement发送sql指令
            statement.execute(sql)
          }
        }
        //把connection对象再还回给连接池
        ConnectionPool.returnConnection(connection)
      } )
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

}

先运行程序再发送nc数据:



缺点就是把每一次的记录都打印出来了。

如果想要最新的数据就存在Hbase上面,Hbase会自动进行覆盖。


下面是将结果保存到Mysql的代码全集:

import java.sql.DriverManager

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
  *
  * 1、在master服务器上启动一个Netcat server
  * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
  *
  *
  * create table wordcount(ts bigint, word varchar(50), count int);
  *
  * spark-shell --total-executor-cores 4 --executor-cores 2 --master spark://master:7077 --jars mysql-connector-java-5.1.44-bin.jar,c3p0-0.9.1.2.jar,spark-streaming-basic-1.0-SNAPSHOT.jar
  *
  *
  */
object NetworkWordCountForeachRDD {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDD")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(5))

    //创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
    val lines = ssc.socketTextStream("hadoop1", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    //处理的逻辑,就是简单的进行word count
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    //将结果保存到Mysql(一)
    /**
      *
      * 这个代码会报错的!!!
      */
    wordCounts.foreachRDD { (rdd, time) =>
      Class.forName("com.mysql.jdbc.Driver")
      val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
      val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
      rdd.foreach { record =>
        statement.setLong(1, time.milliseconds)
        statement.setString(2, record._1)
        statement.setInt(3, record._2)
        statement.execute()
      }
      statement.close()
      conn.close()
    }
    //启动Streaming处理流
    ssc.start()

    ssc.stop(false)


    //将结果保存到Mysql(二)
    wordCounts.foreachRDD { (rdd, time) =>
      rdd.foreach { record =>
        Class.forName("com.mysql.jdbc.Driver")
        val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
        val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
        statement.setLong(1, time.milliseconds)
        statement.setString(2, record._1)
        statement.setInt(3, record._2)
        statement.execute()
        statement.close()
        conn.close()
      }
    }

    //将结果保存到Mysql(三)
    wordCounts.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionRecords =>
        Class.forName("com.mysql.jdbc.Driver")
        val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
        val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
        partitionRecords.foreach { case (word, count) =>
          statement.setLong(1, time.milliseconds)
          statement.setString(2, word)
          statement.setInt(3, count)
          statement.execute()
        }
        statement.close()
        conn.close()
      }
    }

    //将结果保存到Mysql(四),使用连接池
    wordCounts.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionRecords =>
        val conn = ConnectionPool.getConnection
        val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
        partitionRecords.foreach { case (word, count) =>
          statement.setLong(1, time.milliseconds)
          statement.setString(2, word)
          statement.setInt(3, count)
          statement.execute()
        }
        statement.close()
        ConnectionPool.returnConnection(conn)
      }
    }

    //将结果保存到Mysql(五),批处理
    wordCounts.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionRecords =>
        val conn = ConnectionPool.getConnection
        val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
        partitionRecords.foreach { case (word, count) =>
          statement.setLong(1, time.milliseconds)
          statement.setString(2, word)
          statement.setInt(3, count)
          statement.addBatch()
        }
        statement.executeBatch()
        statement.close()
        ConnectionPool.returnConnection(conn)
      }
    }


    //将结果保存到Mysql(六),批处理引入事务
    wordCounts.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionRecords =>
        val conn = ConnectionPool.getConnection
	//把自动提交改为false
        conn.setAutoCommit(false)
        val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
        partitionRecords.foreach { case (word, count) =>
          statement.setLong(1, time.milliseconds)
          statement.setString(2, word)
          statement.setInt(3, count)
          statement.addBatch()
        }
        statement.executeBatch()
        statement.close()
        conn.commit()
        conn.setAutoCommit(true)
        ConnectionPool.returnConnection(conn)
      }
    }


    //将结果保存到Mysql(七),控制批处理的量,每500条提交一次
    wordCounts.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionRecords =>
        val conn = ConnectionPool.getConnection
        conn.setAutoCommit(false)
        val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
        partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
          statement.setLong(1, time.milliseconds)
          statement.setString(2, word)
          statement.setInt(3, count)
          statement.addBatch()
          if (index != 0 && index % 500 == 0) {
            statement.executeBatch()
            conn.commit()
          }
        }
        statement.executeBatch()
        statement.close()
        conn.commit()
        conn.setAutoCommit(true)
        ConnectionPool.returnConnection(conn)
      }
    }

    //等待Streaming程序终止
    ssc.awaitTermination()
  }
}

案例7:SparkStreaming的数据源来自kafka

首先在pom.xml中写入下面的代码


编写下面的代码:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaTest")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(2))
    //使用kafka是需要下面的目录的,因为SparkStreaming自己要维护一些东西的,要持久化,存到内存是易丢失的。
    ssc.checkpoint("hdfs://myha01/streamingkafka")
    /**
      * 数据的输入:KafkaUtils.createDirectStream
      *
      * def createDirectStream[K: ClassTag,V: ClassTag,KD <:Decoder[K]: ClassTag,VD <:Decoder[V]: ClassTag] (
      * 下面是三个参数:
      * ssc: StreamingContext,
      * kafkaParams: Map[String, String],
      * topics: Set[String])  可以一下子读多个topics,但是我们这里读一个topics就行了
      */
      //指定kafka broker的机器,也就是kafka的地址
    val kafkaParams = Map("metadata.broker.list" -> "hadoop03:9092")
    val topics = Set("aura")
    //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
    //原来直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
    val kafkaDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics).map(_._2)

    /**
      * 数据的处理
      * 也已经比较正式了
      */
    kafkaDStream.flatMap(_.split(","))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}


启动kafka高可用集群:

注意先启动ZK

[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

[hadoop@hadoop04 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

[hadoop@hadoop05 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

创建topic

[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic aura


创建生产者

[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 -topic aura


然后运行代码:

然后再hadoop03输入下面的内容





案例8:数据黑名单过滤

import java.sql.Date
import java.util.Properties

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * timestamp:
  * 时间戳,用户点击广告的时间
  * province:
  * 省份,用户在哪个省份点击的广告
  * city:
  * 城市,用户在哪个城市点击的广告
  * userid:
  * 用户的唯一标识
  * advid:
  * 被点击的广告id
  */
        object AdvApplicationTest {

          def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setMaster("local")
            conf.setAppName("AdvApplicationTest")
            conf.set("","")  //序列化

            val sc = new SparkContext(conf)

            val ssc = new StreamingContext(sc,Seconds(5))
            //getOrCreate():有就拿过来,没有就创建,类似于单例模式:
            val spark: SparkSession = SparkSession.builder()
              .config(conf).getOrCreate()

            /**
              * 第一步:从kafka获取数据(direct  方式)
              *   K: ClassTag,
                  V: ClassTag,
                  KD <: Decoder[K]: ClassTag,
                  VD <: Decoder[V]: ClassTag] (
              ssc: StreamingContext,
              kafkaParams: Map[String, String],
              topics: Set[String]
              */
            val kafkaParams = Map("metadata.broker.list" -> "hadoop3:9092")
            val topics = Set("aura")
            val logDstream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
              ssc, kafkaParams, topics).map(_._2)

            /**
              * 第二步:进行黑名单过滤
              */
            val filterLogDStream: DStream[String] = blackListFilter(logDstream,ssc)


            /**
              * 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
              *
              *
              * zhangsan:
              *          A:50  B:60
              * lisi:
              *          A:50   A:20  A:40   这就是黑名单用户
              * 如果一个用户今天是黑名单用户,那么明天还是黑名单用户吗?
              * 这个看业务而定。
              *
              * 第三步:动态生成黑名单  实时生成黑名单
              */
            DynamicGenerationBlacklists(filterLogDStream,spark)

            /**
              * 第四步:
              *        实时统计每天各省各城市广告点击量
              */
            val dateProvinceCityAdvClick_Count = ProvinceCityAdvClick_Count(filterLogDStream)
            /**
              * 第五步:
              *       实时统计每天各省热门广告
              *        分组求TopN
              *
              *   transform  froeachRDD
              *   rdd   => dataframe
              *   SparkSQL:
              *     SQL
              */


            /**
              * 第六步:
              *     实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
              */

            ssc.start()
            ssc.awaitTermination()
            ssc.stop()
          }

          /**
            * 对黑名单数据进行过滤
            * logDstream  从kafka读取数据
            * 返回的就是进行黑名单过滤以后的数据
            */
          def blackListFilter(logDstream: DStream[String],ssc:StreamingContext):DStream[String]={

            /**
              * 这个地方应该是去数据库里面去读取数据
              * 三个常用的数据库:Redis,HBase,Mysql
              * black_list
              */

            val blackList = List((1L,true),(2L,true),(3L,true))
            val blackListRDD = ssc.sparkContext.parallelize(blackList)
            val balckListBroadcast = ssc.sparkContext.broadcast(blackListRDD.collect())

            /**
              * 这个地方的黑名单,应该是从我们的持久化的数据库里面读取的:有三个数据库是我们常用的:
              * 1)Reids   自己去百度一下
              * 2) HBase  自己去百度一下
              * 3) Mysql  上课演示过
              * SparkCore的方式读取的
              * SparkSQL  -> dataframe -> rdd
              */

            logDstream.transform( rdd =>{
             val user_lineRDD=rdd.map( line =>{
               val fields = line.split(",")
               (fields(3).toLong,line)
             })
               val blackRDD = rdd.sparkContext.parallelize(balckListBroadcast.value)
              //只有keyValue的形式才能进行join,所以需要上面的操作
              val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
              resultRDD.filter( tuple =>{
                tuple._2._2.isEmpty
              }).map(_._2._1)

            })

          }

          /**
            * 动然生成黑名单
            * @param filterLogDStream  黑名单过滤万了以后的数据
            * 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
            *
            * 梳理一下思路:
            *   这个需求 跟 我们单词计数很像,无非不就是实时统计每个单词出现了多少次
            *   如果发现某个单词出现了一个100,那么他就是黑名单单词
            *   方式一:
            *   (date_userid_advid,v)=map
            *    实时统计出来每个单词出现了多少次=updateStateBykey (对内存的要求高一点)
            *    张三 A 80
            *    李四 B 99
            *         100
            *    fitler  过滤出来次数 一百以上 把它写入 MySQL,Reids,HBase 数据库
            *   方式二:
            *   (date_userid_advid,v)=map
            *    每次处理的是本批次的数据 reduceBykey(对内存的要求低一点)
            *    HBase:
            *        rowkey:  date_userid_advid  2
            *          本批次  3
            *            5
            *    Redis
            *   方式三:
            *        MySQL的方式
            */
          def DynamicGenerationBlacklists(filterLogDStream: DStream[String],spark:SparkSession):Unit={

            val date_userid_advid_ds=filterLogDStream.map( line =>{
              val fields = line.split(",")
             val time = new Date( fields(0).toLong)
              val date = DateUtils.formatDateKey(time)
              val userid = fields(3)
              val advid = fields(4)
               //20180512_
              (date+"_"+userid+"_"+advid,1L)
            }).reduceByKey(_+_)

            date_userid_advid_ds.foreachRDD( rdd =>{
              rdd.foreachPartition( partition =>{
                val connection = ConnectionPool.getConnection()
                val statement = connection.createStatement()
                partition.foreach{
                  case(date_userid_advid,count) =>{
                    val fields = date_userid_advid.split("_")
                    val date = fields(0)
                    val userid = fields(1).toLong
                    val advid = fields(2).toLong
                    val sql=s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
                    statement.execute(sql);
                  }
                }
                ConnectionPool.returnConnection(connection)

              })
            })

            /**
              *生成黑名单
              */

            val df: DataFrame = spark.read.format("jdbc")
              .option("url", "jdbc:mysql://localhost:3306/aura")
              .option("user", "aura")
              .option("password", "aura")
              .option("dbtable", "tmp_advclick_count")
              .load()

            df.createOrReplaceTempView("tmp_advclick_count")

            val sql=
              """
                 SELECT
                      userid
                 FROM
                 (
                 SELECT
                      date,userid,advid,sum(click_count) c_count
                      FROM
                      tmp_advclick_count
                 GROUP BY
                      date,userid,advid
                 ) t
                      WHERE
                      t.c_count > 100
              """

            //统计出来黑名单
            val blacklistdf = spark.sql(sql).distinct()
              val properties = new Properties()
            properties.put("user","aura")
            properties.put("password","aura")
            blacklistdf.write.mode(SaveMode.Append)
                .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
          }

          /**
            * 实时统计每天各省各城市广告点击量
            * @param filterLogDStream
            */
          def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]):DStream[(String,Long)]={
            /**
              * 思路
              * map  => (k,v)  => date+province+city+advid  1
              *                updateStateBykey
              */
            var f=(input:Seq[Long],state:Option[Long]) =>{
              val current_count = input.sum
              val last_count = state.getOrElse(0)
              Some(current_count+last_count)
            }

            filterLogDStream.map( line =>{
              val fields = line.split(",")
              val time = fields(0).toLong
              val mydate = new Date(time)
              val date = DateUtils.formatDateKey(mydate)
              val province = fields(1)
              val city = fields(2)
              val advid = fields(4)
              (date+"_"+province+"_"+city+"_"+advid,1L)
            }).updateStateByKey(f)
            /**
              * 如果开发有需求的话,可以把这些数据库写入 MySQL数据库 ,Hbase
              */
          }

          /**
            * 实时统计 各省热门广告
            *
            * transform : rdd  -> datafram  -> table -> sql
            * @param date_province_city_advid_count
            */
          def ProvinceAdvClick_Count(date_province_city_advid_count:DStream[(String,Long)],spark:SparkSession): Unit ={
            date_province_city_advid_count.transform( rdd =>{
            var date_province_advid_count=  rdd.map{
                case(date_province_city_advid,count) =>{
                  val fields = date_province_city_advid.split("_")
                  val date = fields(0)
                  val province = fields(1)
                  val advid = fields(3)


                  (date+"_"+province+"_"+advid,count)
                }
              }.reduceByKey(_+_)

             val rowRDD=date_province_advid_count.map( tuple =>{
                val fields = tuple._1.split("_")
                val date = fields(0)
                val provnice = fields(1)
                val advid = fields(2).toLong
                val count = tuple._2
                Row(date,provnice,advid,count)
              })

              val schema=StructType(
                StructField("date",StringType,true)::
                  StructField("province",StringType,true)::
                  StructField("advid",LongType,true)::
                  StructField("count",LongType,true):: Nil

              )

              val df = spark.createDataFrame(rowRDD,schema)

              df.createOrReplaceTempView("temp_date_province_adv_count")

              val sql=
                """
                   select
                        *
                   from
                   (
                   select
                        date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
                   from
                        temp_date_province_adv_count
                   ) temp
                   where temp.rank < 10

                """

              /**
                * 把结果持久化到数据库
                */
              spark.sql(sql)

             rdd

            })
          }
        }
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

/**
 * DateTime Utils
 *
 * Created by XuanYu on 2016/5/31.
 */
public class DateUtils {

    public static final SimpleDateFormat TIME_FORMAT =
            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static final SimpleDateFormat DATE_FORMAT =
            new SimpleDateFormat("yyyy-MM-dd");
    public static final SimpleDateFormat DATEKEY_FORMAT =
            new SimpleDateFormat("yyyyMMdd");

    /**
     * 判断一个时间是否在另一个时间之前
     * @param time1 第一个时间
     * @param time2 第二个时间
     * @return 判断结果
     */
    public static boolean before(String time1, String time2) {
        try {
            Date dateTime1 = TIME_FORMAT.parse(time1);
            Date dateTime2 = TIME_FORMAT.parse(time2);

            if(dateTime1.before(dateTime2)) {
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 判断一个时间是否在另一个时间之后
     * @param time1 第一个时间
     * @param time2 第二个时间
     * @return 判断结果
     */
    public static boolean after(String time1, String time2) {
        try {
            Date dateTime1 = TIME_FORMAT.parse(time1);
            Date dateTime2 = TIME_FORMAT.parse(time2);

            if(dateTime1.after(dateTime2)) {
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 计算时间差值(单位为秒)
     * @param time1 时间1
     * @param time2 时间2
     * @return 差值
     */
    public static int minus(String time1, String time2) {
        try {
            Date datetime1 = TIME_FORMAT.parse(time1);
            Date datetime2 = TIME_FORMAT.parse(time2);

            long millisecond = datetime1.getTime() - datetime2.getTime();

            return Integer.valueOf(String.valueOf(millisecond / 1000));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

    /**
     * 获取年月日和小时
     * @param datetime 时间(yyyy-MM-dd HH:mm:ss)
     * @return 结果(yyyy-MM-dd_HH)
     */
    public static String getDateHour(String datetime) {
        String date = datetime.split(" ")[0];
        String hourMinuteSecond = datetime.split(" ")[1];
        String hour = hourMinuteSecond.split(":")[0];
        return date + "_" + hour;
    }

    /**
     * 获取当天日期(yyyy-MM-dd)
     * @return 当天日期
     */
    public static String getTodayDate() {
        return DATE_FORMAT.format(new Date());
    }

    /**
     * 获取昨天的日期(yyyy-MM-dd)
     * @return 昨天的日期
     */
    public static String getYesterdayDate() {
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.add(Calendar.DAY_OF_YEAR, -1);

        Date date = cal.getTime();

        return DATE_FORMAT.format(date);
    }

    /**
     * 格式化日期(yyyy-MM-dd)
     * @param date Date对象
     * @return 格式化后的日期
     */
    public static String formatDate(Date date) {
        return DATE_FORMAT.format(date);
    }

    /**
     * 格式化时间(yyyy-MM-dd HH:mm:ss)
     * @param date Date对象
     * @return 格式化后的时间
     */
    public static String formatTime(Date date) {
        return TIME_FORMAT.format(date);
    }

    /**
     * 解析时间字符串
     * @param time 时间字符串
     * @return Date
     */
    public static Date parseTime(String time) {
        try {
            return TIME_FORMAT.parse(time);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 格式化日期key
     * @param date
     * @return
     */
    public static String formatDateKey(Date date) {
        return DATEKEY_FORMAT.format(date);
    }

    /**
     * 格式化日期key
     * @param datekey
     * @return
     */
    public static Date parseDateKey(String datekey) {
        try {
            return DATEKEY_FORMAT.parse(datekey);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 格式化时间,保留到分钟级别
     * yyyyMMddHHmm
     * @param date
     * @return
     */
    public static String formatTimeMinute(Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
        return sdf.format(date);
    }
 
}
import java.sql.{Connection, DriverManager}
object ConnectionPool {
  private val max=8 ;//连接池的连接总数
  private val connectionNum=10;//每次产生的连接数
  private var conNum=0;//当前连接池已经产生的连接数

  import java.util
  private val pool=new util.LinkedList[Connection]();//连接池

  {

    Class.forName("com.mysql.jdbc.Driver")
  }
  /**
   * 释放连接
   */
  def returnConnection(conn:Connection):Unit={
    pool.push(conn);
  }
  /**
   * 获取连接
   */
  def getConnection():Connection={
    //同步代码块
    AnyRef.synchronized({
      if(pool.isEmpty()){
        for( i <- 1 to connectionNum){
          val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/aura","root","root");
          pool.push(conn);
          conNum+1;
        }
      }

      pool.poll();
    })

  }
}







2019-06-29 17:45:37 dsdaasaaa 阅读数 2047

本节将介绍如何实际动手进行 RDD 的转换与操作,以及如何编写、编译、打包和运行 Spark 应用程序。

启动 Spark Shell

Spark 的交互式脚本是一种学习 API 的简单途径,也是分析数据集交互的有力工具。Spark 包含多种运行模式,可使用单机模式,也可以使用分布式模式。为简单起见,本节采用单机模式运行 Spark。

无论采用哪种模式,只要启动完成后,就初始化了一个 SparkContext 对象(SC),同时也创建了一个 SparkSQL 对象用于 SparkSQL 操作。进入 Scala 的交互界面中,就可以进行 RDD 的转换和行动操作。

进入目录 SPARK_HOME/bin 下,执行如下命令启动 Spark Shell。

$./spark-shell

Spark Shell 使用

假定本地文件系统中,文件 home/hadoop/SparkData/WordCount/text1 的内容如下。

hello world
hello My name is john I love Hadoop programming

下面我们基于该文件进行 Spark Shell 操作。

1)利用本地文件系统的一个文本文件创建一个新 RDD。

scala>var textFile = sc.textFile(“file://home/Hadoop/SparkData/WordCount/text1”);
textFile:org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
<console>:12

2)执行动作操作,计算文档中有多少行。

scala>textFile.count() //RDD中有多少行
17/05/17 22:59:07 INFO spark.SparkContext:Job finished:count at<console>:15, took 5.654325469 s
resl:Long = 2

返回结果表明文档中有“2”行。

3)执行动作操作,获取文档中的第一行内容。

scala>textFile.first() // RDD 第一行的内容
17/05/17 23:01:25 INFO spark.SparkContext:Job finished:first at <console>:15,took

返回结果表明文档的第一行内容是“hello world”。

4)转换操作会将一个 RDD 转换成一个新的 RDD。获取包含“hello”的行的代码如下

scala>var newRDD = textFile.filter (line => line.contains(“hello”)) //有多少行含有 hello
scala>newRDD.ount() // 有多少行含 hello
17/05/17 23:06:33 INFO spark.SparkContext:Job finished:count at <console>:15,took 0.867975549 s
res4:Long = 2

这段代码首先通过转换操作 filter 形成一个只包括含有“hello”的行的 RDD,然后再通过 count 计算有多少行。

5)Spark Shell 的 WordCount 实现

scala> val file = sc.textFile (“file://home/hendoop/SparkData/WordCount/text1”));
scala> val count = file.flatMap(line=>line.split(“”)).map(word => (word,1)).reduceByKey(_+_)
scala> count.collect()
17/05/17 23:11:46 INFO spark.SparkContext:Job finished: collect at<console>:17,
took 1.624248037 s
res5: Array[(String, Int)] = Array((hello,2),(world,1),(My,1),(is,1),(love,1),(I,1),(John,1),(hadoop,1),(name,1),(programming,1))

  1. 使用 sparkContext 类中的 textFile() 读取本地文件,并生成 MappedBJDD。
  2. 使用 flatMap() 方法将文件内容按照空格拆分单词,拆分形成 FlatMappedRDD。
  3. 使用 map(word=>(word,1)) 将拆分的单词形成 <单词,1> 数据对,此时生成 MappedBJDD。
  4. 使用 reduceByKey() 方法对单词的频度进行统计,由此生成 ShuffledRDD,并由 collect 运行作业得出结果。

编写Java应用程序

1. 安装 maven

手动安装 maven,可以访问 maven 官方下载 apache-maven-3.3.9-bin.zip。选择安装目录为 /usr/local/maven。

sudo unzip ~/下载/apache-maven-3.3.9-bin.zip -d/usr/local
cd /usr/local
sudo mv apache-maven-3.3.9/ ./maven
sudo chown -R hadoop ./maven

2. 编写 Java 应用程序代码

在终端执行以下命令创建一个文件夹 sparkapp2,作为应用程序根目录。

cd~#进入用户主文件夹
mkdir -p ./sparkapp2/src/main/java

使用 vim./sparkapp2/src/main/java/SimpleApp.java 建立一个名为 SimpleApp.java 的文件,代码如下。

  1. /*** SimpleApp.java ***/
  2. import org.apache.spark.api.java.*;
  3. import org.apache.spark.api.java.function.Function;
  4.  
  5. public class SimpleApp {
  6. public static void main(String[] args) {
  7. String logFile = “file:///usr/local/spark/README.md”; // Should be some file on your system
  8. JavaSparkContext sc = new JavaSparkContext(“local”, “Simple App”,
  9. “file:///usr/local/spark/”,new String[] {“target/simple-project-1.0.jar”});
  10.  
  11. JavaRDD<String> logData = sc.textFile(logFile).cache();
  12. long numAs = logData.filter(new Function<String, Boolean>(){
  13. public Boolean call(String s) {
  14. return s.contains (“a”);
  15. }
  16. }).count();
  17.  
  18. long numBs = logData.filter(new Function<String,Boolean>(){
  19. public Boolean call(String s) {
  20. return s.contains(“b”);
  21. }
  22. }).count();
  23. System.out.printIn (“Lines with a:”+ numAs +“,lines with b:”+ numBs);
  24. }
  25. }

该程序依赖 Spark Java API,因此我们需要通过 maven 进行编译打包。在 ./sparkapp2 中新建文件 pom.xml(vim./sparkapp2/pom.xml),并声明该独立应用程序的信息及与 Spark 的依赖关系,代码如下。

  1. <project>
  2. <groupld>edu.berkeley</groupId>
  3. <artifactId>simple-project</artifactId>
  4. <modelVersion>4.0.0</modelVersion>
  5. <name>Simple Project</name>
  6. <packaging>jar</packaging>
  7. <version>l.0</version>
  8. <repositories>
  9. <repository>
  10. <id>Akka repository</id>
  11. <url>http://repo.akka.io/releases</url>
  12. </repository>
  13. </repositories>
  14.  
  15. <dependencies>
  16. <dependency> <!–Spark dependency –>
  17. <groupId>org.apache.spark<groupId>
  18. <artifactId>spark-core_2.11</artifactId>
  19. <version>2.1.0</version>
  20. </dependency>
  21. </dependencies>
  22. </project>

3. 使用 maven 打包 Java 程序

为了保证 maven 能够正常运行,先执行以下命令检查整个应用程序的文件结构。

cd ~/sparkapp2
find

文件结构如图 1 所示。

SimpleApp.java的文件结构
图 1  SimpleApp.java的文件结构

接着,可以通过以下代码将这整个应用程序打包成 Jar。

/usr/local/maven/bin/mvn package

如果运行以上命令后出现类似下面的信息,说明 Jar 包生成成功。

[INFO] ———————————————
[INFO] BUILD SUCCESS
[INFO] ———————————————
[INFO] Total time: 6.583 s
[INFO] Finished at: 2017-02-19T15:52:08+08:00
[INFO] Final Memory: 15M/121M
[INFO]———————————————-

4. 通过 spark-submit 运行程序

最后,可以将生成的 Jar 包通过 spark-submit 提交到 Spark 中运行,命令如下。

/usr/local/spark/bin/spark-submit –class “SimpleApp” ~/sparkapp2/target/simple-project-1.0.jar

最后得到的结果如下。

Lines with a: 62,Lines with b: 30

推荐学习目录:

2017-01-11 23:31:51 duan_zhihua 阅读数 1992

《Spark商业案例与性能调优实战100课》第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析


package com.dt.spark.sparksql


import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Movie_Users_Analyzer_DateFrame {


  def main(args: Array[String]): Unit = {
    var masterUrl = "local[4]"
    var dataPath = "data/movielens/medium/"
    if (args.length > 0) {
      masterUrl = args(0)
    } else if (args.length > 1) {
      dataPath = args(1)
    }
    val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("Movie_Users_Analyzer_DateFrame")

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    import spark.implicits._
    // For implicit conversions like converting RDDs to DataFrames

    val sc = spark.sparkContext

    // val sc = new SparkContext(new SparkConf().setMaster(masterUrl).setAppName("Movie_Users_Analyzer"))
    val usersRDD = sc.textFile(dataPath + "users.dat")
    val moviessRDD = sc.textFile(dataPath + "movies.dat")
    val occupationsRDD = sc.textFile(dataPath + "occupation.dat")
    val ratingsRDD = sc.textFile(dataPath + "ratings.dat")
    //统计不同性别,不同年龄的电影观看人数

    val schemaforusers = StructType("userID::Gender::Age::Occupation::Zip-code".split("::")
      .map(column => StructField(column, StringType, true)))
    val userRDDRows = usersRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim,
      line(2).trim, line(3).trim, line(4).trim))

    val usersDataFrame = spark.createDataFrame(userRDDRows, schemaforusers)


    //ratings.dat  UserID::MovieID::Rating::Timestamp
    val schemaforrating = StructType("UserID::MovieID::Rating::Timestamp".split("::")
      .map(column => StructField(column, StringType, true)))
    val ratinsRDDRows = ratingsRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim,
      line(2).trim, line(3).trim))

    val ratinsDataFrame = spark.createDataFrame(ratinsRDDRows, schemaforrating)
    ratinsDataFrame.filter(s" MovieID = 1193 ")
      .join(usersDataFrame, "UserID")
      .select("Gender", "Age")
      .groupBy("Gender", "Age")
      .count()
      .show(10)


    while (true) {}
    sc.stop()

  }
}


17/01/11 23:28:14 INFO CodeGenerator: Code generated in 15.558235 ms
+------+---+-----+
|Gender|Age|count|
+------+---+-----+
|     F| 45|   55|
|     M| 50|  102|
|     M|  1|   26|
|     F| 56|   39|
|     F| 50|   43|
|     F| 18|   57|
|     F|  1|   10|
|     M| 18|  192|
|     F| 25|  140|
|     M| 45|  136|
+------+---+-----+
only showing top 10 rows