精华内容
下载资源
问答
  • spark_df创建 一、创建DF或者读入DF 以sql输出的结果创建df,这种形式最常用。 from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql....

    pandas_df创建

    十分钟搞定pandas

    RDD创建

    【Spark】3.RDD编程

    spark_df创建

    一、创建DF或者读入DF

    以sql输出的结果创建df,这种形式最常用。

    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
     
    df = spark.sql("select * from table_name")
    

    也可以使用toDF()

    from pyspark.sql import Row
    row = Row("spe_id", "InOther")
    x = ['x1','x2']
    y = ['y1','y2']
    new_df = sc.parallelize([row(x[i], y[i]) for i in range(2)]).toDF()
    

    当然,也可以采用下面的方式创建DF,我们这里造了下面的数据集来说明df的一系列操作。

    test = []
    test.append((1, 'age', '30', 50, 40))
    test.append((1, 'city', 'beijing', 50, 40))
    test.append((1, 'gender', 'fale', 50, 40))
    test.append((1, 'height', '172cm', 50, 40))
    test.append((1, 'weight', '70kg', 50, 40))
    df = spark.createDataFrame(test,['user_id', 'attr_name','attr_value', 'income', 'expenses'])
    

    createDataFrame有一个参数,samplingRatio。这个参数的含义是:如果df的某列的类型不确定,则抽样百分之samplingRatio的数据来看是什么类型。因此,我们一般设定其为1。即,只要该列有1个数据不为空,该列的类型就不会为null。

    RDD与spark_df

    RDD-spark_df

    dataframe = spark.createDataFrame(RDD)
    

    spark_df-RDD

    RDD = spark_df.rdd.map(lambda x:x)
    

    pandas_df 与 spark_df转换

    pandas_pd=saprk_df.toPandas()
    
    spark_df = spark.createDataFrame(pandas_df)
    
    展开全文
  • Spark SQL&DF.pdf

    2021-08-26 15:35:16
    Spark SQL&DF.pdf
  • spark rdd 和 DF 转换

    千次阅读 2016-08-07 18:04:41
    RDD -》 DF   有两种方式 一、   一、Inferring the Schema Using Reflection ...将 RDD[t] 转为一个 ...val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt"...

    RDD   -》 DF

     

    有两种方式

    一、

     

    一、Inferring the Schema Using Reflection

     

    将 RDD[t]   转为一个 object ,然后 to df

     

    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()

     

     

    rdd 也能直接装 DATASet  要  import 隐式装换 类 import spark.implicits._

     如果  转换的对象为  tuple .   转换后  下标为 _1  _2   .....

     

     

     

    二、Programmatically Specifying the Schema

     

    把 columnt meta  和  rdd   createDataFrame 在一起

     

    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    
    // The schema is encoded in a string
    val schemaString = "name age"
    
    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

     

    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))
    
    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    
    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

     

     

     

     

     

     

    DF  to  RDd

     

    val tt = teenagersDF.rdd

     

     

     

     rdd to  ds  会有  rdd[object] 没有TODS 的异常

     

    保险搞法

    val schema = new StructType()
      .add(StructField("client_date", StringType, true))
      .add(StructField("client_time", StringType, true))
      .add(StructField("server_date", StringType, true))
      .add(StructField("server_time", StringType, true))
    
    

    。。。。。。

     

     val schema = new StructType()

      .add(StructField("client_date", StringType, true))
      .add(StructField("client_time", StringType, true))
      .add(StructField("server_date", StringType, true))
      .add(StructField("server_time", StringType, true))

     。。。。。。

     

    然后 

     

    import spark.implicits._
    var cubesDF = spark.createDataFrame(cubesRDD, schema)

     

    展开全文
  • import pandas as pd from pyspark.sql import ...spark= SparkSession\ .builder \ .appName("Dataframe") \ .getOrCreate() data=pd.DataFrame([[1,2],[3,4]],columns=['a','b']) data_values=data.values.tol.
    import pandas as pd
    from pyspark.sql import SparkSession
    spark= SparkSession\
                    .builder \
                    .appName("Dataframe") \
                    .getOrCreate()
    
    data=pd.DataFrame([[1,2],[3,4]],columns=['a','b'])
    data_values=data.values.tolist()
    data_coulumns=list(data.columns)
    df=spark.createDataFrame(data)
    
    #将pandas.DataFrame转为spark.dataFrame
    spark_df  = spark.createDataFrame(data_values, data_coulumns)
    print('spark.dataFram=',spark_df.show())
    
    #将spark.dataFrame转为pandas.DataFrame  
    pandas_df = spark_df.toPandas()  
    print('pandas.DataFrame=',pandas_df)
    
    #将spark.dataFrame存入hive
    spark_df.createOrReplaceTempView('table_test')
    spark.sql(
    "create table tmp.table_test SELECT * FROM table_test"
    )

     

    展开全文
  • spark df api操作

    2019-06-19 14:17:56
    val df3=df1.join(df2,on字段,...val df=a11.join(a22,Seq("receive_time","channel_code")) 2 两个表的关联字段名不同 (3个等于号) val h5_1=h10_lev3.join(h10_lev2,h10_lev3("parentid_3")===h5_lev2("no...

    val df3=df1.join(df2,on字段,连接类型)

    1 两个表的关联字段名一样

    
    val df=a11.join(a22,Seq("receive_time","channel_code"))

    2 两个表的关联字段名不同 (3个等于号)

     val h5_1=h10_lev3.join(h10_lev2,h10_lev3("parentid_3")===h5_lev2("node2id"),"inner")

    3常用操作汇总

     import org.apache.spark.sql.SaveMode
    val r88=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/test").option("driver","com.mysql.jdbc.Driver").option("dbtable","test.ccc").option("user","xxx").option("password","xxx").load()
    
    r88.select("itemid","dl_count_30").groupBy(["itemid"]).sum("dl_count_30").orderBy(desc("sum(dl_count_30)")).withColumnRenamed("sum(play_count)","playcount18Q1").limit(500000).write.mode(SaveMode.Overwrite).save("/home/gmd/tmp/playcount18Q1")
    
    r88.write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://localhost:3306/test").option("driver","com.mysql.jdbc.Driver").option("dbtable","test.cms20190618").option("user","xx").option("password","xxx").save()
    

    4聚合

    
    val tempos2=tempos.join(r91,Seq("relate_itemid"),"inner").agg(sum("counts") as "ultimate_top_play_counts").first().getAs[Long]("ultimate_top_play_counts")
    
    val tempos2_playcounts2=(tempos2+".0").toDouble
    
    

    5 在写text时,这里要求只有一列,且为字符串类型

    import org.apache.spark.sql.SaveMode
    a.filter("appid=2980").selectExpr("cast(userid as string)").write.mode(SaveMode.Overwrite).text("/home/gmd/userid2980.txt")
    

     

    展开全文
  • Spark RDD DF DS 的区别与联系

    千次阅读 2019-04-08 11:48:54
    Spark RDD DF DS 的区别与联系 三者的联系 1)都是spark中得弹性分布式数据集,轻量级 2)都是惰性机制,延迟计算 3)根据内存情况,自动缓存,加快计算速度 4)都有partition分区概念 5)众多相同得算子:map ...
  • Spark SQL 入门 DF、DS

    2019-05-23 09:59:37
    Spark SQL 入门 SparkSession Spark中所有功能的入口点都是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder(): import org.apache.spark.sql.SparkSession val spark = SparkSession ....
  • Spark RDD、DF、DS互转

    2020-06-09 18:11:21
    val rdd1=df.rdd val rdd2=ds.rdd RDD 转DataFrame import spark.implicits._ val df = rdd.map {line=> (line._1,line._2) }.toDF("col1","col2") 一般用元组把一行的数据写在一起,然后在toDF 中指定字段名 ...
  • val spark = SparkSession.builder().master("local").appName("data-operation").getOrCreate() //1.读取json进行sql处理 // jsonFile(spark) //2.读取text进行sql处理 textFile(spark) //3....
  • import java.sql.{Connection, DriverManager, PreparedStatement} ...import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import scala.collection.mutable...
  • import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Encoder, Row, SaveMode, ...
  • spark DF数据【DataFrame】的Schema如何设置?
  • spark创建DF的两种方式

    2019-11-04 15:23:53
    方式一:反射:(使用这种方式来创建DF是在你知道字段具体有哪些) 1.创建一个SparkContext,然后再创建SQLContext 2.先创建RDD,对数据进行整理,然后关联case class,将非结构化的数据转换成结构化数据 ...
  • df=spark.read.json("E:/**/people.json") df1=df.select(df['age'],df['name']) df2=df.select(df['age'],df['weigh']) df1.unionAll(df2).show()
  • RDD,Spark SQL,DF排序

    2019-02-25 17:12:21
    一、单一字段排序 1、用RDD RDD使用takeOrdered(num,key=None)方法排序资料 升序排列 a = userrdd.takeOrdered(5, key=lambda x: int(x[1])) print(a) 降序 a = userrdd.takeOrdered(5, key=...2、Spark SQL ...
  • 关于spark使用DF写入到数据库mysql

    千次阅读 2017-07-07 14:38:47
    package spark import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, St
  • https://blog.csdn.net/godlovebinlee/article/details/85719360 https://blog.csdn.net/u013090676/article/details/80721764?utm_source=blogxgwz3
  • spark_df=spark.createDataFrame(ss_tuple,schema=schema) spark_df.show() if __name__ == '__main__': #createSpark() convert2Spakr()   转载于:...
  • Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种: 第1种:指定列名添加Schema 第2种:通过StructType指定Schema 第3种:编写样例类,利用反射...
  • Spark中RDD与DF与DS之间的转换关系

    千次阅读 2020-05-18 23:13:38
    这里的DS区别于sparkstream里的DStream!! 转换关系 RDD的出现早于DS,DF。由于scala的扩展机制,必定是要用到隐式转换的! 所以在RDD下要转DF或者DS,就应该导隐式对象包! val conf = new SparkConf().setMaster...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 20,726
精华内容 8,290
关键字:

df是什么spark