spark 订阅
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。 展开全文
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
信息
基    于
MapReduce算法实现的分布式计算
最新版本
2.4.0
外文名
Spark
SPARK基本介绍
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 [1]  。现在形成一个高速发展应用广泛的生态系统。
收起全文
精华内容
下载资源
问答
  • spark
    万次阅读
    2021-09-26 18:05:55

    一.Spark SQL的概述

    1.1 Spark SQL 来源

    Hive是目前大数据领域,事实上的数据仓库标准。
    image.png

    Hive与RDBMS的SQL模型比较类似,容易掌握。 Hive的主要缺陷在于它的底层是基于MapReduce的,执行比较慢。

    在Spark 0.x版的时候推出了Shark,Shark与Hive是紧密关联的,Shark底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块,底层使用Spark的基于内存的计算模型,性能上比Hive提升了很多倍。

    在Spark 1.x的时候Shark被淘汰。在2014 年7月1日的Spark Summit 上, Databricks宣布终止对Shark的开发,将重点放到 Spark SQL 上。

    Shark终止以后,产生了两个分支:

    1). Hive on Spark
    hive社区的,源码在hive中

    2). Spark SQL(Spark on Hive)
    Spark社区,源码在Spark中,支持多种数据源,多种优化技术,扩展性好很多;

    Spark SQL的源码在Spark中,而且新增了许多的优化代码,如果追求速度,例如数据分析的时候,可以使用Hive on Spark,如果追求性能,例如生产的定时报表的时候,应该使用Spark SQL。

    1.2 从代码看Spark SQL的特点

    我们来对比Spark RDD、Dataframe、SQL代码实现wordcount:
    image.png

    我们可以看到,Spark SQL代码看起来与关系型数据库是一致的,从上图可以看到Spark SQL的特点:
    1). 集成
    通过Spark SQL或DataFrame API运行Spark 程序,操作更加简单、快速.

    image.png

    从上图可以看到,Spark SQL和DataFrame底层其实就是调用RDD

    2). 统一的数据访问
    DataFrame 和SQL提供了访问各种数据源的通用方式,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以跨这些数据源连接数据。

    image.png

    3). Hive集成
    在现有的数据仓库上运行SQL或HiveQL查询。

    4). 标准的连接
    服务器模式为业务智能工具提供行业标准的JDBC和ODBC连接。

    1.3 从代码运行速度看来看Spark SQL

    ;

    从上图我们可以看到:
    1). Python操作RDD比Java/Scala慢一倍以上
    2). 无论是那种语言操作DataFrame,性能几乎一致

    那么为什么Python用RDD这么慢?
    为什么用Python写的RDD比Scala慢一倍以上,两种不同的语言的执行引擎,上下文切换、数据传输。

    Spark SQL其实底层调用的也是RDD执行,其实中间的执行计划进行了优化,而且是在Spark的优化引擎里面,所以无论是那种语言操作DataFrame,性能几乎一致

    二.Spark SQL数据抽象

    Spark SQL提供了两个新的抽象,分别是DataFrame 和Dataset;

    Dataset是数据的分布式集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDDs的优点(强类型、使用强大lambda函数的能力)以及Spark SQL优化的执行引擎的优点。可以从JVM对象构造数据集,然后使用函数转换(map、flatMap、filter等)操作数据集。数据集API可以在Scala和Java中使用。Python不支持Dataset API。但是由于Python的动态特性,Dataset API的许多优点已经可以使用了(例如,您可以通过名称natural row. columnname访问行字段)。R的情况也是类似的。

    DataFrame 是组织成命名列的Dataset。它在概念上相当于关系数据库中的表或R/Python中的数据框架,但在底层有更丰富的优化。数据框架可以从各种各样的数据源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有的rdd。DataFrame API可以在Scala、Java、Python和r中使用。在Scala和Java中,DataFrame是由行数据集表示的。在Scala API中,DataFrame只是Dataset[Row]的类型别名。而在Java API中,用户需要使用Dataset来表示DataFrame。

    2.1 DataFrame

    DataFrame的前身是SchemaRDD。Spark1.3更名为DataFrame。不继承RDD,自己实现RDD的大部分功能。

    与RDD类似,DataFrame也是一个分布式数据集
    1). DataFrame可以看做分布式Row对象的集合,提供了由列组成的详细模式信息,使其可以得到优化,DataFrame不仅有比RDD更多的算子,还可以进行执行计划的优化

    2). DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema

    3). DataFrame也支持嵌套数据类型(struct、array和Map)

    4). DataFrame API提供的是一套高层的关系操作,比函数式RDD API更加优化,门槛低

    5). DataFrame的劣势在于在编译期缺少类型安全检查,导致运行时出错。

    image.png

    2.2 Dataset

    Dataset时在Spark1.6中添加的新接口;与RDD相比,可以保存更多的描述信息,概念上等同于关系型数据库中的二维表。与DataFrame相比,保存了类型信息,是强类型,提供了编译时检查。

    调用Dataset的方法会生成逻辑计划,然后Spark的优化器进行优化,最终胜出无力计划,然后提交到集群中运行。

    Dataset包含了DataFrame的功能,在Spark2.0中两者得到了统一,DataFrame表示为Dataset[Row],即Dataset的子集.

    image.png

    三.Spark SQL 操作数据库

    3.1 Spark SQL操作Hive数据库

    Spark中所有功能的入口点都是SparkSession类。要创建一个基本的SparkSession,只需使用SparkSession.builder():

    import org.apache.spark.sql.SparkSession;
    
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();
    

    在Spark repo的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”中可以找到完整的示例代码。

    Spark 2.0中的SparkSession提供了对Hive特性的内置支持,包括使用HiveQL编写查询,访问Hive udf,以及从Hive表中读取数据的能力。要使用这些特性,您不需要有一个现有的Hive设置。

    3.1.1 创建DataFrames

    通过SparkSession,应用程序可以从现有的RDD、Hive表或Spark数据源中创建DataFrames。

    下面是一个基于text文件内容的DataFrame示例:
    代码:

    package org.example;
    
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    
    public class SparkSQLTest1 {
        public static void main(String[] args){
            SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkSQLTest1")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
            Dataset<Row> df = spark.read().text("file:///home/pyspark/idcard.txt");
            df.show();
            
            spark.stop();
        }
    }
    
    

    测试记录:

    [root@hp2 javaspark]# spark-submit \
    >   --class org.example.SparkSQLTest1 \
    >   --master local[2] \
    >   /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
    21/08/10 14:41:54 INFO spark.SparkContext: Running Spark version 2.4.0-cdh6.3.1
    21/08/10 14:41:54 INFO logging.DriverLogger: Added a local log appender at: /tmp/spark-7bdf3b0f-d374-4b47-b78f-bdea057849cf/__driver_logs__/driver.log
    21/08/10 14:41:54 INFO spark.SparkContext: Submitted application: SparkSQLTest1
    21/08/10 14:41:54 INFO spark.SecurityManager: Changing view acls to: root
    21/08/10 14:41:54 INFO spark.SecurityManager: Changing modify acls to: root
    21/08/10 14:41:54 INFO spark.SecurityManager: Changing view acls groups to: 
    21/08/10 14:41:54 INFO spark.SecurityManager: Changing modify acls groups to: 
    21/08/10 14:41:54 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    21/08/10 14:41:55 INFO util.Utils: Successfully started service 'sparkDriver' on port 36352.
    21/08/10 14:41:55 INFO spark.SparkEnv: Registering MapOutputTracker
    21/08/10 14:41:55 INFO spark.SparkEnv: Registering BlockManagerMaster
    21/08/10 14:41:55 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    21/08/10 14:41:55 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    21/08/10 14:41:55 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-59f07625-d934-48f5-a4d8-30f14fba5274
    21/08/10 14:41:55 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
    21/08/10 14:41:55 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    21/08/10 14:41:55 INFO util.log: Logging initialized @1548ms
    21/08/10 14:41:55 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-09-05T05:11:46+08:00, git hash: 3ce520221d0240229c862b122d2b06c12a625732
    21/08/10 14:41:55 INFO server.Server: Started @1623ms
    21/08/10 14:41:55 INFO server.AbstractConnector: Started ServerConnector@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    21/08/10 14:41:55 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@173b9122{/jobs,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@649f2009{/jobs/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@14bb2297{/jobs/job,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a15b789{/jobs/job/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57f791c6{/stages,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51650883{/stages/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c4f9535{/stages/stage,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@499b2a5c{/stages/stage/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@596df867{/stages/pool,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c1fca1e{/stages/pool/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@241a53ef{/storage,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@344344fa{/storage/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2db2cd5{/storage/rdd,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@70e659aa{/storage/rdd/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@615f972{/environment,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@285f09de{/environment/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@73393584{/executors,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@31500940{/executors/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1827a871{/executors/threadDump,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@48e64352{/executors/threadDump/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7249dadf{/static,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5be82d43{/,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@600b0b7{/api,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@473b3b7a{/jobs/job/kill,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1734f68{/stages/stage/kill,null,AVAILABLE,@Spark}
    21/08/10 14:41:55 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hp2:4040
    21/08/10 14:41:55 INFO spark.SparkContext: Added JAR file:/home/javaspark/SparkStudy-1.0-SNAPSHOT.jar at spark://hp2:36352/jars/SparkStudy-1.0-SNAPSHOT.jar with timestamp 1628577715383
    21/08/10 14:41:55 INFO executor.Executor: Starting executor ID driver on host localhost
    21/08/10 14:41:55 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40459.
    21/08/10 14:41:55 INFO netty.NettyBlockTransferService: Server created on hp2:40459
    21/08/10 14:41:55 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    21/08/10 14:41:55 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hp2, 40459, None)
    21/08/10 14:41:55 INFO storage.BlockManagerMasterEndpoint: Registering block manager hp2:40459 with 366.3 MB RAM, BlockManagerId(driver, hp2, 40459, None)
    21/08/10 14:41:55 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hp2, 40459, None)
    21/08/10 14:41:55 INFO storage.BlockManager: external shuffle service port = 7337
    21/08/10 14:41:55 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, hp2, 40459, None)
    21/08/10 14:41:55 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@245a060f{/metrics/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:56 INFO scheduler.EventLoggingListener: Logging events to hdfs://nameservice1/user/spark/applicationHistory/local-1628577715422
    21/08/10 14:41:56 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener
    21/08/10 14:41:56 INFO logging.DriverLogger$DfsAsyncWriter: Started driver log file sync to: /user/spark/driverLogs/local-1628577715422_driver.log
    21/08/10 14:41:56 INFO internal.SharedState: loading hive config file: file:/etc/hive/conf.cloudera.hive/hive-site.xml
    21/08/10 14:41:56 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
    21/08/10 14:41:56 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
    21/08/10 14:41:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5922d3e9{/SQL,null,AVAILABLE,@Spark}
    21/08/10 14:41:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7d57dbb5{/SQL/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5f95f1e1{/SQL/execution,null,AVAILABLE,@Spark}
    21/08/10 14:41:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@672a1c62{/SQL/execution/json,null,AVAILABLE,@Spark}
    21/08/10 14:41:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6015a4a5{/static/sql,null,AVAILABLE,@Spark}
    21/08/10 14:41:57 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    21/08/10 14:41:58 INFO datasources.FileSourceStrategy: Pruning directories with: 
    21/08/10 14:41:58 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
    21/08/10 14:41:58 INFO datasources.FileSourceStrategy: Output Data Schema: struct<value: string>
    21/08/10 14:41:58 INFO execution.FileSourceScanExec: Pushed Filters: 
    21/08/10 14:41:58 INFO codegen.CodeGenerator: Code generated in 185.453035 ms
    21/08/10 14:41:59 INFO codegen.CodeGenerator: Code generated in 11.048328 ms
    21/08/10 14:41:59 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 342.0 KB, free 366.0 MB)
    21/08/10 14:41:59 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 32.3 KB, free 365.9 MB)
    21/08/10 14:41:59 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hp2:40459 (size: 32.3 KB, free: 366.3 MB)
    21/08/10 14:41:59 INFO spark.SparkContext: Created broadcast 0 from show at SparkSQLTest1.java:17
    21/08/10 14:41:59 INFO execution.FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
    21/08/10 14:41:59 INFO spark.SparkContext: Starting job: show at SparkSQLTest1.java:17
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Got job 0 (show at SparkSQLTest1.java:17) with 1 output partitions
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (show at SparkSQLTest1.java:17)
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Parents of final stage: List()
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Missing parents: List()
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at show at SparkSQLTest1.java:17), which has no missing parents
    21/08/10 14:41:59 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 7.4 KB, free 365.9 MB)
    21/08/10 14:41:59 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.9 KB, free 365.9 MB)
    21/08/10 14:41:59 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on hp2:40459 (size: 3.9 KB, free: 366.3 MB)
    21/08/10 14:41:59 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1164
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at show at SparkSQLTest1.java:17) (first 15 tasks are for partitions Vector(0))
    21/08/10 14:41:59 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    21/08/10 14:41:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 8309 bytes)
    21/08/10 14:41:59 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
    21/08/10 14:41:59 INFO executor.Executor: Fetching spark://hp2:36352/jars/SparkStudy-1.0-SNAPSHOT.jar with timestamp 1628577715383
    21/08/10 14:41:59 INFO client.TransportClientFactory: Successfully created connection to hp2/10.31.1.124:36352 after 35 ms (0 ms spent in bootstraps)
    21/08/10 14:41:59 INFO util.Utils: Fetching spark://hp2:36352/jars/SparkStudy-1.0-SNAPSHOT.jar to /tmp/spark-7bdf3b0f-d374-4b47-b78f-bdea057849cf/userFiles-ddafbf49-ca6a-4c28-83fe-9e4ff2765c5e/fetchFileTemp442678599817678402.tmp
    21/08/10 14:41:59 INFO executor.Executor: Adding file:/tmp/spark-7bdf3b0f-d374-4b47-b78f-bdea057849cf/userFiles-ddafbf49-ca6a-4c28-83fe-9e4ff2765c5e/SparkStudy-1.0-SNAPSHOT.jar to class loader
    21/08/10 14:41:59 INFO datasources.FileScanRDD: Reading File path: file:///home/pyspark/idcard.txt, range: 0-209, partition values: [empty row]
    21/08/10 14:41:59 INFO codegen.CodeGenerator: Code generated in 9.418721 ms
    21/08/10 14:41:59 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1441 bytes result sent to driver
    21/08/10 14:41:59 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 256 ms on localhost (executor driver) (1/1)
    21/08/10 14:41:59 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: ResultStage 0 (show at SparkSQLTest1.java:17) finished in 0.366 s
    21/08/10 14:41:59 INFO scheduler.DAGScheduler: Job 0 finished: show at SparkSQLTest1.java:17, took 0.425680 s
    21/08/10 14:41:59 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml
    21/08/10 14:41:59 INFO hive.HiveUtils: Initializing HiveMetastoreConnection version 2.1 using Spark classes.
    21/08/10 14:42:00 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml
    21/08/10 14:42:00 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/1be0271b-776b-42c6-9660-d8f73c14ef2f
    21/08/10 14:42:00 INFO session.SessionState: Created local directory: /tmp/root/1be0271b-776b-42c6-9660-d8f73c14ef2f
    21/08/10 14:42:00 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/1be0271b-776b-42c6-9660-d8f73c14ef2f/_tmp_space.db
    21/08/10 14:42:00 INFO client.HiveClientImpl: Warehouse location for Hive client (version 2.1.1) is /user/hive/warehouse
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 15
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 16
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 23
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 20
    21/08/10 14:42:00 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on hp2:40459 in memory (size: 3.9 KB, free: 366.3 MB)
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 12
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 6
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 22
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 10
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 5
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 19
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 28
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 26
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 27
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 7
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 11
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 17
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 24
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 29
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 30
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 18
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 25
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 14
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 13
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 8
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 21
    21/08/10 14:42:00 INFO spark.ContextCleaner: Cleaned accumulator 9
    21/08/10 14:42:00 INFO hive.metastore: HMS client filtering is enabled.
    21/08/10 14:42:00 INFO hive.metastore: Trying to connect to metastore with URI thrift://hp1:9083
    21/08/10 14:42:00 INFO hive.metastore: Opened a connection to metastore, current connections: 1
    21/08/10 14:42:00 INFO hive.metastore: Connected to metastore.
    21/08/10 14:42:01 INFO metadata.Hive: Registering function getdegree myUdf.getDegree
    +------------------+
    |             value|
    +------------------+
    |440528*******63016|
    |350525*******60813|
    |120102*******10789|
    |452123*******30416|
    |440301*******22322|
    |441421*******54614|
    |440301*******55416|
    |232721*******40630|
    |362204*******88412|
    |430281*******91015|
    |420117*******88355|
    +------------------+
    
    21/08/10 14:42:01 INFO spark.SparkContext: Invoking stop() from shutdown hook
    21/08/10 14:42:01 INFO server.AbstractConnector: Stopped Spark@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    21/08/10 14:42:01 INFO ui.SparkUI: Stopped Spark web UI at http://hp2:4040
    21/08/10 14:42:01 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    21/08/10 14:42:01 INFO memory.MemoryStore: MemoryStore cleared
    21/08/10 14:42:01 INFO storage.BlockManager: BlockManager stopped
    21/08/10 14:42:01 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    21/08/10 14:42:01 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    21/08/10 14:42:01 INFO spark.SparkContext: Successfully stopped SparkContext
    21/08/10 14:42:01 INFO util.ShutdownHookManager: Shutdown hook called
    21/08/10 14:42:01 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-45aa512d-c4a1-44f4-8152-2273f1e78bda
    21/08/10 14:42:01 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-7bdf3b0f-d374-4b47-b78f-bdea057849cf
    [root@hp2 javaspark]# 
    

    3.1.2 以编程方式运行SQL查询

    代码:

    package org.example;
    
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    public class SparkSQLTest2 {
        public static void main(String[] args){
            SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkSQLTest2")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
            Dataset<Row> sqlDF = spark.sql("SELECT * FROM test.ods_fact_sale limit 100");
            sqlDF.show();
            
           spark.stop();
        }
    
    }
    
    

    测试记录:

    [14:49:40] [root@hp2 javaspark]# spark-submit \
    [14:49:40] >   --class org.example.SparkSQLTest2 \
    [14:49:40] >   --master local[2] \
    [14:49:41] >   /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
    [14:49:42] 21/08/10 14:49:46 INFO spark.SparkContext: Running Spark version 2.4.0-cdh6.3.1
    [14:49:42] 21/08/10 14:49:46 INFO logging.DriverLogger: Added a local log appender at: /tmp/spark-7c425e78-967b-4dbc-9552-1917c8d38b2f/__driver_logs__/driver.log
    [14:49:42] 21/08/10 14:49:46 INFO spark.SparkContext: Submitted application: SparkSQLTest2
    [14:49:42] 21/08/10 14:49:46 INFO spark.SecurityManager: Changing view acls to: root
    [14:49:42] 21/08/10 14:49:46 INFO spark.SecurityManager: Changing modify acls to: root
    [14:49:42] 21/08/10 14:49:46 INFO spark.SecurityManager: Changing view acls groups to: 
    [14:49:42] 21/08/10 14:49:46 INFO spark.SecurityManager: Changing modify acls groups to: 
    ....snip....
    21/08/10 14:49:56 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 237) in 53 ms on localhost (executor driver) (1/1)
    21/08/10 14:49:56 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
    21/08/10 14:49:56 INFO scheduler.DAGScheduler: ResultStage 1 (show at SparkSQLTest2.java:16) finished in 0.064 s
    21/08/10 14:49:56 INFO scheduler.DAGScheduler: Job 0 finished: show at SparkSQLTest2.java:16, took 3.417211 s
    +---+--------------------+---------+---------+
    | id|           sale_date|prod_name|sale_nums|
    +---+--------------------+---------+---------+
    |  1|2011-08-16 00:00:...|    PROD4|       28|
    |  2|2011-11-06 00:00:...|    PROD6|       19|
    |  3|2011-04-25 00:00:...|    PROD8|       29|
    |  4|2011-09-12 00:00:...|    PROD2|       88|
    |  5|2011-05-15 00:00:...|    PROD5|       76|
    |  6|2011-02-23 00:00:...|    PROD6|       64|
    |  7|2012-09-26 00:00:...|    PROD2|       38|
    |  8|2012-02-14 00:00:...|    PROD6|       45|
    |  9|2010-04-22 00:00:...|    PROD8|       57|
    | 10|2010-10-31 00:00:...|    PROD5|       65|
    | 11|2010-10-24 00:00:...|    PROD2|       33|
    | 12|2011-02-11 00:00:...|    PROD9|       27|
    | 13|2012-07-10 00:00:...|    PROD8|       48|
    | 14|2011-02-23 00:00:...|    PROD6|       46|
    | 15|2010-08-10 00:00:...|    PROD4|       50|
    | 16|2011-05-02 00:00:...|    PROD6|       22|
    | 17|2012-07-20 00:00:...|    PROD2|       56|
    | 18|2012-07-12 00:00:...|    PROD9|       57|
    | 19|2011-11-18 00:00:...|    PROD6|       58|
    | 20|2010-04-22 00:00:...|    PROD4|        7|
    +---+--------------------+---------+---------+
    only showing top 20 rows
    
    21/08/10 14:49:56 INFO spark.SparkContext: Invoking stop() from shutdown hook
    21/08/10 14:49:56 INFO server.AbstractConnector: Stopped Spark@51768776{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    21/08/10 14:49:56 INFO ui.SparkUI: Stopped Spark web UI at http://hp2:4040
    21/08/10 14:49:56 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    21/08/10 14:49:56 INFO memory.MemoryStore: MemoryStore cleared
    21/08/10 14:49:56 INFO storage.BlockManager: BlockManager stopped
    21/08/10 14:49:56 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    21/08/10 14:49:56 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    21/08/10 14:49:56 INFO spark.SparkContext: Successfully stopped SparkContext
    21/08/10 14:49:56 INFO util.ShutdownHookManager: Shutdown hook called
    21/08/10 14:49:56 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-0d720459-d9a4-4bb5-b9ce-521f524c87c8
    21/08/10 14:49:56 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-7c425e78-967b-4dbc-9552-1917c8d38b2f
    [root@hp2 javaspark]# 
    

    3.2 Spark SQL操作MySQL数据库

    Spark SQL不仅可以操作Hive数据库,也可以操作远程的MySQL数据库

    代码:

    package org.example;
    
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    public class SparkSQLTest3 {
        public static void main(String[] args){
            SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkSQLTest3")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
            Dataset<Row> jdbcDF = spark.read()
                    .format("jdbc")
                    .option("url", "jdbc:mysql://10.31.1.123:3306/test")
                    .option("dbtable", "(SELECT * FROM EMP) tmp")
                    .option("user", "root")
                    .option("password", "abc123")
                    .load();
    
            jdbcDF.printSchema();
            jdbcDF.show();
    
            spark.stop();
        }
    
    }
    
    

    测试记录:

    [root@hp2 javaspark]# spark-submit \
    >   --class org.example.SparkSQLTest3 \
    >   --master local[2] \
    >   /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
    21/08/10 15:04:01 INFO spark.SparkContext: Running Spark version 2.4.0-cdh6.3.1
    21/08/10 15:04:01 INFO logging.DriverLogger: Added a local log appender at: /tmp/spark-5fdcedf0-eeb8-4089-961f-42a1048761c6/__driver_logs__/driver.log
    21/08/10 15:04:01 INFO spark.SparkContext: Submitted application: SparkSQLTest3
    21/08/10 15:04:01 INFO spark.SecurityManager: Changing view acls to: root
    21/08/10 15:04:01 INFO spark.SecurityManager: Changing modify acls to: root
    21/08/10 15:04:01 INFO spark.SecurityManager: Changing view acls groups to: 
    21/08/10 15:04:01 INFO spark.SecurityManager: Changing modify acls groups to: 
    21/08/10 15:04:01 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    21/08/10 15:04:02 INFO util.Utils: Successfully started service 'sparkDriver' on port 44266.
    21/08/10 15:04:02 INFO spark.SparkEnv: Registering MapOutputTracker
    21/08/10 15:04:02 INFO spark.SparkEnv: Registering BlockManagerMaster
    21/08/10 15:04:02 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    21/08/10 15:04:02 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    21/08/10 15:04:02 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-1781c08b-9d73-427d-9bc0-2e7e092a2f16
    21/08/10 15:04:02 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
    21/08/10 15:04:02 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    21/08/10 15:04:02 INFO util.log: Logging initialized @1556ms
    21/08/10 15:04:02 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-09-05T05:11:46+08:00, git hash: 3ce520221d0240229c862b122d2b06c12a625732
    21/08/10 15:04:02 INFO server.Server: Started @1633ms
    21/08/10 15:04:02 INFO server.AbstractConnector: Started ServerConnector@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    21/08/10 15:04:02 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@173b9122{/jobs,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@649f2009{/jobs/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@14bb2297{/jobs/job,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a15b789{/jobs/job/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57f791c6{/stages,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51650883{/stages/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c4f9535{/stages/stage,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@499b2a5c{/stages/stage/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@596df867{/stages/pool,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c1fca1e{/stages/pool/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@241a53ef{/storage,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@344344fa{/storage/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2db2cd5{/storage/rdd,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@70e659aa{/storage/rdd/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@615f972{/environment,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@285f09de{/environment/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@73393584{/executors,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@31500940{/executors/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1827a871{/executors/threadDump,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@48e64352{/executors/threadDump/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7249dadf{/static,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5be82d43{/,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@600b0b7{/api,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@473b3b7a{/jobs/job/kill,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1734f68{/stages/stage/kill,null,AVAILABLE,@Spark}
    21/08/10 15:04:02 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hp2:4040
    21/08/10 15:04:02 INFO spark.SparkContext: Added JAR file:/home/javaspark/SparkStudy-1.0-SNAPSHOT.jar at spark://hp2:44266/jars/SparkStudy-1.0-SNAPSHOT.jar with timestamp 1628579042508
    21/08/10 15:04:02 INFO executor.Executor: Starting executor ID driver on host localhost
    21/08/10 15:04:02 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43617.
    21/08/10 15:04:02 INFO netty.NettyBlockTransferService: Server created on hp2:43617
    21/08/10 15:04:02 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    21/08/10 15:04:02 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hp2, 43617, None)
    21/08/10 15:04:02 INFO storage.BlockManagerMasterEndpoint: Registering block manager hp2:43617 with 366.3 MB RAM, BlockManagerId(driver, hp2, 43617, None)
    21/08/10 15:04:02 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hp2, 43617, None)
    21/08/10 15:04:02 INFO storage.BlockManager: external shuffle service port = 7337
    21/08/10 15:04:02 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, hp2, 43617, None)
    21/08/10 15:04:02 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@245a060f{/metrics/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:03 INFO scheduler.EventLoggingListener: Logging events to hdfs://nameservice1/user/spark/applicationHistory/local-1628579042554
    21/08/10 15:04:03 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener
    21/08/10 15:04:03 INFO logging.DriverLogger$DfsAsyncWriter: Started driver log file sync to: /user/spark/driverLogs/local-1628579042554_driver.log
    21/08/10 15:04:03 INFO internal.SharedState: loading hive config file: file:/etc/hive/conf.cloudera.hive/hive-site.xml
    21/08/10 15:04:03 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
    21/08/10 15:04:03 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
    21/08/10 15:04:03 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5922d3e9{/SQL,null,AVAILABLE,@Spark}
    21/08/10 15:04:03 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7d57dbb5{/SQL/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:03 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5f95f1e1{/SQL/execution,null,AVAILABLE,@Spark}
    21/08/10 15:04:03 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@672a1c62{/SQL/execution/json,null,AVAILABLE,@Spark}
    21/08/10 15:04:03 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6015a4a5{/static/sql,null,AVAILABLE,@Spark}
    21/08/10 15:04:04 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    root
     |-- empno: integer (nullable = true)
     |-- ename: string (nullable = true)
     |-- job: string (nullable = true)
     |-- mgr: integer (nullable = true)
     |-- hiredate: date (nullable = true)
     |-- sal: decimal(7,2) (nullable = true)
     |-- comm: decimal(7,2) (nullable = true)
     |-- deptno: integer (nullable = true)
    
    21/08/10 15:04:06 INFO codegen.CodeGenerator: Code generated in 194.297923 ms
    21/08/10 15:04:06 INFO codegen.CodeGenerator: Code generated in 32.1523 ms
    21/08/10 15:04:06 INFO spark.SparkContext: Starting job: show at SparkSQLTest3.java:24
    21/08/10 15:04:06 INFO scheduler.DAGScheduler: Got job 0 (show at SparkSQLTest3.java:24) with 1 output partitions
    21/08/10 15:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (show at SparkSQLTest3.java:24)
    21/08/10 15:04:06 INFO scheduler.DAGScheduler: Parents of final stage: List()
    21/08/10 15:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
    21/08/10 15:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at show at SparkSQLTest3.java:24), which has no missing parents
    21/08/10 15:04:06 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 10.9 KB, free 366.3 MB)
    21/08/10 15:04:06 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.1 KB, free 366.3 MB)
    21/08/10 15:04:06 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hp2:43617 (size: 5.1 KB, free: 366.3 MB)
    21/08/10 15:04:06 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1164
    21/08/10 15:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at show at SparkSQLTest3.java:24) (first 15 tasks are for partitions Vector(0))
    21/08/10 15:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    21/08/10 15:04:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7690 bytes)
    21/08/10 15:04:07 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
    21/08/10 15:04:07 INFO executor.Executor: Fetching spark://hp2:44266/jars/SparkStudy-1.0-SNAPSHOT.jar with timestamp 1628579042508
    21/08/10 15:04:07 INFO client.TransportClientFactory: Successfully created connection to hp2/10.31.1.124:44266 after 37 ms (0 ms spent in bootstraps)
    21/08/10 15:04:07 INFO util.Utils: Fetching spark://hp2:44266/jars/SparkStudy-1.0-SNAPSHOT.jar to /tmp/spark-5fdcedf0-eeb8-4089-961f-42a1048761c6/userFiles-651b34b1-fee3-4d6b-b332-9246d1f6e35b/fetchFileTemp859077269082454607.tmp
    21/08/10 15:04:07 INFO executor.Executor: Adding file:/tmp/spark-5fdcedf0-eeb8-4089-961f-42a1048761c6/userFiles-651b34b1-fee3-4d6b-b332-9246d1f6e35b/SparkStudy-1.0-SNAPSHOT.jar to class loader
    21/08/10 15:04:07 INFO jdbc.JDBCRDD: closed connection
    21/08/10 15:04:07 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1884 bytes result sent to driver
    21/08/10 15:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 252 ms on localhost (executor driver) (1/1)
    21/08/10 15:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    21/08/10 15:04:07 INFO scheduler.DAGScheduler: ResultStage 0 (show at SparkSQLTest3.java:24) finished in 0.577 s
    21/08/10 15:04:07 INFO scheduler.DAGScheduler: Job 0 finished: show at SparkSQLTest3.java:24, took 0.632211 s
    21/08/10 15:04:07 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml
    21/08/10 15:04:07 INFO hive.HiveUtils: Initializing HiveMetastoreConnection version 2.1 using Spark classes.
    21/08/10 15:04:07 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml
    21/08/10 15:04:07 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/75efdbe5-01fb-4f39-b04a-82982a3b5298
    21/08/10 15:04:07 INFO session.SessionState: Created local directory: /tmp/root/75efdbe5-01fb-4f39-b04a-82982a3b5298
    21/08/10 15:04:07 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/75efdbe5-01fb-4f39-b04a-82982a3b5298/_tmp_space.db
    21/08/10 15:04:07 INFO client.HiveClientImpl: Warehouse location for Hive client (version 2.1.1) is /user/hive/warehouse
    21/08/10 15:04:08 INFO hive.metastore: HMS client filtering is enabled.
    21/08/10 15:04:08 INFO hive.metastore: Trying to connect to metastore with URI thrift://hp1:9083
    21/08/10 15:04:08 INFO hive.metastore: Opened a connection to metastore, current connections: 1
    21/08/10 15:04:08 INFO hive.metastore: Connected to metastore.
    21/08/10 15:04:08 INFO metadata.Hive: Registering function getdegree myUdf.getDegree
    +-----+------+---------+----+----------+-------+-------+------+
    |empno| ename|      job| mgr|  hiredate|    sal|   comm|deptno|
    +-----+------+---------+----+----------+-------+-------+------+
    | 7369| SMITH|    CLERK|7902|1980-12-17| 800.00|   null|    20|
    | 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00|    30|
    | 7521|  WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00|    30|
    | 7566| JONES|  MANAGER|7839|1981-04-02|2975.00|   null|    20|
    | 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00|    30|
    | 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.00|   null|    30|
    | 7782| CLARK|  MANAGER|7839|1981-06-09|2450.00|   null|    10|
    | 7788| SCOTT|  ANALYST|7566|1987-06-13|3000.00|   null|    20|
    | 7839|  KING|PRESIDENT|null|1981-11-17|5000.00|   null|    10|
    | 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00|   0.00|    30|
    | 7876| ADAMS|    CLERK|7788|1987-06-13|1100.00|   null|    20|
    | 7900| JAMES|    CLERK|7698|1981-12-03| 950.00|   null|    30|
    | 7902|  FORD|  ANALYST|7566|1981-12-03|3000.00|   null|    20|
    | 7934|MILLER|    CLERK|7782|1982-01-23|1300.00|   null|    10|
    +-----+------+---------+----+----------+-------+-------+------+
    
    21/08/10 15:04:08 INFO server.AbstractConnector: Stopped Spark@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    21/08/10 15:04:08 INFO ui.SparkUI: Stopped Spark web UI at http://hp2:4040
    21/08/10 15:04:08 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    21/08/10 15:04:08 INFO memory.MemoryStore: MemoryStore cleared
    21/08/10 15:04:08 INFO storage.BlockManager: BlockManager stopped
    21/08/10 15:04:08 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    21/08/10 15:04:08 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    21/08/10 15:04:08 INFO spark.SparkContext: Successfully stopped SparkContext
    21/08/10 15:04:08 INFO util.ShutdownHookManager: Shutdown hook called
    21/08/10 15:04:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-5fdcedf0-eeb8-4089-961f-42a1048761c6
    21/08/10 15:04:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6b5f511b-1bb4-420c-b26f-737dc764fea1
    [root@hp2 javaspark]# 
    

    参考:

    1.http://spark.apache.org/docs/2.4.2/sql-getting-started.html
    2.https://www.jianshu.com/p/ad6dc9467a6b
    3.https://blog.csdn.net/u011412768/article/details/93426353
    4.https://blog.csdn.net/luoganttcc/article/details/88791460

    更多相关内容
  • 本文首先介绍了Spark的基础知识以及RDD和DataFrame这些核心概念,然后演示了如何下载Spark二进制版本并搭建一个本地单机模式下的开发环境,最后通过Python语言来编写第一个Spark程序。

    系列文章目录

    1. 手把手带你玩转Spark机器学习-专栏介绍
    2. 手把手带你玩转Spark机器学习-问题汇总
    3. 手把手带你玩转Spark机器学习-Spark的安装及使用
    4. 手把手带你玩转Spark机器学习-使用Spark进行数据处理和数据转换
    5. 手把手带你玩转Spark机器学习-使用Spark构建分类模型
    6. 手把手带你玩转Spark机器学习-使用Spark构建回归模型
    7. 手把手带你玩转Spark机器学习-使用Spark构建聚类模型


    前言

    本文主要介绍了Apache的基础知识及Spark环境的搭建和运行。


    一、Apache Spark的基础知识

    在这里插入图片描述

    Apache Spark TM

    几年前,Spark被其创造者定义成:A fast and general engine for large-scale data processing(用于大规模数据处理的快速通用引擎)。

    其中"Fast"意味着它比以前的大数据处理方法更快(例如Hadoop的Mapreduce)。更快的秘诀在于Spark在内存(RAM)上运行,这使得处理速度比在磁盘上快的多。

    "General"部分意味着它可以用于多种用途,例如运行分布式SQL、创建数据管道、将数据存储到数据库、运行机器学习算法、处理图形、数据流等等。现在随着Apache Spark项目的发展,Spark几乎可以做数据科学或机器学习工作流程中的所有事情,我们也可以将Spark框架单独应用到深度学习这样的端到端项目中。

    “Large-scale"意味着这是一个可以完美处理大量数据的框架,我们过去称之为"大数据”。

    RDD

    在这里插入图片描述

    弹性分布式数据集(RDD)

    Apache Spark的核心抽象和起源是弹性分布式数据集(RDD)。

    RDD是可以并行操作且具有一定容错性的元素集合。你可以在驱动程序中并行创建现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。
    在这里插入图片描述

    RDD数据处理流程

    关于Spark其中有个非常重要的点是所有的Transformation操作都是不立即生效的,换句话说,Spark不会立即计算它的结果。相反,Spark只是记录下来对某些基础数据(例如文件)的Transformation操作。这些Transformation操作只会在Action需要将结果返回给驱动程序的时候才进行计算操作。

    默认情况下,每个Transformation后的 RDD 可能会在模每次对其运行操作时重新计算。但是,你也可以使用Persist(或Cache)方法将 RDD 持久化在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问它。Spark还支持在磁盘上持久化 RDD 或跨多个节点复制的操作。

    DataFrame

    在这里插入图片描述

    DataFrame

    从 Spark 2.0.0 开始,DataFrame 是一个被组织成带有字段名的数据集【表格数据】。它在概念上等同于关系数据库中的表或 R/Python 中的 DataFrame,但在底层进行了更丰富的优化。

    如下图所示,DataFrames 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有 RDD。
    在这里插入图片描述

    DataFrame数据来源

    简而言之,Dataframes API 是 Spark creators 中的一种方法,可让你在框架中轻松使用数据。它们与 Pandas Dataframes 或 R Dataframes 非常相似,但有几个优点。第一个当然是它们可以被缓存在一个集群的内存里,因此它们可以处理大量数据,第二个是这种数据结构是经过特殊优化的,可以适配分布式环境。

    在Spark发展起初,将 Spark 与 Scala 或 Java 一起使用要快得多。随着python语言越来越普及以及Spark整个生态的发展,使用 DF API,这不再是问题,现在我们可以在 R、Python、Scala 或 Java 中使用Spark获得相同的性能。
    在这里插入图片描述

    不同语言在Spark中操作数据的性能对比

    负责此优化的是 Catalyst。你可以把它想象成一个“巫师”,它会接受你的查询( 类似 SQL 的查询,它们也会被并行化)和操作并针对分布式计算进行优化。
    在这里插入图片描述

    Caltayst数据处理流程

    如上图所示,这个过程并不是那么简单,但作为程序员的你甚至不会注意到它,只是它一直在那里帮助你。 在 Spark 3.0 中,新增了一个“自适应查询执行”(AQE)的东西,它将根据在查询执行过程中收集到的统计信息重新优化和调整查询计划。这将对性能产生巨大影响,例如,假设我们正在运行查询

    SELECT max(i) FROM table GROUP BY column
    

    如果没有AQE,Spark将启动五个任务来进行最终数据的聚合:
    在这里插入图片描述

    无AQE的分区合并流程

    但是使用 AQE,Spark 会将上图中间的三个小分区合并为一个,因此,最终聚合现在只需要执行三个任务而不是五个:

    在这里插入图片描述

    有AQE的分区合并流程

    二、安装及使用Spark

    注意:

    • "$"符号表示在shell中运行(但是不要复制该符号)
    • “>>>”符号表示 Python shell(不要复制该符号)

    Spark能通过内置的单机集群调度器在本地运行。此时,所有的Spark进程运行在同一个java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型设计、开发、调试和测试。同样它也适用于在单机上进行多核并行计算的实际场景。

    Spark的本地模式和集群模式完全兼容,本地编写和测试过的代码仅需要增加少许设置便能在集群上运行。

    1. 下载预编译包

    首先第一步访问Spark项目的下载页面:https://spark.apache.org/downloads.html。一版选择最新的Spark版本包
    在这里插入图片描述

    Spark下载页面

    如上图所示,各个版本的版本包及源代码的github地址可以从Spark项目的下载页面找到。为了访问HDFS(Hadoop分布式文件系统)以及标准或定制的Hadoop输入源,Spark的编译版本要与Hadoop的版本对应。如上图所示,上面下载页面提供了针对Hadoop2.7的预编译版本。除非你想构建针对特定版本hadoop的Spark,否则还是建议你通过下载页面的推荐链接下载预编译的二进制包。在安装Spark之前,还要确保电脑上已经安装好了Java 8+以及anaconda。例如作者选了一台linux服务器,下载了spark-3.2.1预编译包及对应的hadoop3.3的预编译包,Java版本java1.8.0_251,python3.7

    1. 解压并创建软链

    下载完上述版本的包后,解压缩并将其移动到你的 /opt 文件夹下:

    $ tar -xzf spark-3.2.1-bin-hadoop3.3.tgz
    $ mv spark-3.2.1-bin-hadoop3.3 /opt/spark-3.2.1-bin-hadoop3.3
    

    创建软链

    $ ln -s /opt/spark-3.2.1-bin-hadoop3.3 /opt/spark̀
    
    1. 添加环境变量

    最后,告诉你的 bash(或 zsh 等)在哪里可以找到 spark。为此,通过在 ~/.bashrc(或 ~/.zshrc)文件中添加以下行来配置 $PATH 变量:

    export SPARK_HOME=/opt/spark
    export PATH=$SPARK_HOME/bin:$PATH
    
    1. 安装pysaprk

    这边使用清华源,下载快点

    pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
    
    1. 在IDE中使用PySpark

    有时你需要一个完整的 IDE 来创建更复杂的代码,而 PySpark 默认不在 sys.path 上,但这并不意味着它不能用作常规库。你可以通过在运行时将 PySpark 添加到 sys.path 来解决此问题。findspark可以做到这点,可以输入如下命令:

    $ pip install findspark
    

    然后在你的 IDE(我使用的PyCharm)上初始化 PySpark,只需在代码中输入:

    import findspark
    findspark.init()
    

    三、Spark编程模型及Spark python编程入门

    SparkContext类与SparkConf类

    任何Spark程序的编写都是从SparkContext开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如说主节点的URL)。

    初始化后,我们便可以用SparkContext对象所包含的各种方法来创建或者操作分布式数据集和共享变量。Spark shell可以自动完成上述初始化:
    在这里插入图片描述

    若是用python代码来实现的话。可以参考下面的代码:

    import findspark
    findspark.init()
    
    from pyspark import SparkContext, SparkConf
    conf = SparkConf() \
        .setAppName('First Application') \
        .setMaster("local[4]")
    sc = SparkContext(conf=conf)
    

    上述代码会创建一个四线程的SparkContext对象,并将其相应的任务命名为“First Application”。

    编写第一个Spark python应用程序:计算pi

    如下所示,我们编写了一个计算Pi的应用程序:

    import findspark
    findspark.init()
    import random
    from pyspark import SparkContext
    sc = SparkContext(appName="EstimatePi")
    def inside(p):
        x, y = random.random(), random.random()
        return x*x + y*y < 1
    NUM_SAMPLES = 1000000
    count = sc.parallelize(range(0, NUM_SAMPLES)) \
                 .filter(inside).count()
    print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
    sc.stop()
    

    在这里插入图片描述

    控制台输出 : Pi is roughly 3.142100

    总结

    本文首先介绍了Spark的基础知识以及RDD和DataFrame这些核心概念,然后演示了如何下载Spark二进制版本并搭建一个本地单机模式下的开发环境,最后通过Python语言来编写第一个Spark程序。

    展开全文
  • 文章目录一、Spark on Hive 和 Hive on Spark的区别1)Spark on Hive2)Hive on Spark(本章实现)二、Hive on Spark实现1)编译hive1、下载hive2、编译hive3、解压hive4、下载spark5、打包spark jar包并上传到HDFS6...

    一、Spark on Hive 和 Hive on Spark的区别

    1)Spark on Hive

    Spark on Hive 是Hive只作为存储角色,Spark负责sql解析优化,执行。这里可以理解为Spark 通过Spark SQL 使用Hive 语句操作Hive表 ,底层运行的还是 Spark RDD。具体步骤如下:

    • 通过SparkSQL,加载Hive的配置文件,获取到Hive的元数据信息;
    • 获取到Hive的元数据信息之后可以拿到Hive表的数据;
    • 通过SparkSQL来操作Hive表中的数据。

    具体实现在我之前的博文中已经讲过,在这里就不再重复了,实现很简单,可以参考:大数据Hadoop之——Spark SQL+Spark Streaming

    【总结】Spark使用Hive来提供表的metadata信息。

    2)Hive on Spark(本章实现)

    Hive on Spark是Hive既作为存储又负责sql的解析优化,Spark负责执行。这里Hive的执行引擎变成了Spark,不再是MR,这个要实现比Spark on Hive麻烦很多, 必须重新编译你的spark和导入jar包,不过目前大部分使用的确实是spark on hive

    • Hive默认使用MapReduce作为执行引擎,即Hive on MapReduce。实际上,Hive还可以使用Tez和Spark作为其执行引擎,分别为Hive on Tez和Hive on Spark。由于MapReduce中间计算均需要写入磁盘,而Spark是放在内存中,所以总体来讲Spark比MapReduce快很多。因此,Hive on Spark也会比Hive on MapReduce快。由于Hive on MapReduce的缺陷,所以企业里基本上很少使用了。

    【总结】hive on spark大体与spark on hive结构类似,只是SQL引擎不同,但是计算引擎都是spark

    参考文档:

    二、Hive on Spark实现

    编译Spark源码

    要使用Hive on Spark,所用的Spark版本必须不包含Hive的相关jar包,hive on spark 的官网上说“Note that you must have a version of Spark which does not include the Hive jars”。在spark官网下载的编译的Spark都是有集成Hive的,因此需要自己下载源码来编译,并且编译的时候不指定Hive。最终版本:Hadoop3.3.1+Spark2.3.0+Hive3.1.2,其实主要是spark和hive版本对应上就行,hadoop版本好像没那么严格,所以这里hadoop版本我使用当前最新版本,但是还是建议使用hive的pom.xml配置文件里配置的版本。

    1)先下载hive源码包查看spark版本

    $ cd /opt/bigdata/hadoop/software
    $ wget http://archive.apache.org/dist/hive/hive-3.1.2/apache-hive-3.1.2-src.tar.gz
    $ tar -zxvf apache-hive-3.1.2-src.tar.gz
    $ egrep 'spark.version|hadoop.version' apache-hive-3.1.2-src/pom.xml
    

    在这里插入图片描述

    2)下载spark

    下载地址:https://archive.apache.org/dist/spark/spark-2.3.0/
    在这里插入图片描述

    $ cd /opt/bigdata/hadoop/software
    # 下载
    $ wget http://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0.tgz
    

    3)解压编译

    # 解压
    $ tar -zxvf spark-2.3.0.tgz
    $ cd spark-2.3.0
    # 开始编译,注意hadoop版本
    $ ./dev/make-distribution.sh --name without-hive --tgz -Pyarn -Phadoop-2.7 -Dhadoop.version=3.3.1 -Pparquet-provided -Porc-provided -Phadoop-provided
    # 或者(这里不执行下面这句,因为跟上面等价)
    $ ./dev/make-distribution.sh --name "without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.7,parquet-provided,orc-provided"
    命令解释:
    -Phadoop-3.3 \  -Dhadoop.version=3.3.1 \ ***指定hadoop版本为3.3.1
    --name without-hive hive 是编译文件的名字参数
    --tgz ***压缩成tgz格式
    -Pyarn 是支持yarn
    -Phadoop-2.7 是支持的hadoop版本,一开始使用的是3.3后来提示hadoop3.3不存在,只好改成2.7,编译成功
    -Dhadoop.version=3.3.1 运行环境
    

    但是发现编译卡住了,原来编译会自动下载maven和scala,存放在build目录下,如图:
    在这里插入图片描述

    自动下载完maven和scala,就开始编译了,编译耗时还是比较久,慢慢等待编译结束吧。

    编译花了半个小时左右,终于编译完成了。编译的时间太漫长,下面我也会把我编译好的spark包放在网盘上供大家下载使用。
    在这里插入图片描述
    在这里插入图片描述
    在当前目录下就有编译好的spark包

    $ ll
    

    在这里插入图片描述

    4)解压

    $ tar -zxvf spark-2.3.0-bin-without-hive.tgz -C /opt/bigdata/hadoop/server/
    $ cd /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive
    $ ll
    

    在这里插入图片描述

    5)把spark jar包上传到HDFS

    【温馨提示】hive-site.xml文件里配置需要。

    $ cd /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive/
    ### 创建日志存放目录
    $ hadoop fs -mkdir -p hdfs://hadoop-node1:8082/tmp/spark
    ### 在hdfs上创建存放jar包目录
    $ hadoop fs -mkdir -p /spark/spark-2.4.5-jars
    ## 上传jars到HDFS
    $ hadoop fs -put ./jars/* /spark/spark-2.4.5-jars/
    

    如果使用了打包好的jar包,hive操作时会报如下错误:

    Failed to execute spark task, with exception ‘org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session c8c46c14-4d2a-4f7e-9a12-0cd62bf097db)’
    FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session c8c46c14-4d2a-4f7e-9a12-0cd62bf097db

    6)打包spark jar包并上传到HDFS

    【温馨提示】spark-default.xml文件需要配置打包好的jar包,spark-submit会调用。

    $ cd /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive/
    $ jar cv0f spark2.3.0-without-hive-libs.jar -C ./jars/ .
    $ ll
    ### 在hdfs上创建存放jar包目录
    $ hadoop fs -mkdir -p /spark/jars
    ## 上传jars到HDFS
    $ hadoop fs -put spark2.3.0-without-hive-libs.jar /spark/jars/
    

    如果不打包,则会报如下错误:

    Exception in thread “main” java.io.FileNotFoundException: File does not exist: hdfs://hadoop-node1:8082/spark/spark-2.3.0-jars/*.jar
    at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1756)
    at org.apache.hadoop.hdfs.DistributedFileSystem 29. d o C a l l ( D i s t r i b u t e d F i l e S y s t e m . j a v a : 1749 ) a t o r g . a p a c h e . h a d o o p . f s . F i l e S y s t e m L i n k R e s o l v e r . r e s o l v e ( F i l e S y s t e m L i n k R e s o l v e r . j a v a : 81 ) a t o r g . a p a c h e . h a d o o p . h d f s . D i s t r i b u t e d F i l e S y s t e m . g e t F i l e S t a t u s ( D i s t r i b u t e d F i l e S y s t e m . j a v a : 1764 ) a t o r g . a p a c h e . s p a r k . d e p l o y . y a r n . C l i e n t D i s t r i b u t e d C a c h e M a n a g e r 29.doCall(DistributedFileSystem.java:1749) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1764) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager 29.doCall(DistributedFileSystem.java:1749)atorg.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)atorg.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1764)atorg.apache.spark.deploy.yarn.ClientDistributedCacheManager$anonfun 1. a p p l y ( C l i e n t D i s t r i b u t e d C a c h e M a n a g e r . s c a l a : 71 ) a t o r g . a p a c h e . s p a r k . d e p l o y . y a r n . C l i e n t D i s t r i b u t e d C a c h e M a n a g e r 1.apply(ClientDistributedCacheManager.scala:71) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager 1.apply(ClientDistributedCacheManager.scala:71)atorg.apache.spark.deploy.yarn.ClientDistributedCacheManager$anonfun 1. a p p l y ( C l i e n t D i s t r i b u t e d C a c h e M a n a g e r . s c a l a : 71 ) a t s c a l a . c o l l e c t i o n . M a p L i k e 1.apply(ClientDistributedCacheManager.scala:71) at scala.collection.MapLike 1.apply(ClientDistributedCacheManager.scala:71)atscala.collection.MapLikeclass.getOrElse(MapLike.scala:128)
    at scala.collection.AbstractMap.getOrElse(Map.scala:59)
    at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:71)
    at org.apache.spark.deploy.yarn.Client.org a p a c h e apache apachespark d e p l o y deploy deployyarn C l i e n t Client Client$distribute 1 ( C l i e n t . s c a l a : 480 ) a t o r g . a p a c h e . s p a r k . d e p l o y . y a r n . C l i e n t . p r e p a r e L o c a l R e s o u r c e s ( C l i e n t . s c a l a : 517 ) a t o r g . a p a c h e . s p a r k . d e p l o y . y a r n . C l i e n t . c r e a t e C o n t a i n e r L a u n c h C o n t e x t ( C l i e n t . s c a l a : 863 ) a t o r g . a p a c h e . s p a r k . d e p l o y . y a r n . C l i e n t . s u b m i t A p p l i c a t i o n ( C l i e n t . s c a l a : 169 ) a t o r g . a p a c h e . s p a r k . s c h e d u l e r . c l u s t e r . Y a r n C l i e n t S c h e d u l e r B a c k e n d . s t a r t ( Y a r n C l i e n t S c h e d u l e r B a c k e n d . s c a l a : 57 ) a t o r g . a p a c h e . s p a r k . s c h e d u l e r . T a s k S c h e d u l e r I m p l . s t a r t ( T a s k S c h e d u l e r I m p l . s c a l a : 164 ) a t o r g . a p a c h e . s p a r k . S p a r k C o n t e x t . < i n i t > ( S p a r k C o n t e x t . s c a l a : 500 ) a t o r g . a p a c h e . s p a r k . S p a r k C o n t e x t 1(Client.scala:480) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:517) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:863) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:169) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164) at org.apache.spark.SparkContext.<init>(SparkContext.scala:500) at org.apache.spark.SparkContext 1(Client.scala:480)atorg.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:517)atorg.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:863)atorg.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:169)atorg.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)atorg.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)atorg.apache.spark.SparkContext.<init>(SparkContext.scala:500)atorg.apache.spark.SparkContext.getOrCreate(SparkContext.scala:2486)
    at org.apache.spark.sql.SparkSession B u i l d e r Builder Builder$anonfun 7. a p p l y ( S p a r k S e s s i o n . s c a l a : 930 ) a t o r g . a p a c h e . s p a r k . s q l . S p a r k S e s s i o n 7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession 7.apply(SparkSession.scala:930)atorg.apache.spark.sql.SparkSessionBuilderKaTeX parse error: Can't use function '$' in math mode at position 8: anonfun$̲7.apply(SparkSe…runMain(SparkSubmit.scala:879)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain 1 ( S p a r k S u b m i t . s c a l a : 197 ) a t o r g . a p a c h e . s p a r k . d e p l o y . S p a r k S u b m i t 1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit 1(SparkSubmit.scala:197)atorg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:227)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    7)配置

    1、配置spark-defaults.conf

    $ cd /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive/conf
    # copy一个配置文件
    $ cp spark-defaults.conf.template spark-defaults.conf
    

    spark-defaults.conf修改内容如下:

    spark.master                     yarn
    spark.home                       /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive
    spark.eventLog.enabled           true
    spark.eventLog.dir               hdfs://hadoop-node1:8082/tmp/spark
    spark.serializer                 org.apache.spark.serializer.KryoSerializer
    spark.executor.memory            1g
    spark.driver.memory              1g
    spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
    spark.yarn.archive               hdfs:///spark/jars/spark2.3.0-without-hive-libs.jar
    spark.yarn.jars                  hdfs:///spark/jars/spark2.3.0-without-hive-libs.jar
    
    
    ### 参数解释,不用复制到配置文件中
    # spark.master指定Spark运行模式,可以是yarn-client、yarn-cluster...
    
    # spark.home指定SPARK_HOME路径
    
    # spark.eventLog.enabled需要设为true
    
    # spark.eventLog.dir指定路径,放在master节点的hdfs中,端口要跟hdfs设置的端口一致(默认为8020),否则会报错
    
    # spark.executor.memory和spark.driver.memory指定executor和dirver的内存,512m或1g,既不能太大也不能太小,因为太小运行不了,太大又会影响其他服务
    

    2、配置spark-env.sh

    $ cd /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive/conf
    $ cp spark-env.sh.template spark-env.sh
    # 在spark-env.sh添加如下内容
    $ vi spark-env.sh
    export SPARK_DIST_CLASSPATH=$(hadoop classpath)
    export HADOOP_CONF_DIR={HADOOP_HOME}/etc/hadoop/
    
    # 加载
    $ source spark-env.sh
    

    在Yarn模式运行时,需要将以下三个包放在HIVE_HOME/lib下 :scala-library、spark-core、spark-network-common。

    $ cd /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive
    # 先删
    $ rm -f ../apache-hive-3.1.2-bin/lib/scala-library-*.jar
    $ rm -f ../apache-hive-3.1.2-bin/lib/spark-core_*.jar
    $ rm -f ../apache-hive-3.1.2-bin/lib/spark-network-common_*.jar
    
    # copy这三个jar到hive lib目录下
    $ cp jars/scala-library-*.jar ../apache-hive-3.1.2-bin/lib/
    $ cp jars/spark-core_*.jar ../apache-hive-3.1.2-bin/lib/
    $ cp jars/spark-network-common_*.jar ../apache-hive-3.1.2-bin/lib/
    

    3、配置hive-site.xml

    $ cd /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/conf/
    
    #配置hive-site.xml,主要mysql数据库
    $ cat << EOF > hive-site.xml
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    
    <!-- 配置hdfs存储目录 -->
    <property>
            <name>hive.metastore.warehouse.dir</name>
            <value>/user/hive_remote/warehouse</value>
    </property>
    
    <!-- 所连接的 MySQL 数据库的地址,hive_remote是数据库,程序会自动创建,自定义就行 -->
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://hadoop-node1:3306/hive_remote2?createDatabaseIfNotExist=true&amp;useSSL=false&amp;serverTimezone=Asia/Shanghai</value>
    </property>
    
    <!-- 本地模式
    <property>
      <name>hive.metastore.local</name>
      <value>false</value>
    </property>
    -->
    
    <!-- MySQL 驱动 -->
    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
    </property>
    
    <!-- mysql连接用户 -->
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value>
    </property>
    
    <!-- mysql连接密码 -->
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>123456</value>
    </property>
    
    <!--元数据是否校验-->
    <property>
      <name>hive.metastore.schema.verification</name>
      <value>false</value>
    </property>
    
    <property>
      <name>system:user.name</name>
      <value>root</value>
      <description>user name</description>
    </property>
    
    <!-- host -->
    <property>
      <name>hive.server2.thrift.bind.host</name>
      <value>hadoop-node1</value>
      <description>Bind host on which to run the HiveServer2 Thrift service.</description>
    </property>
    
    <!-- hs2端口 -->
    <property>
      <name>hive.server2.thrift.port</name>
      <value>11000</value>
    </property>
    
    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://hadoop-node1:9083</value>
    </property>
    
    <!--Spark依赖位置,上面上传jar包的hdfs路径-->
    <property>
        <name>spark.yarn.jars</name>
        <value>hdfs:///spark/spark-2.3.0-jars/*.jar</value>
    </property>
    
    <!--Hive执行引擎,使用spark-->
    <property>
        <name>hive.execution.engine</name>
        <value>spark</value>
    </property>
    
    <!--Hive和spark连接超时时间-->
    <property>
        <name>hive.spark.client.connect.timeout</name>
        <value>10000ms</value>
    </property>
    
    </configuration>
    EOF
    

    8)设置环境变量

    在/etc/profile添加如下配置:

    export HIVE_HOME=/opt/bigdata/hadoop/server/apache-hive-3.1.2-bin
    export PATH=$HIVE_HOME/bin:$PATH
    export SPARK_HOME=/opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive
    export PATH=$SPARK_HOME/bin:$PATH
    

    加载

    $ source /etc/profile
    

    9)初始化数据库(mysql)

    不清楚的可以先看一下这篇文章 大数据Hadoop之——数据仓库Hive

    # 初始化,--verbose:查询详情,可以不加
    $ schematool -initSchema -dbType mysql --verbose
    

    10)启动或者重启hive的metstore服务

    # 先查进程是否存在,存在则kill掉
    $ ss -atnlp|grep 9083
    # 启动metstore服务
    $ nohup hive --service metastore &
    

    11)测试验证

    先验证编译好的spark是否ok,就用spark提供的示例:SparkPI

    $ spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode client \
    --driver-memory 1G \
    --num-executors 3 \
    --executor-memory 1G \
    --executor-cores 1 \
    /opt/bigdata/hadoop/server/spark-2.3.0-bin-without-hive/examples/jars/spark-examples_*.jar 10
    

    在这里插入图片描述
    从上图发现编译好的spark包是没问题的,接下来就是验证hive提交spark任务

    $ mkdir /opt/bigdata/hadoop/data/spark
    $ cat << EOF > /opt/bigdata/hadoop/data/spark/test1230-data
    1,phone
    2,music
    3,apple
    4,clothes
    EOF
    
    # 启动hive
    $ hive
    # 创建表,通过逗号分隔字段
    create table test1230(id string,shop string) row format delimited fields terminated by ',';
    # 从local加载数据,这里的local是指hs2服务所在机器的本地linux文件系统
    load data local inpath '/opt/bigdata/hadoop/data/spark/test1230-data' into table test1230;
    # 通过insert添加数据,会提交spark任务
    select * from test1230;
    select count(*) from test1230;
    

    在这里插入图片描述

    最后提供我上面编译好的spark2.3.0版本的包,下载地址如下:

    链接:https://pan.baidu.com/s/1OY_Mn8UdRkTiiMktjQ3wlQ
    提取码:8888

    展开全文
  • Java-Spark系列7-Spark streaming介绍

    万次阅读 2021-09-27 15:49:25
    文章目录一.Spark streaming介绍1.1 Spark streaming简介1.2 Spark 与storm区别1.3 一个简单的例子二.Spark Streaming的组件介绍2.1 Streaming Context2.2 Dstream(离散流)2.1 Receiver2.2 数据源2.3 可靠性2.4 ...

    一.Spark streaming介绍

    1.1 Spark streaming简介

    Spark Streaming是Spark API的核心扩展,支持实时数据流的可扩展、高吞吐量和容错流处理。数据可以从Kafka、Kinesis或TCP套接字等多种来源中获取,并且可以使用复杂的算法进行处理,这些算法用高级函数表示,如map、reduce、join和window。最后,处理过的数据可以推送到文件系统、数据库和实时仪表板。事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。
    image.png

    在内部,它的工作方式如下。Spark Streaming接收实时输入的数据流,并对数据进行分批处理,由Spark引擎进行处理,生成最终的批量结果流。
    image.png

    Spark Streaming提供了一种高级抽象,称为离散流或DStream,它表示连续的数据流。Dstream可以通过来自Kafka和Kinesis等源的输入数据流创建,也可以通过在其他Dstream上应用高级操作来创建。在内部,DStream表示为rdd序列。

    1.2 Spark 与storm区别

    Storm

    1. 流式计算框架
    2. 以record为单位处理数据
    3. 也支持micro-batch方式(Trident)

    Spark

    1. 批处理计算框架
    2. 以RDD为单位处理数据
    3. 也支持micro-batch流式处理数据(Spark Streaming)

    两者异同

    1. 吞吐量: Spark Streaming 优于Storm
    2. 延迟: Spark Streaming差于Storm

    1.3 一个简单的例子

    在我们深入了解如何编写自己的Spark Streaming程序之前,让我们快速了解一下简单的Spark Streaming程序是什么样的。

    首先,我们导入StreamingContext,它是所有流功能的主要入口点。我们创建一个具有两个执行线程的本地StreamingContext,批处理间隔为1秒。

    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.*;
    import scala.Tuple2;
    
    // Create a local StreamingContext with two working thread and batch interval of 1 second
    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
    

    为了初始化一个Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。

    appName参数是应用程序在集群UI上显示的名称。master是Spark、Mesos或YARN集群的URL,或者是一个特殊的“local[*]”字符串,在本地模式下运行。实际上,当在集群上运行时,您不希望在程序中硬编码master,而是使用spark-submit启动应用程序并在那里接收它。但是,对于本地测试和单元测试,可以通过“local[*]”来运行Spark Streaming in-process(检测本地系统中的核数)。

    在定义了上下文之后,必须执行以下操作:

    1. 通过创建输入DStreams来定义输入源。
    2. 通过对DStreams应用转换和输出操作来定义流计算。
    3. 开始接收数据并使用streamingContext.start()处理它。
    4. 使用streamingContext.awaitTermination()等待处理停止(手动或由于任何错误)。
    5. 可以使用streamingContext.stop()手动停止处理。

    注意:

    1. 一旦启动了Context,就不能再设置或向其添加新的流计算。
    2. 一旦停止了Context,就不能重新启动它。
    3. 同一时间,JVM中只能有一个StreamingContext是活动的。
    4. StreamingContext上的stop()也会停止SparkContext。要只停止StreamingContext,请将stop()的可选参数stopSparkContext设置为false。
    5. 一个SparkContext可以被重用来创建多个StreamingContext,只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext)。

    二.Spark Streaming的组件介绍

    Spark Streaming的核心组件有2个:

    1. Streaming Context
    2. Dstream(离散流)

    2.1 Streaming Context

    Streaming Context是Spark Streaming程序的起点,生成Streaming Context之前需要生成SparkContext,SparkContext可以理解为申请Spark集群的计算资源,Streaming Context可以理解为申请Spark Streaming的计算资源

    2.2 Dstream(离散流)

    Dstream是Spark Streaming的数据抽象,同DataFrame,其实底层依旧是RDD。

    Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示一个连续的数据流,要么是从源接收的输入数据流,要么是通过转换输入流生成的处理数据流。在内部,DStream由一系列连续的rdd表示,这是Spark对不可变的分布式数据集的抽象。DStream中的每个RDD都包含一定时间间隔的数据,如下图所示:
    image.png

    在DStream上应用的任何操作都转换为在底层rdd上的操作。
    image.png

    这些底层RDD转换是由Spark引擎计算的。DStream操作隐藏了大部分细节,并为开发人员提供了更高级的API。

    DStream存在如下概念:

    1. Receiver
    2. 数据源: 基本源、高级源
    3. 可靠性
    4. Dstream的操作
    5. 缓存
    6. Checkpoint

    image.png

    2.1 Receiver

    每个输入DStream(文件流除外)都与一个Receiver (Scala doc, Java doc)对象相关联,接收来自源的数据并将其存储在Spark的内存中进行处理。

    2.2 数据源

    Spark Streaming提供了两类内置流源:

    1. 基本源:在StreamingContext API中直接可用的源。例如文件系统和套接字连接。
    2. 高级资源:像Kafka, Kinesis等资源可以通过额外的实用程序类获得。这些需要根据链接部分中讨论的额外依赖项进行链接。

    注意,如果希望在流应用程序中并行接收多个数据流,可以创建多个输入Dstream。这将创建多个接收器,这些接收器将同时接收多个数据流。但是请注意,Spark worker/executor是一个长期运行的任务,因此它占用分配给Spark Streaming应用程序的一个核心。因此,Spark Streaming应用程序需要分配足够的内核(或者线程,如果在本地运行的话)来处理接收到的数据,以及运行接收端,记住这一点很重要。

    记住
    在本地运行Spark Streaming程序时,不要使用“local”或“local[1]”作为主URL。这两种情况都意味着只有一个线程用于本地运行任务。如果你使用一个基于接收器的输入DStream(例如,socket, Kafka等),那么单线程将被用来运行Receiver ,不留下任何线程来处理接收的数据。因此,当本地运行时,总是使用“local[n]”作为主URL,其中要运行n个>数量的Receiver 。

    将逻辑扩展到集群上,分配给Spark Streaming应用的内核数必须大于接收端数。否则系统将接收到数据,但无法进行处理。

    2.3 可靠性

    根据数据源的可靠性,可以有两种数据源。源(如Kafka)允许传输的数据被确认。如果从这些可靠来源接收数据的系统正确地确认了接收的数据,就可以确保不会由于任何类型的故障而丢失数据。这就产生了两种接收者:
    1). 可靠的接收端—当数据被接收到并存储在Spark中并进行复制时,一个可靠的接收端会正确地向一个可靠的源发送确认。
    2), 不可靠的接收者——不可靠的接收者不向源发送确认。这可以用于不支持确认的来源,甚至当一个人不想或需要进入确认的复杂性时,用于可靠的来源。

    对于不可靠的接收者,Spark streaming有自己的可靠机制,来保证数据的可靠性。

    2.4 Dstream的操作

    与rdd类似,转换允许修改来自输入DStream的数据。DStreams支持许多普通Spark RDD上可用的转换。下面是一些常见的.

    Transformations on DStreams
    image.png

    Output Operations on DStreams:
    image.png

    2.5 缓存

    与rdd类似,DStreams也允许开发人员在内存中持久化流数据。也就是说,在DStream上使用persist()方法将自动在内存中持久化该DStream的每个RDD。如果DStream中的数据将被计算多次(例如,对同一数据的多次操作),这是有用的。对于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow,以及基于状态的操作,如updateStateByKey,这是隐式true。因此,由基于窗口的操作生成的DStreams会自动持久化到内存中,而不需要开发人员调用persist()。

    对于通过网络接收数据的输入流(例如,Kafka, socket等),默认的持久性级别被设置为将数据复制到两个节点以实现容错。

    注意,与rdd不同,DStreams的默认持久性级别将数据序列化保存在内存中。

    2.6 Checkpoint

    流应用程序必须全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障、JVM崩溃等)具有弹性。为了使这成为可能,Spark Streaming需要对容错存储系统进行足够的信息检查点,以便从故障中恢复。有两种类型的数据是检查点的:

    1. 元数据检查点——将定义流计算的信息保存到像HDFS这样的容错存储中。这用于从运行流应用程序驱动程序的节点的故障中恢复(稍后将详细讨论)。元数据包括:
      1.1) 配置—用于创建流应用程序的配置。
      1.2) DStream操作-定义流应用程序的DStream操作集。
      1.3) 未完成批-作业已排队但尚未完成的批。

    2. 数据检查点——将生成的rdd保存到可靠的存储中。在一些跨多个批组合数据的有状态转换中,这是必要的。在这种转换中,生成的rdd依赖于以前批次的rdd,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的无限增长(与依赖链成正比),有状态转换的中间rdd会定期被检查到可靠的存储(例如HDFS),以切断依赖链。

    总之,元数据检查点主要用于从驱动程序失败中恢复,而数据或RDD检查点即使是用于基本功能(如果使用有状态转换)也是必要的。

    三.一个简单的测试用例

    3.1 linux服务器安装nc服务

    yum -y install netcat.x86_64    -- centos7 正确
    yum -y install nc.x86_64          -- centos7 错误
    
    nc -lk 9999
    
    [root@hp2 yum.repos.d]# nc -help
    usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
              [-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
              [-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
              [-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
              [-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
              [destination] [port]
            Command Summary:
                    -4              Use IPv4
                    -6              Use IPv6
                    -C certfile     Public key file
                    -c              Use TLS
                    -D              Enable the debug socket option
                    -d              Detach from stdin
                    -e name         Required name in peer certificate
                    -F              Pass socket fd
                    -H hash         Hash string of peer certificate
                    -h              This help text
                    -I length       TCP receive buffer length
                    -i interval     Delay interval for lines sent, ports scanned
                    -K keyfile      Private key file
                    -k              Keep inbound sockets open for multiple connects
                    -l              Listen mode, for inbound connects
                    -M ttl          Outgoing TTL / Hop Limit
                    -m minttl       Minimum incoming TTL / Hop Limit
                    -N              Shutdown the network socket after EOF on stdin
                    -n              Suppress name/port resolutions
                    -O length       TCP send buffer length
                    -o staplefile   Staple file
                    -P proxyuser    Username for proxy authentication
                    -p port         Specify local port for remote connects
                    -R CAfile       CA bundle
                    -r              Randomize remote ports
                    -S              Enable the TCP MD5 signature option
                    -s sourceaddr   Local source address
                    -T keyword      TOS value or TLS options
                    -t              Answer TELNET negotiation
                    -U              Use UNIX domain socket
                    -u              UDP mode
                    -V rtable       Specify alternate routing table
                    -v              Verbose
                    -W recvlimit    Terminate after receiving a number of packets
                    -w timeout      Timeout for connects and final net reads
                    -X proto        Proxy protocol: "4", "5" (SOCKS) or "connect"
                    -x addr[:port]  Specify proxy address and port
                    -Z              Peer certificate file
                    -z              Zero-I/O mode [used for scanning]
            Port numbers can be individual or ranges: lo-hi [inclusive]
    [root@hp2 yum.repos.d]# 
    [root@hp2 yum.repos.d]# 
    [root@hp2 yum.repos.d]# 
    [root@hp2 yum.repos.d]# nc -lk 9999
    

    3.2 Java spark代码

    maven配置:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    

    代码:

    package org.example;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.*;
    import scala.Tuple2;
    
    import java.util.Arrays;
    
    
    public class SparkStreaming1 {
        public static void main(String[] args) throws Exception{
            // Create a local StreamingContext with two working thread and batch interval of 1 second
            SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
    
            // Create a DStream that will connect to hostname:port, like localhost:9999
            JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
    
            // Split each line into words
            JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
    
            // Count each word in each batch
            JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
            JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
    
            // Print the first ten elements of each RDD generated in this DStream to the console
            wordCounts.print();
    
            jssc.start();              // Start the computation
            jssc.awaitTermination();   // Wait for the computation to terminate
        }
    }
    
    

    运行spark程序代码:

    spark-submit \
      --class org.example.SparkStreaming1 \
      --master local[2] \
      /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
    

    测试记录:
    image.png

    滚动太快,只能从日志中找到记录

    image.png

    参考:

    1.http://spark.apache.org/docs/latest/streaming-programming-guide.html

    展开全文
  • Spark快速大数据分析——Spark安装(贰) 软件环境: Hadoop-3.3.2 Spark-3.1.3/Spark-3.2.1(sbt pull) JDK 11 Scala 2.13.8/SCala 2.1(搭配3.1.3使用) 环境搭建: 首先下载SparkSpark下载地址 如果你下载真...
  • 前言 本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!...如果索引大于数组长度或者spark.sql.ansi.enabled 设置成 false,则函数返回 N
  • Java-Spark系列10-Spark性能调优概述

    万次阅读 2021-10-11 17:38:03
    文章目录一.Spark 性能优化概述二.运行环境优化2.1 数据本地性2.2 数据存储格式三.RDD算子优化3.1 尽可能复用同一个RDD3.2 对多次使用的RDD进行持久化四.参数微调五.数据倾斜六. Spark常用的调优参数6.1 在内存中...
  • Spark环境搭建 JunLeon——go big or go home 目录 Spark环境搭建 一、环境准备 1、软件准备 2、Hadoop集群搭建 3、Anaconda环境搭建 二、Spark Local模式搭建 1、Spark下载、上传和解压 2、配置环境...
  • Java-Spark系列8-Spark streaming整合Kafka

    万次阅读 2021-09-30 15:46:22
    Spark streaming整合Kafka概述1.1 Maven配置1.2 创建Direct Stream1.3 定位策略1.4 消费者的策略1.5 创建RDD1.6 获得Offsets1.7 存储 Offsets1.8 检查点1.9 Kafka自身1.10 自身数据存储二.Spark Streaming整合Kafka...
  • Centos7上安装配置Spark

    千次阅读 2022-04-20 16:26:31
    该文章主要是描述单机版Spark的简单安装,版本为 spark-3.1.3-bin-hadoop3.2.tgz 1、Spark 下载、解压、安装 Spark官方网站: Apache Spark™ - Unified Engine for large-scale data analytics Spark下载地址:...
  • Java-Spark系列6-Spark SQL编程实战

    万次阅读 2021-09-26 18:13:10
    文章目录一.Spark DataFrame概述1.1 创建DataFrame1.1.1 通过json文件创建DataFrame1.1.2 通过CSV文件创建DataFrame1.1.3 通过hive table创建DataFrame1.1.4 通过jdbc数据源创建DataFrame二.Spark SQL实战2.1 ...
  • Spark环境搭建(保姆级教程)

    万次阅读 2022-02-24 12:43:15
    Spark 是一个大规模数据处理的统一分析引擎 本文主要介绍Spark的相关配置,以及各种模式的代码提交,包括Local,Standalone,YARN。 文末有相应资源下载网盘链接。
  • Spark简介

    千次阅读 2022-06-01 15:18:18
    Spark 于 2009 年诞生于加州大学伯克利分校 AMPLab,2013 年被捐赠给 Apache 软件基金会,2014 年 2 月成为 Apache 的顶级项目。相对于 MapReduce 的批处理计算,Spark 可以带来上百倍的性能提升,因此它成为继 ...
  • Spark SQL简介

    千次阅读 2022-04-04 20:02:04
    Spark SQL简介 一、从Shark说起 1、在这之前我们要先理解Hive的工作原理: Hive是一个基于Hadoop的数据仓库工具,提供了类似于关系数据库SQL的查询语言——HiveSQL,用户可以通过HiveSQL语句快速实现简单的...
  • If so, this book will be your companion as you create data-intensive app using Spark as a processing engine, Python visualization libraries, and web frameworks such as Flask. To begin with, you will...
  • spark 官网首页

    万次阅读 2020-09-13 19:20:48
    简单的spark概述: 原文: Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general ...
  • Spark

    千次阅读 2020-04-22 10:59:32
    什么是Spark Spark特点 Spark运行模式 Spark编写代码 SparkCore 什么是RDD RDD的主要属性 RDD的算子分为两类: Rdd数据持久化什么作用? cache和Checkpoint的区别 什么是宽窄依赖 什么是DAG DAG边界 ...
  • 【两万字总结】Spark安装部署与入门

    万次阅读 2021-11-22 15:29:33
    Spark 介绍 核心概念 Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。 最大的优化是让计算任务的中间结果可以存储在内存中,不需要每次都写入 HDFS,更适用于需要迭代的 ...
  • Spark SQL之RDD, DataFrame, DataSet详细使用

    万次阅读 多人点赞 2022-05-02 09:27:12
    Spark SQL之RDD, DataFrame, DataSet详细使用
  • Spark Launcher Java API提交Spark算法

    千次阅读 2020-06-07 12:36:06
    在介绍之前,我先附上spark 官方文档地址: http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/package-summary.html 源码github地址: ...
  • Explore the integration of Apache Spark with third party applications such as H20, Databricks and Titan Evaluate how Cassandra and Hbase can be used for storage An advanced guide with a combination of...
  • 需求描述:前后端分离系统,用SpringBoot整合Spark API,调用大量数据(几百GB,上TB)进行处理计算,单机环境难以达到性能要求,此,需整合直接调用spark跑程序,且在集群跑… 在此,一台测试服务器模拟,搭建伪分布...
  • 实验7 Spark初级编程实践

    千次阅读 2021-12-23 10:07:16
    1. Spark读取文件系统的数据 2.编写独立应用程序实现数据去重 对于两个输入文件A和B,编写Spark独立应用程序(推荐使用Scala语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和...
  • 将scala-2.12.13.tgz和spark-3.1.1-bin-hadoop2.7.tgz安装包上传到xshell 在xshell上解压压缩包 输入解压命令: tar -zxvf scala-2.12.13.tgz tar -zxvf spark-3.1.1-bin-hadoop2.7.tgz 配置 1、...
  • Spark基础操作(一)

    万次阅读 2021-08-24 16:54:58
    前言:我们来学习Spark基础吧! 一、搭建学习环境 1、下载spark 我使用的是spark1.6.2,下载地址 我们直接下载,然后解压。我们看看里面的目录 2、python-shell 我们运行bin/pyspark之后就进入了spark的python shell...
  • Spark是什么 Spark (全称 Apache Spark™) 是一个专门处理大数据量分析任务的通用数据分析引擎。 spark官网 Spark核心代码是用scala语言开发的,不过支持使用多种语言进行开发调用比如scala,java,python。 spark...
  • Spark 之WordCount

    千次阅读 2022-04-10 22:20:55
    Spark 之WordCount
  • Apache Spark 3.3.0 从2021年07月03日正式开发,历时近一年,终于在2022年06月16日正式发布,在 Databricks Runtime 11.0 也同步发布。这个版本一共解决了 1600 个 ISSUE,感谢 Apache Spark 社区为 Spark 3.3 版本...
  • spark集群配置

    千次阅读 2022-03-21 14:50:21
    1.集群部署规划 表1-1集群部署规划 主机名 master slave1 slave2 HDFS NameNode SecondaryNameNode DataNode ... Spark Master Worker Worker 2.安装Spark ...
  • Spark Streaming 实时写入Hive

    千次阅读 2021-02-01 17:06:08
    所以使用Spark Streaming替代Flume实现入库Hive功能。 二、流程图 Created with Raphaël 2.2.0kafkaSpark StreamingETLhive 三、代码实现 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 411,383
精华内容 164,553
关键字:

spark