2017-03-09 10:18:26 qq_24831889 阅读数 194
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9132 人正在学习 去看看 CSDN讲师

spark

需要安装的软件:virtual box5.1(vmware)、centos6.5/Ubuntu、jdk1.7、Hadoop2.4.1、hive0.13、zookeeper3.4.5、kafka_2.9.2-0.8.1、spark1.5.1、

secureCRT(命令操作)、WinSCP(上传centos通道上传文件)

网络配置

永久配置centos网络:vi /etc/sysconfig/network-scripts/ifcfg-eth0,

                                        ONBOOT=yes                     启动或重启网络时,

                                        BOOTPROTO=static                将ip地址设置为静态

                                        IPADDR=                                    IP地址

                                        NETMASK=                                子网掩码

                                        GATEWAY=                                 默认网关

注:上述地址具体可在Windows控制台用ipconfig命令查看

                                        service network restart,重启网关

                                        ifconfig 查看是否设置成功

在本地文件/etc/hosts中配置本地ip到host(spark)的映射:vi /etc/hosts,添加以下内容:

                                       192.168.14.33(为spark1静态的IP地址) spark1

                                       192.168.14.33(为spark2静态的IP地址) spark2

                                       192.168.14.33(为spark3静态的IP地址) spark3

在spark1上配置好以后,可以通过scp /etc/hosts root@spark2:/etc/hosts、scp /etc/hosts root@spark3:/etc/hosts分别拷贝到spark2和spark3上面,之后可通过在spark1上ping 通spark2

注:拷贝文件夹时:scp -r /usr/local/p1 root@spark2:/ust/local/

      Ctrl+R可以快速搜索之前输入过的命令

防火墙与DNS配置

关闭防火墙:service iptables stop

                   chkconfig iptables stop    启动时自动关闭

                    vi /etc/selinux/config   SELINUX=disabled

配置DNS服务器:vi /etc/resolv.conf   有nameserver则上步已成功配置

                          nameserver=

                          ping www.baidu.com检查能否上网

配置Windows主机上的hosts文件C:\Windows\System32\drivers\etc\hosts,修改spark1、2、3的地址,随后可在Windows控制台ping通spark1、2、3.

securecrt说明:securecrt可实现在Windows环境下对Linux进行操作,

WinSCP说明:可以在Windows下查看Linux下的文件,可以将windows下的文件上传到Linux

yum配置

修改repo:使用WinSCP,CentOS6-Base-163.repo上传到centos的usr\local目录下,

                cd /etc/yum.repos.d/             

               删除centos的所有源(删除当前目录下所有文件:rm -rf *)

               mv我们的repo到/etc/yum.repos.d/目录下(移动文件到当前目录:mv /usr/local/project .),修改repo文件,将所有gpgcheck属性修改为0.

拷贝文件到当前目录:cp /usr/local/

配置yum:yum clean all               清除之前文件

                yum makecache          设置缓存

                yum install talnet           用yum下载talnet

JDK1.7的安装

1.将jdk-7u60-linux-i586.rpm通过WinSCP上传到虚拟机/usr/local下面

2.rpm -ivh jdk-7u60-linux-i586.rpm

3.配置环境变量

vi .bashrc

export JAVA_HOME=/usr/java/latest

export PATH=$PATH:$JAVA_HOME/bin

source bashrc           使编辑生效

4.检测是否安装成功

java -version

配置SSH集群免密码登录

生成公钥:ssh-keygen -t rsa ,默认将公钥存在/root/.ssh目录下,

将公钥复制为authorized_keys文件:cd /root/.ssh

                                                     cp id_rsa.pubauthorized_keys(单个机器内部拷贝使用cp命令,机器之间使用scp命令)

此时单个机器内部不需要密码,在spark1内部使用ssh spark1试验。

在spark1上ssh-copy-id -i spark2将本机(spark1)的公钥拷贝(添加)到指定机器(spark2)的authorized_keys,此时在spark1上使用ssh spark2,只用输入一次密码即可以在spark1上登录spark2,配置成功。

类似的分别在spark1、spark2、spark3上将本机公钥拷贝到其余机器上之后,即可以实现互通。

 

 

2017-11-12 13:15:37 qq_27926875 阅读数 759
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9132 人正在学习 去看看 CSDN讲师

发展

这里写图片描述

spark是什么

这里写图片描述

2、一个大一统的软件栈

Spark核心
    计算引擎
        对由很多计算任务组成的、运行在多个工作机器或者是一个计算集群上的应用调度、分发以及监控的计算引擎
        速度快、通用
Spark项目包含多个密切组成的组件
    优点1:软件栈中所有的程序库和高级组件都可以从下层的改进中获益
    优点2:运行整个软件栈的代价变小了
    优点3:能够构建出无缝整合不同处理模型的应用
Spark的各个组件

1

    Spark Core 
        实现了Spark的基本功能
        包含:任务调度、内存管理、错误恢复、与存储系统交互等模块
        包含:对弹性分布式数据集RDD的API定义
            RDD表示
                分布在多个计算机节点上可以并行操作的元素集合
                是Spark的主要编程对象
                SparkCore提供了创建和操作这些集合的多个API
    SparkSQL
        用来操作结构化数据的程序包
        通过它我们可以使用
            SQL or Apache Hive版本的SQL方言(HQL)查询数据
        支持多种数据源
            比如:Hive表、Parquet、JSON等
        为Spark提供了一个SQL接口
            实在Spark1.0中被引用的
    Spark Streaming
        Spark提供的对实时数据进行流式计算的组件
        提供了用来操作数据流的API
        与SparkCore中的RDD API高度对应
        底层设计来看:它支持与Spark Core同级别的容错性、吞吐量以及可伸缩性
    MLlib
        机器学习ML功能的程序库
        提供了很多种机器学习算法
            分类
            回归
            聚类
            协同过滤等
    GraphX
        用来操作图的程序库
        可以进行并行的图计算
        扩展了Spark的RDD API
            用来创建一个顶点和边都包含任意属性的有向图
    集群管理器
        支持在各种集群管理器(cluster manager)上运行
        包括:Hadoop YARN、Apache Mesos、以及Spark自带的独立调器
2018-08-24 20:34:14 Shingle_ 阅读数 371
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9132 人正在学习 去看看 CSDN讲师

Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。

Spark的前辈:MPI、MapReduce

特性:迭代式计算、交互式探索、内存缓存计算

Spark软件栈

  • Spark Core:任务调度、内存管理、错误恢复、与存储系统交互,弹性分布式数据集(resilient distributed dataset, RDD)
  • Spark SQL:可与Hive Metastore交互
  • Spark Steaming:能从Flume、Kafka读取数据
  • MLlib:分类、回归、聚类、协同过滤
  • GraphX:并行的图计算

Spark编程模型

每个Spark应用都由一个驱动器程序来发起集群上的各种并行操作,驱动器程序通过一个SparkContext对象来访问Spark。Spark集群是由两类程序构成:一个驱动程序和多个执行程序。本地模式时对所有的处理都运行在同一个JVM中,而在集群模式时包括一个运行Spark单机主程序和驱动程序的主节点和各自运行一个执行程序进程的多个工作节点。

Spark Shell

在Spark Shell中,实际的驱动器程序就是Spark Shell本身,shell启动时就创建了一个SparkContext对象,sc变量。

  • Scala Shell
  • Python Shell

SparkContext类和SparkConf类

Spark也可以在Java、Scala或Python的独立程序中使用,与shell中使用的主要区别在于需要自动初始化SparkContext。

例如:(Python中)

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

弹性分布式数据集(RDD)

在Spark中,通过对分布式数据集的操作来表达意图,这些计算会自动地在集群上并行进行。RDD是Spark对分布式数据和计算的基本抽象。

两种创建RDD的方法:

  • 用SparkContext基于外部数据源**创建**RDD,包括HDFS上的文件、通过JDBC访问的数据库表或Spark Shell中创建的本地对象集合。
  • 在一个或多个已有的RDD上执行转换操作来创建RDD,包括记录过滤、对具有相同键值的记录做汇总,把多个RDD关联在一起。
val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)

val rddFromTextFile = sc.textFile("LICENSE")

RDD操作

  • 转换(transformation):对一个数据集里的所有记录执行某种函数,从而使记录发生改变,如mapfilterfilterNotSpark中的转换操作时延后的!在RDD上调用一个转换操作并不会立即触发相应的计算,这些转换操作会链接起来,在执行操作被调用时才被高效的计算。
val intFromStringRDD = rddFromTextFile.map(line => line.size)
函数名(1个RDD) 目的
map() 将函数应用于RDD的每个元素,将返回值构成新的RDD
flatMap() 将函数应用于RDD的每个元素,将返回的迭代器的所有内容构成新的RDD
filter() 返回一个由通过传给filter()的函数的元素组成的RDD
distinct() 去重
sample(withReplacement, fraction, [seed]) 对RDD采样,以及是否替换
函数名(2个RDD) 目的
union() 生成一个包含两个RDD中所有元素的RDD
intersection() 求两个RDD共同的元素的RDD
subtract() 移除一个RDD中的内容
cartesian() 与另一个RDD的笛卡儿积
  • 执行(action):运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。如firsttakecollect
函数名 目的
collect() 返回RDD中的所有元素
count() RDD中的元素个数
countByValue() 各个元素在RDD中出现的次数
take(num) 从RDD中返回num个元素
top(num) 从RDD中返回最前面的num个元素
takeOrdered(num)(ordering) 从RDD中按照提供的顺寻返回最前面的num个元素
takeSample(w ithReplacement, num, [seed])
reduce(func) 并行整合RDD中所有数据,如sum
fold(zero)(func) 和reduce()一样,但是需要提供初始值
aggregate(zeroValue)(seqOp, combOp) 和reduce()相似,通常返回不同类型的函数
foreach(func) 对RDD中的每个元素使用给定的函数

注: 通常只在需要将结果返回到驱动程序所在节点以供本地处理时,才调用collect函数。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,导致程序崩溃。

pair RDD

函数名(1个RDD) 目的
reduceByKey(func) 合并具有相同键的值
groupByKey() 对具有相同键的值进行分组
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 使用不同的返回类型合并具有相同键的值
mapValues(func) 对pair RDD中的每个值应用一个函数而不改变键
flatMapValues(func) 对pair RDD中每个值应用一个返回迭代器函数,然后对返回的每个元素都生成一个对应原键的键值对记录
keys() 返回一个仅包含键的RDD
values() 返回一个仅包含值的RDD
sortByKey() 返回一个根据键排序的RDD
函数名(2个RDD) 目的
subtractByKey() 删掉RDD中键与other RDD中键相同的元素
join() 对两个RDD进行内连接
rightOuterJoin() 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)
leftOuterJoin() 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接)
cogroup() 将两个RDD中拥有相同键的数据分组到一起

RDD缓存策略\持久化
Spark最强大的功能之一就是能够把数据缓存在集群的内存中。通过rddFromTextFile.cache函数实现。

Spark为持久化RDD定义了几种不同的机制,用不同的StoreageLevel表示。rdd.cache()rdd.persist(StorageLevel.MEMORY)的简写,将RDD存储为未序列化的java对象。存储级别:(MEMORY、MEMORY_SER、MEMORY_AND_DISK、MEMORY_AND_DISK_SER)。什么时候需要缓存数据需要对空间时间进行权衡,还需要考虑垃圾回收的开销。调用unpersist()方法可以手动把持久化的RDD从缓存中移除。

广播变量和累加器

广播变量为只读变量,由运行SparkContext的驱动程序创建后发送给参与计算的节点。

val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))

累加器也是一种被广播到工作节点的变量,不同于广播变量,累加器可以累加,能保证在全局范围内累加起来的值能被正确的并行计算以及返回驱动程序。

聚合与统计

聚合:对集群数据进行聚合时,一定要时刻记住我们分析的数据是存放在多台机器上的,并且聚合需要连接机器的网络来移动数据(需要考虑数据传输的效率)。

val grouped = mds.groupBy(md => md.matched)

创建直方图:(类别变量)

val matchCounts = parsed.map(md => md.matched).countByValue()
val matchContsSeq = matchCounts.toSeq
matchContsSeq.sortby(_._1).foreach(println)

连续变量的统计

import java.long.Double.isNaN
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()

val stats = (0 until 9).map(md => md.scores(i)).filter(!isNaN(_)).stats()

Spark编程的核心概念:通过一个驱动器程序创建一个SparkContext和一系列RDD,然后进行并行操作。

  • 从外部数据中创建出输入RDD
  • 使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
  • 告诉Spark对需要被重用的中间结果RDD执行persist()操作
  • 使用行动操作(如count()first())来触发一次并行计算,Spark会对计算进行优化后再执行。

Spark机器学习

见Blog:

Spark学习笔记(二)——Spark机器学习


《Spark快速大数据分析》(《Learning Spark: Lightning-fast Data Analysis》)

《Spark高级数据分析》(《Advanced Analytics with Spark》)

《Spark 机器学习》(《Machine Learning with Spark》)

http://spark.apache.org

2018-11-22 15:56:01 weixin_42663941 阅读数 61
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9132 人正在学习 去看看 CSDN讲师

Spark学习—Scala(基础部分)

(一)Scala简介

scala 是一种多范式编程软件,既包括函数式编程又包括面向对象式编程。什么是函数式编程?函数式编程是一种编程模型,将计算机运算看做是数学中函数的计算,并且避免了状态以及变量的概念。(对于函数式编程可以参考我的Spark学习—官方文档学习这篇文章里的代码)

(二)编程实践
Scala编程支持命令行的形式也可以通过idea的Scala插件进行编程。本文代码全部使用idea的Scala编程框架实现。
用idea来写scala
1.添加Scala插件支持。(可以自行百度)
2.建一个java hello world。()
3.添加框架支持。(如下图)
在这里插入图片描述
4.Scala程序。
hello world:

//输出
                object know1 {
                  def main(args: Array[String]): Unit = {
                    println("Hello World")
                  }
                }

数据类型:

//变量
    object know1 {
      def main(args: Array[String]): Unit = {
        var r1 = 100
        var r2: Double = 100.12
        var r3: Float = 1.1f
        var r4: Char = 'a'
        var r5: String = "abc"
        println(r1, r2, r3, r4, r5)
      }
    }
  //常量
        object know1 {
          def main(args: Array[String]): Unit = {
            val l1 = 100
            val l4: Int = 100
            val l2 = "hello world"
            val l3: String = "hello world"
            println(l1 + l2 + l3 + l4)
          }
        }

Math:

//数学运算
object know1 {
  def main(args: Array[String]): Unit = {
    print(math.min(1, 2))
  }
}

输入输出:

  //控制台的输入输出
    import scala.io.StdIn._
    
    object know1 {
      def main(args: Array[String]): Unit = {
        println("整数:")
        var r1 = readInt()
        println(r1)
    
        println("小数:")
        var r2 = readDouble()
        println(r2)
    
        println("String:")
        var r3 = readLine("输入内容:")
        println(r3)
      }
    }

文件输入(输出同java):

//文件的输入
    import scala.io.Source._
    
    object know1 {
      def main(args: Array[String]): Unit = {
        var file = fromFile("wenli.txt")
        val lines = file.getLines()
        for (line <- lines) {
          println(line)
        }
      }
    }

循环:

  //for循环
    object know1 {
      def main(args: Array[String]): Unit = {
        println("案例1")
        for (i <- 1 to 5) {
          println(i)
        }
    
    println("案例2")
    for (i <- 1 to 5 by (2)) {
      println(i)
    }

    println("案例3")
    for (i <- 1 to 5 if (i % 2 == 0)) {
      println(i)
    }

    println("案例4")
    for (i <- 1 to 5; j <- 1 to 5) {
      println(i * j)
    }
    println("案例5")
    for (i <- 1 to 5 if (i % 2 == 0); j <- 1 to 5 if (i != j)) {
      println(i * j)
    }
  }
}

数据结构:(Collection List Set Map Iterator Array Tuple)

//list
object know1 {
  def main(args: Array[String]): Unit = {
    var list = List(1, 2, 3)
    println(list)
    println(list.tail)
    println(list.head)
    println(list.reverse)
    var otherlist = 4 :: list //4在左
    println(list)
    println(otherlist)
    for (i <- otherlist) {
      println(i)
    }
  }
}


//集合
import scala.collection.mutable.Set

object know1 {
  def main(args: Array[String]): Unit = {
    var mySet = Set("hadoop", "spark")
    //添加元素
    mySet = mySet + "scala"
    println(mySet)
    for (i <- mySet) {
      println(i)
    }
  }
}


//映射

object know1 {
  def main(args: Array[String]): Unit = {
    var map = Map("cao" -> "4", "liang" -> 5)
    println(map("cao"))
    println(map)

    map = map + ("zhang" -> 3)
    println(map)

    for ((k, v) <- map) {
      println(k)
      println(v)
    }

    for (k <- map.keys) {
      println(k)
    }
    for (v <- map.values) {
      println(v)
    }
  }
}


//迭代器

object know1 {
  def main(args: Array[String]): Unit = {
    val it = Iterator("Baidu", "Google", "Runoob", "Taobao")
    while (it.hasNext) {
      println(it.next())
    }
  }
}

字符串:

//字符串
object know1 {
  def main(args: Array[String]): Unit = {
    val a: String = "hello world"
    println(a)
    println("a的长度:", a.length)
    var b = new StringBuffer()
    b.append("hello world")
    println(b.toString)
    b.append(" hello world")
    println(b.toString)
    val d = "hello cao wen li "
    val e = d.concat(a)
    println(e)
  }
}

数组:

   //数组
    import scala.collection.mutable.ArrayBuffer
object know1 {
  def main(args: Array[String]): Unit = {
    //构造数组
    val a1 = new Array[Int](3)
    a1(0) = 1
    a1(1) = 2
    a1(2) = 3

    val a2 = new Array[String](3)
    a2(0) = "hello hadoop"
    a2(1) = "hello spark"
    a2(2) = "hello storm"
    
    val a3 = Array("hello world", "hello cao wen li", "hello liang qun qun")

    //遍历数组
    for (i <- a1) {
      println(i)
    }
    for (i <- a2) {
      println(i)
    }
    for (i <- a3) {
      println(i)
    }
    //二维数组
    var a4 = Array.ofDim[Int](3, 4)
    for (i <- 0 to 2) {
      for (j <- 0 to 3) {
        a4(i)(j) = (i + 1) * (j + 1)
      }
    }
    //遍历数组
    for (i <- 0 to 2) {
      for (j <- 0 to 3) {
        println(a4(i)(j))
      }
    }
    //三维数组
    val array5 = Array.ofDim[Int](3, 2, 4)
    //用Array 定义的数组是定长数组,用Arraybuffer定义的数组是可添加的数组
    var buffer_array1 = ArrayBuffer(10, 20, 30)
    buffer_array1 += 40
    buffer_array1.insert(2, 40, 60)
    buffer_array1 -= 40
    buffer_array1.remove(2)
    println(buffer_array1)
    for (i <- 0 to buffer_array1.length - 1) {
      println(buffer_array1(i))
    }
  }
}

//数组的合并

object know1 {
  def main(args: Array[String]): Unit = {
    var myList1 = Array(1.9, 2.9, 3.4, 3.5)
    var myList2 = Array(8.9, 7.9, 0.4, 1.5)
    var myList3 = Array.concat(myList1, myList2)
    // 输出所有数组元素
    for (x <- myList3) {
      println(x)
    }
  }
}

//创建区间数组
object know1 {
  def main(args: Array[String]): Unit = {
    //创建区间数组
    var a = Array.range(10, 20, 2)
    var b = Array.range(1, 20, 5)
    for (i <- a) {
      println(i)
    }
    for (i <- b) {
      println(i)
    }
  }
}

函数式编程:

   //函数式编程
object know1 {
  def add(a: Int, b: Int): Int = { //有返回值
    var sum = a + b
    return sum
  }

  def hello(): Unit = { //没有返回值
    println("helle world")
  }

  def main(args: Array[String]): Unit = {
    println(add(1, 2))
    hello()
  }
}

//匿名函数
//占位符函数
面向对象过程:

//类和对象
//无参数
object know1 {

  class Counter {
    private var value = 0

    def increment(): Unit = {
      value += 1
    }

    def current(): Int = {
      value
    }
  }

  def main(args: Array[String]): Unit = {
    val myCounter = new Counter()
    myCounter.increment()
    println(myCounter.current())
  }
}


//有参数
object know1 {

  class Counter {
    private var value = 0

    def increment(step: Int): Unit = {
      value += step
    }

    def current(): Int = {
      value
    }
  }

  def main(args: Array[String]): Unit = {
    val myCounter = new Counter()
    myCounter.increment(5)
    println(myCounter.current())
  }
}

  
object know1 {


  class Counter {
    var value = 0

    def increment(step: Int): Unit = {
      value += step
    }

    def current(): Int = {
      value
    }
  }

  def main(args: Array[String]): Unit = {
    val myCounter = new Counter()
    println(myCounter.value)
    myCounter.value = 3
    myCounter.increment(5)
    println(myCounter.current())
  }
}


//单例对象
object know1 {
  private var id = 0

  def newpersonid(): Int = {
    id += 1
    return id
  }

  def helloWorld(): Unit = {
    println("Hello World")
  }

  def main(args: Array[String]): Unit = {
    println(newpersonid())
    helloWorld()
  }
}
2019-07-15 22:07:41 get_you_1 阅读数 17
  • Spark初级入门(2):解析Scala集合操作

    Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机软件与理论硕士研究生,研究方向为计算机视觉、机器学习,毕业后先后供职于宁波银行、中共浙江省委党校,目前就职于绿城集团,担任数据中心平台架构师、数据开发主管。Scala语言、Hadoop及Spark大数据处理技术爱好者。

    9132 人正在学习 去看看 CSDN讲师

Hadoop可以完成项目的功能实现,spark是hadoop的功能优化实现,spark使用的内存基于内存进行计算,一个jar包中有很多任务,特点是:迭代式计算(后一个job依赖前一个job记过)和交互式数据挖掘(shell)。
spark被看成是一整套的大数据处理的通用处理引擎,是一套大数据的处理方案一个大的软件栈,在各个方面都可以基于此进行实现

spark中的角色:
集群中的角色:

master是集群中的管理节点,worker是在正式工作的节点。master是集群中的资源管理,worker是管理自己的资源的。

程序运行中的角色:

每一个spark项目由一个驱动器程序执行其中的main函数,驱动器程序的工作包括:
将程序分成若干任务,对执行器进行跟踪和调度,创建sparkContext(可以看成是和集群的一个连接,在shell中是一个sc对象,编程时候通过conf对象来生成);
执行器是运行在工作节点的进程,负责真正的作业工作并返回相应的结果,RDD是缓存在执行器进程中的。
driver和executor是对于当前这个程序而言的,程序执行完成那么这对角色也没有了,master和worker则是集群的机器角色。
驱动器程序和执行器程序之间通过集群资源调度管理器(内存和核心),常见的模式有Local只有本身自己一台机器,local-cluster伪分布相似的一台机器多个进程,standalone(master和workers存在于这种模式下的真正的集群模式),yarn,Mesos(专业的集群管理者)等

spark项目的提交方式:

spark-submit的总的格式是 :
spark-submit [options] <app jar | python file > [app options]

options的一些参数:

–class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
–master: 集群的master URL (如 spark://192.168.9.102:7077)
–deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
–name 应用的显示名,会在网页中显示
–jars需要上传并放在CLASSPATH中的第三方应用jar,少量的jar可以这么做
–files 需要放到应用工作目录中的文件的列表,分发到各个节点的数据文件
–conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”. 缺省的Spark配置 application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar.
application-arguments: 传给main()方法的参数
例如:

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
examples/jars/spark-examples_2.11-2.1.1.jar  100
master参数的例子:
  • local 本地单线程的跑

  • local[K] 本地k个线程跑

  • spark://HOST:PORT 独立集群standalone中的master

  • mesos://HOST:PORT连接到指定的Mesos集群

  • yarn-client 作为客户端连接到YARN cluster,集群位置由HADOOP_CONF_DIR指定

  • yarn-cluster:以cluster形式连接到YARN cluster,集群位置由HADOOP_CONF_DIR指定,两者差别在driver的运行 位置在哪里。client在提交的机器上,cluster在集群中的一台上。后面的yarn上的可以分开写,master写yarn,deploy-mode写client或者cluster

在运行spark-shell中不指定master的是local[* ],是以本地的模式在运行的。同常测试集群和生产集群两种,端口4040是对当前运行(注意只是当前,历史的查看需要重新配置)application的job状况查看,8080查看集群中的情况,7077是集群提交的端口。

一般spark程序的工作过程:

通过外部数据产生RDD,通过filter这样的转化操作产生新的RDD,通过行动操作触发spark优化后的并行计算,通过persist()实现持久化保存中间的需要进行重用的结果。
RDD操作:行动操作(不产生新的RDD)和转化操作(返回产生的新RDD),filter这个操作虽然产生了新的RDD但是在对旧的RDD来说,在后面的程序中依然可以使用,
创建RDD:通过sc.parallelize产生;通过从外部数据读取(sc.textFile等)
RDD的惰性求值:产生的新的RDD,只有在进行行动操作的时候才会被真正的计算。
由于转化操作的存在,使得RDD之间存在着类似派生的关系,spark使用的系谱图来记录这些RDD之间的关系。常用来进行恢复数据等操作使用

一些重要的RDD操作
普通RDD操作:
  1. 单个RDD的操作:
take(10)  获取指定数目的元素rdd;
filter(x=>x>10) 针对各个元素过滤掉返回值是false的元素;
map(x=>(x,1)) 针对各个元素进行操作;
flatMap(x=>x.split(" ")) 针对各个元素,单个元素可能返回值不止一个,接受所有的返回值作为一个RDD中的元素;
count() 对元素进行计数;
first() 返回第一个元素;
collect() 整个的元素返回驱动器, 消耗巨大;
sample() 对rdd进行取样,结果非确定的;
distinct() 使集合中元素唯一;
reduce((x,y)=>x+y) 对所有的元素进行归约,比如对RDD中所有元素求和;
fold(sum)((x,y)=>x+y),所有元素的归约,但是是存在初始值的;
aggregate((0, 0))( (acc, value) => (acc. _1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val avg = result._1 / result._2.toDouble 分别是初始值(0,0),(acc,value)本地节点运行是遇到value如何操作,(acc1,acc2)在多个累加器之间合并操作;
foreach(x=>x+1) 可以在不返回驱动器的情况下操作每一个元素;
  1. 集合操作:
union(other) 取两个RDD的并集
substract(other) 差集,在前者而不在后者的元素
intersection(other)取两个RDD的交集
cartesian(other) 两个RDD 的笛卡尔积
  1. RDD的持久化

多次重复计算同一个RDD的消耗可以通过持久化的操作减少多次重复计算,持久化有不同的级别,
用法:rdd.persist(StorageLevel.DISK_ONLY),持久化是在将数据存在执行器进程的缓存中的,也可以使用unpersist()解除持久化操作

pairRDD操作:

pairRDD支持所有的普通的RDD的操作,只是在书写时候可以使用
val r3=r2.map(x=>x._1+x._2) val r4=r2.filter{case (x,y) => x>1}
两种。
键值对的RDD可以从普通的RDD转化过来,val pairRdd=r1.map(x=>(x,1))

单个pairRDD 的操作:

reduceByKey((x,y)=>x+y) 对相同key的元素操作
groupBykey() 按照key进行分组
combineByKey() 合并具有相同的key
mapValues() 对所有的values进行相同的操作
flatMapValues()
keys() 获取所有的keys
values() 获取所有的values
sortByKey() 根据key进行排序
lookup(key) 返回给定的键对相应的所有的值
collectAsMap() 结果以映射表的形式返回
countBykey() 对每个键分别计数

针对两个pairRDD的转化操作

subtractBykey() 在key上的差集操作
join() 根据key进行内连接
rightOuterJoin
leftOuterJoin
cogroup 将相同的key进行分组

聚合操作:
reduceByKey((x,y)=>x+y)
combine求key对应的均值val result = input.combineByKey(
 (v) => (v, 1),//新key作v的转化
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),//已经存在的key对于其v的处理
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }//多个分区之间的合并

数据分区

为了减少数据通信的代价,对于多次访问多次扫描基于Key的多次操作的RDD可以考虑进行分区,首先所有的pairRDD都可以针对Key进行分组,spark所有的共同一组的key(通过某种函数获得相同的结果的key是一组的)的数据出现在同一个分区中。数据分区时候需要充分的考虑后续使用的时候重复计算的部分,比如相对较大的表和较小的表在进行连接时候是通过计算key的哈希值来进行匹配的,此时就可以将大一点的表进行分区,创建新的RDD并持久化,这样在后续的哈希计算过程中,就不需要重新计算了,获取新的joined的RDD时候大大减少了计算量。

注意在实现过程中的持久化的位置,不然每次划分之后都重新计算没分区的意义就不存在了。
partitionBy()这个方法需要使用的版本尽量不要特别旧。
使用RDD.partitioner属性来获取分区的信息,返回的结果是一个scala.Option对象,这个对象中的isDefined()检查是否有值,get()方法获取其中的值。
从分区中获益的操作:
一些常见的操作用到分区的信息:
sortByKey 使用范围分区
groupByKey 使用哈希分区
join 使用哈希分区
cogroup 
groupWith
leftOuterJoin
groupByKey
reduceByKey
combineByKey
lookup
二元操作中至少一个RDD是不用做数据混洗的,通常选择那个数据操作量耗时严重的作为不发生数据混洗的那个。

Spark编译打包

阅读数 22

Spark学习记录

阅读数 78

没有更多推荐了,返回首页