精华内容
下载资源
问答
  • Spark实战

    万次阅读 2015-06-24 16:07:05
    01.Spark简介(Spark VS MapReduce) 02.Spark生态系统 03.Scala集合简介 04.spark的关键组件 05.核心概念:弹性分布式数据集 06.RDD的操作(转换(transformation)动作(actions)) 07.RDD依赖 08.Wordcount例子 09...

                                                                                   1.Spark简介

    什么是Spark?

      Spark是UC BerkeleyAmp实验室开源的类Hadoop MapReduce的通用并行计算框架

                                                              Spark    VS   MapReduce

    MapReduce            

                                     ①.缺少对迭代计算以及DAG运算的支持

                                     .Shuffle过程多次排序和落地,MR之间的数据需要落Hdfs文件系统

    Spark                        

                                     ①.提供了一套支持DAG图的分布式并行计算的编程框架,减少多次计算之间中间结果写到hdfs的开销

                                     .提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销

                                     .使用多线程池模型来减少task启动开稍,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作

                                     .广泛的数据集操作类型(map,groupby,count,filter)

                                     ⑤.Spark通过提供丰富的Scala, Java,PythonAPI及交互式Shell来提高可用性

                                     ⑥.RDD之间维护了血统关系,一旦RDDfail掉了,能通过父RDD自动重建,保证了容错性。 采用容错的、高可伸缩性的akka作为通讯框架

     



                                                     2.Spark生态系统


                                     



                                                                          3.Scala集合简介


    vallist2 = List(1,2,3,4,5)

    list2.map{x=>x +8}     //{9,10,11,12,13}

    list2.filter{x=>x > 3}      //{4,5}

    list2.reduce(_ + _)

    更多scala学习网址:http://twitter.github.io/scala_school/zh_cn/collections.html


                                                                          4.spark的关键组件


    Master

    Worker

    SparkContext(客户端)



                                                                          5.核心概念:弹性分布式数据集

      Spark围绕的概念是弹性分布式数据集(RDD),这是一个有容错机制并可以被并行操作的元素集合。

    RDD的特点:

    失败自动重建。对于丢失部分数据分区只需根据它的lineage(见文章最后介绍)就可重新计算出来,而不需要做特定的Checkpoint

    可以控制存储级别(内存、磁盘等)来进行重用。默认是存储于内存,但当内存不足时,RDD会spill到disk

    必须是可序列化的。

    目前RDD有两种创建方式:并行集合(ParallelizedCollections):接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集(HadoopDatasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。这两种类型的RDD都可以通过相同的方式进行操作。

    1.并行集合(Parallelized Collections)

    并行集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的输出,演示了如何从一个数组创建一个并行集合:


    scala> val data = Array(1, 2, 3, 4, 5)

    scala> val distData =sc.parallelize(data)

    一旦分布式数据集(distData)被创建好,它们将可以被并行操作。例如,我们可以调用distData.reduce(_+_ )来将数组的元素相加

    2.Hadoop数据集(Hadoop Datasets)

    Spark可以从存储在HDFS,或者Hadoop支持的其它文件系统(包括本地文件,HBase等等)上的文件创建分布式数据集。

    Text file的RDDs可以通过SparkContext’stextFile的方式创建,

    scala> val distFile =sc.textFile("data.txt")


    并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个CPU上分布2-4个slices.一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以通过传递给parallelize的第二个参数来进行手动设置。(例如:sc.parallelize(data,10)).

    textFile方法也可以通过输入一个可选的第二参数,来控制文件的分片数目。默认情况下,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),但是你也可以通过传入一个更大的值,来指定一个更高的片值。注意,你不能指定一个比块数更小的片值(和Map数不能小于Block数一样,但是可以比它多)


                                                                            6.RDD的操作


    RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。

                                                                                                                           转换(transformation)

     转换

    含义

    map(func)

    返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成

    filter(func)

    返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成

    flatMap(func)

    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)

    distinct([numTasks]))

    返回一个包含源数据集中所有不重复元素的新数据集

    groupByKey([numTasks])

    在一个(K,V)对的数据集上调用,返回一个(KSeq[V])对的数据集注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它

    reduceByKey(func[numTasks])

    在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的

    sortByKey([ascending[numTasks])

    在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定

    join(otherDataset[numTasks])

    在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集


    动作(actions)

     动作

    含义

    reduce(func)

    通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。

    collect()

    在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。

    count()

    返回数据集的元素的个数。

    first()

    返回数据集的第一个元素(类似于take(1))

    take(n)

    返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素

    saveAsTextFile(path)

    将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行

    countByKey()

    对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数

    foreach(func)

    在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase


                                                                                                                                                                                                                                                 

                                                                            7. RDD依赖


    转换操作,最主要的操作,是Spark生成DAG图的对象,转换操作并不立即执行,在触发行动操作后再提交给driver处理,生成DAG图--> Stage --> Task  --> Worker执行。按转化操作在DAG图中作用,可以分成两种:

    窄依赖

    »输入输出一对一的操作,且结果RDD的分区结构不变,主要是map、flatMap;

    »输入输出一对一,但结果RDD的分区结构发生了变化,如union等;

    »从输入中选择部分元素的操作,如filter、distinct、subtract、sample。


    宽依赖,宽依赖会涉及shuffle类,在DAG图解析时以此为边界产生Stage,如图所示。

    »对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;

    »对两个RDD基于key进行join和重组,如join等。



    Stage的划分

    在RDD的论文中有详细的介绍,简单的说是以shuffle和result这两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x=> (x, 1)).reduceByKey(_ + _).foreach(println),这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。



                                                                            8.Wordcount例子


    输入文件例子:由空格分隔的

    aaabbbccc

    ccc bbbddd

    计算过程:读入文件,把每行数据,按空格分成单个的单词。对每个单词记数

        val  ssc = newSparkContext().setAppName("WordCount")

        val lines =ssc.textFile(args(1))//输入

         val words =

         lines.flatMap(x=>x.split(" "))

         words.cache()//缓存

         valwordCounts =

         words.map(x=>(x, 1) )

         val red =wordCounts.reduceByKey( (a,b)=>{a + b})

        red.saveAsTextFile(“/root/Desktop/out”) //行动


    蓝色的部分,生成相关的上下文,负责和Masterexutor通信,请求资源,搜集task执行的进度等

    绿色的部分,仅仅是在定义相关的运算规则(也就是画一张有向无环图),没有执行实际的计算

    当红色的部分(action rdd)被调用的时候,才会真正的向spark集群去提交,Dag。。。根据之前代码(也就是绿色的部分)生成rdd链,在根据分区算法生成partition,每个partition对应一个Task,把这些task,交给Excutor去执行



                                                                          9. 提交job


    ./bin/spark-submit \

     --class org.apache.spark.examples.SparkPi \

     --master spark://hangzhou-jishuan-DDS0258.dratio.puppet:7077 \

     --executor-memory 2G \

     --total-executor-cores 3 \

     /opt/spark-1.0.2-bin-hadoop1/lib/spark-examples-1.0.2-hadoop1.0.4.jar \

     10

    更详细的参数说明参见:http://blog.csdn.net/book_mmicky/article/details/25714545


                                                                          10.  编程接口


    Scala

    Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写HadoopMapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。参见:http://spark.apache.org/docs/latest/quick-start.html#tab_scala_3
    如:

        val sc = new SparkContext(master, appName,[sparkHome], [jars])

        val textFile =sc.textFile("hdfs://.....")

        textFile.map(....).filter(.....).....

    Java

        JavaSparkContext sc = newJavaSparkContext(...); 

        JavaRDD lines =ctx.textFile("hdfs://...");

        JavaRDD words = lines.flatMap(

          new FlatMapFunction<String,String>() {

             public Iterable call(String s) {

                return Arrays.asList(s.split("")); } } );

    Python

        from pyspark import SparkContext

        sc = SparkContext("local","Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])

        words =sc.textFile("/usr/share/dict/words")

        words.filter(lambda w:w.startswith("spar")).take(5)


                                                                            11. Spark运行架构

     Sparkon YARN 运行过程(cluster模式)

    1.用户通过bin/spark-submit或bin/spark-class 向YARN提交Application

    2.RM为Application分配第一个container,并在指定节点的container上启动SparkContext。

    3.SparkContext向RM申请资源以运行Executor

    4.RM分配Container给SparkContext,SparkContext和相关的NM通讯,在获得的Container上启动 StandaloneExecutorBackend,StandaloneExecutorBackend启动后,开始向SparkContext注册并申请  Task

    5.SparkContext分配Task给StandaloneExecutorBackend执行

    6.StandaloneExecutorBackend执行Task并向SparkContext汇报运行状况

    7.Task运行完毕,SparkContext归还资源给NM,并注销退出。



                                                                            12.Spark SQL

    Spark SQL是一个即席查询系统,其前身是shark,不过代码几乎都重写了,但利用了shark的最好部分内容。SparkSQL可以通过SQL表达式、HiveQL或者Scala DSL在Spark上执行查询。目前Spark SQL还是一个alpha版本。


                                                                             13.SparkStreaming

         SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP套接字)进行类似map、reduce、join、window等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。



    SparkStreaming流式处理系统特点有:

       将流式计算分解成一系列短小的(按秒)批处理作业

       将失败或者执行较慢的任务在其它节点上并行执行

       较强的容错能力(checkpoint等)

       使用和RDD一样的语义

                                                            

    ./bin/run-exampleorg.apache.spark.examples.streaming.NetworkWordCount localhost 9999

    nc-lk 9999



                                                                             14. 练习题

    有一批ip,找出出现次数最多的前50个?

    10.129.41.91

    61.172.251.20

    10.150.9.240

    ...

    答案:

    data.map(word=>(word,1)).reduceByKey(_+_).map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)).take(50)



                                                                           15.延伸

    Lineage(血统)

    Spark处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用血统关系(Lineage)方案。RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制RDDLineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

    RDDLineage依赖方面分为两种NarrowDependenciesWideDependencies用来解决数据容错的高效性。NarrowDependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。WideDependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与WideDependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),NarrowDependencies对于数据的重算开销要远小于WideDependencies的数据重算开销。

    容错

    RDD计算,通过checkpint进行容错,做checkpoint有两种方式,一个是checkpointdata,一个是loggingthe updates。用户可以控制采用哪种方式来实现容错,默认是loggingthe updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDDlineage(血统)来重新计算生成丢失的分区数据。







    展开全文
  • Spark实战高手之路

    2015-09-22 07:30:07
    Spark实战
  • Spark实战开发

    2018-04-08 10:47:57
    Apache Spark是大型数据处理的快速和通用引擎,此为Spark实战开发教程 ,适合初学者 ,简单易学。
  • 大数据Spark实战视频教程

    万人学习 2016-11-10 14:26:54
    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室...
  • Spark 实战开发教程

    2016-03-29 09:42:23
    1、《Spark实战高手之路-从零开始》 2、《Spark开发环境配置及流程(Intellij_IDEA)》 3、《spark官方文档中文版》 4、《Spark 入门之 Scala 语言解释及示例讲解》 5、《Scala编码规范》 总结: Hadoop ...
  • spark实战138讲.rar

    2019-10-29 13:11:33
    spark实战讲义内容,有需要的可以下载下来看看,一起学习。
  • Spark实战高手之路.rar

    2017-04-15 17:36:43
    Spark实战高手之路.rar
  • Spark实战高手之路-第6章Spark SQL编程动手实战(1)
  • Spark实战高手之路-第5章Spark API编程动手实战(1)
  • Spark实战高手之路-第5章Spark API编程动手实战(2)
  • Spark实战高手之路-第5章Spark API编程动手实战(3).
  • Spark实战教程

    千人学习 2020-03-30 21:58:47
    本课程内容包括Spark简介、安装、快速入门、RDD编程、独立模式、集群模式、Spark SQL入门、Spark SQL数据源、Spark Streaming,MLlib机器学习、实例(淘宝双11大数据分析)等等。
  • Spark实战高手之路 【Spark亚太研究院系列丛书】《Spark机器学习库(v1.2.0)》-王宇舟 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(1) 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(2) ...
  • Spark实战高手之路 【Spark亚太研究院系列丛书】《Spark机器学习库(v1.2.0)》-王宇舟 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(1) 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(2) ...
  • Spark实战高手之路,共5章,主要内容为Spark集群、架构、内核及编程实战。
  • Spark实战高手之路 【Spark亚太研究院系列丛书】《Spark机器学习库(v1.2.0)》-王宇舟 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(1) 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(2) ...
  • Spark实战高手之路 【Spark亚太研究院系列丛书】《Spark机器学习库(v1.2.0)》-王宇舟 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(1) 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(2) ...
  • Apache Hadoop spark 实战技术分享.pptx
  • Spark系列一:Spark系列目录索引背景面向的读者系列目录索引[Spark基础1:Spark2基本介绍](https://www.baidu.com/)[Spark基础2:Spark基本概念](https://www.baidu.com/)[Spark实战1:parquet文件的动态生成]...

    背景

    在本系列中,将和大家分享或介绍如下一些知识点:

    1. spark2的一些基本知识点。比如:spark2中的DataSet,Dataframe,算子,shuffle等等。
    2. 在使用spark中出现的一些误区。
    3. spark性能调优:代码级调优、submit配置调优、gc调优
    4. spark实战:本部分主要根据博主接手的一个spark项目在面对各个问题是的一个总结。

    面向的读者

    1. 本系列将假定读者已经对编程语言scala有一定的了解和实际使用经验。
    2. 本系列将假定读者已经对spark1或者spark2有了一定的实际使用经验。
    3. 本系列将假定读者已经对hadoop生态圈中的一些组件有一定的使用使用。比如:HDFS,yarn,impala,lzo,parquet等等。

    系列目录索引

    Spark基础1:Spark2基本介绍

    Spark基础2:Spark基本概念

    Spark实战1:parquet文件的动态生成

    Spark实战2:实现impala的分桶查询

    Spark实战3:使用的设计模式分享

    Spark实战6:Spark sql 临时表加载的改进

    Spark实战5:结果数据oracle的输出改进

    Spark实战6:Spark Submit时间周期的改进

    展开全文
  • Spark实战之读写HBase

    万次阅读 2017-05-19 19:18:06
    Spark实战之Hbase读写

    1 配置

    1.1 开发环境:

    • HBase:hbase-1.0.0-cdh5.4.5.tar.gz
    • Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
    • ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
    • Spark:spark-2.1.0-bin-hadoop2.6

    1.2 Spark的配置

    • Jar包:需要HBase的Jar如下(经过测试,正常运行,但是是否存在冗余的Jar并未证实,若发现多余的jar可自行进行删除)

    jars

    • spark-env.sh
      添加以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
      注:如果使用spark-shell的yarn模式进行测试的话,那么最好每个NodeManager节点都有配置jars和hbase-site.xml
    • spark-default.sh
    spark.yarn.historyServer.address=slave11:18080
    spark.history.ui.port=18080
    spark.eventLog.enabled=true
    spark.eventLog.dir=hdfs:///tmp/spark/events
    spark.history.fs.logDirectory=hdfs:///tmp/spark/events
    spark.driver.memory=1g
    spark.serializer=org.apache.spark.serializer.KryoSerializer

    1.3 数据

    1)格式: barCode@item@value@standardValue@upperLimit@lowerLimit

    01055HAXMTXG10100001@KEY_VOLTAGE_TEC_PWR@1.60@1.62@1.75@1.55
    01055HAXMTXG10100001@KEY_VOLTAGE_T_C_PWR@1.22@1.24@1.45@0.8
    01055HAXMTXG10100001@KEY_VOLTAGE_T_BC_PWR@1.16@1.25@1.45@0.8
    01055HAXMTXG10100001@KEY_VOLTAGE_11@1.32@1.25@1.45@0.8
    01055HAXMTXG10100001@KEY_VOLTAGE_T_RC_PWR@1.24@1.25@1.45@0.8
    01055HAXMTXG10100001@KEY_VOLTAGE_T_VCC_5V@1.93@1.90@1.95@1.65
    01055HAXMTXG10100001@KEY_VOLTAGE_T_VDD3V3@1.59@1.62@1.75@1.55

    2 代码演示

    2.1 准备动作

    1)既然是与HBase相关,那么首先需要使用hbase shell来创建一个表

    创建表格:create ‘data’,’v’,create ‘data1’,’v’

    2)使用spark-shell进行操作,命令如下:

    bin/spark-shell –master yarn –deploy-mode client –num-executors 5 –executor-memory 1g –executor-cores 2

    代码演示环境

    3)import 各种类

    import org.apache.spark._
    import org.apache.spark.rdd.NewHadoopRDD
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.HBaseAdmin
    import org.apache.hadoop.hbase.client.HTable
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.client.Get
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64,Bytes}
    import org.apache.hadoop.hbase.KeyValue
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
    import org.apache.hadoop.hbase.HColumnDescriptor
    import org.apache.commons.codec.digest.DigestUtils

    2.2 代码实战

    创建conf和table

    val conf= HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE,"data1")
    val table = new HTable(conf,"data1")

    2.2.1 数据写入

    格式:

    val put = new Put(Bytes.toBytes("rowKey"))
    put.add("cf","q","value")

    使用for来插入5条数据

    for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}

    到hbase shell中查看结果

    hbase_data1表中的数据

    2.2.2 数据读取

    val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

    1)take

    hbaseRdd take 1

    take_result

    2)scan

    var scan = new Scan();
    scan.addFamily(Bytes.toBytes(“v”));
    var proto = ProtobufUtil.toScan(scan)
    var scanToString = Base64.encodeBytes(proto.toByteArray());
    conf.set(TableInputFormat.SCAN,scanToString)
    
    val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))

    scan_result

    2.3 批量插入

    2.3.1 普通插入

    1)代码

    val rdd = sc.textFile("/data/produce/2015/2015-03-01.log")
    val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))}
    val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create();conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => {
    var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}

    2)执行时间如下:7.6 min

    执行时间

    2.3.2 Bulkload

    1) 代码:

    val conf = HBaseConfiguration.create();
    val tableName = "data1"
    val table = new HTable(conf,tableName)
    conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
    
    lazy val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat.configureIncrementalLoad(job,table)
    
    val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}
    
    rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
    val bulkLoader = new LoadIncrementalHFiles(conf)
    bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)

    2) 执行时间:7s

    执行时间_BulkLoad
    3)执行结果:
    到hbase shell 中查看 list “data1”

    结果查询

    通过对比我们可以发现bulkload批量导入所用时间远远少于普通导入,速度提升了60多倍,当然我没有使用更大的数据量测试,但是我相信导入速度的提升是非常显著的,强烈建议使用BulkLoad批量导入数据到HBase中。

    关于Spark与Hbase之间操作就写到这里,如果有什么地方写得不对或者运行不了,欢迎指出,谢谢

    展开全文
  • Spark实战教程 大强老师 大华软件学院 技术总监 / 高级讲师 曾就...

    扫码下载「CSDN程序员学院APP」,1000+技术好课免费看

    APP订阅课程,领取优惠,最少立减5元 ↓↓↓

    订阅后:请点击此处观看视频课程

     

    视频教程-Spark实战教程-Spark

    学习有效期:永久观看

    学习时长:1300分钟

    学习计划:22天

    难度:

     

    口碑讲师带队学习,让你的问题不过夜」

    讲师姓名:刘宏强

    CTO/CIO/技术副总裁/总工程师

    讲师介绍:大强老师 大华软件学院 技术总监 / 高级讲师 曾就职于中软国际华为业务线,具有十五年软件开发和培训经验。

    ☛点击立即跟老师学习☚

     

    「你将学到什么?」

    本课程内容包括Spark简介、安装、快速入门、RDD编程、独立模式、集群模式、Spark SQL入门、Spark SQL数据源、Spark Streaming,MLlib机器学习、实例(淘宝双11大数据分析)等等。


     

    「课程学习目录」

    第1章:Spark快速入门
    1.Spark 简介
    2.Spark 安装
    3.Spark 示例
    4.Spark 入门例子
    5.Spark 独立应用例子
    第2章:Spark RDD 编程
    1.Spark RDD 编程1
    2.Spark RDD 编程2
    3.Spark RDD 编程3
    4.Spark RDD 编程4
    第3章:Spark集群
    1.Spark 集群模式概述
    2.CentOS安装
    3.Spark 独立模式环境安装
    4.Spark 独立模式启动程序
    5.Spark 独立模式提交作业
    6.Spark 独立模式提交作业2
    7.Spark 在YARN上运行 环境安装配置
    8.Spark 在YARN上运行
    第4章:Spark SQL1
    1.Spark SQL DataFrames
    2.Spark SQL入门 起点
    3.Spark SQL入门 创建DataFrame
    4.Spark SQL入门 以编程方式运行
    5.Spark SQL入门 全局临时视图
    6.Spark SQL入门 创建Dataset
    第5章:Spark SQL2
    1.Spark SQL入门 与RDD互操作1
    2.Spark SQL入门 与RDD互操作2
    3.Spark SQL入门 答疑
    4.Spark SQL入门 起点
    5.Spark SQL入门 用户自定义聚合函数
    6.Spark SQL入门 用户自定义聚合函数2
    第6章:Spark SQL数据源1
    1.Spark SQL数据源 通用加载保存功能1
    2.Spark SQL数据源 通用加载保存功能2
    3.Spark SQL数据源 复习答疑
    4.Spark SQL数据源 通用加载保存功能3
    5.Spark SQL数据源 通用加载保存功能4
    6.Spark SQL数据源 通用加载保存功能5
    7.Spark SQL数据源 通用加载保存功能6
    8.Spark SQL数据源 通用加载保存功能7
    第7章:Spark SQL数据源2
    1.Spark SQL数据源 Parquet文件1
    2.Spark SQL数据源 Parquet文件2
    3.Spark SQL数据源 Parquet文件3
    4.Spark SQL数据源 Parquet文件4
    5.Spark SQL数据源 ORC JSON文件
    6.Spark SQL数据源 Hive表
    7.Spark SQL数据源 JDBC
    8.Spark SQL数据源 Avro
    第8章:Spark Streaming1
    1.Spark Streaming 总览
    2.Spark Streaming 例子1
    3.Spark Streaming 例子2
    4.Spark Streaming 例子3
    5.Spark Streaming 基本概念 连结
    第9章:Spark Streaming2
    1.Streaming 基本概念 初始化
    2.Streaming 基本概念 离散流
    3.Streaming 输入DStreams1
    4.Streaming 输入DStreams2
    5.Streaming DStreams上的转换1
    6.Streaming DStreams上的转换2
    7.Streaming DStreams上的转换3
    8.Streaming DStreams上的转换4
    第10章:Spark Streaming3
    1.Streaming DStream输出操作1
    2.Streaming DStream输出操作2
    3.Streaming DStream输出操作3
    4.Streaming DStream输出操作4
    5.Streaming DStream输出操作5
    6.Streaming DStream输出操作6
    7.Streaming DataFrame操作
    8.Spark Streaming 检查点等
    第11章:Spark MLlib 机器学习1
    1.Spark 机器学习
    2.Spark 机器学习库(MLlib)指南
    3.Spark MLlib 机器学习概念1
    4.Spark MLlib 机器学习概念2
    5.Spark MLlib 机器学习概念3
    6.Spark MLlib 机器学习 复习
    第12章:Spark MLlib 机器学习2
    1.Spark MLlib 基本统计1
    2.Spark MLlib 基本统计2
    3.Spark MLlib 基本统计3
    4.Spark MLlib 数据源
    5.Spark MLlib 管道
    6.Spark MLlib 特征提取
    第13章:Spark 《淘宝双11大数据分析》项目案例
    1.Spark 《淘宝双11大数据分析》项目案例1
    2.Spark 《淘宝双11大数据分析》项目案例2
    3.Spark 《淘宝双11大数据分析》项目案例3
    4.Spark 《淘宝双11大数据分析》项目案例4
    5.Spark 《淘宝双11大数据分析》项目案例5
    6.Spark 《淘宝双11大数据分析》项目案例6
    7.Spark 《淘宝双11大数据分析》项目案例7
    8.Spark 《淘宝双11大数据分析》项目案例8
    9.Spark 《淘宝双11大数据分析》项目案例9

     

    7项超值权益,保障学习质量」

    • 大咖讲解

    技术专家系统讲解传授编程思路与实战。

    • 答疑服务

    专属社群随时沟通与讲师答疑,扫清学习障碍,自学编程不再难。

    • 课程资料+课件

    超实用资料,覆盖核心知识,关键编程技能,方便练习巩固。(部分讲师考虑到版权问题,暂未上传附件,敬请谅解)

    • 常用开发实战

    企业常见开发实战案例,带你掌握Python在工作中的不同运用场景。

    • 大牛技术大会视频

    2019Python开发者大会视频免费观看,送你一个近距离感受互联网大佬的机会。

    • APP+PC随时随地学习

    满足不同场景,开发编程语言系统学习需求,不受空间、地域限制。

     

    「什么样的技术人适合学习?」

    • 想进入互联网技术行业,但是面对多门编程语言不知如何选择,0基础的你
    • 掌握开发、编程技术单一、冷门,迫切希望能够转型的你
    • 想进入大厂,但是编程经验不够丰富,没有竞争力,程序员找工作难。

     

    「悉心打造精品好课,22天学到大牛3年项目经验」

    【完善的技术体系】

    技术成长循序渐进,帮助用户轻松掌握

    掌握Spark知识,扎实编码能力

    【清晰的课程脉络】

    浓缩大牛多年经验,全方位构建出系统化的技术知识脉络,同时注重实战操作。

    【仿佛在大厂实习般的课程设计】

    课程内容全面提升技术能力,系统学习大厂技术方法论,可复用在日后工作中。

     

    「你可以收获什么?」

    帮助学员全面深入的理解Spark的核心概念,掌握常用的用法和操作。

    帮助学员全面深入的理解Spark SQL的核心概念,掌握常用的用法和操作。

    帮助学员全面深入的理解Spark Streaming的核心概念,掌握常用的用法和操作。

    帮助学员全面深入的理解MLlib等模块的核心概念,掌握常用的用法和操作。

     

    展开全文
  • 伴随着大数据相关技术和产业的逐步成熟,继Hadoop之后,Spark技术以其无可比拟的优势,发展迅速...Spark实战高手之路.part1.rar Spark实战高手之路.part2.rar Spark实战高手之路.part3.rar Spark实战高手之路.part4.rar
  • 伴随着大数据相关技术和产业的逐步成熟,继Hadoop之后,Spark技术以其无可比拟的优势,发展迅速...Spark实战高手之路.part1.rar Spark实战高手之路.part2.rar Spark实战高手之路.part3.rar Spark实战高手之路.part4.rar
  • 伴随着大数据相关技术和产业的逐步成熟,继Hadoop之后,Spark技术以其无可比拟的优势,发展迅速...Spark实战高手之路.part1.rar Spark实战高手之路.part2.rar Spark实战高手之路.part3.rar Spark实战高手之路.part4.rar
  • Spark实战高手之路-从零开始
  • Spark实战高手之路-第3章Spark架构设计与编程(1)
  • 大数据Spark实战视频教程 张长志技术全才、擅长领域:区块链、大数据、Ja...
  • Spark实战高手之路-第3章Spark架构设计与编程模型(4).
  • Spark实战高手之路-第3章Spark架构设计与编程模型(3).

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 26,014
精华内容 10,405
关键字:

spark实战