2016-10-17 15:22:40 suhanjiao4897 阅读数 2510
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

Spark升级2.0.1版本安装部署教程

 


 

 

0.集群当前环境

Hadoop版本:2.7.1

JDK版本:jdk1.7.0_79

Hbase版本:1.1.2

Spark版本:1.5.0

Scala版本:2.10.4

1.Spark安装

a)   在官网(http://spark.apache.org/downloads.html)上下载与当前hadoop版本适合的Spark版本包。


b)   下载后解压到你要放置的安装目录。我是先解压再重命名移动到指定的文件夹下面。



c)   修改Spark配置文件。由于spark1.5.0版本和spark2.0.1版本的配置文件基本相同,故拷贝了原来的配置文件。


具体其中配置如下:

slaves文件配置子节点hostname(我这边在etc/hosts/中针对每个子节点的ip地址做了解析,所以只写域名)。


spark-defaults.conf配置一些jar包的引用。(注意:spark2.0.1版本不再有lib文件夹,只有jars文件夹,所以需要加上这个jars文件路径)


spark-env.sh配置spark的运行环境(注意,JAK版本至少要是1.7以上,SCALA版本也要控制2.11版本以上,scala路径就是后面你要安装的路径)


至此,Spark安装完成

2.Scala安装

a)   由于spark2.0.1版本只支持2.11版本以上的scala版本,所以需要重装新版本的Scala。在Scala官网下载2.11版本以上的scala安装包。(我安装的是2.11.8版本,http://www.scala-lang.org/download/2.11.8.html)


根据集群的操作系统选择对应的jar包。

下载到指定文件夹后,进行解压缩。


此时注意!要放到/usr/local/文件夹下,必须使用root账号。再拷贝到对应文件夹下。


至此Scala安装完成。

3.修改配置文件

a)   若此时Spark服务正在运行,建议先关闭Spark服务。再修改配置文件。


b)   修改每个账号下面的.bashrc文件


c)   修改完毕之后,source 下.bashrc文件,使之生效。

4.同步文件到子节点

a)   Spark文件夹,Scala文件夹及.bashrc文件到各个子节点,注意:source下.bashrc文件,使之生效。

5.验证是否安装成功

a)   验证scala是否安装成功

输入scala -version


b)   启动Spark服务。进入sbin文件夹下,执行bash -x start-all.sh

c)   使用jps查看进程是否启动成功。


d)   查看Spark页面


e)   执行Spark样例(./bin/run-example SparkPi | grep "Pi is roughly")


f)   测试Spark-shell能否正常使用(spark-shell --executor-memory 1G --total-executor-cores 10)


g)   查看8080页面


至此,Spark新版本升级完成

 

2017-10-31 16:52:02 zylove2010 阅读数 5766
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

编写好的Spark程序一般通过Spark-submit指令的方式提交给Spark集群进行具体的任务计算,Spark-submit指令可以指定一些向集群申请资源的参数(也可直接在Spark程序代码中指定,参数生效的优先级最高),在Linux环境下,可通过spark-submit –help 了解spark-submit指令的各种参数说明,截图如下:
这里写图片描述

案例(Python任务提交):
spark-submit –master spark://192.168.1.10:7077 –name router_app –total-executor-cores 8 –executor-memory 4g router_inout.py

常用的重要参数详解:
1) –master MASTER_URL: 指定要连接的集群模式(集群资源管理器)
standalone模式: spark://host:port, 如:spark://192.168.1.10:7077
Spark On Mesos模式 : mesos://host:port
Spark On YARN模式: yarn://host:port
本地模式:local

2) – deploy-mode DEPLOY_MODE : 指定任务的提交方式(client 和cluster)
client: 本地客户端模式(默认方式),一般会在集群主节点安装客户端
cluster: 集群工作节点模式
任务最终都会提交给主节点处理,所以在指定任务提交方式时,考虑本地客户端和集群工作节点对主节点的网络开销问题即可。

3)–name appName :设置任务的名称,方便在webUI查看

4)–py-files PY_FILES :加载Python外部依赖文件

5)–driver-memory MEM:设置driver的运行内存(占用客户端内存,用于通信及调度开销,默认为1G)

6)–executor-memory MEM:设置每一个executor的运行内存(占用工作节点内存,主要用于执行任务的内存开销),executor代表work节点上的一个进程。

7)–total-executor-cores NUM:设置任务占用的总CPU核数(即任务的并发量),由主节点指定各个工作节点CPU的使用数。
注意:该参数选项只在Spark standalone and Mesos 模式下有效

8)–executor-cores NUM:设置执行任务的每一个executor的CPU核数(yarn模式有效,默认为1)或者工作节点的总CPU核数(standalone模式有效)

9)–num-executors NUM:设置任务的executor进程数(yarn模式下有效)

10)–conf PROP=VALUE:设置Spark的属性参数
–conf spark.default.parallelism=1000 设置RDD分区大小,系统默认为200
–conf spark.storage.memoryFraction=0.5 设置内存分配大小(存储),系统默认为0.6
–conf spark.shuffle.memoryFraction=0.3 设置shuffle上限内存空间,系统默认为0.2

2020-02-25 21:58:10 Strawberry_595 阅读数 70
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

目录

idea安装Spark之前

一:windows端环境设置

二:Idea的配置

三:开发第一个wordcount程序

集群上安装Spark

1:伪分布式集群搭建spark环境

2:完全分布式集群搭建spark环境

 

 


 

idea安装Spark之前

开发环境分为:windows和centos6.5两端。

一:windows端环境设置

1:安装javaJDK1.8

 

2:环境设置

 

2.1:环境变量

3:安装scala2.11.12(注意不要安装最新或最高版本,视你的操作系统的Idea版本,否则会出现版本冲突)

3.1下载安装2.11.12(百度一下,有N多下载地址)

3.3系统变量设置

 

4:安装MAVEN

:注意如是要在本地运行请把在linux端配置的hadoop及spark相应的版本软件在windows下配置如下:(各位参考自己的软件版本及安装位置)

4.1安装maven3.6.1(不要太低,否则后面会有很多软件安装会出异常)

4.2配置maven环境变量(path目录)

5:如果在windows下运行软件

安装即可

二:Idea的配置

1:下载安装Idea软件

这里我安装的是2019.3版本的。算比较新的版本了。

 

1.2配置idea 环境的JDK

 

 

1.3配置Scala

 

 

三:开发第一个wordcount程序

1、新建一个maven工程

2、输入项目信息后下一步:

3、注意选择自己的mvn库

根据你自己之前设置的maven安装处

setting一定要选择正确,不要后面的maven库会让你下到想哭,下到猴年马月。

 

4、关键是配置pom.xml

好人做到底:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.wisdom.spark.demo</groupId>
    <artifactId>sparkwordcount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <inceptionYear>2008</inceptionYear>
    <properties>
        <scala.version>2.11.12</scala.version>
        <spark.version>2.3.4</spark.version>
        <jackson.version>2.6.0</jackson.version>
    </properties>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.codahale.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.codahale.metrics</groupId>
            <artifactId>metrics-json</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.10</artifactId>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.5</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <buildcommands>
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <additionalProjectnatures>
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                    </additionalProjectnatures>
                    <classpathContainers>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                    </classpathContainers>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <reporting>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </reporting>
</project>

等待Maven下载成功,这里如果网速太慢,建议大家更改一下setting中的mirorr代码,换成aliyun的挥着什么会好很多!

Build成功:

6、创建代码:

 

7、打包成功:

查看本地,打包文件: 

 

 

集群上安装Spark

1:伪分布式集群搭建spark环境

1下载:https://www.apache.org/

 

 

2linux上传并解压

[root@hadoop ~]# cd /home/

[root@hadoop home]# cd tools/

[root@hadoop tools]# rz

rz waiting to receive.

Starting zmodem transfer.  Press Ctrl+C to cancel.

Transferring spark-2.3.1-bin-hadoop2.7.tgz...

  100%  220589 KB    2535 KB/sec    00:01:27       0 Errors 

 

[root@hadoop tools]# tar -zxf spark-2.3.1-bin-hadoop2.7.tgz  -C ../softwares/

[root@hadoop tools]#

3配置环境变量

export SPARK_HOME=/home/softwares/spark-2.3.1-bin-hadoop2.7

export PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin:$FINDBUGS_HOME/bin:$SCALA_HOME/bin:$SPA

[root@hadoop spark-2.3.1-bin-hadoop2.7]# source /etc/profile

4:启动scala

[root@hadoop spark-2.3.1-bin-hadoop2.7]# spark-shell

2018-09-11 19:41:41 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://hadoop:4040

Spark context available as 'sc' (master = local[*], app id = local-1536720131769).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1

      /_/

        

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)

Type in expressions to have them evaluated.

Type :help for more information.

 

scala>

5:设置spark-shell的显示信息

打开修改

重新打spark-shell

[root@hadoop spark-2.3.1-bin-hadoop2.7]# spark-shell

18/09/11 19:53:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Spark context Web UI available at http://hadoop:4040

Spark context available as 'sc' (master = local[*], app id = local-1536720844018).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1

      /_/

        

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)

Type in expressions to have them evaluated.

Type :help for more information.

 

scala> 5+5

res0: Int = 10

 

scala>

6:spark读取本地文件

 

scala> val tetxFile=sc.textFile("file:/home/softwares/spark-2.3.1-bin-hadoop2.7/README.md")

scala> tetxFile.count

res2: Long = 103 

7:读取HDFS文件

1:启动HDFS

[root@hadoop hadoop-2.9.1]# sbin/start-dfs.sh

Starting namenodes on [hadoop]

hadoop: starting namenode, logging to /home/softwares/hadoop-2.9.1/logs/hadoop-root-namenode-hadoop.out

localhost: starting datanode, logging to /home/softwares/hadoop-2.9.1/logs/hadoop-root-datanode-hadoop.out

Starting secondary namenodes [0.0.0.0]

0.0.0.0: starting secondarynamenode, logging to /home/softwares/hadoop-2.9.1/logs/hadoop-root-secondarynamenode-hadoop.out

[root@hadoop hadoop-2.9.1]# jps

60867 SecondaryNameNode

61061 Jps

60346 SparkSubmit

60730 DataNode

60637 NameNode

[root@hadoop hadoop-2.9.1]#

2:读取文件

scala> val textFile2=sc.textFile("hdfs://hadoop:8020/words")

textFile2: org.apache.spark.rdd.RDD[String] = hdfs://hadoop:8020/words MapPartitionsRDD[3] at textFile at <console>:24

 

scala> textFile2.count

res3: Long = 3                                                                 

 

scala>

8:在Hadoop Yarn上运行Spark-shell

启动hadoop集群

[root@hadoop hadoop-2.9.1]# sbin/start-yarn.sh

[root@hadoop spark-2.3.1-bin-hadoop2.7]# HADOOP_CONF_DIR=/home/softwares/hadoop-2.9.1/etc/hadoop/ spark-shell --master yarn

 

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1

      /_/

        

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)

Type in expressions to have them evaluated.

Type :help for more information.

 

scala>

 

 

 

 

 

 

2:完全分布式集群搭建spark环境

 

(1) 把安装包上传到hadoop01服务器并解压

[hadoop@hadoop01 soft]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C /home/hadoop/apps/

# 解压后如果感觉安装目录的名称太长可以修改一下

[hadoop@hadoop01 /]$ cd /opt/module

[hadoop@hadoop01 module]$ mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0

(2) 修改spark-env.sh配置文件

# 把SPARK_HOME/conf/下的spark-env.sh.template文件复制为spark-env.sh

[hadoop@hadoop01 module]$ cd spark-2.2.0/conf

[hadoop@hadoop01 conf]$ mv spark-env.sh.template spark-env.sh

 

# 修改spark-env.sh配置文件,添加如下内容

[hadoop@hadoop01 conf]$ vim spark-env.sh

# 配置JAVA_HOME,一般来说,不配置也可以,但是可能会出现问题,还是配上吧

export JAVA_HOME=/opt/module/jdk1.8

# 一般来说,spark任务有很大可能性需要去HDFS上读取文件,所以配置上

# 如果说你的spark就读取本地文件,也不需要yarn管理,不用配

export HADOOP_CONF_DIR=/opt/module/hadoop-2.7.4/etc/hadoop



# 设置Master的主机名

export SPARK_MASTER_HOST=hadoop01  #这里建议写IP

# 提交Application的端口,默认就是这个,万一要改呢,改这里

export SPARK_MASTER_PORT=7077

# 每一个Worker最多可以使用的cpu core的个数,我虚拟机就3个...

# 真实服务器如果有32个,你可以设置为32个

export SPARK_WORKER_CORES=3

# 每一个Worker最多可以使用的内存,我的虚拟机就2g

# 真实服务器如果有128G,你可以设置为100G

export SPARK_WORKER_MEMORY=1g


(3) 修改slaves配置文件,添加Worker的主机列表

[hadoop@hadoop01 conf]$ mv slaves.template slaves

[hadoop@hadoop01 conf]$ vim slaves

# 里面的内容原来为localhost
hadoop01

hadoop02

hadoop03

(4) 把SPARK_HOME/sbin下的start-all.sh和stop-all.sh这两个文件重命名

比如分别把这两个文件重命名为start-spark-all.sh和stop-spark-all.sh
原因:
如果集群中也配置HADOOP_HOME,那么在HADOOP_HOME/sbin目录下也有start-all.sh和stop-all.sh这两个文件,当你执行这两个文件,系统不知道是操作hadoop集群还是spark集群。修改后就不会冲突了,当然,不修改的话,你需要进入它们的sbin目录下执行这些文件,这肯定就不会发生冲突了。我们配置SPARK_HOME主要也是为了执行其他spark命令方便。

[hadoop@hadoop01 conf]$ cd ../sbin

[hadoop@hadoop01 sbin]$ mv start-all.sh start-spark-all.sh

[hadoop@hadoop01 sbin]$ mv stop-all.sh stop-spark-all.sh

(5) 把spark安装包分发给其他节点

[hadoop@hadoop01 apps]$ scp -r spark-2.2.0 hadoop02:`pwd`

[hadoop@hadoop01 apps]$ scp -r spark-2.2.0 hadoop03:`pwd`

[hadoop@hadoop01 apps]$ scp -r spark-2.2.0 hadoop04:`pwd`

(6) 在集群所有节点中配置SPARK_HOME环境变量

[hadoop@hadoop01 conf]$ vim ~/.bash_profile

export SPARK_HOME=/opt/module/spark-2.2.0

export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

 

[hadoop@hadoop01 conf]$ source ~/.bash_profile

 

# 其他节点也都配置...这里你可以参考........等我有时间写一下完全分布式安装的步骤,里面有渐变的集体分发文件,和集体命令的方法,会方便很多。前提是你要记住用法,显然我经常忘记..........

 

(7) 在spark master节点启动spark集群

# 注意,如果你没有执行第4步,一定要进入SPARK_HOME/sbin目录下执行这个命令

# 或者你在Master节点分别执行start-master.sh和start-slaves.sh

 

[hadoop@hadoop01 conf]$ start-spark-all.sh

 

(8)完全分布式配置好spark,然后启动。

8.1先启动hdfs和yarn

 

 

8.2然后启动spark

 

这里忽略了上传刚刚打包好的idea的WordCount文件步骤,请自己完成。

8.3上传文件:启动

 

spark-submit

 --class com.spark.demo.WordCount      #代码全包名

--master spark://192.168.91.101:7077 #作为master的端口

sparkwordcount-1.0-SNAPSHOT.jar   #你虚拟机中上传好的jar包

file:/opt/data/test.txt    #输入文件

hdfs://hadoop01:9000/user/hadoop/output  #输出文件

结果显示:

 

 

 

好了。Happy Ending了。下面结束吧。

先停止spark

然后yarn

最后Hdfs

 

2016-01-05 18:05:50 kwu_ganymede 阅读数 6189
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

Hadoop经典案例Spark实现(一)——通过采集的气象数据分析每年的最高温度


1、原始数据分析
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999


数据说明: 
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据

第50位值只能是0、1、4、5、9几个数字


2、首先MapReduce实现

1) map 任务

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NewMaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	private static final int MISSING = 9999;

	@Override
	public void map(LongWritable key, Text value,

	Context context) throws IOException, InterruptedException {

		String line = value.toString();

		System.out.println("key: " + key);

		String year = line.substring(15, 19);

		int airTemperature;

		if (line.charAt(45) == '+') {

			airTemperature = Integer.parseInt(line.substring(46, 50));

		} else {

			airTemperature = Integer.parseInt(line.substring(45, 50));

		}

		String quality = line.substring(50, 51);

		System.out.println("quality: " + quality);

		if (airTemperature != MISSING && quality.matches("[01459]")) {

			context.write(new Text(year), new IntWritable(airTemperature));

		}
	}
}

2)reduce任务

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NewMaxTemperatureReducer extends
		Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context)
			throws IOException, InterruptedException {
		
		int maxValue = Integer.MIN_VALUE;

		for(IntWritable value: values){

			maxValue = Math.max(maxValue, value.get());

		}


		context.write(key, new IntWritable(maxValue));
	}
	
}

3)Job提交

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewMaxTemperature {

	public static void main(String[] args)throws Exception {
		if (args.length != 2) {

			System.err.print("Usage: MaxTemperature<input path> <output path>");

			System.exit(-1);

		}

		Job job = new Job();

		job.setJarByClass(NewMaxTemperature.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));

		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(NewMaxTemperatureMapper.class);

		job.setReducerClass(NewMaxTemperatureReducer.class);

		job.setOutputKeyClass(Text.class);

		job.setOutputValueClass(IntWritable.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}


3、Spark代码实现Scala版本

val one = sc.textFile("/tmp/hadoop/one")

val yearAndTemp = one.filter(line => {
 val quality = line.substring(50, 51);
 var airTemperature = 0
 if(line.charAt(45)=='+'){
  airTemperature = line.substring(46, 50).toInt
 }else{
  airTemperature = line.substring(45, 50).toInt
 }
 airTemperature != 9999 && quality.matches("[01459]")}).map{
line =>{
 val year = line.substring(15,19)
 var airTemperature = 0

 if(line.charAt(45)=='+'){
  airTemperature = line.substring(46, 50).toInt
 }else{
  airTemperature = line.substring(45, 50).toInt
 }
  (year,airTemperature)
}
}

val res = yearAndTemp.reduceByKey(
 (x,y)=> if(x>y) x else y
)

res.collect.foreach(x=>println("year : " + x._1+", max : "+x._2))


上面为了过滤非法的数据,在map前先做了filter过滤。


mapreduce与spark执行的任务结果是一样的

year : 1949, max : 111
year : 1950, max : 78

spark vs hadoop

阅读数 952

没有更多推荐了,返回首页