精华内容
下载资源
问答
  • pyspark案例

    千次阅读 2018-09-25 11:33:08
    pyspark本地环境配置教程配置成功后,可以通过spark dataframe笔记练习pyspark的用法,不过最好是通过spark官网练习语法使用。下面写个小案例,供自己以后查阅: #!/usr/bin/python # -*- coding: utf-8 -*- "&...

    pyspark本地环境配置教程配置成功后,可以通过spark dataframe笔记练习pyspark的用法,不过最好是通过spark官网练习语法使用。下面写个小案例,供自己以后查阅:

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    """
    @author:
    @contact:
    @time:
    """
    from __future__ import print_function
    from pyspark.sql import SparkSession
    import os, time
    
    if __name__ == "__main__":
       # 设置spark_home环境变量,路径不能有中文、空格
       os.environ['SPARK_HOME'] = "E:/data_page/spark-2.0.2-bin-hadoop2.7"
       # 运行在本地(local),2个线程,一行写不完换行时用“\”
       spark = SparkSession.builder\
          .appName("test")\
          .master("local[2]")\
          .getOrCreate()
       # 如果想看函数源码,可以通过ctrl+点击函数的形式跳转到函数详情界面
       datas = ["hi I love you", "hello", "ni hao"]
       sc = spark.sparkContext
       rdd = sc.parallelize(datas)
       # 查看数据类型 type()
       print(type(datas))
       print(type(rdd))
       #获取总数,第一条数据
       print(rdd.count())
       print(rdd.first())
       # 每个spark运行会有一个监控界面(WEB UI4040),为了监控,让线程休眠一段时间,然后打开localhost:4040页面
       time.sleep(100)
       spark.stop()
    

    打印的结果如下:

    <type 'list'>
    <class 'pyspark.rdd.RDD'>
    3
    hi I love you
    

    localhost:4040界面如下:
    在这里插入图片描述

    展开全文
  • 今天写pyspark脚本的时候遇到一个问题,需要类似于关系型数据库group by再聚合的操作,尝试通过rdd来写,发现不好实现。 于是想到了使用DataFrame,通过类sql的group by直接进行实现。 二.解决方案 将rdd直接转为...

    一.问题描述

    今天写pyspark脚本的时候遇到一个问题,需要类似于关系型数据库group by再聚合的操作,尝试通过rdd来写,发现不好实现。
    于是想到了使用DataFrame,通过类sql的group by直接进行实现。

    二.解决方案

    将rdd直接转为DataFrame。

    首先进行配置:
    SparkSession是Spark SQL的入口

    from pyspark import SparkContext, SparkConf
    from pyspark.sql.session import SparkSession
    spark_conf = SparkConf().setMaster("local[*]").setAppName("FindCommonFriends")
    sc = SparkContext(conf = spark_conf)
    spark = SparkSession(sc)
    

    代码:

    -- 通过rdd生产DataFrame
    df = spark.createDataFrame(rdd)
    -- 通过rdd生产DataFrame,并给列进行命名
    df = spark.createDataFrame(rdd,['a', 'b'])
    

    下面是一段生成group by的pyspark代码:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from pyspark import SparkContext, SparkConf
    from pyspark.sql.session import SparkSession
    from pyspark.sql.functions import collect_list
    from pyspark.sql.functions import udf, col
    
    # 设置Spark程序运行的地方,此处设置运行在本地模式,启动2个线程分析数据
    spark_conf = SparkConf().setMaster("local[*]").setAppName("FindCommonFriends")
    sc = SparkContext(conf = spark_conf)
    spark = SparkSession(sc)
    
    # 从本地文件生成rdd
    filename = 'file:///home/pyspark/friends.txt'
    rdd = sc.textFile(filename)
    
    # 根据分隔符进行分割并排序
    rdd1=rdd.map(lambda x:x.split("\t")).sortByKey(lambda x:x[0])
    
    # 根据rdd生成DataFrame
    df1 = spark.createDataFrame(rdd1,['a', 'b'])
    df2 = df1.groupBy("a").agg(collect_list('b').alias('b_new1')).orderBy("b_new1")
    df3=df2.groupBy("b_new1").agg(collect_list('a').alias('a_new1')).orderBy("b_new1")
    
    #df2.show()
    #df3.show()
    

    参考:

    1.https://www.cnblogs.com/Lee-yl/p/9759657.html

    展开全文
  • 今天写pyspark遇到一个问题,要实现同mysql的GROUP_CONCAT函数的功能 数据1: col1 col2 1 a 1 b 1 c 2 d 2 f 想要的结果1: col1 new_col2 1 a,b,c 2 d,f 如果存在多列是否也可行 数据2: col1 col2 col3 1 a 100 1 ...

    一.问题描述

    今天写pyspark遇到一个问题,要实现同mysql的GROUP_CONCAT函数的功能

    数据1:

    col1   col2
    1        a
    1        b
    1        c
    2        d
    2        f
    

    想要的结果1:

    col1   new_col2
    1        a,b,c
    2        d,f
    

    如果存在多列是否也可行
    数据2:

    col1   col2   col3
    1        a        100
    1        b        200
    1        c        300
    2        d        400
    2        f        500
    

    想要的结果2:

    col1   new_col2    new_col3
    1        a,b,c   100,200,300
    2        d,f       400,500
    

    二.解决方案

    pyspark的collect_list可以实现如下需求

    代码:

    #!/usr/bin/env python
    from pyspark import SparkContext, SparkConf
    from pyspark.sql.session import SparkSession
    from pyspark.sql.functions import collect_list
    from pyspark.sql.functions import collect_set
    
    spark_conf = SparkConf().setMaster("local").setAppName("test1")
    sc = SparkContext(conf = spark_conf)
    spark = SparkSession(sc)
    
    df1 = spark.createDataFrame([('1','a'),('1','b'),('1','c'),('2','d'),('2','f')], ['col1', 'col2'])
    df1.groupBy("col1").agg(collect_list('col2').alias('new_col2')).show()
    
    df2 = spark.createDataFrame([('1','a','100'),('1','b','200'),('1','c','300'),('2','d','400'),('2','f','500')], ['col1', 'col2', 'col3'])
    df2.groupBy("col1").agg(*[collect_set(col) for col in ['col2','col3']]).show()
    
    sc.stop()
    
    

    测试记录:

    --snip--
    21/04/15 17:26:41 INFO hive.metastore: Trying to connect to metastore with URI thrift://hp1:9083
    21/04/15 17:26:41 INFO hive.metastore: Opened a connection to metastore, current connections: 1
    21/04/15 17:26:41 INFO hive.metastore: Connected to metastore.
    +----+---------+
    |col1| new_col2|
    +----+---------+
    |   1|[a, b, c]|
    |   2|   [d, f]|
    +----+---------+
    --snip--
    21/04/15 17:30:43 INFO scheduler.TaskSetManager: Finished task 64.0 in stage 19.0 (TID 401) in 8 ms on localhost (executor driver) (75/75)
    21/04/15 17:30:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 19.0, whose tasks have all completed, from pool 
    21/04/15 17:30:43 INFO scheduler.DAGScheduler: ResultStage 19 (showString at NativeMethodAccessorImpl.java:0) finished in 0.272 s
    21/04/15 17:30:43 INFO scheduler.DAGScheduler: Job 9 finished: showString at NativeMethodAccessorImpl.java:0, took 0.278859 s
    +----+-----------------+-----------------+
    |col1|collect_set(col2)|collect_set(col3)|
    +----+-----------------+-----------------+
    |   1|        [c, b, a]|  [100, 300, 200]|
    |   2|           [f, d]|       [500, 400]|
    +----+-----------------+-----------------+
    --snip--
    

    参考:
    1.https://www.cnblogs.com/TTyb/p/10196544.html

    展开全文
  • 文章目录一.环境介绍二.运行spark-sql的几种方式2.1 spark-shell的方式2.2 beeline的方式2.3 spark-sql的方式参考: 一.环境介绍 我本地的环境的CDH 6.3.1的环境,自己已经将spark软件安装成功了。...

    一.环境介绍

    我本地的环境的CDH 6.3.1的环境,自己已经将spark软件安装成功了。
    然后有一个节点因为不小心升级了spark的版本,导致与集群失去了联系,然后在该节点下重新安装了spark。

    二.运行spark-sql的几种方式

    2.1 spark-shell的方式

    可以通过spark-shell的方式来登陆spark,然后用spark.sql模块来执行sql。

    命令:

    spark-shell --master local[2] \
    spark.sql("use test").show(false)
    spark.sql("select * from emp").show(false)
    

    测试记录:
    image.png

    可以看到spark 的history server上有spark-shell的记录。
    image.png

    2.2 beeline的方式

    通过beeline的方式,然后jdbc连接hive。

    命令:

    beeline
    !connect jdbc:hive2://10.31.1.123:10000
    username: spark
    password : spark
    

    测试记录:
    image.png

    测试过后,spark 的history server及yarn上都没有该脚本的运行记录。

    2.3 spark-sql的方式

    spark-sql常用的查询工具,速度比较hivesql要快。但是cdh6并没有spark-sql。

    我们独立安装的一个节点的spark
    代码:

    cd /etc/spark/conf
    cp /etc/hive/conf/hive-site.xml ./
    
    sudo -u hdfs hadoop fs -chmod -R 777 /tmp/hive
    cd /tmp
    rm -rf hive
    
    -- 此时spark登陆的居然是一个全新的库
    spark-sql
    

    测试记录:
    image.png

    此时spark登陆的居然是一个全新的库
    这个问题后面再看,实在不能登陆spark-sql的话,就用spark shell进行代替吧。

    参考:

    1.https://www.cnblogs.com/yaowentao/p/13048664.html
    2.https://blog.csdn.net/weixin_33683108/article/details/114489527
    3.https://blog.csdn.net/qq_36835255/article/details/90770620

    展开全文
  • pyspark 入门小案例

    2020-10-06 09:51:02
    pyspark 入门小案例 导入相应的依赖包 import sys from pyspark import SparkConf, SparkContext 设置对应的导入 if name == ‘main’: if len(sys.argv)!=3: print("Usage:wordcount ", sys.stderr) sys.exit(-1)...
  • 大数据工程实践教程(pyspark测试)WordCount回顾 前言 与Hello World同样经典,Spark的回顾将从WordCount开始 关于WordCount有关的实验已经不知道做过(抄过)n次了 本次回顾就剖析一下他的流程与各个函数的作用 并且...
  • 原端口为8081) Python代码 weather3.py from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import StringType //根据value值来判定grade def get_grade(value):...
  • pyspark系列2-linux安装pyspark

    千次阅读 2021-04-26 16:23:42
    安装Apache Spark三.pyspark案例参考: 一.安装Java和Scale 1.1 安装java 因为我这个环境是CDH 6.3.1版本,已经安装了JDK,此次略过。 [root@hp1 ~]# javac -version javac 1.8.0_181 1.2 安装Scala 1.2.1 安装 代码...
  • $ cat hello.txt hello spark hello flink hello hadoop python代码 import sys from pyspark import SparkConf, SparkContext if __name__ == '__main__': if len(sys.argv) != 2: print("usage:wordcount ", file=...
  • pyspark count计数小案例

    千次阅读 2019-10-11 23:31:44
    2、计数小案例 #!/usr/bin/env python # coding: utf-8 from pyspark import SparkContext,SparkConf conf = SparkConf().setAppName("haha").setMaster("local[2]") sc = SparkContext(conf = conf) l1 = [1...
  • PySpark机器学习案例--分类与聚类

    千次阅读 2019-05-20 06:57:05
    案例一:基于逻辑回归算法的机器学习(分类) 要求:text含有“spark”的 lable 标记为1,否则标记为0 训练数据集: # 训练数据 id text label 0 "a b c d e spark" 1.0 1 "b d" 0.0 2 "spark f g h" 1.0 ...
  • 飞行数据案例案例主要是为了熟悉dataframe的基础操作以及SQL语句的再熟悉,此案例的所有数据都可以从这里获得。 话不多说,直接上代码 # 导包 from pyspark import SparkContext from pyspark.sql.session import...
  • 基于Python语言的Spark数据处理分析案例集锦(PySpark) 实验环境 1) Linux: Ubuntu 20.04 2) Python: 3.7.x 3) Spark: 2.4.5(安装教程: 4) Jupyter Notebook: (安装教程和使用方法: 案例 1) yelp: ...
  • PySpark之算子综合实战案例《三》

    万次阅读 2020-05-15 18:04:33
    相关推荐: hadoop,pySpark环境安装与运行实战《一》 Spark RDD操作,常用算子《二》 PySpark之算子综合实战案例《三》 Spark运行模式以及部署《四》 Spark Core解析《五》 PySpark之Spark Core调优《六》 PySpark之...
  • ip地理位置统计案例代码实现 案例分析: 一、 ip地理位置统计案例思路 加载城市ip段信息,获取ip起始数字和结束数字,经度,纬度 加载日志数据,获取ip信息,然后转换为数字,和ip段比较 比较的时候采用二分法查找,...
  • 背景 在我们数据开发过程中业务中有很多计算时间差值的场景,公司业务数据时间格式基本是:yyyyMMdd...pyspark import time def dd_datediff(date_now, date_ago, date_type="day"): date_now = time...
  • PySparkpyspark.ml 相关模型实践

    千次阅读 2019-07-26 10:06:22
    文章目录1 pyspark.ml MLP模型实践9 spark.ml模型评估 MulticlassClassificationEvaluator ...官方案例来源:https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.Multilay...
  • 笔者最近在尝试使用PySpark,发现pyspark.dataframe跟pandas很像,但是数据操作的功能并不强大。由于,pyspark环境非自建,别家工程师也不让改,导致本来想pyspark环境... Dataframes (using PySpark) 》中的案例,...
  • 其实我很不理解Hadoop与pyspark的关系,网上也看了很多相关教程,但是感觉对自己没多少用处,先不管了,先学pyspark吧,之后吧自然语言处理学好之后再说吧,还要学pytorch与tensorflow,好多东西,慢慢学呗 ...
  • 经典案例:对user-movie-rating数据建模,用户获得可能喜爱的电影推荐,电影获得潜在观看用户以做营销推广。 movie数据下载地址: http://files.grouplens.org/datasets/ movielens/ml-100k.zip 解压后可以看到...
  • pyspark使用说明

    2019-05-28 14:12:30
    PySpark PySpark 是 Spark为Python开发者提供的 API,位于 $SPARK_HOME/bin 目录,使用也非常简单,进入pyspark shell就可以使用了。 子模块 pyspark.sql 模块 pyspark.streaming 模块 pyspark.ml 包 pyspark....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,324
精华内容 529
关键字:

pyspark案例