精华内容
下载资源
问答
  • xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">4.0.0...

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    4.0.0

    com.baidukt

    spark

    1.0-SNAPSHOT

    jar

    1.8

    1.8

    2.11.12

    2.3.1

    2.7.6

    2.0.1

    2.0.1

    UTF-8

    org.scala-lang

    scala-library

    ${scala.version}

    org.apache.spark

    spark-core_2.11

    ${spark.version}

    org.apache.spark

    spark-sql_2.11

    ${spark.version}

    org.apache.spark

    spark-graphx_2.11

    ${spark.version}

    org.apache.spark

    spark-streaming_2.11

    ${spark.version}

    org.apache.spark

    spark-streaming-kafka-0-10_2.11

    ${spark.version}

    org.apache.kafka

    kafka-clients

    ${kafka-clients.version}

    org.apache.kafka

    kafka-streams

    ${kafka-streams.version}

    org.apache.spark

    spark-hive_2.11

    ${spark.version}

    org.apache.hbase

    hbase-common

    2.0.1

    org.apache.hbase

    hbase-client

    2.0.1

    org.apache.hbase

    hbase-server

    1.2.3

    org.apache.hbase

    hbase

    2.0.1

    pom

    org.apache.hadoop

    hadoop-client

    ${hadoop.version}

    org.apache.hadoop

    hadoop-hdfs

    ${hadoop.version}

    org.apache.zookeeper

    zookeeper

    3.4.10

    redis.clients

    jedis

    2.9.0

    log4j

    log4j

    1.2.17

    com.typesafe

    config

    1.3.0

    commons-httpclient

    commons-httpclient

    3.1

    com.alibaba

    fastjson

    1.2.35

    ch.hsr

    geohash

    1.3.0

    mysql

    mysql-connector-java

    5.1.47

    org.apache.maven.plugins

    maven-compiler-plugin

    3.8.0

    1.8

    1.8

    org.apache.maven.plugins

    maven-compiler-plugin

    3.8.0

    compile

    compile

    org.apache.maven.plugins

    maven-shade-plugin

    3.2.1

    package

    shade

    *:*

    META-INF/*.SF

    META-INF/*.DSA

    META-INF/*.RSA

    展开全文
  • <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XML...
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.baidukt</groupId>
        <artifactId>spark</artifactId>
        <version>1.0-SNAPSHOT</version>
    
       <packaging>jar</packaging>
    
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <scala.version>2.11.12</scala.version>
            <spark.version>2.3.1</spark.version>
            <hadoop.version>2.7.6</hadoop.version>
            <kafka-clients.version>2.0.1</kafka-clients.version>
            <kafka-streams.version>2.0.1</kafka-streams.version>
            <encoding>UTF-8</encoding>
        </properties>
    
        <dependencies>
            <!-- 导入scala的依赖 -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <!-- 导入spark core的依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- 导入spark sql的依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- 导入spark graphx的依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <!-- spark Streaming 的依赖-->
            <!--start-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--spark 跟kafka 整合的依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--kafka客户端依赖-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka-clients.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>${kafka-streams.version}</version>
            </dependency>
            <!--end-->
    
            <!--spark整合hive-->
            <!--start-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>2.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.2.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase</artifactId>
                <version>2.0.1</version>
                <type>pom</type>
            </dependency>
            <!--end-->
    
            <!--hadoop相关依赖-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <!-- 导入zookeeper依赖 -->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.10</version>
            </dependency>
            <!-- 导入redis客户端jedis依赖 -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>com.typesafe</groupId>
                <artifactId>config</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                <groupId>commons-httpclient</groupId>
                <artifactId>commons-httpclient</artifactId>
                <version>3.1</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.35</version>
            </dependency>
            <dependency>
                <groupId>ch.hsr</groupId>
                <artifactId>geohash</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.47</version>
            </dependency>
        </dependencies>
    
        <build>
            <pluginManagement>
                <plugins>
                    <!--  编译scala的插件  -->
                    <!--<plugin>-->
                        <!--<groupId>net.alchim31.maven</groupId>-->
                        <!--<artifactId>scala-maven-plugin</artifactId>-->
                        <!--<version>3.2.2</version>-->
                    <!--</plugin>-->
                    <!--  编译java的插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.8.0</version>
                        <!-- 所有的编译都依照JDK1.8来搞 -->
                        <configuration>
                            <source>1.8</source>
                            <target>1.8</target>
                        </configuration>
                    </plugin>
                </plugins>
            </pluginManagement>
    
            <plugins>
                <!--<plugin>-->
                    <!--<groupId>net.alchim31.maven</groupId>-->
                    <!--<artifactId>scala-maven-plugin</artifactId>-->
                    <!--<version>3.2.2</version>-->
                    <!--<executions>-->
                        <!--<execution>-->
                            <!--<id>scala-compile-first</id>-->
                            <!--<phase>process-resources</phase>-->
                            <!--<goals>-->
                                <!--<goal>add-source</goal>-->
                                <!--<goal>compile</goal>-->
                            <!--</goals>-->
                        <!--</execution>-->
                        <!--<execution>-->
                            <!--<id>scala-test-compile</id>-->
                            <!--<phase>process-test-resources</phase>-->
                            <!--<goals>-->
                                <!--<goal>testCompile</goal>-->
                            <!--</goals>-->
                        <!--</execution>-->
                    <!--</executions>-->
                <!--</plugin>-->
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                  <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>

     

    转载于:https://my.oschina.net/u/3962987/blog/3098498

    展开全文
  • 本站已不在更新,由于选的spark版本为1.6,是老版本 SPARK 2.4.0 学习笔记分享(持续更新) : https://github.com/opensourceteams/spark-scala-maven-2.4.0 更多资源 微信(技术交流) : thinktothings SPARK 源码...
  • 介绍 最近看了不少delta介绍,最大优点在于使用一个技术即可实现数据...1、在spark社区(http://spark.apache.org)下载spark2.4.2以上pre压缩包,本次下载spark-2.4.4-bin-hadoop2.6.tgz(pre-build for ...

    介绍

    最近看了不少delta的介绍,最大的优点在于使用一个技术即可实现数据实时写入和实时读取的,处理一般的实时流处理业务场景和数据仓库CURD场景。下面简单介绍一下使用过程。

    1、在spark社区(http://spark.apache.org)下载spark2.4.2以上的pre压缩包,本次下载的是 spark-2.4.4-bin-hadoop2.6.tgz(pre-build for apache hadoop2.6)

    在conf路径下将spark-env.sh.tempsparp-default.conf.temp中的.temp后缀去掉。

    在spark-env.sh加入自定义JDK为JAVA1.8:

    export JAVA_HOME=/usr/java/jdk1.8.0_77/

    2、在idea工程中加入maven依赖。

    <dependency>
    <groupId>io.delta</groupId>
    <artifactId>delta-core_2.11</artifactId>
    <version>0.5.0</version>
    </dependency>

    3、下载delta的jar包,在maven库的路径下找到 delta的包io/delta/delta-core_2.11 0.5.0 ,然后上传到集群主机上。

    如果没有安装maven,需要创建一个maven的仓库目录

    mkdir -p /home/cdhadmin/.m2/repository/io/delta/delta-core_2.11/0.5.0/

    将delta的jar包放进去。

    cp ../jars/delta-core_2.11-0.5.0.jar /home/cdhadmin/.m2/repository/io/delta/delta-core_2.11/0.5.0/

    4、启动spark-shell。

     ./spark-shell  --packages   io.delta:delta-core_2.11:0.5.0

    5、官方的例子:

    1)、写入数据:

    val data = spark.range(0, 5)
    data.write.format("delta").save("/tmp/delta-table")

    hadoop dfs -ls /tmp/delta-table,发现数据保存在hdfs,格式为parquet。

    e07d48cc66ef4fcf44952b337d68820e.png

    2)、读取数据:

    val df = spark.read.format("delta").load("/tmp/delta-table")
    df.show()

    2af2eb2c9b30239f83473aa2c75f912e.png

    3)、数据覆盖:

    val data = spark.range(5,10)data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
    df.show()

    4)、再次读取数据:发现数据已经更新为5-10,查看本地磁盘发现写入了很多新的文件,每次写入的文件不会合并。

    val df = spark.read.format("delta").load("/tmp/delta-table")df.show()

    5c9b8fc56d7d31392477749c4445e0ed.png

    5)、delta支持读取之前每个版本写入的数据,读取第一次写入的数据:

    val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
    df.show()

    2af2eb2c9b30239f83473aa2c75f912e.png

    6)、将数据写入流表,持续写入数据:

    val streamingDf = spark.readStream.format("rate").load()
    val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

    7)、新启动spark-shell,持续读取数据:

    val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

    1112d6d7f32a02647d251f84fbbe5a87.png

    总结:delta本质上是一种数据处理方式,parquet+元数据,底层使用的技术的都是现有的成熟技术,巧妙的在于如何组织数据达到数据实时写入和实时读取的目的。

    delta的官方链接地址:

    Quickstart - Delta Lake Documentationdocs.delta.io
    展开全文
  • spark会从数据存储系统如HDFS读取数据,生成RDD,RDD进行数据分区交由各个节点进行处理,每个节点处理完数据后结果存储在内存,同时可将该结果交由下一个节点继续处理.最后将最终结果写回HDFS/Mysq...

    94fff46911f980afb4d8721b678d2023.png

    转载请私信。禁止无授权转载

    1.spark安装

    利用ambari安装整个集群 参考Ambari及其HDP集群安装及其配置教程

    2.Spark工作原理

    04874ddd02518fea1168a7cba602dcb7.png
    spark工作流程

    提交任务到spark集群后,spark会从数据存储系统如HDFS读取数据,生成RDD,RDD进行数据分区交由各个节点进行处理,每个节点处理完数据后的结果存储在内存中,同时可将该结果交由下一个节点继续处理.最后将最终结果写回HDFS/Mysql/Hbase等数据存储系统中.

    RDD:弹性分布式数据集(Resilient Distributed Dataset)
    一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算.
    RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,转换操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)

    RDD典型的执行过程如下:

    1.RDD读入外部数据源(或者内存中的集合)进行创建;
    2.RDD经过一系列的“转换”操作,每一次都会产生不同的RDD,给下一个“转换”使用;
    3.最后一个RDD经“行动”操作进行处理,并输出到外部数据源

    2e1279a3cba9c00c70b5101970fca9c4.png

    RDD采用了惰性调用,即在RDD的执行过程中(如图2.2所示),真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算.

    2e0500a58ec34ef76ec8305a2dd5dfe0.png
    图2.2

    RDD特征:

    • 高容错性
    • 中间结果持久化到内存
    • 存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化开销

    3.spark-hello world

    本地模式(Java)
    • 创建maven项目spark-note,添加Spark依赖,新建testSpark.java类

    d84013a01d94f05ea168e21330330c55.png
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.jp</groupId>
      <artifactId>spark-note</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>spark-note</name>
      <url>Welcome to Apache Maven</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.2.0</spark.version>
      </properties>
    
      <dependencies>
        <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
        </dependency>
        <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
         </dependency>
      </dependencies>
    </project>
    • 创建words.txt文件
    hello java
    hello scala
    hello python
    hi C++
    hi android
    • 编写代码
    package com.jp.spark;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    
    /**
     * Hello world!
     *
     */
    public class TestSpark {
        public static void main( String[] args ){
    	//设置配置信息
    	SparkConf conf = new SparkConf()
    	                 .setAppName("WordCount")
    	                 .setMaster("local");//本地运行
    
    	//创建JavaSparkContext对象-spark功能入口
    	JavaSparkContext sc = new JavaSparkContext(conf);
    
    	//针对输入源创建初始RDD
    	JavaRDD<String> lines = sc.textFile("D:words.txt");//读取文件
    	
    	//==============计算操作================//
    	//flatMap算子-拆分操作
    	JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    		private static final long serialVersionUID = 1L;
    		public Iterator<String> call(String line) throws Exception {
    			return Arrays.asList(line.split(" ")).iterator();
    		}
    	});
    
    	//每个单词映射为(单词,1)
    	JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    		private static final long serialVersionUID = 1L;
    		public Tuple2<String, Integer> call(String word) throws Exception {
    			return new Tuple2<String, Integer>(word,1);
    		}
    	});
    	
    	//根据Key统计单词出现的次数
    	JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    		private static final long serialVersionUID = 1L;
    		public Integer call(Integer v1, Integer v2) throws Exception {
    			return v1+v2;
    		}
    	});
    
    	//action操作 触发计算
    	wordsCount.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    		private static final long serialVersionUID = 1L;
    		public void call(Tuple2<String, Integer> wordCount) throws Exception {
    			System.out.println(wordCount._1+" apeared " +wordCount._2+" times");
    		}
    	});
    	sc.close();
        }
    }
    • 执行结果

    1bc4bcdfc08ab80fed60dce3a24e7b81.png
    • 错误处理

    1d5a3ce3a75aa6340408b6bce4f83f41.png

    该错误可忽略,本地模式并无hadoop,报错正常不影响程序执行,将程序提交到spark集群上后则不会报错.若在本地模式下清理该错误可在本地安装winutils.exe,参考windows下调试hadoop


    spark集群模式(Java) - 单机集群主机node, ip:192.168.1.64
    • window文件words.txt上传到到Linux

    610fc5f22c38a480dc015fb5ea5b60bb.png

    2a7548d77997f487e2a79cd2643c443e.png
    • 将words.txt上传HDFS系统

    f1b5363347387ab6a137ee4cb2fa724e.png
    • 查看文件是否上传成功

    05b02d38c6c2c8126f4d1793b3d6195e.png
    • 修改部分代码
    SparkConf conf = new SparkConf()
    		 .setAppName("WordCount");
    		  //.setMaster("local");//注释掉
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> lines = sc.textFile("hdfs://node:8020/words.txt");//读取HDFS文件
    
    // hdfs://node:8020  ->登陆ambari查看HDFS core-site配置可知 
    • pom.xml中添加项目打包插件
    <!-- 位置  </dependencies> -->
    <build>  
        <plugins>  
      
         <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                       <artifactId> maven-assembly-plugin </artifactId>
                       <configuration>
                            <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                            <archive>
                                 <manifest>
                                      <mainClass>com.jp.spark.TestSpark</mainClass>
                                 </manifest>
                            </archive>
                       </configuration>
                       <executions>
                            <execution>
                                 <id>make-assembly</id>
                                 <phase>package</phase>
                                 <goals>
                                      <goal>single</goal>
                                 </goals>
                            </execution>
                       </executions>
                  </plugin>
        </plugins>  
    </build>  
    <!-- 位置 </project> -->

    255ab61cd13fd2fd5912d42c468fc79a.png
    <mainClass>com.jp.spark.TestSpark</mainClass> 程序执行入口
    • 更新

    0efd6c62aea1976aa13ad8f9ae9a3578.png
    • 打包

    61e10d5c22d2d4a6392d08873967bc14.png
    • 打包成功

    695098d636ac2269b8c79fd5bb3a43ef.png

    0d8c08bd7f071c6c7712d9338992ee94.png
    spark-note-0.0.1-SNAPSHOT-jar-with-dependencies.jar:包含依赖的jar包
    • 上传spark-note-0.0.1-SNAPSHOT-jar-with-dependencies.jar到spark集群主机节点上

    0ea2445156c818a373d3ca23dc3b3a5a.png
    • 提交任务执行
    [root@node spark]# spark-submit --class com.jp.spark.TestSpark --num-executors 1  --executor-cores 1 /home/spark/spark-note-0.0.1-SNAPSHOT-jar-with-dependencies.jar 

    d347331533359c8bd24e5d08581b1211.png
    note:教程中使用的集群属于单机集群模式,为伪分布式.若为真正的分布式(多台服务器或者多个虚拟机组成的集群),需加上 --master spark://192.168.1.64:7077 参数 IP地址为集群主节点的IP.若在伪分布式中加上该参数为报内存不足的错误,spark较耗内存.

    通过http://192.168.1.64:4040可查看任务执行进度

    11ec7ea5dd22401d018a60540ffbb045.png

    结果

    2428a1d02cd2d1a4f8b416fb7b9590d0.png

    spark-shell 模式(scala)
    • 启动spark-shell(启动时间较慢)
    [root@node ~]# spark-shell

    383c88d6ddb0e342b315179bcc69f281.png
    • 编写scala wordCount程序
    scala> val lines = sc.textFile("hdfs://node:8020/words.txt")
    scala> val words = lines.flatMap(line => line.split(" "))
    scala> val pairs = words.map(word => (word,1))
    scala> val wordCounts = pairs.reduceByKey(_+_)
    scala> wordCounts.foreach(wordCount => println(wordCount._1 + " apeared " + wordCount._2 + " times"))

    b98735831a64de9a644282948a32e8cb.png

    wordCount执行过程

    dae4da983876e75071d6c840300b7229.png

    4.spark架构原理

    * application:用户编写的Spark应用程序;
    * driver:控制节点 - 提交任务所在的某个集群主机上的进程;
    * master:资源调度和分配 - 集群主节点上的进程
    * worker: 启动executor - 集群子节点上的进程;
    * executor:运行任务,存储数据 - 集群子节点上的进程;
    * task:运行在executor上的工作单元 - executor启动的线程;

    7a3e9c9d0b2cece83d435520045330f0.png

    5.创建RDD

    • 创建RDD的方式
    * 调用SparkContext的parallelize方法,在程序中集合上创建
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    
    public class ParallelizeCollection {
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf()
    				        .setAppName("collection")
    				        .setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7);
    		JavaRDD<Integer> numberRdd = sc.parallelize(numbers);
    		int sum = numberRdd.reduce(new Function2<Integer, Integer, Integer>() {
    
    			private static final long serialVersionUID = 1L;
    			@Override
    			public Integer call(Integer v1, Integer v2) throws Exception {
    				return v1+v2;
    			}
    		});
    		sc.close();
    		System.out.println(sum);
    	}
    }

    f424e7d808bd1d25ee75c0f794070475.png
    * 读取外部数据集,如本地文件、HDFS文件系统、HBase、Cassandra、Amazon S3等
    JavaRDD<String> lines = sc.textFile("D:words.txt");
    JavaRDD<String> lines = sc.textFile("hdfs://node:8020/words.txt");

    6.RDD操作 transformation和action

    transformation:针对RDD进行计算处理生成新的RDD —— 定义、记录
    • map

    将RDD中的每个数据项通过map中的函数映射变为一个新的元素

    输入分区与输出分区一对一,即:有多少个输入分区,则有多少个输出分区

    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6);
    JavaRDD<Integer> numerRDD = sc.parallelize(numbers);
    
    //Function<Integer, Integer> 输入类型 返回类型
    JavaRDD<Integer> numAddRDD = numerRDD.map(new Function<Integer, Integer>() {
    	private static final long serialVersionUID = 1L;
    	@Override
    	public Integer call(Integer v1) throws Exception {
    		return v1*2;
    	}
    }); 
    
    numAddRDD.foreach(new VoidFunction<Integer>() {
    	private static final long serialVersionUID = 1L;
    	@Override
    	public void call(Integer t) throws Exception {
    		System.out.println(t);
    	}
    });
    
    sc.close();

    1dd370129d5b8ec5dccb07ddc978b5e6.png
    • filter 返回经过filter中的函数处理后值为true的原元素组成新的数据集
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6);
    JavaRDD<Integer> numerRDD = sc.parallelize(numbers);
    
    //Function<Integer, Integer> 输入类型 返回类型
    JavaRDD<Integer> numAddRDD = numerRDD.filter(new Function<Integer, Boolean>() {
    
    	@Override
    	public Boolean call(Integer v1) throws Exception {
    		
    		return v1 % 2 == 0;
    	}
    }) ;
    
    numAddRDD.foreach(new VoidFunction<Integer>() {
    	private static final long serialVersionUID = 1L;
    	@Override
    	public void call(Integer t) throws Exception {
    		System.out.println(t);
    	}
    });
    
    sc.close();
    
    

    282f98f88d8e9457b33d1ebfde321d2f.png
    • flatMap 将一条rdd数据使用定义的函数给分解成多条rdd数据
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<String> numbers = Arrays.asList("hello world","hello java","hello scala","hi java");
    JavaRDD<String> numerRDD = sc.parallelize(numbers);
    
    //Function<Integer, Integer> 输入类型 返回类型
    JavaRDD<String> numAddRDD = numerRDD.flatMap(new FlatMapFunction<String, String>() {
    	@Override
    	public Iterator<String> call(String t) throws Exception {
    		return Arrays.asList(t.split(" ")).iterator();
    	}
    });
    
    numAddRDD.foreach(new VoidFunction<String>() {
    	@Override
    	public void call(String t) throws Exception {
    		System.out.println(t);
    	}
    });
    
    sc.close();

    774bb381b76f316968b60adf5cfa43e8.png
    • groupByKey 对键值对RDD(PairRDD)通过Key分组
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Tuple2<String, Integer>> scoresList = Arrays.asList(
    				new Tuple2<String, Integer>("A",92),
    				new Tuple2<String, Integer>("B",82),
    				new Tuple2<String, Integer>("A",72),
    				new Tuple2<String, Integer>("B",52),
    				new Tuple2<String, Integer>("A",98));
    
    JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoresList);
    
    JavaPairRDD<String,Iterable<Integer>> groupScores = scores.groupByKey();
    groupScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
    
    	@Override
    	public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
    		System.out.println("name:"+t._1);
    		Iterator<Integer> iterator = t._2.iterator();
    		while(iterator.hasNext()){
    			System.out.println(iterator.next());
    		}
    		System.out.println("===============================");
    	}
    });
    sc.close();

    57371f2983b8d960e186535b14d69db0.png
    • reduceBykey 对键值对RDD通过函数合并每个key的value值
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Tuple2<String, Integer>> scoresList = Arrays.asList(
    				new Tuple2<String, Integer>("A",92),
    				new Tuple2<String, Integer>("B",82),
    				new Tuple2<String, Integer>("A",72),
    				new Tuple2<String, Integer>("B",52),
    				new Tuple2<String, Integer>("A",98));
    
    JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoresList);
    
    JavaPairRDD<String,Integer> groupScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
    	
    	@Override
    	public Integer call(Integer v1, Integer v2) throws Exception {
    		return v1+v2;
    	}
    });
    groupScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    	@Override
    	public void call(Tuple2<String, Integer> t) throws Exception {
    		System.out.println(t._1+":"+t._2);
    	}
    });
    sc.close();

    d4c9650e7a6ba13bbbe6bc337581ca33.png
    • sortByKey 对键值对RDD根据key排序
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Tuple2<Integer, String>> scoresList = Arrays.asList(
    				new Tuple2<Integer, String>(92,"A"),
    				new Tuple2<Integer, String>(72,"B"),
    				new Tuple2<Integer, String>(56,"C"),
    				new Tuple2<Integer, String>(100,"D"),
    				new Tuple2<Integer, String>(98,"E"),
    				new Tuple2<Integer, String>(62,"F")
    							);
    
    JavaPairRDD<Integer,String> scores = sc.parallelizePairs(scoresList);
    
    JavaPairRDD<Integer,String> groupScores = scores.sortByKey();//默认升序
    groupScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
    	@Override
    	public void call(Tuple2<Integer, String> t) throws Exception {
    		System.out.println(t._2+" : "+t._1);
    	}
    
    });
    sc.close();

    0745ba99de89eb7a465e8fd276af7d62.png
    • join 对两个需要关联的RDD通过key进行内连接操作
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Tuple2<Integer, String>> studentList = Arrays.asList(
    			new Tuple2<Integer, String>(101,"A"),
    			new Tuple2<Integer, String>(102,"B"),
    			new Tuple2<Integer, String>(103,"C"),
    			new Tuple2<Integer, String>(104,"D"),
    			new Tuple2<Integer, String>(105,"E"),
    			new Tuple2<Integer, String>(106,"F")
    							);
    List<Tuple2<Integer, Integer>> stuScores = Arrays.asList(
    			new Tuple2<Integer, Integer>(101,92),
    			new Tuple2<Integer, Integer>(102,72),
    			new Tuple2<Integer, Integer>(103,56),
    			new Tuple2<Integer, Integer>(104,100),
    			new Tuple2<Integer, Integer>(105,98),
    			new Tuple2<Integer, Integer>(106,62)
    							);
    JavaPairRDD<Integer,String> students = sc.parallelizePairs(studentList);
    JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(stuScores);
    JavaPairRDD<Integer, Tuple2<String, Integer>> stu_score = students.join(scores);
    stu_score.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
    
    	@Override
    	public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
    		System.out.println("student id:"+t._1);
    		System.out.println("student name:"+t._2._1);
    		System.out.println("student score:"+t._2._2);
    		System.out.println("=================================");
    	}
    });
    
    sc.close();

    5206ad5b6c082f771a365542a68d5e2c.png
    • cogroup 对两个键值对RDD,每个RDD中相同key中的元素分别聚合成一个集合
    SparkConf conf = new SparkConf()
             .setMaster("local")
             .setAppName("Transformation");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //构造数据
    List<Tuple2<Integer, String>> studentList = Arrays.asList(
    			new Tuple2<Integer, String>(101,"A"),
    			new Tuple2<Integer, String>(102,"B"),
    			new Tuple2<Integer, String>(103,"C")
    							);
    List<Tuple2<Integer, Integer>> stuScores = Arrays.asList(
    			new Tuple2<Integer, Integer>(101,92),
    			new Tuple2<Integer, Integer>(102,72),
    			new Tuple2<Integer, Integer>(103,56),
    			new Tuple2<Integer, Integer>(103,96),
    			new Tuple2<Integer, Integer>(101,96)
    			
    							);
    JavaPairRDD<Integer,String> students = sc.parallelizePairs(studentList);
    JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(stuScores);
    JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> stu_score = students.cogroup(scores);
    stu_score.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
    
    	@Override
    	public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
    		System.out.println("student id:"+t._1);
    		System.out.println("student name:"+t._2._1);
    		System.out.println("student score:"+t._2._2);
    		System.out.println("=================================");
    		
    	}
    });
    
    sc.close();

    8500cc79ddaa68fa93be8ca02399cab6.png

    action:对RDD作最后的计算或处理,如遍历、reduce、保存数据、返回结果—触发计算操作

    未完持续..

    展开全文
  • Spark SQL入门Spark基础:数据读写Spark基础:读写ParquetSpark基础:读写HiveSpark基础:读写JDBCSpark基础:Spark SQL优化Spark基础:分布式SQL查询引擎Spark基础:Hive特性兼容Spark基础:集群运行在Spark的bin...
  • 依赖冲突:...依赖冲突表现为在运行出现NoSuchMethodError或者ClassNotFoundException异常或者其他与类加载相关JVM异常。此时,若能确定classpath存在这个包,则错误是因为classpath存在2个不...
  • 来源:传智播客云计算学科 浏览次数:在IDEA编写Spark入门级程序WordCountSpark是用Scala语言开发,目前对Scala语言支持较好是IDEA插件,这里我们编写一个Spark入门级程序,然后用Maven编译成jar包,然后提交...
  • 本文来自社区投稿与征集,作者梁云,转自:https://github.com/lyhue1991/eat_tensorflow2_in_30_days本篇文章介绍在 Spark 调用训练好 TensorFlow 模型进行预测方法。本文内容学习需要一定 Spark 和 ...
  • 依赖冲突:NoSuchMethodError,...依赖冲突表现为在运行出现NoSuchMethodError或者ClassNotFoundException异常或者其他与类加载相关JVM异常。此时,若能确定classpath存在这个包,则错误是因为classpath
  • 因为在日常测试和生产因为某些问题,需要配置很多不同包的依赖来对应不同环境,java的maven的profile可以很好地帮我们解决这一问题 以spark kafka包为例 这里有两个环境,一个local,一个product.我这里需要将...
  • Apache Kafka 是一个可扩展,高性能,低延迟平台,允许我们像消息系统一样读取和写入数据...Spark Streaming 是 Apache Spark 一部分,是一个可扩展、高吞吐、容错实时流处理引擎。虽然是使用 Scala 开发,...
  • IDEA使用Maven开发Spark应用程序

    千次阅读 2018-04-27 20:17:05
    如上图所示点击next创建自己的spark项目; 对maven进行修改 1.2 修改pom.xml &amp;lt;!--依赖的版本--&amp;gt; &amp;lt;properties&amp;gt; &amp;lt;scala.version&...
  • | Linux虚拟机情况下,对Spark RDD程序完美运行,旨在解放初学者应无Linux集群环境、无内存容量支撑情况下运行spark程序,写这篇灵感来源于自己在学习Spark时,看官方文档介绍spark standalone模式部署,...
  • 闲来无事,整理一下如何从零开始构建spark项目的maven依赖.首先一个破解版idea是必须.这里附上一个Mac版本安装地址,留着下次自己试试效果.https://blog.csdn.net/qq_17213067/article/details/81449797 构建...
  • [本文2000字左右,预计阅读需要8-15分钟]在之前一系列从零开始学习大数据系列文章,我们已经学习了很多关于Spark这一组件基础知识,那么从今天开始我们来学习Spark生态圈中Spark SQL这一重要模块。什么是Spark...
  • Maven分离配置、依赖

    2017-08-13 09:25:03
    在用Maven打包项目时,要像Hadoop、Spark、Hive等项目打包之后文件包含bin、lib、conf之类文件夹,同时可以动态修改项目配置参数,需要如下两步: 在 pom.xml 文件引入 maven-assembly 插件; 在 ...
  • Maven创建Spark工程一、创建Maven项目二、添加依赖三、编写Spark源代码 一、创建Maven项目 创建quickstart的maven的项目: 编写组名和项目名: 修改Maven的安装目录 点击完成 二、添加依赖 在pom.xml...
  • idea中用maven打包spark程序pom

    千次阅读 2018-05-04 15:14:37
    首先要安装scala,并且在idea安装scala插件。依赖关系:&lt;?xml version="1.0" encoding="UTF-8"?&gt; &lt;project xmlns="http://maven.apache.org/POM/4.0.0" xmlns...
  • 我在pom.xml设置了Apache Spark maven依赖,如下所示org.apache.sparkspark-core_2.100.9.1但我发现这个依赖使用“hadoop-client-1.0.4.jar”和“hadoop-core-1.0.4.jar”,当我运行我程序时,我收到错误“org....
  • Maven对重复依赖的解决方案

    千次阅读 2017-05-12 13:23:08
    这几天在学习spark-streaming做流式计算,一开始写了一个比较简单测试程序能够顺利运行,但是当在项目pom.xml加入了一些项目公共依赖之后就死活编译不过,或者能编译过但是不能run,根本原因就是如下这些包: ...
  • 最近从eclipse转到IDEA,一开始就遇到问题,以前eclipse运行的MAVEN管理程序,在IDEA,总要报错NoClassDefFoundError比如: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/api/...
  • -----------------scala-spark <dependencies> <!--spark依赖--> <dependency> <groupId>org.apache.spark</groupId> <...
  • maven管理项目优势是:不用到处拷贝jar包,只要将项目依赖的其他项目在pom.xml以添加依赖的方式加进去进行了。所以我用maven管理spark应用,打包后提交给spark集群。本篇文章介绍本地调试运行spark应用,再...
  • Maven的聚合工程就是在一个父模块Pom.xml文件引入所有工程需要的Maven依赖,在不同子模块Pom.xml就可以直接继承父类存在的Maven依赖而不需要重新引入。这样符合模块化开发要求,更容易管理各个模块...
  • 在 Java 开发,如果想把项目打成可直接执行一个 jar 包,需要通过 maven 相关插件把依赖的 jar 包一起放进指定一个 jar 包里;比较典型使用场景是,如果有离线任务要通过 spark 提交相关 jar 包执行,那这...

空空如也

空空如也

1 2 3 4 5 ... 16
收藏数 303
精华内容 121
关键字:

maven中的spark依赖