精华内容
下载资源
问答
  • 参考:spark开发环境搭建(基于idea 和maven) 安装JDK 从这里下载Java 8的JDK 设置JAVA_HOME环境变量,在Mac上它大概会是/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/这个样子 我选择的是...

    参考:spark开发环境搭建(基于idea 和maven)

    安装JDK

    1. 这里下载Java 8的JDK
    2. 设置JAVA_HOME环境变量,在Mac上它大概会是/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/这个样子
      我选择的是在~/.bash_profile文件里添加一句:
      export JAVA_HOME=/path/to/JDK,路径换成自己的JDK路径

    安装Maven

    Mac下用Brew安装即可:

    brew install maven

    Intellij IDEA

    以下基本跟spark开发环境搭建(基于idea 和maven)一样,详细可以去这里看

    下载和安装

    这个……去官网下载即可,学生可以申请学生账号,挺方便的

    配置Intellij IDEA

    1. 安装scala插件: 打开Intellij IDEA右下角的Configure,然后选择Plugins,搜索scala插件并安装,安装完之后重启IDE。
    2. 设置全局JDK: 在我的IDEA版本里是选择Structure for New Projects,在弹出的界面依次选择Project-New-JDK,把目录设置成跟JAVA_HOME一样的目录
    3. 设置全局Scala SDK 还是刚刚Structure for New Projects的界面,在Global Libraries选项卡下点+后选择Scala SDK,然后会弹出一个界面如下:
      1715450-20190615132848762-98069366.png

      我不像那位Po主一样有System选项,所以需要选择Download去下载一个SDK。

      !!!!!!!!这里有一个要点!!!!!!!!
      注意一下你的Spark是什么版本的,比如我用的Spark 2.1.2,在官网上有这么一句:
      1715450-20190615132929925-158536591.png

      所以下载的时候选择2.11版本的scala即可,太高和太低都有可能出问题= =,我之前下了2.13出现跟这里一样的错误。

      下载完之后Global Libraries里就有这个sdk啦,然后在这个sdk上右键,点击Copy to Project Libraries

    新建Maven项目

    点新建项目,然后选择Maven项目,在右侧Project SDK看看是不是成功配置了我们的JDK,然后设置好GroupIdArtifactIdVersion

    运行scala程序

    可以把main\java, main\resourcestest 都暂时删掉,它们的作用见这篇文章

    将Scala的框架添加到这个项目中,方法是在左侧栏中的项目名称上右键菜单中点击Add Framework Support…,然后在打开的对话框左侧边栏中,勾选Scala前面的复选框,然后点击确定即可。

    我是不用单独再导入这个框架,直接做下一步就可以了

    main文件夹中建立一个名为scala 的文件夹,并右键点击scala文件夹,选择Make Directory as,然后选择Sources Root,这里主要意思是将 scala 文件夹标记为一个源文件的根目录,然后在其内的所有代码中的 package ,其路径就从这个根目录下开始算起。

    这一步非常重要!!!!! 不做这个下一步可能凉凉

    在已经标记好为源文件根目录的 scala 文件夹 上,右键选择 New,然后选择 Scala Class,随后设置好程序的名称,并且记得将其设置为一个 Object(类似于Java中含有静态成员的静态类),正常的话,将会打开这个 Object 代码界面,并且可以看到IntelliJ IDEA自动添加了一些最基本的信息;
    Object元素内 输入:

    def main(args: Array[String]):Unit = {
       println("Hello World!")
    }

    然后就可以Run它,会输出结果。

    调试Spark

    首先要修改pom.xml里导入相关依赖,注意scala和spark的版本要写对。每次修改完pom.xml之后记得Import Changes。点击右上角的Edit Configurations:
    1715450-20190615132921448-2132640320.png

    设置一下VM参数:
    1715450-20190615132923459-637469373.png

    然后就可以正常Run了,理论上应该不会报错,如果有问题可以试试在terminal里输入mvn cleanmvn install安装需要的依赖。

    转载于:https://www.cnblogs.com/milliele/p/11027327.html

    展开全文
  • main下面新建一个scala,刷新 1.8 1.8 2.12.10 3.0.1 UTF-8 org.scala-lang scala-library ${scala.version} provided org.apache.spark spark-core_2.12 ${spark.version} net.alchim31.maven scala-maven-plugin ...

    面试题: MapReduce 和 Spark 的本质区别MR:  

            1. 只能在离线计算

            2. 复杂计算逻辑,一个MR无法完成,需要多个MR按先 后顺序串联

            3. 多个MR就要进行频繁的读写HDFS,效率低

            4. 编程API不够灵活,只能在map和reduce自己实现逻辑

    Spark: 1. 即可以做离线计算,又可以做实时计算

               2. 提供了抽象的数据集(RDD Dataset DataFrame DStream)

               3. 高度封装的API,算子丰富

               4. 使用了先进的DAG有向无环图,切分Stage,优化pipeline

               5. 数据可以cache复用

               6. 数据可checkpoint保证安全

    注意: MR和Spark在Shufile 时数据都落在本地磁盘

    standalone角色及执行过程

    client模式 和 Cluster模式 :

    Master   负责管理 Worker,接收客户端提交的任务 以及资源调度​ Worker 负责管理所在的节点,启动executor并监控executor的状态​ executor 线程池, 相当于一个Java进程,负责运行Driver端生成的Task,将Task放入线程中执行

    SparkSubmit   是一个Java进程,负责向Master提交任务

    Driver   是很多类的统称,可以认为SparkContext就是Driver,负责将用户编写的代码转成Tasks,然后调度到 Executor中执行,并监控Task的状态和执行进度。

    ​ client模式Driver运行在SparkSubmit进程中

    ​ cluster模式单独运行在一个进程中

    Task   分发到Executor上的工作任务,是Spark的最小执行单元  相当于类的实例

        1. Mater启动后定时检测超时的Worker
        2. Worker启动和master获取连接 , Master保存Worker的信息(内存,cpu等)后 返回注册成功的消息
        3. Worker定期向Master发送心跳
        4. SparkSubmit通过rpc通信提交任务给Master,并向Master申请资源, Master将符合条件的Worker筛选     出来(内存和核数等          资源符合条件),通过RPC通信,让Worker 启动 executor 并监控executor的状态
        5. executor启动后向Driver 反向注册(位置信息由 Master -- Worker -- executor),
        6. driver生成TaskSet 后将Task(计算逻辑)发送给对应的executor执行

    Spark集群安装

    1. 下载安装包,下载地址: https://spark.apache.org/downloads.html
    apache所有的安装包源码,外网速度慢  http://archive.apache.org/dist/
    可以去阿里或者清华大学源码站 速度快 
    2. 压缩包上传解压
    目录: /opt/apps
    解压 tar -zxvf 包名 /
    3. 进入到spark包目录将conf中目录下的spark-env.sh.template重命名为spark-env.sh,再添加
    export JAVA_HOME=/opt/apps/jdk.1.8.0_251
    export SPARK_MASTER_HOST=linux01

    4. 将conf目录下的slaves.template重命名为slaves并修改,指定Worker的所在节点
    linux01
    linux02
    linux03

    5. 将配置好的spark拷贝到其他节点
    scp -r spark-3.0.1-bin-hadoop3.2 linux02:$PWD
    scp -r spark-3.0.1-bin-hadoop3.2 linux03:$PWD

    快速方法: for i in {2..3}; do scp -r /opt/apps/spark-3.0.1-bin-hadoop3.2/  linux0$i:$PWD; done;

    6. 在spark的安装目录下执行启动脚本
    sbin/start-all.sh
    7. 执行jps命令查看java进程
    jps     在linux01上可以看见Master和Worker进程,在其他节点上可以看到Worker进程
    8. 访问Master的web管理界面,端口: 如果8080被占用可用8081 8082...   linux01:8080 

    9. 如果部署出错,查看日志: /opt/apps/spark-3.0.1-bin-hadoop3.2/log   使用命令查看: less 加日志文件名

    10. 配置Cores和Memory根据实际情况分配 ,进入/opt/apps/spark/conf/ , 最后复制到linux02和03上

    export SPARK_WORKER_CORES=4        #指定worker可用的逻辑核数
    export SPARK_WORKER_MEMORY=1g   #指定worker可用的内存大小

    启动交互式命令行客户端

    /opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-shell --master spark://linux01:7077 --executor-memory 1g --total-executor-cores 6          #内存1g     逻辑核(线程数) 6

    或者: /opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-shell --master spark://linux01:7077                                                                           #不指定内存大小和线程数,默认核数全部 内存1g

    RDD简介

    抽象弹性分布式数据集,弹性可复原,可以认为是一个代理,你对RDD进行操作,
    相当于在Dirver端显示记录下计算的描述信息,然后生成Task,将Task调度到Exector端才执行
    神奇的大集合(RDD)里面不装真正要计算的数据,而是装数据的描述信息,描述以后从哪里读数据,调用了什么方法,传入了什么函数,以及依赖关系
    你只要对RDD进行编程就行了,不用关心底层的细节,只要关系具体的计算逻辑就可以了

    特点:1.有多个分区,分区数量决定任务并行度
            1).从HDFS中读取 
                (1)如果是从HDFS中读取数据,分区的数量hdfs中数据的输入切片数量决定
                (2)sc.textFile可以指定rdd的最小分区数量,默认是2
                (3)如果一个大文件,一个小文件,大文件的大小大于goalsize的1.1倍,大文件会有多个输入切片
                (4)大于文件block的数量,多个Task可以读取一个block,当分区的数量小于切片的数量,RDD分区的数量由切片数量决定

            2).将Dirver端集合并行化成RDD)
                (1)RDD默认分区的数量由total-exector-cores决定
                (2)可以在sc.parrllelize(arr,6)指定分区的数量
                (3)makeRDD底层调用的是parallelize方法

         2.一个功能函数作用在分区上,函数决定计算逻辑

         3.RDD和RDD存在依赖关系,可以根据依赖关系恢复失败的任务和划分Stage

         4.若果要发生Shuffe,要使用分区器,默认使用HashPartitioner,分区器决定数据到下游哪个分区

         5.最优位置,即将Executor调度到数据所在的节点上,要求Worker和DataNode部署在同一节点或OnYarn,  通过访问NameNode获取数据块位置

    什么叫做shuffle

    在分布式计算当中,将我们的数据按照一定的规律,通过网路传输到不同的节点上,就叫做shuffle,
    有没有shuffle,取决有数据有没有打散 不看有几个分区,是合并还是分区,只要数据打散就是shuffle

    maven项目初始化         

    1. 新建一个project , 指定setting

    2. 导入依赖

    3. main下面新建一个scala,刷新

    <!-- 定义了一些常量 -->
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <scala.version>2.12.10</scala.version>
            <spark.version>3.0.1</spark.version>
            <encoding>UTF-8</encoding>
        </properties>
    
        <dependencies>
            <!-- 导入scala的依赖 -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <!--编译时引入,打包不打入jar中-->
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <pluginManagement>
                <plugins>
                    <!-- 编译scala的插件 -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                    </plugin>
                    <!-- 编译java的插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.5.1</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    WordCount

    1. 打包

    2. rz上传

    3. 执行

    /opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-submit  --master spark://linux01:7077 --class cn.doit.spark.day01.demo01.WordCount  /opt/apps/WordCount.jar hdfs://linux01:8020/words hdfs://linux01:8020/wordcount

     使用scala编写程序

    package cn.doit.spark.day01.demo01
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
      def main(args: Array[String]): Unit = {
    
        //创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
        //setAppName每个程序都要有个名字(标识)  setMaster设置为本地模式
        val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        //创建RDD:指定[以后]从HDFS中读取数据创建RDD
        //读取数据  args(0)main方法运行的时候可以进行传参
        val lines: RDD[String] = sc.textFile(args(0))
    
        //切分压平
        val words: RDD[String] = lines.flatMap(_.split(" "))
    
        //将单词组合
        val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    
        //分组聚合(先在分区内局部聚合,再全局聚合)
        val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    
        //排序
        val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
    
        //调用Action将计算好的结果保存到HDFS中
        sorted.saveAsTextFile(args(1))
    
        //释放资源
        sc.stop()
      }
    }
    

    使用java编写程序

    package cn.doit.day01.demo01;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class WordCount {
        public static void main(String[] args) {
    
            //创建JavaSparkContext
            SparkConf conf = new SparkConf().setAppName("WordCount");
    
            //javaSparkContext是对SparkContext的包装
            JavaSparkContext jsc = new JavaSparkContext(conf);
    
            //javaSparkContext可以创建JavaRDD
            //javaRDD包装了RDD
            JavaRDD<String> lines = jsc.textFile(args[0]);
    
            //切分压平
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.stream(line.split(" ")).iterator();
                }
            });
    
            //将单词合一组合
            JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return Tuple2.apply(word, 1);
                }
            });
    
            //聚合
            JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            //交换位置,Int作为key才可以使用sortByKey排序   swap: k作为v v作为k
            JavaPairRDD<Integer, String> tup = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
    
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                    return tp.swap();
                }
            });
    
            //排序
            JavaPairRDD<Integer, String> sorted = tup.sortByKey();
    
            //交换位置
            JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> work) throws Exception {
                    return work.swap();
                }
            });
    
            //调用Action将计算好的结果保存到HDFS中
            result.saveAsTextFile(args[1]);
    
            //关闭资源
            jsc.stop();
        }
    }
    
    

    使用lamda编写程序

    package cn.doit.day01.demo01;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.rdd.RDD;
    import scala.Tuple2;
    
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class WordCount02 {
        public static void main(String[] args) {
    
            //获取SparkConf信息并程序命名
            SparkConf conf = new SparkConf().setAppName("WordCount02");
            
            //创建JavaSparkContext
            JavaSparkContext context = new JavaSparkContext(conf);
            
            //创建RDD , 读取文件
            JavaRDD<String> lines = context.textFile(args[0]);
    
            //切割压平
            JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());
    
            //拼接成k v   Tuple元组
            JavaPairRDD<String, Integer> tup = words.mapToPair(word -> Tuple2.apply(word, 1));
    
            //聚合
    //        JavaPairRDD<String, Integer> reduced = tup.reduceByKey((t1, t2) -> t1 + t2);
            JavaPairRDD<String, Integer> reduced = tup.reduceByKey(Integer::sum);
    
            //交换位置,Int作为key才可以使用sortByKey排序   swap: k作为v v作为k
    //        JavaPairRDD<Integer, String> tup2 = reduced.mapToPair(r -> r.swap());
            JavaPairRDD<Integer, String> tup2 = reduced.mapToPair(Tuple2::swap);
    
            //排序
            JavaPairRDD<Integer, String> sorted = tup2.sortByKey();
    
            //再次交换回来位置
    //        JavaPairRDD<String, Integer> result = sorted.mapToPair(m -> m.swap());
            JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);
    
            //调用Action将计算好的结果保存到HDFS
            result.saveAsTextFile(args[1]);
    
            //释放资源
            context.stop();
        }
    }
    

    spark程序本地调试

    setMaster("local[*]"): 可以设置为本地模式,访问windows和linux中的文件

    System.setProperty("HADOOP_USER_NAME" , "root") :  伪装,访问hdfs需要权限

    package cn.doit.spark.day01.demo01
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
      def main(args: Array[String]): Unit = {
    
        //伪装 可以访问hdfs文件  需要访问权限 固定写法
        System.setProperty("HADOOP_USER_NAME" , "root")
    
        //创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
        //setAppName每个程序都要有个名字(标识)  setMaster设置为本地模式
        val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        //创建RDD:指定[以后]从HDFS中读取数据创建RDD
        //读取数据  args(0)main方法运行的时候可以进行传参
        val lines: RDD[String] = sc.textFile(args(0))
    
        //切分压平
        val words: RDD[String] = lines.flatMap(_.split(" "))
    
        //将单词组合
        val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    
        //分组聚合(先在分区内局部聚合,再全局聚合)
        val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    
        //排序
        //val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
    
        //调用Action 将计算好的结果保存到HDFS中
        reduced.saveAsTextFile(args(1))
    
        //关闭资源
        sc.stop()
      }
    }
    

     

    展开全文
  • Spark Master资源调试--worker向master注册(文档详解): https://github.com/opensourceteams/spark-scala-maven/blob/master/md/MasterScheduler_workerRegisterMaster.md Spark Master资源调度--SparkContext...
  • 4.scala 2.11.8 5.maven 3.5.2 在开发和搭环境时必须注意版本兼容的问题,不然会出现很多莫名其妙的问题 1.启动master进程 ./sbin/start-master.sh 2.启动worker进程 ./bin/s...
    本人的开发环境:
    1.虚拟机centos 6.5
    2.jdk 1.8
    3.spark2.2.0
    4.scala 2.11.8
    5.maven 3.5.2
        在开发和搭环境时必须注意版本兼容的问题,不然会出现很多莫名其妙的问题
     
    1.启动master进程
    ./sbin/start-master.sh
     
    2.启动worker进程
    ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://hadoop000:7077
    【注意,spark://hadoop000:7077,是在启动master进程后,通过localhost:8080登陆到spark WebUI上查看的。】
     
    第一第二点是运行环境的前提条件,下面是开发环境。
     
    1.idea结合maven开发spark,下面以NetWorldCount为例子
    package com.spark
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
     
    /**
      * Spark Streaming处理Socket数据
      * 测试: nc
      */
    object NetworkWordCount {
     
      def main(args: Array[String]): Unit = {
     
        val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount")
     
        /**
          * 创建StreamingContext需要两个参数:SparkConf和batch interval
          */
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        val lines = ssc.socketTextStream("localhost", 6789)
     
        val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
     
        result.print()
     
        ssc.start()
        ssc.awaitTermination()
      }
    }

     

    2.生成jar包

     

     

     3.上传jar包

     

     

    4.提交任务前先启动监听端口,在终端输入以下命令
    nc -lk 6789
     
     
    5.提交任务
    ./spark-submit  --master local[2] --class com.spark.NetworkWordCount --name NetworkWordCount  /home/hadoop/tmp/spark.jar

     

    运行程序,出现下面的错误:

    a.local这里出错。原因简单来说,local模式下只开启一条线程,reciver占用一条线程后,没有资源用来计算处理数据了。
    解决办法:local--->local[2]
     
    b.缺少com.fasterxml.jackson.scala这个方法
    解决办法:
    1.查看这个类的版本:view--->maven project--->--->.然后在pom.xml增加对应的dependency
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.6.5</version>
    </dependency>
    重新reimport,再次运行。出现以下错误

     

     

     去maven reposition查找对应的依赖: 

     

     

    在这里,使用1.3.0版本的。 

     

    在pom.xml添加以下的 dependency
    <dependency>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
      <version>1.3.0</version>
    </dependency>
     
    重新reimport,再次运行。这次程序正常运行。
     
    输入数据:

     

    接受数据:

     

    至此,windows下,idea结合maven开发spark+调试过程 完整跑了一遍。
    下面分析 
     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 

     在本地调试中,输入源除了 fileStream外,必须local[n], n >= 2 。
     在spark中,输入源除了 fileStream ,其他的都继承自 ReceiverInputDStream ,因此其他都需要至少两条线程(针对local模式)以上来供程序使用。
    def fileStream[
      K: ClassTag,
      V: ClassTag,
      F <: NewInputFormat[K, V]: ClassTag
    ] (directory: String): InputDStream[(K, V)] = {
      new FileInputDStream[K, V, F](this, directory)
    }
     
    例如本例子中使用的 socketTextStream
    def socketTextStream(
        hostname: String,
        port: Int,
        storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
      ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
      socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
    }

     

     

     

    转载于:https://www.cnblogs.com/liangjf/p/7899282.html

    展开全文
  • 下载IDEA插件 ...引入编译Scala对应的maven插件 <build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>sca

    下载IDEA插件

    下载Scala插件
    在这里插入图片描述
    下载完成后重启IDEA

    设置SDK

    在这里插入图片描述

    引入编译Scala对应的maven插件

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    编写Scala代码

    注意项目目录结构

    在这里插入图片描述

    引入Spark依赖

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    

    注意这里的_2.12是Scala语言版本,版本必须对齐,否则会报类型错误。

    object WordCount {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("wordCountTest").setMaster("local[4]")
        val sc = new SparkContext(sparkConf)
        val lineRdd: RDD[String] = sc.parallelize(List("Wap","Xu","Wap"))
        val teacherAndOne: RDD[(String, Int)] = lineRdd.map(line => {
          (line, 1)
        })
        val reduced: RDD[(String, Int)] = teacherAndOne.reduceByKey(_+_)
        val resultArray: Array[(String, Int)] = reduced.collect()
        print(resultArray.toList)
        sc.stop()
      }
    }
    

    最后run或者debug,在本地执行Spark程序。

    展开全文
  • Maven(打包Scala程序打不进class文件和依赖的解决办法),新建一个Maven项目,开开心心地写完scala程序,在本地调试没什么问题,要打包部署在spark集群上运行的时候却出错了,说找不到主类。 java.lang....
  • 1.应用安装 首先安装好idea2017 java8 scalaJDK spark hadoop(注意scala和spark的版本要匹配) 2.打开idea,创建建maven项目,如图所示 项目创建好后,记得勾选maven auto upate选项,这个动作会触发idea自动...
  • 新建一个Maven项目,开开心心地写完scala程序,在本地调试没什么问题,要打包部署在spark集群上运行的时候却出错了,说找不到主类java.lang.ClassNotFoundException: neu.WordCount at java.net.URLClassLoader....
  • spark源码本地调试

    2018-12-02 18:31:00
    1、前提条件: 1)安装jdk ... 2)安装scala 版本: 3)安装sbt 版本: 4)安装maven 5)安装git 版本: 6)安装idea,并配置好sbt、git、maven 2、从github上下载源码sp...
  • Spark的1.X和2.X版本源码编译与本地阅读调试环境搭建。前置工作: - JDK 安装和环境变量的配置 - Scala 安装和环境变量配置 - Maven 安装和环境变量配置 - Git 客户端安装和 SSH 配置 源码编译完成,进行简单...
  • 本地单机调试和 Spark-Submit 提交集群运行,本地可以执行,发布集群未必能成功运行) 目录: 集成方式 Maven Pom 文件介绍 资源配置介绍 Scala 启动 Spring-Boot 方法介绍 Maven 打包方法介绍 ...
  • scala2.10.4 maven 3.3..9 idea 15.042.下载spark1.5.2源码 https://github.com/apache/spark 进release3.编译安装:mvn clean package -DskipTests查看详细错误:mvn clean package -DskipTests -X 遇到的问题 ...
  • 在这里,我们以hadoop的wordcount为例,编写Scala程序,以本地模式和Yarn模式分别测试程序。Spark程序在开发的时候,使用IDEA编写程序及调试过程如下: 一、项目创建 1、创建ScalaMaven项目,pom.xml文件如下...
  • 一、目的 在远程电脑的windows系统上,部署远程spark代码开发环境,从而提升效率。 二、环境 ...(3)选择本地maven仓库对应配置文件settings.xml (4)填写项目名称,确认建立项目 (5)修改p...
  • Eclipse下开发调试环境的配置该小节中使用的各项工具分别为:mac (Windows7)+EclipseJavaEE4.4.2+Scala2.10.4+Sbt0.13.8+Maven3.3.3,测试的Spark版本为1.4.0。1.配置IDE:选择菜单项Help->Installnewsoftware,...
  • 第一步:在idea编写scala程序,并且要打包(pom文件的build标签中配置好maven打包代码,可以定义主类也可以在提交的时候再定义){补充:可以在spark本地调试程序,新建一个application,添加代码主类,program ...
  • 后面发现在开发环境中使用了maven来构建项目,所以配置的spark版本是2.1.0,scala版本是2.11.11,hadoop是2.7.2,kafka是2.1版本的,本地调试时能正常消费kafka生成的数据进行计算。 但是使用spark-submit提交到...
  • 通常来讲,任何一门大数据框架在实际生产环境中都是以集群的形式运行,而我们调试代码大多数会在本地搭建一个模板工程,Flink 也不例外。 Flink 一个以 Java 及 Scala 作为开发语言的开源大数据项目,通常我们推荐...
  • Spring Boot 2.0.0.M7 如何进行远程调试 Spring Boot 生产准备-基于HTTP的监控 Spring Boot 集成 Druid springboot思维导图 springboot activemq安装 spring-boot单元测试 springboot使用hibernate validator校验 ...
  • 《实战maven私有仓库三部曲之三:Docker下搭建maven私有仓库》 《修改gradle脚本,加速spring4.1源码编译构建速度》 《Docker与Jib(maven插件版)实战》 《Jib使用小结(Maven插件版)》 《Jib构建镜像问题从定位到深入...
  • maven和Gradle是java构建工具的不三之选,这两个优选一个。ANT是之前的构建工具,不需要详细掌握,了解即可。 1.3 虚拟化工具 Docker的横空出世,打造了一个新的虚拟化时代,凭借其优异的性能和资源占用率,Docker...
  • 提供全冗余方式的缓存,自动在每个节点间同步缓存数据,而每个节点都仅从本地内存中获取缓存数据,从而提供高效的执行效率,并且当部分节点宕机时仍旧能正常提供服务。当然,也允许使用Redis提供统一的中心节点缓存...

空空如也

空空如也

1 2
收藏数 22
精华内容 8
关键字:

mavenscala本地调试