spark sql包

2017-04-28 12:25:53 qq_33813365 阅读数 6712

1. scala 环境安装及安装 (官网下载)
scala 官网
2.检验 scala 是否安装成功
这里写图片描述
3.安装 scala 集成开发环境 IDEA (官网自行下载安装)
4.在 IDEA 上 安装 scala 插件
5.在 IDEA 上创建 scala 工程
4.5步详细过程 参见博文
IDEA 上 安装 scala 插件及创建 scala 工程
6.在scala工程 导入必要 spark sql 必要的 jar 包

①.按照如上步骤 已创建 名为 sql_test 的scala工程,如下图:

这里写图片描述

②.按照 以下顺序 导入 spark_sql 必要的jar 包(spark_core_2.11-2.10,spark_sql_2.11-2.10)

file –> project structure –>
这里写图片描述
这里写图片描述
这里写图片描述

务必保证 下图 两 spark sql 必须jar包 已导入到工程当中
这里写图片描述

2016-08-21 16:49:12 zbc1090549839 阅读数 18356

内存计算平台spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都做出了较大的改变,同时更加注重基于DataFrame数据组织的MLlib,更加注重机器学习整个过程的管道化。

当然,作为使用者,特别是需要运用到线上的系统,大部分厂家还是会继续选择已经稳定的spark1.6版本,并且在spark2.0逐渐成熟之后才会开始考虑系统组件的升级。作为开发者,还是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鉴一些spark2.0做出某些改进的思路。

接下来的几篇blog中,将会逐步的对spark2.0的sparkSql、spark-structured-streaming、spark-ml等组件做入门级的学习。

由于公司的机器使用的java1.6x版本,暂时利用其体验spark2.0的分布式的环境,因此在windows机器上运行其local模式。

下面开始step by step开始我们的体验之旅:

首先创建一个maven项目,在cmd命令行下运行:

mvn archetype:generate -DgroupId=cs.dt.sparkTest  -DartifactId=sparkTest -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
这样将在当前目录下创建一个maven项目,名称为sparkTest。在创建项目的pom文件中添加spark2.0的依赖:

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>2.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>2.0.0</version>
    </dependency>
这样子,等待maven将依赖包下载到我们的本地仓库后,就可以运行我们的spark测试代码了。

首先,为了调用spark API 来完成我们的计算,需要先创建一个sparkContext:

        String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用户的当前工作目录
	SparkConf conf = new SparkConf().setAppName("spark sql test")
                .set("spark.sql.warehouse.dir", warehouseLocation)
                .setMaster("local[3]");
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
上述代码主要有三点:

  • 使用spark sql时需要指定数据库的文件地址,这里使用了一个本地的目录
  • spark配置,指定spark app的名称和数据库地址,master url为local 3核
  • 使用SparkSession,取代了原本的SQLContext与HiveContext。对于DataFrame API的用户来说,Spark常见的混乱源头来自于使用哪个“context”。现在你可以使用SparkSession了,它作为单个入口可以兼容两者。注意原本的SQLContext与HiveContext仍然保留,以支持向下兼容。这是spark2.0的一个较大的改变,对用户更加友好。
下面开始体验spark sql:
        //===========================================1 spark SQL===================
        //数据导入方式
        Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json");
        //查看表
        df.show();
        //查看表结构
        df.printSchema();
        //查看某一列 类似于MySQL: select name from people
        df.select("name").show();
        //查看多列并作计算 类似于MySQL: select name ,age+1 from people
        df.select(col("name"), col("age").plus(1)).show();
        //设置过滤条件 类似于MySQL:select * from people where age>21
        df.filter(col("age").gt(21)).show();
        //做聚合操作 类似于MySQL:select age,count(*) from people group by age
        df.groupBy("age").count().show();
        //上述多个条件进行组合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age
        df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show();

        //直接使用spark SQL进行查询
        //先注册为临时表
        df.createOrReplaceTempView("people");
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
        sqlDF.show();

主要关注以下几点:
  • 数据来源:spark可以直接导入json格式的文件数据,people.json是我从spark安装包下拷贝的测试数据。
  • spark sql:sparkSql语法和用法和mysql有一定的相似性,可以查看表、表结构、查询、聚合等操作。用户可以使用sparkSql的API接口做聚合查询等操作或者用类SQL语句实现(但是必须将DataSet注册为临时表)
  • DataSet:DataSet是spark2.0i引入的一个新的特性(在spark1.6中属于alpha版本)。DataSet结合了RDD和DataFrame的优点, 并带来的一个新的概念Encoder当序列化数据时,,Encoder产生字节码与off-heap进行交互,,能够达到按需访问数据的效果,而不用反序列化整个对象。

我们可以为自定义的对象创建DataSet,首先创建一个JavaBeans:
/**
     * 一个描述人属性的JavaBeans
     * A JavaBean is a Java object that satisfies certain programming conventions:

        The JavaBean class must implement either Serializable or Externalizable
        The JavaBean class must have a no-arg constructor
        All JavaBean properties must have public setter and getter methods
        All JavaBean instance variables should be private
     */
    public static class Person implements Serializable {
        private String name;
        private int age;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }
接下来,就可以为该类的对象创建DataSet了,并像操作表一样操作自定义对象的DataSet了:
    //为自定义的对象创建Dataset
        List<Person> personpList = new ArrayList<Person>();
        Person person1 = new Person();
        person1.setName("Andy");
        person1.setAge(32);
        Person person2 = new Person();
        person2.setName("Justin");
        person2.setAge(19);
        personpList.add(person1);
        personpList.add(person2);
        Encoder<Person> personEncoder = Encoders.bean(Person.class);
        Dataset<Person> javaBeanDS = spark.createDataset(
                personpList,
                personEncoder
        );
        javaBeanDS.show();


同时,可以利用Java反射的特性,来从其他数据集中创建DataSet对象:
   //spark支持使用java 反射机制推断表结构
        //1 首先创建一个存储person对象的RDD
        JavaRDD<Person> peopleRDD = spark.read()
                .textFile("..\\sparkTestData\\people.txt")
                .javaRDD()
                .map(new Function<String, Person>() {
                    public Person call(String line) throws Exception {
                        String[] parts = line.split(",");
                        Person person = new Person();
                        person.setName(parts[0]);
                        person.setAge(Integer.parseInt(parts[1].trim()));
                        return person;
                    }
                });
        //2 表结构推断
        Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
        peopleDF.createOrReplaceTempView("people");

        //3 定义map 这里对每个元素做序列化操作
        Encoder<String> stringEncoder = Encoders.STRING();
        Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() {
            public String call(Row row) throws Exception {
                return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0));
            }
        }, stringEncoder);
        peopleSerDF.show();
        //==============================================3 从RDD创建Dataset StructType对象的使用
        JavaRDD<String> peopleRDD2 = spark.sparkContext()
                .textFile("..\\sparkTestData\\people.txt", 1)
                .toJavaRDD();

        // 创建一个描述表结构的schema
        String schemaString = "name age";
        List<StructField> fields = new ArrayList<StructField>();
        for (String fieldName : schemaString.split(" ")) {
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
            fields.add(field);
        }
        StructType schema = DataTypes.createStructType(fields);

        // Convert records of the RDD (people) to Rows
        JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() {
            //@Override
            public Row call(String record) throws Exception {
                String[] attributes = record.split(",");
                return RowFactory.create(attributes[0], attributes[1].trim());
            }
        });

        // Apply the schema to the RDD
        Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

        // Creates a temporary view using the DataFrame
        peopleDataFrame.createOrReplaceTempView("people");
        peopleDataFrame.show();
主要关注以下几点:
  • RDD:从普通文本文件中解析数据,并创建结构化数据结构的RDD。
  • 表结构推断的方式创建DataSet:利用Java类反射特性将RDD转换为DataSet。
  • 指定表结构的方式创建DataSet:我们可以使用StructType来明确定义我们的表结构,完成DataSet的创建

如何将自己的数据/文本导入spark并创建spark的数据对象,对新手来说显得尤为关键,对自己的数据表达好了之后,才有机会去尝试spark的其他API ,完成我们的目标。一般数据源在经过我们其他程序的前处理之后,存储成行形式的文本/json格式或者本身存储的hive/mysql数据库中,spark对这些数据源的调用都是比较方便的。

介绍完了spark-sql的数据导入及数据表达后,我们来完成一个比较简单的数据统计任务。一般在工作生活中对某些数据按一定的周期进行统计分析是一个比较常见的任务了。下面,我们就以股票统计的例子为例。我们使用spark的窗口统计功能,来对某一公司的股票在2016年6月份的各个星期的均值做统计。
   //在Spark 2.0中,window API内置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常类似。
        Dataset<Row> stocksDF = spark.read().option("header","true").
                option("inferSchema","true").
                csv("..\\sparkTestData\\stocks.csv");

        //stocksDF.show();

        Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016").
                filter("month(Date)==6");
        stocks201606.show(100,false);

首先读入了csv格式的数据文件,同时将2016年6月份的数据过滤出来,并以不截断的方式输出前面100条记录,运行的结果为:



调用window接口做窗口统计:

    //window一般在group by语句中使用。window方法的第一个参数指定了时间所在的列;
	//第二个参数指定了窗口的持续时间(duration),它的单位可以是seconds、minutes、hours、days或者weeks。
        Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")).
                agg(avg("Close").as("weekly_average"));
        tumblingWindowDS.show(100,false);
        tumblingWindowDS.sort("window.start").
                select("window.start","window.end","weekly_average").
                show(false);

其运行结果为:


由于没有指定窗口的开始时间,因此统计的开始时间为2016-05-26,并且不是从0点开始的。通常情况下,这样统计就显得有点不对了,因此我们需要指定其开始的日期和时间,但是遗憾的是spark并没有接口/参数让我们明确的指定统计窗口的开始时间。好在提供了另外一种方式,指定偏移时间,上述时间(2016-05-26 08:00:00)做一个时间偏移,也可以得到我们想要的开始时间(2016-06-01 00:00:00)。

    //在前面的示例中,我们使用的是tumbling window。为了能够指定开始时间,我们需要使用sliding window(滑动窗口)。
	//到目前为止,没有相关API来创建带有开始时间的tumbling window,但是我们可以通过将窗口时间(window duration)
	//和滑动时间(slide duration)设置成一样来创建带有开始时间的tumbling window。代码如下:
        Dataset<Row>  windowWithStartTime = stocks201606.
                groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")).
                agg(avg("Close").as("weekly_average"));
        //6 days参数就是开始时间的偏移量;前两个参数分别代表窗口时间和滑动时间,我们打印出这个窗口的内容:
        windowWithStartTime.sort("window.start").
                select("window.start","window.end","weekly_average").
                show(false);

运行结果为:


这就得到了我们需要的统计结果了。


关于spark2.0的sparkSql部分,基本就介绍这么多了。

接下来的几篇blog,一起体验spark-structured-streaming、spark-ml等组件的使用。



2019-08-24 21:51:49 zhikanjiani 阅读数 222

交代下前因后果:

学习Spark SQL过程中:在pom.xml中配置这段话,maven仓库并没有下载,只是提示找不到这个依赖;遂去到这个网址 http://mvnrepository.com 是maven仓库的国内镜像地址,下载到spark-sql_2.11-2.4.0.jar这个包

<spark.version>2.4.0</spark.version>
 <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>${spark.version}</version>
      </dependency>

在File --> Project Structure中点击+号,添加下载依赖包,点击应用okay即可。
在这里插入图片描述

这样的做法,运行很长一段时间都没问题,不知为何打开IDEA运行任何程序就报下面这个错,提示找不到XXX,反正就一大堆东西。

报错代码段:


Error:scalac: missing or invalid dependency detected while loading class file 'Dataset.class'.
Could not access term plans in package org.apache.spark.sql.catalyst,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'Dataset.class' was compiled against an incompatible version of org.apache.spark.sql.catalyst.

解决:

猜测是这个外部导入的spark sql包引起的,于是在导入jar的路径删除后,重新添加一遍再无报错。

也不知道为什么,咱也不敢说,咱也不敢问,现在问题暂时没了,有大佬看到可以给我解答下。

2017-09-06 09:18:33 qq_33813365 阅读数 71059

方法:进入Spark官网,下载对应版本的Spark,注意相应版本号,以及直接下载预编译的版本,不要下源码包
这里写图片描述

下载之后,解压,到该文件夹下的jars目录,对应Spark版本的相应jar包均在其中,再导入IDE既可以进行开发了
这里写图片描述

2017-05-23 21:21:47 fansy1990 阅读数 6121

软件环境:CDH5.8.0;

问题:在使用Spark SQL 读取Hive进行操作的时候,需要使用不包含,如下:(在Spark SQL中有contains,like,rlike函数)

在Hive中有表id_url ,内容如下:

+------------+-----------------------------------+--+
| id_url.id  |            id_url.url             |
+------------+-----------------------------------+--+
| 1          | http://abc.com/ac/10987_2.html    |
| 2          | http://abc.com/ac/109872.html     |
| 3          | http://abc.com/ac/10987_4.html    |
| 4          | http://abc.com/ac/10987_30.html   |
| 14         | http://abc.com/ac/a10987_30.html  |
| 42         | http://abc.com/ac/c10987_30.html  |
| 43         | http://abc.com/ac/1d0987_30.html  |
+------------+-----------------------------------+--+

如果要查看url包含30的网页,可以使用:

假设已经有data数据:

scala> val data = sqlContext.sql("select * from fansy.id_url")
data: org.apache.spark.sql.DataFrame = [id: int, url: string]
那么可以使用contains或like货rlike,如下:

scala> data.filter(data("url") contains "30").collect.foreach(println(_))
[4,http://abc.com/ac/10987_30.html]                                             
[14,http://abc.com/ac/a10987_30.html]
[42,http://abc.com/ac/c10987_30.html]
[43,http://abc.com/ac/1d0987_30.html]

scala> data.filter(data("url") like "%30%").collect.foreach(println(_))
[4,http://abc.com/ac/10987_30.html]                                             
[14,http://abc.com/ac/a10987_30.html]
[42,http://abc.com/ac/c10987_30.html]
[43,http://abc.com/ac/1d0987_30.html]

scala> data.filter(data("url") rlike ".*30.*").collect.foreach(println(_))
[4,http://abc.com/ac/10987_30.html]                                             
[14,http://abc.com/ac/a10987_30.html]
[42,http://abc.com/ac/c10987_30.html]
[43,http://abc.com/ac/1d0987_30.html]
那如果是不包含呢?

1. 使用rlike的正则去匹配不包含30的字符串;

scala> data.filter(data("url") rlike "^((?!30).)*$").collect.foreach(println(_))
[1,http://abc.com/ac/10987_2.html]
[2,http://abc.com/ac/109872.html]
[3,http://abc.com/ac/10987_4.html]
但是,在大量字符串匹配的时候效率会非常低;

2. 使用not 或!

查看Column的API可以看到其还有一个函数,为not或!,通过这个函数可以把如contains/like/rlike等转换为反,如下:

scala> val t = not (data("url") contains "30")
t: org.apache.spark.sql.Column = NOT Contains(url, 30)

scala> val t1 = not (data("url") contains "30")
t1: org.apache.spark.sql.Column = NOT Contains(url, 30)
同时,使用t或t1进行filter,可以看到结果:

scala> data.filter(t).collect.foreach(println(_))
[1,http://abc.com/ac/10987_2.html]
[2,http://abc.com/ac/109872.html]
[3,http://abc.com/ac/10987_4.html]

scala> data.filter(t1).collect.foreach(println(_))
[1,http://abc.com/ac/10987_2.html]                                              
[2,http://abc.com/ac/109872.html]
[3,http://abc.com/ac/10987_4.html]

分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.NET/fansy1990


Spark SQL 教程

阅读数 1557

SPARK SQL一些坑

阅读数 1995

Spark SQL特点

阅读数 1331

Spark SQL内置函数

阅读数 11712