精华内容
下载资源
问答
  • Spark案例

    2021-09-22 16:24:54
    Spark官网的案例 简单示例: ​​​​​​Examples | Apache Spark 更多的例子: Basic Spark:Scala examples,Java examples,Python examples Spark Streaming:Scala examples,Java examples

    Apache Spark Examples

    Spark官网的案例

    代码维护在github

    简单示例:

    ​​​​​​ Examples | Apache Spark

    更多的例子:

    展开全文
  • spark案例

    2019-05-19 10:13:59
    想重温一下scala编写spark,知识嘛要温故知新,虽然现在写的比较少,但是平时花一小时复习复习,看看官网,对知识的巩固和深入有莫大的好处,于是乎小编就从网上搜了搜关于spark的一些入门案例,搜了半小时发现,...

      其实小编写这篇文章的目的就是,前两天突然对spark心血来潮,想重温一下scala编写spark,知识嘛要温故知新,虽然现在写的比较少,但是平时花一小时复习复习,看看官网,对知识的巩固和深入有莫大的好处,于是乎小编就从网上搜了搜关于spark的一些入门案例,搜了半小时发现,基本上都是Wordcount,或者一些简单的调用API,让小编实在有些无从下手,于是乎小编就突发奇想,不如自己写一写,把代码上传到gitlab上,一方面有助于自己以后的复习,一方面也可以给大家提供一个正面或者反面教材,岂不美哉,所以接下来所有的内容都是小编gitlab的下载地址,(注意:内容不重要,学到知识才重要,本人也没有细细打理gitlab分支的习惯,就简简单单的用master分支上传的),当然以后这篇文章,会随着案例的写出,小编会实时更新:
    案例一(spark-core、spark-sql、spark-streaming入门):
    #URL
    https:https://gitlab.com/ZZY478086819/zzy_spark_exec.git
    git:git@gitlab.com:ZZY478086819/zzy_spark_exec.git

    转载于:https://blog.51cto.com/14048416/2396853

    展开全文
  • spark案例与实验教程 , 比较易懂的书。 还不错 o
  • Spark案例库V1.0版

    千次阅读 2021-05-24 22:31:26
    Spark案例库 案例一:使用SparkRDD实现词频统计 pom.xml文件 <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url&...

    Spark案例库

    案例一:使用SparkRDD实现词频统计

    pom.xml文件

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    实现代码

    object SparkWordCount {
    	
    	def main(args: Array[String]): Unit = {
    		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
    		val sc: SparkContext = {
    			// 其一、构建SparkConf对象,设置应用名称和master
    			val sparkConf: SparkConf = new SparkConf()
        			.setAppName("SparkWordCount")
        			.setMaster("local[2]")
    			// 其二、创建SparkContext实例,传递sparkConf对象
    			new SparkContext(sparkConf)
    		}
    		
    		// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
    		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
    		
    		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    		val resultRDD: RDD[(String, Int)] = inputRDD
    			// 按照分隔符分割单词
    			.flatMap(line => line.split("\\s+"))
    			// 转换单词为二元组,表示每个单词出现一次
    			.map(word => word -> 1)
    			// 按照单词分组,对组内执进行聚合reduce操作,求和
    			.reduceByKey((tmp, item) => tmp + item)
    		// TODO: 第三步、将最终处理结果打印控制台
    		resultRDD.foreach(tuple => println(tuple))
    		// 应用结束,关闭资源
    		sc.stop()
    	}
    }
    

    案例二:WordCount程序,按照词频降序排序取Top3

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现

    object SparkTopKey {
    	
    	def main(args: Array[String]): Unit = {
    		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
    		val sc: SparkContext = {
    			// 其一、构建SparkConf对象,设置应用名称和master
    			val sparkConf: SparkConf = new SparkConf()
        			.setAppName("SparkWordCount")
        			.setMaster("local[2]")
    			// 其二、创建SparkContext实例,传递sparkConf对象
    			new SparkContext(sparkConf)
    		}
    		// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
    		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
    		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    		val resultRDD: RDD[(String, Int)] = inputRDD
    			// 按照分隔符分割单词
    			.flatMap(line => line.split("\\s+"))
    			// 转换单词为二元组,表示每个单词出现一次
    			.map(word => word -> 1)
    			// 按照单词分组,对组内执进行聚合reduce操作,求和
    			.reduceByKey((tmp, item) => tmp + item)
    		resultRDD
    			.sortBy(tuple => tuple._2, ascending = false)
    			// 打印结果
    			.take(3)
    			.foreach(tuple => println(tuple))
    		// 应用结束,关闭资源
    		sc.stop()
    	}
    	
    }
    

    案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现

    object _01SparkParallelizeTest {
    	
    	def main(args: Array[String]): Unit = {
    		
    		val sc: SparkContext = {
    			// sparkConf对象
    			val sparkConf = new SparkConf()
    				// _01SparkParallelizeTest$  ->(.stripSuffix("$"))   ->  _01SparkParallelizeTest
        			.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        			.setMaster("local[2]")
    			// sc 实例对象
    			SparkContext.getOrCreate(sparkConf)
    		}
    		// TODO: 1、Scala中集合Seq序列存储数据
    		val linesSeq: Seq[String] = Seq(
    			"hadoop scala hive spark scala sql sql", 
    			"hadoop scala spark hdfs hive spark", 
    			"spark hdfs spark hdfs scala hive spark"
    		)
    		// TODO: 2、并行化集合
    		val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
    		// TODO: 3、词频统计
    		val resultRDD = inputRDD
    			.flatMap(line => line.split("\\s+"))
    			.map(word => (word, 1))
    			.reduceByKey((tmp, item) => tmp + item)
    		// TODO: 4、输出结果
    		resultRDD.foreach(println)
    		// 应用结束,关闭资源
    		sc.stop()
    	}
    }
    

    案例四:采用wholeTextFiles()方法读取小文件

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现

    object _02SparkWholeTextFileTest {
    	
    	def main(args: Array[String]): Unit = {
    		val sc: SparkContext = {
    			// sparkConf对象
    			val sparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// sc 实例对象
    			SparkContext.getOrCreate(sparkConf)
    		}
    		
    		/*
    		  def wholeTextFiles(
    		      path: String,
    		      minPartitions: Int = defaultMinPartitions
    		  ): RDD[(String, String)]
    		  Key: 每个小文件名称路径
    		  Value:每个小文件的内容
    		 */
    		val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
    		
    		println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
    		
    		inputRDD.take(2).foreach(tuple => println(tuple))
    		
    		// 应用结束,关闭资源
    		sc.stop()
    		
    	}
    }	
    

    案例五:RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现

    object _05SparkCacheTest {
    	
    	def main(args: Array[String]): Unit = {
    		// 创建应用程序入口SparkContext实例对象
    		val sc: SparkContext = {
    			// 1.a 创建SparkConf对象,设置应用的配置信息
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// 1.b 传递SparkConf对象,构建Context实例
    			new SparkContext(sparkConf)
    		}
    		// 读取文本文件数据
    		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
    		// 缓存数据: 将数据缓存至内存
    		inputRDD.persist()
    		// 使用Action函数触发缓存
    		inputRDD.count()
    		// 释放缓存
    		inputRDD.unpersist()
    		//缓存数据:选择缓存级别
    		inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
    		// 应用程序运行结束,关闭资源
    		sc.stop()
    	}
    }
    

    案例六:RDD数据Checkpoint设置案例

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现

    object _06SparkCkptTest {
    	
    	def main(args: Array[String]): Unit = {
    		// 创建应用程序入口SparkContext实例对象
    		val sc: SparkContext = {
    			// 1.a 创建SparkConf对象,设置应用的配置信息
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// 1.b 传递SparkConf对象,构建Context实例
    			new SparkContext(sparkConf)
    		}
    		
    		// TODO: 设置检查点目录,将RDD数据保存到那个目录
    		sc.setCheckpointDir("datas/ckpt/")
    		
    		// 读取文件数据
    		val datasRDD = sc.textFile("datas/wordcount.data")
    		
    		// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
    		datasRDD.checkpoint()
    		datasRDD.count()
    
    		// TODO: 再次执行count函数, 此时从checkpoint读取数据
    		println(datasRDD.count())
    
    		// 应用程序运行结束,关闭资源
    		sc.stop()
    	}
    }
    

    案例七:广播变量和累加器案例

    基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数

    -a. 过滤标点符号数据
    使用广播变量
    -b. 统计出标点符号数据出现次数
    使用累加器

    代码实现

    object _05SparkSharedVariableTest {
    	
    	def main(args: Array[String]): Unit = {
    		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    		val sc: SparkContext = {
    			// 创建SparkConf对象,设置应用相关信息,比如名称和master
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// 构建SparkContext实例对象,传递SparkConf
    			new SparkContext(sparkConf)
    		}
    		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    		val inputRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2)
    		
    		// TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
    		val list: List[String] = List(",", ".", "!", "#", "$", "%")
    		// TODO: 将字典数据进行广播变量
    		val broadcastList: Broadcast[List[String]] = sc.broadcast(list)
    		
    		// TODO: 定义计数器
    		val accumulator: LongAccumulator = sc.longAccumulator("number_accu")
    		
    		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    		val resultRDD: RDD[(String, Int)] = inputRDD
    			// 过滤空行数据
    			.filter(line => null != line && line.trim.length > 0)
    			// 分割为单词
    			.flatMap(line => line.trim.split("\\s+"))
    			// TODO: 过滤非单词字符
                .filter{word =>
    			    // 获取广播变量的值
    	            val wordsList: List[String] = broadcastList.value
    	            // 判断每个单词是否时非单词字符
    	            val flag: Boolean = wordsList.contains(word)
    	            if(flag){
    		            // 如果是非单词字符,累加器加1
    		            accumulator.add(1L)
    	            }
    	            // 返回
    	            ! flag
    		    }
    			// 按照单词分组,进行聚合操作
                .map(word => (word, 1))
                .reduceByKey(_ + _)
    		// 4. 第三步、将最终处理结果RDD保存到HDFS或打印控制台
    		resultRDD.foreach(println)
    		// 可以累加器的值,必须使用RDD Action函数进行触发
    		println("Accumulator: " + accumulator.value)
    		// 5. 当应用运行结束以后,关闭资源
    		sc.stop()
    	}
    	
    }
    

    案例八:将RDD数据保存至MySQL表中一般模式

    		a. 对结果数据降低分区数目
    		b. 针对每个分区数据进行操作
    			每个分区数据插入数据库时,创建一个连接Connection
    

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
        <hbase.version>1.2.0-cdh5.16.2</hbase.version>
        <mysql.version>8.0.19</mysql.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    
        <!-- HBase Client 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
    
        <!-- MySQL Client 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.7</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现:

    object _04SparkWriteMySQL {
    	
    	def main(args: Array[String]): Unit = {
    		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    		val sc: SparkContext = {
    			// 创建SparkConf对象,设置应用相关信息,比如名称和master
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// 构建SparkContext实例对象,传递SparkConf
    			new SparkContext(sparkConf)
    		}
    		
    		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
    		
    		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    		val resultRDD: RDD[(String, Int)] = inputRDD
    			// TODO: 过滤
    			.filter(line => null != line && line.trim.length > 0 )
    			// a. 对每行数据按照分割符分割
    			.flatMap(line => line.trim.split("\\s+"))
    			// b. 将每个单词转换为二元组,表示出现一次
    			.map(word => (word ,1))
    			.reduceByKey((temp, item) => temp + item)
    		// TODO: 将结果数据resultRDD保存至MySQL表中
    		resultRDD
    			// 降低RDD分区数目
    			.coalesce(1)
    			.foreachPartition{iter =>
    				// val xx: Iterator[(String, Int)] = iter
    				// 直接调用保存分区数据到MySQL表的方法
    				saveToMySQL(iter)
    			}
    		// 5. 当应用运行结束以后,关闭资源
    		sc.stop()
    	}
    	/**
    	 * 定义一个方法,将RDD中分区数据保存至MySQL表
    	 */
    	def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
    		// step1. 加载驱动类
    		Class.forName("com.mysql.cj.jdbc.Driver")
    		
    		// 声明变量
    		var conn: Connection = null
    		var pstmt: PreparedStatement = null
    		
    		try{
    			// step2. 创建连接
    			conn = DriverManager.getConnection(
    				"jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
    				"root",
    				"123456"
    			)
    			pstmt = conn.prepareStatement("INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
    			// step3. 插入数据
    			iter.foreach{case (word, count) =>
    				pstmt.setString(1, word)
    				pstmt.setInt(2, count)
    				pstmt.execute()
    			}
    		}catch {
    			case e: Exception => e.printStackTrace()
    		}finally {
    			// step4. 关闭连接
    			if(null != pstmt) pstmt.close()
    			if(null != conn) conn.close()
    		}
    	}
    }
    

    案例九:将RDD数据保存至MySQL表中高级模式

    要求:a. 对结果数据降低分区数目
    b. 针对每个分区数据进行操作
    每个分区数据插入数据库时,创建一个连接Connection
    c. 批次插入每个分区数据
    addBatch
    executeBatch
    d. 事务性
    手动提交事务,并且还原原来事务
    e. 考虑主键存在时,如何保存数据数据
    存在,更新数据;不存在,插入数据

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
        <hbase.version>1.2.0-cdh5.16.2</hbase.version>
        <mysql.version>8.0.19</mysql.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    
        <!-- HBase Client 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
    
        <!-- MySQL Client 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.7</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现:

    object _04SparkWriteMySQLV3 {
    	
    	def main(args: Array[String]): Unit = {
    		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    		val sc: SparkContext = {
    			// 创建SparkConf对象,设置应用相关信息,比如名称和master
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// 构建SparkContext实例对象,传递SparkConf
    			new SparkContext(sparkConf)
    		}
    		
    		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
    		
    		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    		val resultRDD: RDD[(String, Int)] = inputRDD
    			// TODO: 过滤
    			.filter(line => null != line && line.trim.length > 0 )
    			// a. 对每行数据按照分割符分割
    			.flatMap(line => line.trim.split("\\s+"))
    			// b. 将每个单词转换为二元组,表示出现一次
    			.map(word => (word ,1))
    			.reduceByKey((temp, item) => temp + item)
    		
    		// TODO: 将结果数据resultRDD保存至MySQL表中
    		resultRDD.coalesce(1).foreachPartition(saveToMySQL)
    		// 4. 当应用运行结束以后,关闭资源
    		sc.stop()
    	}
    	
    	/**
    	 * 定义一个方法,将RDD中分区数据保存至MySQL表
    	 */
    	def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
    		// step1. 加载驱动类
    		Class.forName("com.mysql.cj.jdbc.Driver")
    		
    		// 声明变量
    		var conn: Connection = null
    		var pstmt: PreparedStatement = null
    		
    		try{
    			// step2. 创建连接
    			conn = DriverManager.getConnection(
    				"jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
    				"root",
    				"123456"
    			)
    			pstmt = conn.prepareStatement("replace INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
    			
    			// TODO: 考虑事务性,一个分区数据要全部保存,要不都不保存
    			val autoCommit: Boolean = conn.getAutoCommit // 获取数据库默认事务提交方式
    			conn.setAutoCommit(false)
    			// step3. 插入数据
    			iter.foreach{case (word, count) =>
    				pstmt.setString(1, word)
    				pstmt.setInt(2, count)
    				// TODO: 加入一个批次中
    				pstmt.addBatch()
    			}
    			// TODO:批量执行批次
    			pstmt.executeBatch()
    			conn.commit() // 手动提交事务,进行批量插入
    			// 还原数据库原来事务
    			conn.setAutoCommit(autoCommit)
    		}catch {
    			case e: Exception => e.printStackTrace()
    		}finally {
    			// step4. 关闭连接
    			if(null != pstmt) pstmt.close()
    			if(null != conn) conn.close()
    		}
    	}
    	
    }
    

    案例十:从HBase 表中读取数据,封装到RDD数据集

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
        <hbase.version>1.2.0-cdh5.16.2</hbase.version>
        <mysql.version>8.0.19</mysql.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    
        <!-- HBase Client 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
    
        <!-- MySQL Client 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.7</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现:

    object _03SparkReadHBase {
    	
    	def main(args: Array[String]): Unit = {
    		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    		val sc: SparkContext = {
    			// 创建SparkConf对象,设置应用相关信息,比如名称和master
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    				// TODO: 设置使用Kryo 序列化方式
    				.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    				// TODO: 注册序列化的数据类型
    				.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
    			// 构建SparkContext实例对象,传递SparkConf
    			new SparkContext(sparkConf)
    		}
    		
    		// TODO: 从HBase表读取数据,调用RDD方法:newAPIHadoopRDD
    		val conf: Configuration = HBaseConfiguration.create()
    		// 设置连接Zookeeper属性
    		conf.set("hbase.zookeeper.quorum", "node1")
    		conf.set("hbase.zookeeper.property.clientPort", "2181")
    		conf.set("zookeeper.znode.parent", "/hbase")
    		// 设置将数据保存的HBase表的名称
    		conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
    		val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
    			conf,
    			classOf[TableInputFormat],
    			classOf[ImmutableBytesWritable],
    			classOf[Result]
    		)
    		// 打印HBase表样本数据
    		hbaseRDD
    			.take(6)
    			.foreach{case (rowKey, result) =>
    				result.rawCells().foreach{cell =>
    					println(s"RowKey = ${Bytes.toString(result.getRow)}")
    					println(s"\t${Bytes.toString(CellUtil.cloneFamily(cell))}:" +
    						s"${Bytes.toString(CellUtil.cloneQualifier(cell))} = " +
    						s"${Bytes.toString(CellUtil.cloneValue(cell))}")
    				}
    			}
    		// 5. 当应用运行结束以后,关闭资源
    		sc.stop()
    	}
    	
    }
    

    案例十一:将RDD数据保存至HBase表中

    pom.xml

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
        <hbase.version>1.2.0-cdh5.16.2</hbase.version>
        <mysql.version>8.0.19</mysql.version>
    </properties>
    
    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    
        <!-- HBase Client 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
    
        <!-- MySQL Client 依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.7</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    代码实现:

    object _02SparkWriteHBase {
    	
    	def main(args: Array[String]): Unit = {
    		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
    		val sc: SparkContext = {
    			// 创建SparkConf对象,设置应用相关信息,比如名称和master
    			val sparkConf: SparkConf = new SparkConf()
    				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    				.setMaster("local[2]")
    			// 构建SparkContext实例对象,传递SparkConf
    			new SparkContext(sparkConf)
    		}
    		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
    		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
    		
    		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
    		val resultRDD: RDD[(String, Int)] = inputRDD
    			// 过滤
    			.filter(line => null != line && line.trim.length > 0 )
    			// a. 对每行数据按照分割符分割
    			.flatMap(line => line.trim.split("\\s+"))
    			// b. 将每个单词转换为二元组,表示出现一次
    			.map(word => (word ,1))
    			.reduceByKey((temp, item) => temp + item)
    		// TODO: step 1. 转换RDD为RDD[(RowKey, Put)]
    		/*
    			* HBase表的设计:
    				* 表的名称:htb_wordcount
    				* Rowkey: word
    				* 列簇: info
    				* 字段名称: count
    			create 'htb_wordcount', 'info'
    		 */
    		val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map{case (word, count) =>
    			// 其一、构建RowKey对象
    			val rowKey: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(word))
    			// 其二、构建Put对象
    			val put: Put = new Put(rowKey.get())
    			// 设置字段的值
    			put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count + ""))
    			// 其三、返回二元组(RowKey, Put)
    			rowKey -> put
    		}
    		
    		// TODO: step2. 调用RDD中saveAsNewAPIHadoopFile保存数据
    		val conf: Configuration = HBaseConfiguration.create()
    		// 设置连接Zookeeper属性
    		conf.set("hbase.zookeeper.quorum", "node1")
    		conf.set("hbase.zookeeper.property.clientPort", "2181")
    		conf.set("zookeeper.znode.parent", "/hbase")
    		// 设置将数据保存的HBase表的名称
    		conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
    		putsRDD.saveAsNewAPIHadoopFile(
    			"datas/hbase/htb_wordcount/",
    			classOf[ImmutableBytesWritable],
    			classOf[Put],
    			classOf[TableOutputFormat[ImmutableBytesWritable]],
    			conf
    		)
    		// 5. 当应用运行结束以后,关闭资源
    		sc.stop()
    	}
    }
    
    展开全文
  • spark案例与实验教程-高清-2017年4月,分享给所有需要的人!
  • Spark案例实战之二

    千次阅读 2018-07-25 10:26:16
    Spark案例实战之二 0.如果打开的是本地文件,则是三个’/’,如file:///usr/local/spark/mycode/wordcount/word.txt 01.reduceByKey((a,b)=&gt; a+b) 把具有相同键的map的value加起来 02,如果是集群环境下,...

    Spark案例实战之二

    0.如果打开的是本地文件,则是三个’/’,如file:///usr/local/spark/mycode/wordcount/word.txt
    01.reduceByKey((a,b)=> a+b) 把具有相同键的map的value加起来
    02,如果是集群环境下,想在driver节点上打印所有结果,就需要使用collect方法,

    1.pair RDD的创建方式
    01,读取文件
    02,通过并行集合(数组)创建RDD
    val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)
    map(word=>(word,1))使用map进行函数操作。形成键值对
    val pariRDD = lines.flatMap(line => line.split(” “)).map(word=>(word,1))
    pairRDD.foreach(println) 竟然可以将println作为参数

    2.pairRDD的通用操作
    01,reduceByKey(func)的功能:使用func函数合并具有相同键的值。默认是从左到右,依次累加。【一定要注意:是合并具有相同键的值】
    (hadoop,1)
    (spark,1)
    (hive,1)
    (spark,1)
    上面具有相同键的值就是spark,spark.
    02,系统可以自动推断出来是参数的类型,所有我们在传入参数的时候,就不用写参数的类型了。
    reduceByKey((a,b0=>a+b).foreach(println)

    03.reduceByKey会进行一个Merge操作
    groupBykey 本身不能自定义函数,需要先使用groupBykey之后,再使用map()操作
    【上述的代码必须在Spark中使用,而不能尽在scala的环境中使用】

    val words = Array(“spark”,”hadoop”,”scala”,”Java”,”spark”)
    val result1 = sc.parallelize(words).map(word=>(word,1))
    val result2 = result1.reduceByKey(+)
    val result3 = result2.groupByKey().map(t=>(t._1,t._2.sum)) 列表是一个Iterator类型,所以可以使用sum进行求和

    keys,values,soryByKey

    调用sortByKey的前提是:这个东西必须是键值对

    val d1 = sc.parallelize(Array((“c”,8),(“e”,8),(“d”,3),(“a”,6),(“c”,2))
    d1.reduceByKey(+).sortByKey(flase).collect//必须通过collect操作,才能将所有结果收集到Driver所在的节点上

    d1.reduceByKey(+).sortBy(_._2,false).collect

    sc.parallelize是什么意思?

    val result1 = rdd.groupByKey()
    //CompactBuffer是什么?

    1.hdfs 上传文件的时候,如果里面有多个文件,则需要使用-p参数,否则不能将文件夹中的文件全部上传到其中。
    2.hdfs dfs -put /root/donation/block_*/block* /linkage
    这里的block_*/block_* 采用了两次的*通配符,简直是太厉害!
    3.从hdfs上读入文件:
    scala> val rawblocks = sc.textFile(“hdfs://localhost:9000/linkage/block_1.csv”)
    rawblocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/linkage/block_1.csv MapPartitionsRDD[7] at textFile at :24
    4.取rdd的第一行
    scala> rawblocks.first
    res3: String = “id_1”,”id_2”,”cmp_fname_c1”,”cmp_fname_c2”,”cmp_lname_c1”,”cmp_lname_c2”,”cmp_sex”,”cmp_bd”,”cmp_bm”,”cmp_by”,”cmp_plz”,”is_match”

    2,groupByKey()
    对具有相同key的value进行分组,生成的是列表,但不会将相同key的value累加
    (hadoop,1)
    (spark,1)
    (hive,1)
    (spark,1)
    得到的结果就是:(spark,(1,2)) (hadoop,1),(hive,1)
    [(String,Iterable[Int])] Iterable是一个列表
    每次观察返回的RDD是什么类型
    可以使用Iterator将列表中的值取出来。

    展开全文
  • Spark案例实战之三

    2018-07-27 22:03:44
    Spark案例实战之三 一.简易日志分析 1.现有如下记录的日志,欲把每种状态提取并计数,然后从低到高排数。 INFO This is a message with content INFO This is some other content INFO Here are more messages ...
  • Spark案例:Scala版统计单词个数

    千次阅读 2018-02-17 06:48:37
    Spark案例:Scala版统计单词个数 利用Spark的RDD可以对本机或集群上的文件进行词频统计。 1、创建Scala项目SparkScalaWordCount 2、创建lib目录,添加spark的jar,并添加作为项目的库 3、在项目根目录...
  • Spark案例实战之四

    2018-07-27 22:04:17
    Spark案例实战之四 一.微博专栏分析 1.需求:有一个微博网站,下面有很多栏目,每个栏目下面都有几千万用户,每个用户会有很多的粉丝,要求取出各栏目粉丝量最多的用户TopN。【可用TreeMap实现,专栏:feature, ...
  • spark案例之------------------------------------高铁需求,恰同学少年,风华正茂,挥斥方遒
  • Oozie调用spark案例——Oozie4.3.1 oozie调用hive准备三个文件 job.properties、testWordCount.sh、workflow.xml 1.job.properties # 当你配置了dfs高可用,fs.defaultFS参数对应的名字, # 否者写hdfs://deptest1:...
  • Spark案例:Python版统计单词个数

    千次阅读 2018-02-17 07:25:12
    Spark案例:Python版统计单词个数 1、Python项目PythonSparkWordCount 2、input目录里的文本文件test.txt 3、创建word_count.py文件实现词频统计 import os import shutil from pyspark import ...
  • spark 案例实战掌握 SQL

    2015-10-27 14:45:43
    spark sql study, good good study and day day up!
  • Spark案例:Java版统计单词个数

    千次阅读 2018-02-17 07:07:42
    Spark案例:Java版统计单词个数 1、Maven项目JavaSparkWordCount 2、在pom.xml里,添加对spark的依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" ...
  • 企业Spark案例–酒店数据分析实战 数据清洗–过滤字段长度不足的且将出生日期转换成指定格式 数据分析–通过入住时间和入住总时长计算用户离开时间 数据分析–酒店被入住次数最多的3家和他们的平 数据分析–每个用户...
  • 工作过程:今天打算使用spark 自带的案例sparkpi 对集群进行测试,主要向了解集群启动过程及机器的负载情况。没想到问题还还真不少,感谢群友,特别是hali 支持。 主要的问题有3个: 1.测试spark 集群与local ...
  • Spark案例实战之一

    千次阅读 2018-04-11 20:30:06
    一.计算最受欢迎的老师 1.项目需求:现有某网络上的访问日志,现需要计算某一学科下被访问次数最多的老师。 ... 3.代码如下: ...import java.net.URL ...import org.apache.spark.rdd.RDD import org...
  • 推荐系统从入门到 Spark 案例实践

    千次阅读 2018-11-26 13:17:09
    整个课程内容都在围绕如何构建以及认识推荐系统这么一个常见的产品形态为准,期间还会结合 Spark 工程案例进行深入讲解,理论与实践结合,帮助大家快速提升。 作者介绍 黄崇远,毕业于哈工大,6 年多大数据以及...
  • IntelliJ IDEA开发Spark案例之WordCount

    千次阅读 2019-03-28 15:37:41
    完整代码: package com.shaonaiyi import org.apache.spark.{SparkConf, SparkContext} /** * @Auther: 邵奈一 * @Date: 2019/03/28 下午 3:16 * @Description: IntelliJ IDEA开发Spark案例之WordCount */ object...
  • 用idea编写Spark程序 创建RDD,然后对RDD进行操作(调用RDD的方法,方法分为两类,一类叫Transformation(懒,lazy),一类叫Action(执行程序)) RDD上的方法和Scala原生的方法是有区别的 写好程序,打包上集群运行...
  • 页面的统计不需要考虑用户,直接统计
  • 史上最简单的spark教程,java编写spark程序

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 28,391
精华内容 11,356
关键字:

spark案例