精华内容
下载资源
问答
  • pandas的dataframesparkdataframe sparkdataframe转pandas的dataframe pandas的dataframesparkdataframe互转 dataframe互转

    目录

     

    pandas的dataframe转spark的dataframe

    spark的dataframe转pandas的dataframe 


    本篇介绍Pandas的DataFrame和Spark的DataFrame之间的互转操作。

    pandas的dataframe转spark的dataframe

    import pandas as pd
    # 加载数据
    pd_df = pd.read_csv("./you_csv_file.csv")
    # 展示columns
    pd_df.columns
    # 输出 Index(['ColA', 'ColB', 'ColC'], dtype='object')
    # pandas的dataframe转spark的dataframe
    spark_df = spark.createDataFrame(pd_df)
    # done

    spark的dataframe转pandas的dataframe 

    # spark_df为spark的DataFrame
    # pd_df为pandas的DataFrame
    # spark的dataframe转pandas的dataframe 
    pd_df = spark_df.toPandas()

     

    展开全文
  • Spark DataFrame

    2021-02-25 10:32:03
    Spark创建DataFrame的不同方式 1. Create Spark DataFrame from RDD 2. 从List和Seq集合中创建Spark DataFrame 3. 从CSV文件创建Spark DataFrame 4. 从text文件创建 5. 从JSON文件创建 6. 从XML文件创建 9. ...

    目录

     

    Spark创建DataFrame的不同方式

    1. Create Spark DataFrame from RDD

    2. 从List和Seq集合中创建Spark DataFrame

    3. 从CSV文件创建Spark DataFrame

    4. 从text文件创建

    5. 从JSON文件创建

    6. 从XML文件创建

    9. 从HBase创建DataFrame


    Spark创建DataFrame的不同方式

    本文介绍了使用Scala示例在Spark中创建DataFrame(createDataFrame)的不同方法。

    首先,让我们导入Spark需要的隐式函数,如.toDF()函数,并为示例创建数据。

    import spark.implicits._
    val columns = Seq("language", "users_count")
    val data = Seq(("Java", "20000"), ("Python", "10000"), ("Scala", "30000"))
    

    1. Create Spark DataFrame from RDD

    首先,调用SparkContext中的parallelize()函数从集合Seq创建RDD。对于下面的所有示例,都需要这个rdd对象。

    val rdd = spark.SparkContext.parallelize(data)
    

    1. a) 使用toDF()函数

    一旦创建了一个RDD,可以使用toDF()来创建一个DataFrame。默认情况下,假如数据集每一行有两列,创建的DF时候的列名就是"_1"和"_2"。

    val dfFromRDD1 = rdd.toDF()
    dfFromRDD1.printSchema()
    
    root
    |-- _1: string (nullable = true)
    |-- _2: string (nullable = true)
    

    toDF()具有另一个签名,该签名自定义列名称参数,如下所示:

    val dfFromRDD1 = rdd.toDF("language", "users_count")
    dfFromRDD1.printSchema()
    
    root
    |-- language: string (nullable = true)
    |-- users: string (nullable = true)
    

    默认情况下,这些列的数据类型是通过推断列的数据类型来判断的。我们可以通过提供模式来更改此行为,我们可以在其中为每个字段/列指定列名,数据类型和可为空。

    1.b) 使用SparkSession的creatDataFrame()函数

    使用SparkSession中的createDataFrame()是另一种创建方法,它以rdd对象作为参数。使用toDF()来指定列的名称。

    dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
    

    1.c)对行类型使用createDataFrame()

    createDataFrame()有另一个签名,它将列名的RDD[Row]类型和模式作为参数。首先,我们需要将rdd对象从RDD[T]转换为RDD[Row]类型。

    val schema = StructType(columns.map(fieldName => StructField(fieldName, StringType, nullable = true)))
    val rowRDD = rdd.map(attributes => Row(attributes._1, attributs._2))
    val dfFromRDD3 = spark.createDataFrame(rowRdd.schema)
    

    2. 从List和Seq集合中创建Spark DataFrame

    在本节中,我们将看到从集合Seq[T]或List[T]创建Spark DataFrame的几种方法。这些示例与我们上面的RDD部分看到的类型,但是我们使用的是数据对象而不是RDD对象。

    2.a) List或者Seq使用toDF()

    val dfFromData1 = data.toDF()
    

    2.b) 使用SparkSession的createDataFrame()方法

    var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)
    

    2.c) 使用Row type的createDataFrame()方法

    import scala.collection.JavaConversions._
    val rowData = data.map(attributes => Row(attributes._1, attributes._2))
    var dfFromData3 = spark.createDataFrame(rowData, schema)
    

    3. 从CSV文件创建Spark DataFrame

    val df2 = spark.read.csv("/src/resources/file.csv")
    

    4. 从text文件创建

    val df2 = spark.read.text("/src/resources/file.txt")
    

    5. 从JSON文件创建

    val df2 = spark.read.json("/src/resources/file.json")
    

    6. 从XML文件创建

    从xml解析DataFrame,我们应该使用数据源:com.databricks.spark.xml

    <dependency>
         <groupId>com.databricks</groupId>
         <artifactId>spark-xml_2.11</artifactId>
         <version>0.6.0</version>
     </dependency>
    
    val df = spark.read.format("com.databricks.spark.xml")
            .option("rowTag", "person")
            .xml("src/main/resources/persons.xml")
    

    7. 从Hive创建

    val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
    val hiveDF = hiveContext.sql("select * from emp")
    

    8. 从RDBMS创建

    8.a) Mysql table

    确保在pom.xml文件或类路径中的MySQL jars中都具有Mysql库作为依赖项

    val df_mysql = spark.read.format("jdbc")
        .option("url", "jdbc:mysql://localhost:port/db")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("dbtable", "tablename")
        .option("user", "user")
        .option("password", "password")
        .load()
    

    8.b) DB2

    确保在pom.xml文件或类路径中的DB2 jar中将DB2库作为依赖项。

    val df_db2 = spark.read.format(“jdbc”)
       .option(“url”, “jdbc:db2://localhost:50000/dbname”)
       .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
       .option(“dbtable”, “tablename”) 
       .option(“user”, “user”) 
       .option(“password”, “password”) 
       .load()
    

    9. 从HBase创建DataFrame

    要从HBase表创建Spark DataFrame,我们应该使用Spark HBase连接器中定义的数据源。

     val hbaseDF = sparkSession.read
          .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .load()
    展开全文
  • 文章目录前言1、RDD、Spark DataFrameSpark SQL、Spark Streaming2、Spark DataFrame2.1 创建基本的Spark DataFrame2.2 从各类数据源创建Spark DataFrame2.3 Spark DataFrame持久化数据2.4 Dataframe常见的API3、...

    前言

      本文介绍Spark DataFrame、Spark SQL、Spark Streaming入门使用教程,这些内容将为后面几篇进阶的streaming实时计算的项目提供基本计算指引,本文绝大部分内容来自Spark官网文档(基于PySpark):
    Spark DataFrameSpark SQLSpark Streaming

    1、RDD、Spark DataFrame、Spark SQL、Spark Streaming

      RDD:大家最熟悉的数据结构,主要使用transmissions和actions 相关函数式算子完成数据处理和数理统计,例如map、reduceByKey,rdd没有定义 Schema(一般指未定义字段名及其数据类型), 所以一般用列表索引号来指定每一个字段。 例如, 在电影数据的推荐例子中:

    move_rdd.map(lambda line:line.split('|')).map(lambda a_list:(alist[0],a_list[1],a_list[2]))
    

    每行有15个字段的数据,因此只能通过索引号获取前3个字段的数据,这要求开发者必须掌握 Map/Reduce 编程模式,不过, RDD 功能也最强, 能完成所有 Spark 数据处理与分析需求。

      Spark DataFrame:创建DataFrame时,可以定义 Schema,通过定义每一个字段名与数据类型,以便之后直接用字段名进行数据索引,用法跟Pandas的DataFrame差别不大。Spark DataFrame是一种更高层的API,而且基于PySpark,用起来像Pandas的”手感“,很容易上手。

      Spark SQL 底层是封装了DataFrame(DataFrame封装了底层的RDD) ,让使用者直接用sql的方式操作rdd,进一步降低Spark作为分布式计算框架的使用门槛。
      Spark Streaming是本博客重点要解析的数据结构,实际项目将使用它实现流式计算,相关定义参考原文:

    Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.
    Spark Streaming具有扩展性、数据吞吐量高、容错的特点,底层基于core Spark API 实现,用于流数据处理。Spark Streaming注入的实时数据源可来自Kafka、Flume、Kinesis或者TCP流等,park Streaming可借助Map、reduce、join和window等高级函数接口搭建复杂的算法用于数据处理。Spark Streaming实时处理后数据可存储到文件系统、数据库或者实时数据展示仪表。

    2、Spark DataFrame

      Spark DataFrame API比较多,既然用于数据处理和计算,当然会有预处理接口以及各统计函数、各种方法,详细参考官网:pyspark.sql.DataFrame以及pyspark.sql.functions module模块

      目前版本中,创建Spark DataFrame的Context接口可以直接用SparkSession接口,无需像RDD创建上下文时用Spark Context。
    SparkSession:pyspark.sql.SparkSession:Main entry point for DataFrame and SQL functionality.

    2.1 创建基本的Spark DataFrame

      创建 Spark DataFrame有多种方式,先回顾Pandas的DataFrame,Pandas可从各类文件、流以及集合中创建df对象,同样 Spark DataFrame也有类似的逻辑
    首先需加载spark的上下文:SparkSession

    import pandas as pd
    import numpy as np
    from pyspark.sql import SparkSession #  用于Spark DataFrame的上下文
    from pyspark.sql.types import StringType,StructType,StructField, LongType,DateType # 用于定义df字段类型
    from pyspark.sql import Row,Column
    
    #本地spark单机模式
    spark=SparkSession.builder.master("local").appName('spark_dataframe').getOrCreate()
    print(spark)
    
    

    输出spark上下文信息:

    SparkSession - in-memory
    SparkContext
    Spark UI
    Version v2.4.4
    Master
        local[*]
    AppName
        spark_dataframe
    

    from pyspark.sql.types:df目前支持定义的字段类型(参考源码),用于定义schema,类似关系型数据库建表时,定义表的字段类型

    __all__ = [
        "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
        "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
        "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"]
    

    直接用从RDD创建dataframe对象

    spark_rdd = spark.sparkContext.parallelize([
        (11, "iPhoneX",6000, datetime.date(2017,9,10)),
        (12, "iPhone7",4000, datetime.date(2016,9,10)),
        (13, "iPhone4",1000, datetime.date(2006,6,8))]
        )
    
    # 定义schema,就像数据库建表的定义:数据模型,定义列名,类型和是否为能为空
    schema = StructType([StructField("id", IntegerType(), True),
                         StructField("item", StringType(), True),
                         StructField("price", LongType(), True),
                         StructField("pub_date", DateType(), True)])
    # 创建Spark DataFrame
    spark_df= spark.createDataFrame(spark_rdd, schema)
    # 创建一个零时表,用于映射到rdd上
    spark_df.registerTempTable("iPhone")
    # 使用Sql语句,语法完全跟sql一致
    data = spark.sql("select a.item,a.price from iPhone a")
    # 查看dataframe的数据
    print(data.collect())
    # 以表格形式展示数据
    data.show()
    

    输出:

    [Row(item='iPhoneX', price=6000), Row(item='iPhone7', price=4000), Row(item='iPhone4', price=1000)]
    +-------+-----+
    |   item|price|
    +-------+-----+
    |iPhoneX| 6000|
    |iPhone7| 4000|
    |iPhone4| 1000|
    +-------+-----+
    

    通过该例子,可了解df基本用法,只要从spark上下文加载完数据并转为dataframe类型后,之后调用df的api跟pandas的api大同小异,而且可将dataframe转为Spark SQL,直接使用sql语句操作数据。

    上面的例子用了显示定义schema字段类型,pyspark支持自动推理创建df,也即无需原数据定义为rdd,和自动类型,直接传入Python列表的数据,以及定义字段名称即可:

    a_list = [
        (11, "iPhoneX",6000, datetime.date(2017,9,10)),
        (12, "iPhone7",4000, datetime.date(2016,9,10)),
        (13, "iPhone4",1000, datetime.date(2006,6,8))]
    # 自动推理创建df,代码内部通过各类if 判断类型实现。
    spark_df= spark.createDataFrame(a_list, schema=['id','item','price','pub_date'])
    
    2.2 从各类数据源创建Spark DataFrame

    相关接口方法参考官网文档

    从csv文件创建Spark DataFrame

    file = '/opt/spark/data/train.csv'
    df = spark.read.csv(file,header=True,inferSchema=True)
    

    从pandas的DataFrame创建Spark DataFrame

    pandas_df = pd.DataFrame(np.random.random((4, 4)))
    spark_df = spark.createDataFrame(pandas_df, schema=['a', 'b', 'c', 'd'])
    

    从json创建Spark DataFrame,json文件可以通过远程拉取,或者本地json,并设定json的字段schema

    json_df = spark.read.json('/opt/data/all-world-cup-players.json')
    

    从各类数据库加载数据:

    pg数据库,使用option属性传入参数:

    spark_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql:dbserver") \
        .option("dbtable", "schema.tablename") \
        .option("user", "username") \
        .option("password", "password") \
        .load()
    

    pg数据库,用关键字参数传入连接参数:

    spark_df=spark.read.format('jdbc').options(
    	url='jdbc:postgresql://localhost:5432/',
    	dbtable='db_name.table_name',
    	user='test',
    	password='test'
    ).load()
    

    mysql数据库,用关键字参数传入连接参数:

    spark_df=spark.read.format('jdbc').options(
    	url='jdbc:mysql://localhost:3306/db_name',
    	dbtable='table_name',
    	user='test',
    	password='test'
    ).load()
    

    从hive里面读取数据

    # 如果在SparkSession设置为连接hive,可以直接读取hive数据
    spark = SparkSession \
            .builder \
            .enableHiveSupport() \      
            .master("localhost:7077") \
            .appName("spark_hive") \
            .getOrCreate()
    
    spark_df=spark.sql("select * from hive_app_table")
    spark_df.show()
    

    连接数据库需要相关的jar包,例如连接mysql,则需要将mysql-connector放在spark目录的jar目录下。

    2.3 Spark DataFrame持久化数据

    以csv存储

    spark_df.write.csv(path=local_file_path, header=True, sep=",", mode='overwrite')
    

    注意:mode=‘overwrite’ 模式时,表示创建新表,若表名已存在则会被删除,整个表被重写。而 mode=‘append’ 模式就是普通的最加数据。

    写入mysql:

    url = 'jdbc:mysql://localhost:3306/db_name?characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT'
    table = 'table_name'
    properties = {"user":"test","password":"test"}
    spark_df.write.jdbc(url,table,mode='append',properties=properties)
    

    写入hive

    # 打开动态分区
    spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    
    #指定分区写入表
    spark_df.write.mode("append").partitionBy("name").insertInto("your_db.table_name")
    
    # 不使用分区,直接将数据保存到Hive新表
    spark_df.write.mode("append").saveAsTable("your_db.table_name")
    # 查看数据
    spark.sql("select * from your_db.table_name").show()
    
    

    默认的方式将会在hive分区表中保存大量的小文件,在保存之前对 DataFrame重新分区,从而控制保存的文件数量。

    spark_df.repartition(5).write.mode("append").saveAsTable("your_db.table_name")
    

    写入redis:
    这里需要自行实现redis的写入方法,其实也简单,定义入参为dataframe,函数内部连接redis后,从dataframe取出数据再将其插入redis即可。对于写入其他文件或者数据库,需自行实现相应的数据转存逻辑。

    2.4 Dataframe常见的API

    样例数据参考

    查看字段

    spark_df.columns 
    # ['ratings', 'age', 'experience', 'family', 'mobile']
    

    spark_df.count() 统计行数
    查看df的shape

    print((df.count(),len(df.columns))) # (33, 5)
    

    查看dataframe的schema字段定义

    spark_df.printSchema()
    输出:
    root
     |-- ratings: integer (nullable = true)
     |-- age: integer (nullable = true)
     |-- experience: double (nullable = true)
     |-- family: integer (nullable = true)
     |-- mobile: string (nullable = true)
    
    

    随机抽样探索数据集合:

    spark_df.sample(False,0.5,0).show(5)
    用法:
    Signature: spark_df.sample(withReplacement=None, fraction=None, seed=None)
    Docstring:
    Returns a sampled subset of this :class:`DataFrame`.
    
    :param withReplacement: Sample with replacement or not (default False).
    :param fraction: Fraction of rows to generate, range [0.0, 1.0].
    :param seed: Seed for sampling (default a random seed).
    

    查看行记录:

    spark_df.show(3) 
    spark_df.head(3) 
    spark_df.take(3)
    

    以python列表返回记录,list中每个元素是Row类

    [Row(ratings=3, age=32, experience=9.0, family=3, mobile='Vivo', age_2=32),
     Row(ratings=3, age=27, experience=13.0, family=3, mobile='Apple', age_2=27),
     Row(ratings=4, age=22, experience=2.5, family=0, mobile='Samsung', age_2=22)]
    

    查看null的行,可以传入isnull函数,也可以自定义lambda函数

    from pyspark.sql.functions import isnull
    spark_df = spark_df.filter(isnull("name"))
    

    选择列数据:

    spark_df.select('age','mobile').show(2)# select方法
    

    扩展dataframe的列数:withColumn可以在原df上新增列数据

    spark_df.withColumn("age_2",(spark_df["age"]))
    

    注意该方式不会更新到原df,如需替换原df,则更新spark_df即可。

    spark_df=spark_df.withColumn("age_2",(spark_df["age"]))
    

    新增一列数据,并将新增的数据转为double类型,需要用到cast方法

    from pyspark.sql.types import StringType,DoubleType,IntegerType
    spark_df.withColumn('age_double',spark_df['age'].cast(DoubleType()))
    

    根据条件查询df,使用filter方法

    spark_df.filter(spark_df['mobile']=='Apple')
    #筛选记录后,再选出指定的字段记录
    spark_df.filter(df['mobile']=='Vivo').select('age','ratings','mobile')
    

    选择mobile列值为‘Apple’的记录,多条件查询:

    spark_df.filter(spark_df['mobile']=='Vivo').filter(spark_df['experience'] >10)
    或者:
    spark_df.filter((spark_df['mobile']=='Vivo')&(spark_df['experience'] >10))
    

    distinct:

    # 先用select取出要处理的字段,获取不同类型的mobile
    spark_df.select('mobile').distinct()
    
    # 统计不同类型mobile的数量
    spark_df.select('mobile').distinct().count()
    

    groupBy:

    spark_df.groupBy('mobile').count().show(5,False)
    # 输出
    +-------+-----+
    |mobile |count|
    +-------+-----+
    |MI     |8    |
    |Oppo   |7    |
    |Samsung|6    |
    |Vivo   |5    |
    |Apple  |7    |
    +-------+-----+
    

    groupBy之后,按统计字段进行降序排序

    spark_df.groupBy('mobile').count().orderBy('count',ascending=False)
    

    groupBy之后,按mobile分组,求出每个字段在该分组的均值

    spark_df.groupBy('mobile').mean().show(2,False)
    
    +------+-----------------+------------------+------------------+------------------+------------------+
    |mobile|avg(ratings)     |avg(age)          |avg(experience)   |avg(family)       |avg(age_2)        |
    +------+-----------------+------------------+------------------+------------------+------------------+
    |MI    |3.5              |30.125            |10.1875           |1.375             |30.125            |
    |Oppo  |2.857142857142857|28.428571428571427|10.357142857142858|1.4285714285714286|28.428571428571427|
    +------+-----------------+------------------+------------------+------------------+------------------+
    only showing top 2 rows
    

    同理还有spark_df.groupBy('mobile').sum()df.groupBy('mobile').max()df.groupBy('mobile').min()等,或者用agg方法,然后传入相应的聚合函数

    spark_df.groupBy('mobile').agg({'experience':'sum'})等同于spark_df.groupBy('mobile').sum()

    用户定义函数UDF:

    用户定义函数一般用于对列或者对行的数据进行定制化处理,就sql语句中,价格为数字的字段,根据不同判断条件,给字段加上美元符号或者指定字符等

    from pyspark.sql.functions import udf
    def costom_func(brand):
        if brand in ['Samsung','Apple']:
            return 'High Price'
        elif brand =='MI':
            return 'Mid Price'
        else:
            return 'Low Price'
            
    brand_udf=udf(costom_func,StringType())
    spark_df.withColumn('price_range',brand_udf(spark_df['mobile'])).show(5,False) # 使用spark_df['mobile']或者使用spark_df.mobile都可以
    
    +-------+---+----------+------+-------+-----+-----------+
    |ratings|age|experience|family|mobile |age_2|price_range|
    +-------+---+----------+------+-------+-----+-----------+
    |3      |32 |9.0       |3     |Vivo   |32   |Low Price  |
    |3      |27 |13.0      |3     |Apple  |27   |High Price |
    |4      |22 |2.5       |0     |Samsung|22   |High Price |
    |4      |37 |16.5      |4     |Apple  |37   |High Price |
    |5      |27 |9.0       |1     |MI     |27   |Mid Price  |
    +-------+---+----------+------+-------+-----+-----------+
    only showing top 5 rows
    
    
    # 使用lambda定义udf
    age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
    spark_df.withColumn("age_group", age_udf(df.age)).show(3,False)
    

    输出:

    +-------+---+----------+------+-------+-----+---------+
    |ratings|age|experience|family|mobile |age_2|age_group|
    +-------+---+----------+------+-------+-----+---------+
    |3      |32 |9.0       |3     |Vivo   |32   |senior   |
    |3      |27 |13.0      |3     |Apple  |27   |young    |
    |4      |22 |2.5       |0     |Samsung|22   |young    |
    +-------+---+----------+------+-------+-----+---------+
    only showing top 3 rows
    

    删除重复记录:

    # 重复的行,将被删除
    spark_df=spark_df.dropDuplicates()
    

    删除一列数据

    df_new=spark_df.drop('mobile') # 删除多列 spark_df.drop('mobile','age')
    

    3、Spark SQL

      在第二部分内容给出了创建spark sql的方法,本章节给出更为详细的内容:这里重点介绍spark sql创建其上下文,完成相应的context设置后,剩下的就是熟悉的写SQL了。
    第一种方式:将本地文件加载为dataframe
    之后再使用createOrReplaceTempView方法转为SQL模式,流程如下

    用第2节内容的数据做演示:

    spark_df=spark.read.csv('sample_data.csv',inferSchema=True,header=True)
    spark_df.registerTempTable("phone_sales")
    df1 = spark.sql("select age,family,mobile from phone_sales ")
    df1.show(3)
    
    +---+------+-------+
    |age|family| mobile|
    +---+------+-------+
    | 32|     3|   Vivo|
    | 27|     3|  Apple|
    | 22|     0|Samsung|
    +---+------+-------+
    only showing top 3 rows
    

    spark.sql用于传入sql语句,返回dataframe对象,故后续的数据处理将变得非常灵活,使用SQL确实能够降低数据处理门槛,再例如:

    spark_df.groupBy('mobile').count().show(5,False) 等同于

    your_sql=("select mobile,count(mobile) as count from phone_sales group by mobile "
    df1 = spark.sql(your_sql)
    

    如果df1集合较大,适合用迭代器方式输出记录(适合逐条处理的逻辑)

    for each_record in df1.collect(): 
    	data_process(each_record)
    

    第二种方式:直接连接诸如mysql或者hive的context,基于该context直接运行sql

    以mysql为例:
    (1)配置mysql连接需要相关jar包和路径配置:
    mysql-connector-java-5.1.48.jar 放入spark目录/opt/spark-2.4.4-bin-hadoop2.7/jars/目录下, mysql-connector包可在mysql自行下载。
    在spark-env.sh 配置了EXTRA_SPARK_CLASSPATH=/opt/spark-2.4.4-bin-hadoop2.7/jars/(也可不配,spark按默认目录检索)

    (2)连接mysql

    from pyspark.sql import SparkSession 
    spark=SparkSession.builder.master("local").appName('spark_dataframe').getOrCreate()
    

    连接数据库以及读取表

    apps_name_df=spark.read.format('jdbc').
    options(
    	url='jdbc:mysql://192.168.142.5:3306/',
    	dbtable='erp_app.apps_name',
    	user='root',
    	password='bar_bar'
    ).load()
    

    read方法详细使用可参考:spark.read.format

    pirnt(apps_name_df)
    # DataFrame[id: int, app_log_name: string, log_path: string, log_date: timestamp]
    
    apps_name_df.show(5)
    +---+-------------+-------------------+-------------------+
    | id| app_log_name|           log_path|           log_date|
    +---+-------------+-------------------+-------------------+
    |  1|BI-Access-Log|/opt/data/apps_log/|*******************|
    |  3|BI-Access-Log|/opt/data/apps_log/|*******************|
    |  5|BI-Access-Log|/opt/data/apps_log/|*******************|
    |  7|BI-Access-Log|/opt/data/apps_log/|*******************|
    |  9|BI-Access-Log|/opt/data/apps_log/|*******************|
    +---+-------------+-------------------+-------------------+
    only showing top 5 rows
    

    上述连接mysql的erp_app后,直接读取apps_name全部字段的数据,如果想在连接时,指定sql,则需按以下方式进行

    spark_df=spark.read.format('jdbc').options(url='jdbc:mysql://192.168.142.5:3306/erp_app',
                                               dbtable='(select id,app_log_name,log_path from apps_name) as temp',
                                               user='root',
                                               password='bar_bar'
                                              ).load()
    

    dbtable这个值可以为一条sql语句,而且格式必须为:

    dbtable='(select id,app_log_name,log_path from apps_name) as temp'
    

    如果写成以下格式,则提示解析出错。

    dbtable='select id,app_log_name,log_path from apps_name'
    

    4、Spark Streaming

      以上在介绍dataframe和spark sql的相关用法,都用离线数据进行测试,本章节将给出spark的核心组件之一:spark streaming:实时流式计算(微批处理),该功能将应用于本bg其他项目。有关流式计算的相关概念,可以查看相关参考资料,这里不再累赘。此外,本bg也将给出一篇关于spark streaming的深度解析文章。

    4.1 实时计算TCP端口的数据

      以一个简单demo介绍pyspark实现streaming的流程:
    在数据源输入端,使用netcat打开一个7070端口,手动持续向netcat shell输入句子;
    在实时计算端:streaming连接7070端口,并实时计算word count,并将统计结果实时打印。

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext 
    
    # 创建本地的streaming context,并指定4个worker线程
    sc = SparkContext("local[4]", "streaming wordcount test")
    sc.setLogLevel("WARN") # 减少spark自生成的日志打印
    # 每批处理间隔为1秒
    ssc = StreamingContext(sc, 1) 
    
    # 连接netcat的tcp端口,用于读取netcat持续输入的行字符串
    lines = ssc.socketTextStream("192.100.0.10", 7070)
    

    socketTextStream创建的对象称为:Discretized Streams(离散流) ,简称 DStream,是spark的核心概念

    # 统计word的逻辑,这段代码再熟悉不过了。
    words = lines.flatMap(lambda line: line.split(" "))
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda value_1, value_2: value_1 + value_2)
    wordCounts.pprint() # 这里wordCounts是'TransformedDStream' object,不再是普通的离线rdd
    

    启动流计算,并一直等到外部中断程序(相当于线程里面的jion)

    ssc.start()            
    ssc.awaitTermination(timeout=None)    # 默认无timeout,程序会一直阻塞在这里
    

    启动后,如果你使用jupyter开发,可以看到notebook的cell每隔1秒打印一次

    ......
    -------------------------------------------
    Time: *** 16:14:50
    -------------------------------------------
    
    -------------------------------------------
    Time: *** 16:14:51
    -------------------------------------------
    
    ......
    

    在netstat shell输入字符串

    [root@localhost ~]# nc -l 7070
    this is spark streaming
    streaming wordcount is awesome
    

    再查看notebook的cell的的实时打印出了wordcount统计结果

    -------------------------------------------
    Time: *** 16:14:54
    -------------------------------------------
    ('spark', 1)
    ('this', 1)
    ('is', 1)
    ('streaming', 1)
    
    -------------------------------------------
    Time: *** 16:14:54
    -------------------------------------------
    ......
    -------------------------------------------
    Time: *** 16:14:58
    -------------------------------------------
    ('streaming', 1)
    ('is', 1)
    ('wordcount', 1)
    ('awesome', 1)
    

    以上实现了一个完整的实时流计算,虽然该streaming的demo较为简单,但却给了大家非常直观的流计算处理设计思路,只需改造相关逻辑,即可满足符合自己业务的需求,在这里给出一个可行的设计:

    (1)实时数据源替换为Kafka等组件:启动一个进程,用于运行streaming。streaming的实时数据源来自kafka的topic
    (2)定制MapReduce的计算逻辑,用于实时预处理流数据
    (3)将(2)的实时结果保存到redis的list上
    (4)启动另外一个进程,从结果队列里面取出并存到Hbase集群或者hdfs
    或者无需使用队列,Spark Streaming实时预处理后直接写入Hbase。

    4.2 实时计算本地文件

      对于python接口,streamingContext.textFileStream(dataDirectory)方法可以实时监听并读取本地目录下的日志文件,但有几点需要指出,参考官方文档指引:

    • 能实时监听dataDirectory目录下创建的任意类型文件

    • dataDirectory主要分为两种文件系统,第一种为本地文件系统,例如监听/opt/appdata/目录下的所有文件,格式为file:///opt/appdata/,第二种为hdfs文件系统:格式为hdfs://namenode:8040/logs/

    • 支持文件正则匹配,例如要监听本地文件目录下,所有以.log作为后缀类型的文件,file:///opt/appdata/*.log

    • 要求监听目录下的所有文件,里面的数据格式是一致的,例如1.log和2.log,里面都相同固定格式的日志记录。(每次新增的文件如果数据格式不一样,显然streaming处理逻辑无法完成)

    • 目录下的文件数量越多,streaming扫描耗时将越长

    • 若移动文件到这个监控目录,则无法触发streaming读取该新增文件,必须用流的形式写入到这个目录的文件才能被监听到

    • 最后也是也是最重要的:streaming只处理在时间窗口内创建的新的数据文件,这里如何理解新的数据文件

      例如streaming流计算设为5秒,这个5秒就是时间窗口,若在5秒内目录产生了一个1.log,这个1.log会被读取,当5秒时间窗口已经过了,那么即使1.log有数据更新,streaming也不再reload该文件,为什么会这么设计呢?

      流式处理的逻辑:一批一批的实时读取,已经读取过的数据文件,在下一轮时间窗口不再读取。假设在下一轮时间窗口,还读取已处理过的文件(该文件追加了新的数据行),那么该设计逻辑不再是流式处理了。例如/opt/appdata/目录下,有1.log,…100.log,并还会持续新增数据文件,101.log…等,如果streaming在每轮时间窗口还要对已处理过文件:1.log,...100.log再重新读取(读取新增加的数据行),那么spark streaming就无法完成微批的、实时的流式处理逻辑。在下面的实例会加以说明:

    spark streaming 监听文件夹实例:

    时间窗口为5秒,实时监听/opt/words/目录下的文件,只要有新的文件创建(这里新的文件是指:每次创建的新文件,文件名必须唯一,streaming才会触发读取)

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext 
    sc = SparkContext("local[4]", "streaming wordcount test")
    ssc = StreamingContext(sc, 5)# 时间窗口为5秒
    lines = ssc.textFileStream("file:///opt/words/")
    words = lines.flatMap(lambda line: line.split(" "))
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    wordCounts.pprint()
    ssc.start()            
    ssc.awaitTermination(timeout=None)  
    

    模拟实时生成数据文件,每5秒生成一份数据文件,并在生成文件前,删除之前的文件(因为这个旧文件已经被spark streaming读取并流式计算过,下一轮时间窗口不再读取,所以可以删除旧文件)

    import uuid
    import random,os
    import time
    def save_text(dir_path,file_name):
        words=['spark','streaming','foo','bar','hadoop','kafka','yarn','zookeeper']
        line_num=random.randint(1000,2000)
        text=''
        for _ in range(line_num):
            line=' '.join([random.choice(words) for _ in range(4)])
            text=text+line+'\n'
        data_file_path=os.path.join(dir_path,file_name)
        with open(data_file_path,'w') as f:
            f.write(text)
        
    def streaming_gen_data(stream_dir_path):
        while True:
            tmp_file=os.listdir(stream_dir_path)
            if  tmp_file:# 如果监听的目录下有旧文件,则直接删除
                file_path=os.path.join(stream_dir_path,tmp_file[0])
                os.remove(file_path)
            file_name=str(uuid.uuid1())+'.log' # 保证每次新增的文件名唯一
            save_text(stream_dir_path,file_name)
            time.sleep(5)
                
    if __name__=='__main__':
        streaming_gen_data('/opt/words')
    

    测试结果:

    ......
    -------------------------------------------
    Time: *** 15:00:30
    -------------------------------------------
    ('hadoop', 933)
    ('foo', 970)
    ('yarn', 951)
    ('zookeeper', 938)
    ('bar', 1020)
    ('spark', 990)
    ('streaming', 1021)
    ('kafka', 949)
    
    -------------------------------------------
    Time: *** 15:00:35
    -------------------------------------------
    ('hadoop', 651)
    ('zookeeper', 623)
    ('bar', 593)
    ('yarn', 584)
    ('foo', 659)
    ('spark', 623)
    ('streaming', 592)
    ('kafka', 571)
    ......
    

    每隔5秒,spark streaming 完成微批计算:实时统计新创建文件的单词数

    在这里重点说明 :file_name=str(uuid.uuid1())+'.log',这句保证了每次生成的文件使用了唯一文件名称,这样spark streaming才会瞬间监听到变化,及时读取到该文件。

    有人会问:在生成文件前,已经删除了旧文件,那么每次创建文件使用固定文件名,对于spark streaming来说应该是唯一的、未加载过的文件才对吧?

    解释:当使用os.remove一个文件后,如果等待间隔时长不长(例如几秒钟)又再创建同名文件,linux底层文件系统使用了缓存,用ls -al 查看该文件,会发现新创建文件的创建时间没有及时改变,导致spark streaming认为该文件还是原的旧文件,也就不再读取。

    具体说明如下:
    第一次创建文件的时间为16 16:10,接着下轮生成新文件,删除旧文件,等待5秒后,再创建同名新文件时,会发现创建时间没有改变还是16 16:10,而ssc.textFileStream读取时用的是创建时间去判断是否为新文件,所以才导致明明已经创建新文件,但是ssc.textFileStream却不读取的情况,这是pyspark textFileStream的bug,这个接口不应该只用创建时间判断。

    -rw-r--r-- 1 root root 33847  16 16:10 streaming_data.log # 首次创建文件
    [root@nn words]# ls -al streaming_data.log 
    ls: cannot access streaming_data.log: No such file or directory #目录下的文件被删除
    [root@nn words]# ls -al streaming_data.log
    ls: cannot access streaming_data.log: No such file or directory
    [root@nn words]# ls -al streaming_data.log 
    -rw-r--r-- 1 root root 31166  16 16:10 streaming_data.log # 5秒后,第二次创建同名的文件,创建时间未改变(如果等待时间去到十来秒,此时创建同名的文件的创建时间会发生变化)
    

    鉴于textFileStream接口使用场景受限,所以spark streaming实时数据源最适合的场景:接收kafka或者flume推来的流数据,这保证spark streaming能够立刻监听流数据的到来时间是已经发生变化,能触发streaming计算。

    展开全文
  • <div><p>该提问来源于开源项目:databricks/koalas</p></div>
  • dataFrame spark

    以下链接为Spark –SQL中的dataFrame API
    http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
    dataframe 官方说明:
    http://spark.apache.org/docs/latest/sql-programming-guide.html
    参考文档:http://ju.outofmemory.cn/entry/128891
    在Spark中,DataFrame是一个以命名列方式组织的分布式数据集,等同于关系型数据库中的一个表,也相当于R/Python中的data frames(但是进行了更多的优化)。DataFrames可以由结构化数据文件转换而来,也可以从Hive中的表得来,以及可以转换自外部数据库或现有的RDD。

    类似于RDD,DataFrame同样使用了lazy的方式。也就是说,只有动作真正发生时(如显示结果,保存输出),计算才会进行。从而,通过一些技术,比如predicate push-downs和bytecode generation,执行过程可以进行适当的优化。同时,所有的DataFrames也会自动的在集群上并行和分布执行。

    官方给出了详细的API可以给我们调用,下面用scala为例,简单地说明:

    1.在Spark SQL中所有函数的入口点都是一个SQLContext类,或者是它的子类,要创建一个SQLContext,那么首先应该创建一个SparkContext的对象

    val sc: SparkContext //已经存在的 SparkContext.(没有的话,要自己重建)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 这个包可以将RDD转换成DataFrame
    import sqlContext.implicits._

    除了一个基本的 SQLContext,你也能够创建一个 HiveContext,它支持基本 SQLContext 所支持功能的一个超集。它的额外的功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、 从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启,SQLContext 可用的数据源对 HiveContext 也可用。HiveContext 分开打包是为了避免在 Spark 构建时包含了所有 的 Hive 依赖。如果对你的应用程序来说,这些依赖不存在问题,Spark 1.3 推荐使用 HiveContext。以后的稳定版本将专注于为 SQLContext 提供与 HiveContext 等价的功能。
    用来解析查询语句的特定 SQL 变种语言可以通过 spark.sql.dialect 选项来选择。这个参数可以通过两种方式改变,一种方式是通过 setConf 方法设定,另一种方式是在 SQL 命令中通过 SET key=value 来设定。对于 SQLContext,唯一可用的方言是 “sql”,它是 Spark SQL 提供的一个简单的 SQL 解析器。在 HiveContext 中,虽然也支持"sql",但默认的方言是 “hiveql”,这是因为 HiveQL 解析器更完整。
    

    一.从文件创建DataFrame

    1.DataFrame的一些操作

    2.以编程模式运行SQL查询
    应用可以通过SQLContext的sql操作进行查询操作,该方法返回的结果是一个DataFrame,举例如下:

    二。把RDDS转换成dataFrame
    Spark SQL 支持两种方法将存在的 RDD 转换为 DataFrame 。第一种方法使用反射来推断包含特定对象类型的 RDD 的模式。在你写 spark 程序的同时,当你已经知道了模式,这种基于反射的方法可以使代码更简洁并且程序工作得更好。
    第二种方法是通过一个编程接口来实现,这个接口允许你构造一个模式,然后在存在的 RDD 上使用它。虽然这种方法更冗长,但是它允许你在运行期之前不知道列以及列的类型的情况下构造 DataFrame。
    用反射来推断模式:
    Spark SQL的 Scala 接口支持将包含样本类的 RDD 自动转换为 DataFrame。这个样本类定义了表的模式。样本类的参数名字通过反射来读取,然后作为列的名字。样本类可以嵌套或者包含复杂的类型如序列或者数组。这个 RDD 可以隐式转化为一个 DataFrame,然后注册为一个表,表可以在后续的 sql 语句中使用。

    具体源码地址:http://spark.apache.org/docs/latest/sql-programming-guide.html
    编程指定模式
    当样本类不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 DataFrame 可以通过三步来创建。
    • 从原来的 RDD 创建一个行的 RDD
    • 创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配
    • 在行 RDD 上通过 applySchema 方法应用模式
    具体源码地址:http://spark.apache.org/docs/latest/sql-programming-guide.html

    展开全文
  • 错误原因:突然离线,易造成没能来得及删除自动创建的metastore_db文件夹(/home/hadoopadmin/spark-2.3.1-bin-hadoop2.7/bin),这时再次用spark-shell命令进入,则会产生如下报错。 解决办法:将metastore_db...
  • spark生成DataFrame

    千次阅读 2017-09-07 18:28:17
    1.为什么要有DataFrameSpark中的RDD叫做分布式弹性数据集。RDD是一个粗粒度的分布式计算,用函数声明式的api就能完成分布式的计算,比如wordcount,在mapreduce要写比较冗长的代码,而在Spark中可以用一行代码搞定。...
  • 区别 :http://www.voidcn.com/article/p-wsqbotem-boa.html   获取列名的列表: DataFrame.columns.values.tolist()

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,462
精华内容 2,984
关键字:

dataframespark