精华内容
下载资源
问答
  • SparkSql初级编程实践

    2019-03-26 20:53:00
    (2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。 表 6-3 employee 表新增数据 id name gender age 3 Mary ...

    1.Spark SQL 基本操作
    将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
    { "id":1 , "name":" Ella" , "age":36 }
    { "id":2, "name":"Bob","age":29 }
    { "id":3 , "name":"Jack","age":29 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":5 , "name":"Damon" }
    { "id":5 , "name":"Damon" }
    为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

    (1) 查询所有数据;

     

    (2) 查询所有数据,并去除重复的数据;

     

    (3) 查询所有数据,打印时去除 id 字段;

    (4) 筛选出 age>30 的记录;

    (5) 将数据按 age 分组;

    (6) 将数据按 name 升序排列;

    (7) 取出前 3 行数据

    (8) 查询所有记录的 name 列,并为其取别名为 username

    (9) 查询年龄 age 的平均值;

    (10) 查询年龄 age 的最小值。

     

    2.编程实现将 RDD 转换为 DataFrame

    源文件内容如下(包含 id,name,age):
    1,Ella,36
    2,Bob,29
    3,Jack,29
    请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到
    DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代
    码。

    package cn.spark.study.sy5
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    
    object Testsql {
      def main(args: Array[String]) {
           val conf = new SparkConf()
           conf.setMaster("local")
               .setAppName("Testsql")
           val sc = new SparkContext(conf)
           val sqlContext = new SQLContext(sc)
           //hdfs://192.168.6.134:9000/nlc/1.txt
           //H:\文件\数据集
           val studentRDD = sc.textFile("D:\\myDevelopTools\\Intellij IDEA\\workplace\\spark-study-scala\\src\\main\\java\\cn\\spark\\study\\sy5\\employee.txt", 1)
          .map { line => Row(line.split(",")(0), line.split(",")(1), line.split(",")(2)) }
            // 第二步,编程方式动态构造元数据
          val structType = StructType(Array(
              StructField("id", StringType, true),
              StructField("name", StringType, true),
              StructField("age", StringType, true)))
          // 第三步,进行RDD到DataFrame的转换
          val studentDF = sqlContext.createDataFrame(studentRDD, structType)
          // 继续正常使用
          studentDF.registerTempTable("employee")
    //      val teenagerDF = sqlContext.sql("select usrid,count(usrid) from students group by usrid order by usrid")
          val teenagerDF = sqlContext.sql("select id,name,age from employee")
          val teenagerRDD = teenagerDF.rdd.collect().foreach { row => println("id:"+row(0)+",name:"+row(1)+",age:"+row(2)) }
        }
    }

     

    3. 编程实现利用 DataFrame 读写 MySQL 的数据
    (1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的
    两行数据。
    表 6-2 employee 表原有数据
    id name gender Age
    1  Alice   F         22
    2  John   M        25
    (2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所
    示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
    表 6-3 employee 表新增数据
    id name gender age
    3  Mary   F   26
    4  Tom   M   23

     

    package cn.spark.study.sy5
    import java.util.Properties
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SQLContext}
    /**
      * Created by Lenovo on 2019/3/27.
      */
    object TestMySQL {
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("Testsql")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val employeeRDD = sqlContext.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
        val schema = StructType(List(StructField("id", IntegerType,
          true),StructField("name", StringType, true),StructField("gender", StringType,
          true),StructField("age", IntegerType, true)))
        val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1),
          p(2),p(3).toInt))
        val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "123123")
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", "sparktest.employee", prop)
          val jdbcDF = sqlContext.read.format("jdbc").option("url",
          "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "123123").load()
          jdbcDF.agg("age" -> "max", "age" -> "sum")
          }
    
    }

     

     

    转载于:https://www.cnblogs.com/news1997/p/10606556.html

    展开全文
  • 今天做实验【Spark SQL 编程初级实践】,虽然网上有答案,但在自己的环境下并不能够顺利进行 在第二题中,要求编程实现将 RDD 转换为 DataFrame。根据所谓标准答案,在进行sbt 打包时,报如下错误 [error] /home/...

    今天做实验【Spark SQL 编程初级实践】,虽然网上有答案,但在自己的环境下并不能够顺利进行

    在第二题中,要求编程实现将 RDD 转换为 DataFrame。根据所谓标准答案,在进行sbt 打包时,报如下错误

    [error] /home/hadoop/mycode/rddtodf/src/main/scala/rddtodf.scala:1: object types is not a member of package org.apache.spark.sql
    [error] import org.apache.spark.sql.types._
    [error]               ^
    [error] /home/hadoop/mycode/rddtodf/src/main/scala/rddtodf.scala:2: object Encoder is not a member of package org.apache.spark.sql
    [error] import org.apache.spark.sql.Encoder

    ……

    截图:

    根据错误描述可知,程序在第一行 import org.apache.spark.sql.types._ 处出错:types不是org.apache.spark.sql的成员

     

    解决方案:

    把程序每一行看作一个spark命令,在spark-shell一条一条地执行好了。。。

     

    第三题【编程实现利用 DataFrame 读写 MySQL 的数据】

    读取mysql?一定要用到 mysql-connector-java-xxx.jar 吧,但是答案中并未提及

    查阅书籍了解到,需要把mysql的jar包放在 /usr/local/spark/jars/mysql-connector-java-xxx/文件夹下,其中/usr/local/spark是spark安装目录xxx是mysql jar包版本。

    且启动Spark Shell时,须指定MySQL jar包,如图:

    (\表示命令还没有结束)

    然后再把程序一条一条执行

    执行结果:

    可以看到,已经将新的两条数据插入到mysql数据库了

    (用时:1小时30分钟)

    转载于:https://www.cnblogs.com/lzq666/p/10603269.html

    展开全文
  • 官网给出两种方法,这里给出一种(使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。): 源码: import org.apache.spark.sql.types._ import org.apache.spark.sql.Encoder import org.apache....

    1.题目:

    源码:

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.DataFrameReader
    object TestMySQL {
        def main(args: Array[String]) {
         val spark = SparkSession.builder().appName("RddToDFrame").master("local").getOrCreate()
       import spark.implicits._ 
            val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
            val  schema  =  StructType(List(StructField("id",  IntegerType,true),StructField("name",  StringType,  true),StructField("gender",  StringType,true),StructField("age", IntegerType, true)))
            val  rowRDD  =  employeeRDD.map(p  =>  Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
            val employeeDF = spark.createDataFrame(rowRDD, schema)
            val prop = new Properties()
            prop.put("user", "root")
            prop.put("password", "hadoop")
            prop.put("driver","com.mysql.jdbc.Driver")
            employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee", prop)
            val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "hadoop").load()
            jdbcDF.agg("age" -> "max", "age" -> "sum").show()    
            print("ok")
        }
    }

    数据库数据:

     

    结果:

     2.编程实现将 RDD  转换为 DataFrame

     

    官网给出两种方法,这里给出一种(使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。):

    源码:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    object RDDtoDF {
    def main(args: Array[String]) {
       val spark = SparkSession.builder().appName("RddToDFrame").master("local").getOrCreate()
       import spark.implicits._  
    val  employeeRDD  =spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")
    val schemaString = "id name age"
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
    StringType, nullable = true))
    val schema = StructType(fields)
    val  rowRDD  =  employeeRDD.map(_.split(",")).map(attributes  =>
    Row(attributes(0).trim, attributes(1), attributes(2).trim))
    val employeeDF = spark.createDataFrame(rowRDD, schema)
    employeeDF.createOrReplaceTempView("employee")
    val results = spark.sql("SELECT id,name,age FROM employee")
    results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()
    }
    }

     

     结果:

     

    转载于:https://www.cnblogs.com/mm20/p/10603428.html

    展开全文
  • 实验5 Spark SQL 编程初级实践 一、实验目的 (1)通过实验掌握Spark SQL的基本编程方法。 (2)熟悉RDD到DataFrame的转化方法。 (3)熟悉利用Spark SQL管理来自不同数据源的数据。 二、实验平台 操作系统:Linux ...

    实验5 Spark SQL 编程初级实践

    一、实验目的
    (1)通过实验掌握Spark SQL的基本编程方法。
    (2)熟悉RDD到DataFrame的转化方法。
    (3)熟悉利用Spark SQL管理来自不同数据源的数据。

    二、实验平台
    操作系统:Linux
    Spark版本:2.4.5
    scala版本:2.11.8

    三、实验内容和要求
    1.Spark SQL基本操作
    将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

    文件内容如下:

    { "id":1 , "name":" Ella" , "age":36 }
    { "id":2 , "name":"Bob","age":29 }
    { "id":3 , "name":"Jack","age":29 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":5 , "name":"Damon" }
    { "id":5 , "name":"Damon" }
    

    为employee.json创建DataFrame,并写出Scala语句完成下列操作:

     //导入相应的包
     import spark.implicits._
     //读取文件
     val df=spark.read.json("hdfs://master1:9000/spark/employee.json")
    
    

    (1)查询所有数据;

     df.show()
    


    (2)查询所有数据,并去除重复的数据;

     df.distinct.show()
    

    在这里插入图片描述
    (3)查询所有数据,打印时去除id字段;

    	 df.drop("id").show()
    

    在这里插入图片描述

    (4)筛选出age>30的记录;

     df.filter(df("age")>30).show()
    

    在这里插入图片描述
    (5)将数据按age分组;

    df.groupBy("age").count.show() 
    

    在这里插入图片描述
    (6)将数据按name升序排列;

     df.orderBy("name").show()
     或者
     df.sort(df("name").asc).show()
    

    在这里插入图片描述在这里插入图片描述
    (7)取出前3行数据;

     df.take(3)
     df.limit(3).show
    

    在这里插入图片描述

    (8)查询所有记录的name列,并为其取别名为username;

    df.select(df("name").as("username")).show()
    

    在这里插入图片描述

    (9)查询年龄age的平均值;

     df.agg("age"->"avg").show()
    

    在这里插入图片描述

    (10)查询年龄age的最小值。

     df.agg("age"->"min").show()
    

    在这里插入图片描述

    展开全文
  • RDD编程初级实践

    2021-06-14 20:14:10
    熟悉使用RDD编程解决实际具体问题的方法。 一、通过学习成绩的算出 (1)该系总共有多少学生; (2)该系共开设了多少门课程; (3)Tom同学的总成绩平均分是多少; (4)求每名同学的选修的课程门数; (5)...
  • RDD编程初级实践1、 需求描述2、 环境介绍3、 数据来源描述4、 数据上传及上传结果查看5、 数据处理过程描述1.pyspark交互式编程2.编写独立应用程序实现数据去重3.编写独立应用程序实现求平均值问题6、 经验总结 1、...
  • Spark SQL 编程总结

    2020-09-14 18:56:46
    SparkSQL 编程一、SparkSession 新的起始点二、DataFrame2.1 创建2.2 SQL 风格语法(主要)2.3 DSL 风格语法(次要)2.4 RDD 转换为 DateFrame2.5 DateFrame 转换为 RDD三、DataSet3.1 创建3.2 RDD 转换为 DataSet3.3 ...
  • 实验 5 Spark SQL 编程初级实践 参考厦门大学林子雨 1. Spark SQL 基本操作 将下列 json 数据复制到你的 ubuntu 系统/usr/local/spark 下,并保存命名为 employee.json。 { "id":1 ,"name":" Ella","age":36...
  • Spark大数据分析与实战:Spark SQL编程初级实践 一、Spark SQL基本操作 将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。 {“id”:1,“name”:“Ella”,“age”:36} {“id”:2,“name”:“Bob”,...
  • 通过反射推断Schema在Spark SQL中有两种方式可以在DataFrame和RDD进行转换 ...通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。 1、创建maven工程添加依赖 <properties>
  • Spark开发实例(编程实践

    万次阅读 2019-06-29 17:45:37
    无论采用哪种模式,只要启动完成后,就初始化了一个 SparkContext 对象(SC),同时也创建了一个 SparkSQL 对象用于 SparkSQL 操作。进入 Scala 的交互界面中,就可以进行 RDD 的转换和行动操作。 进入目录 SPARK...
  • Spark基础编程实践

    2021-06-14 15:50:18
    (一)pyspark交互式编程 实验内容: (1)该系总共有多少学生; (2)该系共开设了多少门课程; (3)Tom同学的总成绩平均分是多少; (4)求每名同学的选修的课程门数; (5)该系DataBase课程共有多少人选修...
  • 通过反射推断Schema 在Spark SQL中有两种方式可以在DataFrame和RDD进行... 通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。 1、创建maven工程添加依赖 &lt;propert...
  • 硬刚Hive | 4万字基础调优面试小总结 数据治理方法论和实践小百科全书 标签体系下的用户画像建设小指南 4万字长文 | ClickHouse基础&实践&调优全视角解析 【面试&个人成长】2021年过半,社招和校招的经验之谈 ...
  • ▼ 关注「Flink 中文社区」,获取更多技术干货▼摘要:网易云音乐从 2018 年开始搭建实时计算平台,经过几年的发展已经渗透到云音乐的各个业务当中。本文是大愚老师的一篇实践分享,将从...
  • SQL层,目前SequoiaDB通过连接器(sequoiasql-mysql)直接使用了MySQL的原生解析器,实现MySQL的完整兼容,同时目前也支持PGSQL和SparkSQL。 1. 为什么说巨杉数据库属于MySQL体系内呢?因为他做到了一点,就是100%兼容...
  • Spark程序编写过程中的常见问题

    千次阅读 2015-09-23 15:46:12
    at sparkSQL.SparkSQL_Jdbc01$.initalSparkContext(SparkSQL_Jdbc01.scala:18) at sparkSQL.SparkSQL_Jdbc01$.main(SparkSQL_Jdbc01.scala:35) at sparkSQL.SparkSQL_Jdbc01.main(SparkSQL_Jdbc01.scala) at ...
  • 这次共组织15个组队学习,涵盖了AI领域从理论知识到动手实践的内容 按照下面给出的最完备学习路线分类,难度系数分为低、中、高三档,可以按照需要参加 Table of Contents 学习路线 基础知识 1. Python基础 2....
  • 暑期组队学习计划

    2019-07-29 09:39:57
    Datawhale暑期组队学习计划马上就要开始啦这次共组织15个组队学习涵盖了AI领域从理论知识到动手实践的内容按照下面给出的最完备学习路线分类难度系数分为低、中、高三档...
  • Datawhale 暑期组队学习计划

    千次阅读 2019-07-29 20:51:12
    涵盖了AI领域从理论知识到动手实践的内容 按照下面给出的最完备学习路线分类 难度系数分为低、中、高三档 可以按照需要参加 学习路线 基础知识 1 Python基础 课程简介 **课程设计:**马晶敏,叶梁 **组队学习说明...
  • 参加BDTC 2016有感

    千次阅读 2016-12-11 20:37:50
     第三个百度的哥们讲的是他们内部开发使用的一个通用的计算框架统一接口,用户可以方便的使用一个接口编程,该系统自动进行优化,并转化成MR或Spark等多个不同的计算平台。从优化程度来看是很深层的,借鉴了关系...
  • 本文旨在为普通程序员(Java程序员最佳)提供一个入门级别的大数据技术学习路径,不适用于大数据工程师的进阶学习,也不适用于零编程基础的同学。 前言 一、背景介绍 本人目前是一名大数据工程师,项目...
  • 从现在的角度来看,当时做的这些基础的数据仓库和BI工作,是十分初级和粗糙的。但对于大部分的制造业和服务业来说,基本上解决90%的数据统计问题,一点瓶颈都没有。 美中不足的,有以下3个方面: 非实时 维度暴涨 ...
  • 学习目标:SparkSQL核心原理、SparkSQL实践 学习效果:熟练掌握SparkSQL的各种应用场景并灵活运用 第十七阶段:SparkStreaming流失计算平台 学习内容:SparkStreaming流失计算平台 学习目标:SparkStreaming...
  • 相对于Hadoop,Spark的显著特点是能够在内存中进行计算,因此又称为通用内存并行计算框架,与MapReduce兼容,其主要构件包括SparkCore、SparkSQL、SparkStreaming、MLlib、GraphX、BlinkDB和Tachyon。 Hadoop存在...
  • 网易云音乐从2018年开始搭建实时计算平台,经过几年的发展已经渗透到云音乐的各个业务当中。本文是大愚老师的一篇实践分享,将从一个日常运维问题出发,带领大家了解云音乐实时计算平台的一些工作进...
  • Java初级(虚拟机、并发)、Linux 基本操作、Hadoop(HDFS+MapReduce+Yarn )、 HBase(JavaAPI操作+Phoenix )、Hive(Hql基本操作和原理理解)、 Kafka、Storm/JStorm、Scala、Python、Spark (Core+sparksql+Spark ...

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 135
精华内容 54
关键字:

sparksql编程初级实践