精华内容
下载资源
问答
  • 基于PySpark的销量预测

    2020-06-21 15:07:02
    “ 本文阐述基于Pyspark的sql数据读取、特征处理、寻找最优参数、使用最优参数预测未来销量的全过程,重在预测流程和Pyspark相关知识点的讲解,展示可供企业级开发落地的demo。”

    “ 本文阐述基于PySpark的sql数据读取、特征处理、寻找最优参数、使用最优参数预测未来销量的全过程,重在预测流程和Pyspark相关知识点的讲解,展示可供企业级开发落地的demo。”

    1 数据读取与预处理

    1.1 数据读取

    df = spark.sql("""
        select store_code,goods_code,ds,qty as label
      from xxx.store_sku_sale
      where ds>='2020-05-22' and store_code in ('Z001','Z002')
        """)
    

    1.2 特征生成

    1).dayofweek等函数是从from pyspark.sql.functions import *中functions的类而来;
    2).数据此时是spark.dataframe格式,用类sql的形式进行操作;
    3).withColumn函数为新增一列;
    4).为说明问题简化了特征预处理,只是使用是否月末和星期的OneHotEncoder作为特征。

    df = df.withColumn('dayofweek', dayofweek('ds'))
    df = df.withColumn("dayofweek", df["dayofweek"].cast(StringType()))
    # 是否月末编码
    df = df.withColumn('day', dayofmonth('ds'))
    df = df.withColumn('day', df["day"].cast(StringType()))
    df = df.withColumn('month_end', when(df['day'] <= 25, 0).otherwise(1))
    # 星期编码--将星期转化为了0-1变量,从周一至周天
    dayofweek_ind = StringIndexer(inputCol='dayofweek', outputCol='dayofweek_index')
    dayofweek_ind_model = dayofweek_ind.fit(df)
    dayofweek_ind_ = dayofweek_ind_model.transform(df)
    onehotencoder = OneHotEncoder(inputCol='dayofweek_index',                                           outputCol='dayofweek_Vec')
    df = onehotencoder.transform(dayofweek_ind_)
    

    1.3 数据集的划分

    此时产生的dayofweek_Vec是一个向量,在时序领域,统计特征非常重要,比如mean,std,这些特征是可以由sql来完成,但one_hot这类特征使用sql可能乏力,于是可以借助pyspark.ml中的特征处理模块,如果还是无法很好的处理特征,便需要借助numpy等使用spark自定义函数udf进行操作。还需留意一点,pyspark.ml中默认的输入特征是转换后名为features的稠密向量(DenseVector),也就是多行row合并在一起,另外,作为有监督学习,和features对应的标签默认名为label。

    2 模型构建和调优

    2.1 设置参数空间

    下面我们以最简单的回归模型作为演示。

    线性回归模型最重要的三个参数为:regParam–正则系数;fitIntercept–是否带截距elasticNetParam–弹性网,[0,1]之间,0表示L2;1表示L1;此时我们为每个参数构建一个参数空间。

    lr_params = ({'regParam': 0.00}, {'fitIntercept': True}, {'elasticNetParam': 0.5})
    lr = LinearRegression(maxIter=100, regParam=lr_params[0]['regParam'], \
                          fitIntercept=lr_params[1]['fitIntercept'], \
                          elasticNetParam=lr_params[2]['elasticNetParam'])
    
    lrParamGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.005, 0.01, 0.1, 0.5]) \
        .addGrid(lr.fitIntercept, [False, True]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5, 1.0]) \
        .build()
    

    2.2 交叉验证

    使用五折交叉验证,在备选空间上寻找最优参数,此时的lr_best_params为向量,可以使用type(lr_best_params)查看该对象的数据类型。

    cross_valid = CrossValidator(estimator=lr, estimatorParamMaps=lrParamGrid, evaluator=RegressionEvaluator(),
                              numFolds=5)
    
    cvModel = cross_valid.fit(train_data)
    
    best_parameters = [(
        [{key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())], metric) \
        for params, metric in zip(
            cvModel.getEstimatorParamMaps(),
            cvModel.avgMetrics)]
    
    lr_best_params = sorted(best_parameters, key=lambda el: el[1], reverse=True)[0]
    

    以上交叉验证自然是比较费时的,也可以使用sample函数随机抽取一定比例的数据放入模型中。

    2.3 dataframe转换

    下面借用pd.DataFrame把以上关键参数转换为结构化数据,方便后面直接转换为spark.dataframe。为了检查使用最优参数前面的评价指标,如mape是否有下降,是可以把类似的这些参数一并写入数据库。

    pd_best_params = pd.DataFrame({
        'regParam':[lr_best_params[0][0]['regParam']],
        'fitIntercept':[lr_best_params[0][1]['fitIntercept']],
        'elasticNetParam':[lr_best_params[0][2]['elasticNetParam']]
    })
    pd_best_params['update_date'] = today
    pd_best_params['update_time'] = update_time
    pd_best_params['model_type'] = 'linear'
    

    2.4 dataframe最优参数保存至数据库

    pd.DataFrame–>spark.dataframe,以追加的形式写入hive,得到的最优参数供后续模型预测使用。

    spark.createDataFrame(pd_best_params).write.mode("append").format('hive').saveAsTable(
        'temp.regression_model_best_param')
    

    3 读取预测数据集和最佳参数

    3.1 生成并读取预测数据集

    通过union合并真实销售数据,并使用join on 1=1产生门店/商品/时间维度的笛卡尔集。

    df = spark.sql(f"""
        select store_code,goods_code,ds,qty
       from xxx.test_store_sale
       where ds>='{prev28_day}' and ds<'{today}'
        union
        select s.store_code,s.goods_code,d.ds,0 as qty
        from
        (select stat_date as ds from xxl.dim_date where stat_date<'{after7_day}' and   stat_date>='{today}') d
        join
        (select
        distinct
        store_code,goods_code
        from xxx.test_store_sale
        ) s on 1=1""")
    

    3.2 读取最佳参数

    仅以regparam参数为例,把从sql中读取出来的数据转化为标量,然后转换为可供模型函数调用的实际参数值,(因表中数据很小所以使用了collect)。

    best_param_set=spark.sql(f"select regparam,fitIntercept, elasticNetParam from scmtemp.regression_model_best_param order by update_date desc,update_time desc limit 1 ").collect()
    reg_vec=best_param_set.select('regparam')
    reg_b= [row.regparam for row in reg_vec][0]
    reg_b=float(reg_b)
    
    

    4 模型预测并写入sql

    在上文的交叉验证阶段我们对数据集的划分为形式为random,在预测阶段,需按照指定时间划分。

    train_data=df.where(df['ds'] <today)
    test_data=df.where(df['ds'] >=today)
    train_mod01 = assembler.transform(train_data)
    train_mod02 = train_mod01.selectExpr("features","qty as label")
    
    test_mod01 = assembler.transform(test_data)
    test_mod02 = test_mod01.select("store_code","goods_code","ds","features")
    
    # build train the model
    lr = LinearRegression(maxIter=100,regParam=reg_b, fitIntercept=inter_b,elasticNetParam=elastic_b, solver="normal")
    model = lr.fit(train_mod02)
    predictions = model.transform(test_mod02)
    predictions.select("store_code","goods_code","ds","prediction").show(5)
    test_store_predict=predictions.select("store_code","goods_code","ds","prediction").createOrReplaceTempView('test_store_predict')
    spark.sql(f"""create table xxx.regression_test_store_predict as select * from test_store_predict""")
    
    

    在交叉验证阶段使用的是evaluate函数放入测试集进行模型评估,在正式的预测场景使用的是transform来预测,如果预放入模型的特征已经转换为名为features的向量,在transform预测阶段放入的数据是可以带入时间和store_code,sku_code等列,预测输出默认列名为prediction。

    结语

    以上流程其实是两个阶段,分别为模型交叉验证寻找最优参数与使用最优参数预测训练,其中,寻找最优参数阶段,虽然我们已经用到了spark这个大数据处理利器,但是参数空间和本身放入的数据往往都不小,所以在实际使用过程中,出于计算性能考虑和实际需要,最优参数更新的频率可能低于模型预测周期,比如,因每天产生销售数据,预测未来的模型是每天执行,但是最优参数的更新周期可能是一周执行一次,除了节省资源考虑,还有交叉验证得到的最优参数往往会较为稳定。

    这就是本文分享的PySpark销量预测全流程,为书写编辑便利,合并和简化了某些步骤,比如特征处理,只是点到其中关键点,如兴趣可以沿着带过的知识点扩增其他,最后,欢迎交流指正。

    (本文涉及到的全部代码请点击

    参考:
    1.http://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf>
    2.http://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html

    展开全文
  • 基于pyspark创建DataFrame的几种方法

    千次阅读 2020-03-17 12:59:49
    基于pyspark创建DataFrame的几种方法pyspark创建DataFrameRDD和DataFrame使用二元组创建DataFrame使用键值对创建DataFrame使用rdd创建DataFrame基于rdd和ROW创建DataFrame基于rdd和StructType创建DataFrame基于...

    pyspark创建DataFrame

    为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。

    RDD和DataFrame

    在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。
    这里简单看一下RDD和DataFrame的类型。

    print(type(rdd))  # <class 'pyspark.rdd.RDD'>
    print(type(df))   # <class 'pyspark.sql.dataframe.DataFrame'>
    

    翻阅了一下源码的定义,可以看到他们之间并没有继承关系。

    class RDD(object):
    
        """
        A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
        Represents an immutable, partitioned collection of elements that can be
        operated on in parallel.
        """
    
    class DataFrame(object):
        """A distributed collection of data grouped into named columns.
    
        A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
        and can be created using various functions in :class:`SparkSession`::
    	...
        """
    

    RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
    DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。
    但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于MySQL的表或pandas中的DataFrame。

    在这里插入图片描述
    实际工作中,我们用的更多的还是DataFrame。

    使用二元组创建DataFrame

    尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
    于是我们尝试第二种,同时传入二元组和列名称。

    a = [('Alice', 1)]
    output = spark.createDataFrame(a).collect()
    print(output)
    # [Row(_1='Alice', _2=1)]
    
    output = spark.createDataFrame(a, ['name', 'age']).collect()
    print(output)
    # [Row(name='Alice', age=1)]
    

    这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。

    spark.createDataFrame(a).show()
    # +-----+---+
    # |   _1| _2|
    # +-----+---+
    # |Alice|  1|
    # +-----+---+
    
    spark.createDataFrame(a, ['name', 'age']).show()
    # +-----+---+
    # | name|age|
    # +-----+---+
    # |Alice|  1|
    # +-----+---+
    

    使用键值对创建DataFrame

    d = [{'name': 'Alice', 'age': 1}]
    output = spark.createDataFrame(d).collect()
    print(output)
    
    # [Row(age=1, name='Alice')]
    

    使用rdd创建DataFrame

    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    output = spark.createDataFrame(rdd).collect()
    print(output)
    output = spark.createDataFrame(rdd, ["name", "age"]).collect()
    print(output)
    
    # [Row(_1='Alice', _2=1)]
    # [Row(name='Alice', age=1)]
    

    基于rdd和ROW创建DataFrame

    from pyspark.sql import Row
    
    
    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    Person = Row("name", "age")
    person = rdd.map(lambda r: Person(*r))
    output = spark.createDataFrame(person).collect()
    print(output)
    
    # [Row(name='Alice', age=1)]
    

    基于rdd和StructType创建DataFrame

    from pyspark.sql.types import *
    
    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    schema = StructType(
        [
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)
        ]
    )
    output = spark.createDataFrame(rdd, schema).collect()
    print(output)
    
    # [Row(name='Alice', age=1)]
    

    基于pandas DataFrame创建pyspark DataFrame

    df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。

    df = spark.createDataFrame(rdd, ['name', 'age'])
    print(df)  # DataFrame[name: string, age: bigint]
    
    print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'>
    
    # 传入pandas DataFrame
    output = spark.createDataFrame(df.toPandas()).collect()
    print(output)
    
    # [Row(name='Alice', age=1)]
    

    创建有序的DataFrame

    output = spark.range(1, 7, 2).collect()
    print(output)
    # [Row(id=1), Row(id=3), Row(id=5)]
    
    output = spark.range(3).collect()
    print(output)
    # [Row(id=0), Row(id=1), Row(id=2)]
    

    通过临时表得到DataFrame

    spark.registerDataFrameAsTable(df, "table1")
    df2 = spark.table("table1")
    b = df.collect() == df2.collect()
    print(b)
    # True
    

    配置DataFrame和临时表

    创建DataFrame时指定列类型

    在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。

    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    
    # 指定类型于预期数据对应时,正常创建
    output = spark.createDataFrame(rdd, "a: string, b: int").collect()
    print(output)  # [Row(a='Alice', b=1)]
    rdd = rdd.map(lambda row: row[1])
    print(rdd)  # PythonRDD[7] at RDD at PythonRDD.scala:53
    
    # 只有int类型对应上,过滤掉其他列。
    output = spark.createDataFrame(rdd, "int").collect()
    print(output)   # [Row(value=1)]
    
    # 没有列能对应上,会抛出错误。
    output = spark.createDataFrame(rdd, "boolean").collect()
    # TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>
    

    注册DataFrame为临时表

    spark.registerDataFrameAsTable(df, "table1")
    spark.dropTempTable("table1")
    

    获取和修改配置

    print(spark.getConf("spark.sql.shuffle.partitions"))  # 200
    print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 10
    print(spark.setConf("spark.sql.shuffle.partitions", u"50"))  # None
    print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 50
    

    注册自定义函数

    spark.registerFunction("stringLengthString", lambda x: len(x))
    output = spark.sql("SELECT stringLengthString('test')").collect()
    print(output)
    # [Row(stringLengthString(test)='4')]
    
    spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())
    output = spark.sql("SELECT stringLengthString('test')").collect()
    print(output)
    # [Row(stringLengthString(test)=4)]
    
    spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
    output = spark.sql("SELECT stringLengthInt('test')").collect()
    print(output)
    # [Row(stringLengthInt(test)=4)]
    

    查看临时表列表

    可以查看所有临时表名称和对象。

    spark.registerDataFrameAsTable(df, "table1")
    print(spark.tableNames())  # ['table1']
    print(spark.tables())  # DataFrame[database: string, tableName: string, isTemporary: boolean]
    print("table1" in spark.tableNames())  # True
    print("table1" in spark.tableNames("default"))  # True
    
    spark.registerDataFrameAsTable(df, "table1")
    df2 = spark.tables()
    df2.filter("tableName = 'table1'").first()
    print(df2)  # DataFrame[database: string, tableName: string, isTemporary: boolean]
    

    从其他数据源创建DataFrame

    MySQL

    前提是需要下载jar包。
    Mysql-connector-java.jar

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    import pyspark.sql.functions as F
    
    
    sc = SparkContext("local", appName="mysqltest")
    sqlContext = SQLContext(sc)
    df = sqlContext.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
            "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
            "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
    df.show(n=5)
    sc.stop()
    

    参考

    RDD和DataFrame的区别
    spark官方文档 翻译 之pyspark.sql.SQLContext

    展开全文
  • 基于PySpark整合Spark Streaming与Kafka

    千次阅读 2020-03-06 23:43:11
      本文内容主要给出基于PySpark程序,整合Spark Streaming和Kafka,实现实时消费和处理topic消息,为Python开发大数据实时计算项目提供基本参考。(后续将陆续给出基于Scala开发大数据实时计算项目的文章) 1 程序...

      本文内容主要给出基于PySpark程序,整合Spark Streaming和Kafka,实现实时消费和处理topic消息,为PySpark开发大数据实时计算项目提供基本参考。(未来将陆续更新基于Scala开发大数据实时计算项目的文章)

    1 程序环境准备:

      这里不再使用Spark的集群环境,因涉及的计算资源测试环境受限,目前两台虚拟机:1个vcore+2G内存,其中一台虚拟机启动Spark Streaming服务进程,另外一台虚拟机启动kafka进程。
    虚拟机A:启动单实例kafka服务
    虚拟机B:运行PySpark程序
      在VM A,程序环境要求安装jdk1.8以上以及与kafka匹配版本的scala版本
    版本兼容说明:

    kafka:kafka_2.11-2.4.0
    java:java version "1.8.0_11"
    scala: Scala 2.12.0
    

      这里需要注意:如果使用kafka_2.12版本以上,需要使用jdk1.8.0_212以上;kafka_2.12与jdk1.8.0_11有不兼容地方,kafka启动报错提示java.lang.VerifyError: Uninitialized object exists on backward branch 209

    1.1 基本配置

    (1)配置单机zk这里无需依赖ZooKeeper集群,只需使用kafka自带的zk服务即可
    vim /opt/kafka_2.11-2.4.0/config/zookeeper.properties

    dataDir=/opt/zookeeper # zk的snapshot数据存储路径
    clientPort=2181 # 按默认端口
    

    (2)配置kafka的,路径/opt/kafka_2.11-2.4.0/config/ server.properties

    log.dirs=/opt/kafka-logs # 存放kafka数据目录
    zookeeper.connect=127.0.0.1:2181 # 按默认连接本机zk即可
    
    1.2 启动zk和kafka
    [root@nn kafka_2.11-2.4.0]# pwd
    /opt/kafka_2.12-2.4.0
    
    [root@nn kafka_2.11-2.4.0]#  nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 &
    

    kafka server后台启动:

    [root@nn kafka_2.11-2.4.0]# nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
    
    1.3 测试单实例Kafka

      对于kafka单节点而言,这里只能使用1个分区且1个replication-factor,topic名称为sparkapp

    [root@nn kafka_2.11-2.4.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sparkapp
    Created topic sparkapp.
    

    打开一个新的shell,用于启动producer

    [root@nn kafka_2.11-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sparkapp
    

    再打开一个新的shell,用于启动consumer

    [root@nn kafka_2.11-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic sparkapp
    

      在producer shell输入字符串,consumer端可以看到相应输出,说明单机的kafka可以正常运行,下面将使用Spark Streaming实时读取kafka的输入流

    2 整合streaming和kafka

    2.1 配置依赖包

      具体说明参考官方文档spark streaming连接kafka需要依赖两个jar包(注意版本号):
    spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar: 下载链接
    spark-streaming-kafka-0-8_2.11-2.4.4.jar: 下载链接
      将这两个jar包放在spark 的jars目录下,需要注意的是:这两个jar包缺一不可,如果是在Spark集群上做测试,那么每个Spark节点都需要放置这两个jars包:

    [root@nn jars]# pwd
    /opt/spark-2.4.4-bin-hadoop2.7/jars
    
    [root@nn jars]# ls spark-streaming-kafka-0-8
    spark-streaming-kafka-0-8_2.11-2.4.4.jar
    spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar
    

      (关于spark-streaming-kafka的jar包依赖说明:就像python连接kafka,需要使用pip 安装kafka这个库)

    2.2 Spark Streaming实时消费Kafka消息

      使用spark自带的直连kafka,实现实时计算wordcount,可以看到写普通的PySpark逻辑相对简单:

    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == "__main__":
    
        sc = SparkContext(appName="streamingkafka")
        sc.setLogLevel("WARN") # 减少shell打印日志
        ssc = StreamingContext(sc, 5) # 5秒的计算窗口
        brokers='127.0.0.1:9092'
        topic = 'sparkapp'
        # 使用streaming使用直连模式消费kafka 
        kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
        lines_rdd = kafka_streaming_rdd.map(lambda x: x[1])
        counts = lines_rdd.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a+b)
        # 将workcount结果打印到当前shell    
        counts.pprint()
        ssc.start()
        ssc.awaitTermination()
    

    spark streaming流默认接收的是utf-8编码的字符串

    KafkaUtils接口createDirectStream说明:

    Parameters:	
        ssc – StreamingContext object.
        topics – list of topic_name to consume.
        kafkaParams – Additional params for Kafka.
        fromOffsets – Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.
        keyDecoder – A function used to decode key (default is utf8_decoder).
        valueDecoder – A function used to decode value (default is utf8_decoder).
        messageHandler – A function used to convert KafkaMessageAndMetadata. You can assess meta using messageHandler (default is None).
    
    Returns:	
    A DStream object
    

    spark streaming 从 kafka 接收数据,有两种方式
    (1)使用Direct API,这是更底层的kafka API
    (2)使用receivers方式,这是更为高层次的API

      在本博客后面讨论streaming的原理同时也给出Direct模式的相关详细的解析。当前测试使用为Direct模式,在虚拟机B的Spark目录下,启动application,启动命令需要带上指定的jars包。

    bin/spark-submit --jars spark-streaming-kafka-0-8_2.11-2.4.4.jar direct_stream.py 
    

      在虚拟机A的producer shell端,输入字符串句子

    [root@nn kafka_2.11-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9 --topic sparkapp
    >welcome to pyspark kafka
    >从这里开始  将开发一个 由sparkstreaming 完成的 实时计算的 大数据项目
    

      在spark-submit窗口,可以看到spark streaming消费并处理kafka生成的实时流字符串结果:

    -------------------------------------------
    Time: *** 09:34:28
    -------------------------------------------
    ('welcome', 1)
    ('to', 1)
    ('pyspark', 1)
    ('kafka', 1)
    
    -------------------------------------------
    Time: *** 09:34:30
    -------------------------------------------
    
    -------------------------------------------
    Time: *** 09:34:34
    -------------------------------------------
    ('从这里开始', 1)
    ('', 1)
    ('将开发一个', 1)
    ('由sparkstreaming', 1)
    ('完成的', 1)
    ('实时计算的', 1)
    ('大数据项目', 1)
    
    -------------------------------------------
    Time: *** 09:34:36
    -------------------------------------------
    

      以上完成基于PySpark整合Spark Streaming与Kafka的测试。

    2.3 关于以上测试过程有关offset简单说明

      该测试并没有给出consumer自己管理消息的offset,在上面测试中,例如,producer连续生产5条消息,那么消息体可以看出以下简单构成:

    offset msg
    0 123@qq.com
    1 124@qq.com
    2 125@qq.com
    3 126@qq.com
    5 127@qq.com

      上面的测试中,streaming 以Direct模式连接kafka,每消费一条消息,streaming默认自动commit offset到kafka,以期实现当下一批streaming去kafka取消息时,是按顺延下一条来取,保证没有重复处理消息,也不会漏了消息,这是什么意思呢?
      例如当前streaming 消费offset=1的消息后,自动将消费位置offset=1告诉kafka:你记住我已经把第1个位置消息处理了,如果我下次找你kafka消费,请你找出offset=2的消息给我,但如果你将offset=0的消息给我,说明你让我重复消费消息,如果将offset=4消息给我,说明你让我漏了处理offset=3的消息。
      根据以上说明,例如producer已经生产了offset=9共10消息,即使将当前spark streaming进程再消费offset=1的消息后,被退出,之后重启,spark streaming从kafka消费的消息将是offset=2的消息,而不是offset=10的消息。虽然默认配置有一定合理性,但也有这种情况,导致无法实现“仅消费一次而且保证业务正常”,参考以下场景:
      spark streaming当前进程消费了offset=1的消息后,在业务处理过程中程序出错导致没有将办理业务详情发送到用户124@qq.com,因为spark streaming默认自动提交offset的位置给到kafka,因此spark streaming在一批处理中将消费offset=2的消息。若你想倒回去重新处理offset=1的消息,以保证邮件正确送到给用户,那么只能自己用外部数据库存放成功完成业务的offset,也即是自行管理offset,而不是被动的自动提交到kafka保存消费的offset。
      kafka的offset消费位置的管理详解将在之后的文章给出,只有将offset的消费位置交由客户端自行管理,才能灵活实现各种需求:重新消费、只消费一次等

    3 Spark Streaming与Kafka整合的两种方式

      在上面的整合测试里,用的streaming直连kafka进行消费消息。目前Spark Streaming 与 Kafka 的结合主要有两种方式:Receiver Dstream和Direct Dstream,目前企业实际项目主要采用 Direct Dstream 的模式,为何我这边可以断言企业主要使用Direct Dstream模式呢?因为在企业中,他们主力用Java和Scala,考虑企业需求方面,肯定使用spark-streaming-kafka-0-10版本的整合包,而这一版本不再支持Receiver模式。除非某些企业用了Pyspark作为spark应用开发,否则基本没人用Receiver模式。Spark官网也给出整合Kafka的指引链接
    在这里插入图片描述  因为基于PySpark开发实时流计算程序,这里只能选择spark-streaming-kafka-0-8开发包,从官方提示可知,spark-streaming-kafka-0-10是stable版本而且支持ssl安全传输,支持offset commit(支持手动提交,这个非常重要,自行控制消息位置从哪条开始处理,保证准确消费)和dynamic topic subscription,这就是为何要用Scala语言开发面向高级需求的Spark程序或者streaming程序,亲儿子!
      对于两种连接连接方式,有必要给出讨论和对比,以便加深streaming消费kafka topic更深理论知识。

    3.1 基于Receiver消费消息方式

    原理图(已启用WAL机制)
    在这里插入图片描述 (原理图需要注意的地方:如果Receiver模式下,未开启WAL用于备份接收的消息,那么图中Save data to WAL是不存在的。)
      早期版本的Spark Streaming与Kafka的整合方式为Receiver从Kafka消费消息,在提交Spark Streaming任务后,Spark会划出指定的Receiver来持续不断、异步读取kafka数据,这个Receiver其实是Executor(jvm进程)的一个常驻线程,跟task类似,为何它是常驻的?因为它需要不断监听Kafka的Producer生产的消息,从这点也可以看出,Receiver收到的消息是存放在Executor的内存中,换句话说,占用了Executor的内存。至于Receiver线程内部使用哪种数据结构存放接收的消息?对于先进先消费,后进后消费场景,显然使用queue最适合(通过队列实现多线程的生产-消费编程逻辑)。当Driver这边提交job后,Executors从Receiver拿到消息去交给task处理。在执行完之后,Receiver向Kafka的Zookeeper提交offset,告诉Kafka记主它当前已消费的位置。
      早期的设计中,Spark Streaming为了零丢失地消费kafka消息,增加对接收到的消息进行预写日志处理(Write Ahead Log, WAL)这个WAL是放在hdfs的checkpoint 目录下,开启该功能后,Receiver除了将接收到消息存放到Executor内存中,还将其同步写入到hdfs上的WAL日志文件。因此,当运行着的Spark Streaming任务突然挂了,后期启动时,Streaming也可以自动从hfds的checkpoint目录下的WAL日志找回丢失的消息。

    Receiver连接方式的缺点

      从上面receiver工作原理可以总结其缺点出将出现在内存方面、wal日志影响吞吐量等方面存在设计上的缺点:
    (1)占用cpu+内存:每个receiver需要单独占用一个vcore以及相应内存,如果Receiver并发数量多,占用Executor更多cpu和内存资源,这些资源本应用来跑tasks做计算用的,显然这会浪费计算资源。

    (2)WAL拖累整体处理效率:为了不丢数据需要开启WAL,也即Receiver将接收到的数据写一份备份到文件系统上(hdfs的checkpoint目录),既然有落到磁盘自然会有IO,这降低了kafka+streaming这个组合实时处理消息的效率,换句话说:增加job的执行时间。此外,开启WAL,还有造成重复消费的可能。

    (3)接收数量大于处理速率: 若Receiver并发数量设置不合理,接受消息速率大于streaming处理消息的速率,就会出现数据积压在队列中,最终可能会导致程序异常退出。这里也是面试常见的问题:例如提高Receiver的并发数量,就可以提高streaming处理能力吗?首先,Receiver异步接收kafka消息,不参与计算,真正执行计算的是streaming,如果streaming并发性没有调高,整个计算能力也没有提高。一定要记着:kafka跟streaming是需要两边同时调优,才能达到计算能力的整体提升,不能只调优一边,这是一个二合一的实时计算组合!!

    (补充知识点:Receiver的并发数据量是怎么确定?
      在KafkaUtils.createStream()中,可以指定topic的partition数量,该数量就是Receiver消费此topic的并发数(其实就是Executor 启动消费此topic的线程数量)但需要指出的是:Kafka中topic的partition与Spark中RDD的partition是两个不同的概念,两者没有关联关系。)

    3.2 基于Direct消费消息方式

    原理图:
    在这里插入图片描述  当Receiver的工作原理及其缺点理解后,Direct模式将更容易理解。Driect模式下,Streaming定时主动查询Kafka,以获得指定topic的所有partition的最新offset,结合上一批次已保存的offset位置,Streaming就可以确定出每个批次拉取消息offset的范围,例如第1批次的消息(offset范围0-100)正在处理过程中,streaming指定特定的线程定时去Kafka查询第2批次最新的offset,发现最新值为300,那么如果streaming没有限制每批次的最大消费速率,在第2批次取消息时,会一次性取回offset=101到300的消息记录,这个就是所谓的offset ranges。若streaming设置的限制每批次的最大消费速率为每批次100条,那么即使查询到Kafka topic最新offset位置为300,streaming在第2批次消费的offset 访问也只能是101~200共计100条消费记录。
      当处理数据的job启动时,就会使用kafka的简单Consumer API来获取kafka中指定offset范围的数据。此外,Streaming已消费的offset不再交由Zookeeper来管理,而是手动采用外部存储数据库如mysql、redis等存放和管理已消费的offset。
    以下为Scala代码演示从rdd拿到offset ranges属性的逻辑(rdd当然本身包含消息数据)

    directKafkaStream.map {
               ...
     }.foreachRDD { batchRdd =>
        // 获取当前rdd数据对应的offset
        val offsetRanges = batchRdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 运行计算任务
        doCompute(batchRdd)
        // 使用外部数据库自行保存和管理offsetRanges
        saveToRedis(offsetRanges)
     }
    

      而Receiver方式下没有关于offset的处理逻辑,这是因为streaming在该模式下内部通过kafka consumer high level API 提交到zk保存。

    receiverkafkaStream.map {
               ...
     }.foreachRDD { streamRdd =>
        // 运行计算任务
        doCompute(rdd)
     }
    
    Direct连接方式的优点

    (1)提高计算资源利率:不像Receiver那样还占用Executor的一部分内存和计算资源,Direct方式下的Executor的代码实现踢掉Receiver这块设计,因此可以实现计算和内存资源全部用在计算任务,因为streaming定时主动去kafka拉取batch 消息,拉过来直接计算,而不是像Receiver不断接收消息不断地存放在内存中。

    (2)无需开启WAL:Receiver方式需要开启WAL机制以保证不丢失消息,这种方式加大了集群的计算延迟和效率,而Direct的方式,无需开启WAL机制,因为Kafka集群有partition做了高可用,只要streaming消费方自己存放和管理好已经消费过的offset,那么即使程序异常退出等,也可利用已存储的offset去Kafka消费丢失的消息。

    (3)可保证exactly once的消费语义:基于Receiver的方式,使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制,可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和Zookeeper之间可能是不同步的。基于Direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据时消费一次且仅消费一次。

    (4)计算程序更稳定:Receiver模式是通过异步持续不断的读取数据,当集群出现网络、计算负载跟不上等因素,导致streaming计算任务侧出现延迟和堆积,而Receiver却还在持续接收kafka消息,此种情况容易导致Executor内存溢出或者其他异常抛出,从而引起计算程序退出,换句话说,Receiver模式的streaming实时计算可靠性和稳定性欠缺。对于Direct模式,Driver在触发batch计算任务时,才会去kafka拉消息回来并计算,而且给streaming加入最大消费速率控制后,整个实时计算集群鲁棒性更强。

    (5)Dstream 的rdd分区数与kafka分区一致
      Direct模式下,Spark Streaming创建的rdd分区数跟Kafka的partition数量一致,也就是说Kafka partitions和streaming rdd partitions之间有一对一的映射关系,这样的好处是明显和直观的:只要增加kafka topic partition数量,就可以直接增大spark streaming的计算的并发数。
      当然,Direct模式不足的地方就是需要自行实现可靠的offset管理逻辑,但对于开发方向来说,这点很容易实现,我个人若对offset管理,将优先选用redis,而且是集群!
      以上有关Spark Streaming 整合Kafka的方式和原理分析必须要理解,否则在后面的实时计算平台的代码开发上,有些逻辑你不一定能处理好。

    展开全文
  • 基于pyspark GraphFrames实现图查询和计算GraphFrames基本操作创建图展示顶点和边的数据统计顶点的入度和出度对顶点和边的数据进行分析搜索指定结构路径对搜索结果过滤多路径条件搜索匿名顶点和边设置路径不存在的...

    GraphFrames基本操作

    GraphFrames,该类库是构建在Spark DataFrames之上,它既能利用DataFrame良好的扩展性和强大的性能,同时也为Scala、Java和Python提供了统一的图处理API。GraphX基于RDD API,不支持Python API; 但GraphFrame基于DataFrame,并且支持Python API。

    创建图

    创建图的方式很简单,分别向GraphFrame中传入一个顶点数据集和一个边数据集即可。

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from graphframes import GraphFrame
    
    
    sc = SparkContext("local", appName="mysqltest")
    sqlContext = SQLContext(sc)
    
    vertices = sqlContext.createDataFrame([
      ("a", "Alice", 34),
      ("b", "Bob", 36),
      ("c", "Charlie", 30),
      ("d", "David", 29),
      ("e", "Esther", 32),
      ("f", "Fanny", 36),
      ("g", "Gabby", 60)], ["id", "name", "age"])
    
    edges = sqlContext.createDataFrame([
      ("a", "b", "friend"),
      ("b", "c", "follow"),
      ("c", "b", "follow"),
      ("f", "c", "follow"),
      ("e", "f", "follow"),
      ("e", "d", "friend"),
      ("d", "a", "friend"),
      ("a", "e", "friend")
    ], ["src", "dst", "relationship"])
    
    # 生成图
    g = GraphFrame(vertices, edges)
    print(g)
    # GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])
    print(type(g))
    # <class 'graphframes.graphframe.GraphFrame'>
    

    展示顶点和边的数据

    # 展示顶点和边的数据
    g.vertices.show()
    # +---+-------+---+
    # | id|   name|age|
    # +---+-------+---+
    # |  a|  Alice| 34|
    # |  b|    Bob| 36|
    # |  c|Charlie| 30|
    # |  d|  David| 29|
    # |  e| Esther| 32|
    # |  f|  Fanny| 36|
    # |  g|  Gabby| 60|
    # +---+-------+---+
    
    g.edges.show()
    # +---+---+------------+
    # |src|dst|relationship|
    # +---+---+------------+
    # |  a|  b|      friend|
    # |  b|  c|      follow|
    # |  c|  b|      follow|
    # |  f|  c|      follow|
    # |  e|  f|      follow|
    # |  e|  d|      friend|
    # |  d|  a|      friend|
    # |  a|  e|      friend|
    # +---+---+------------+
    

    统计顶点的入度和出度

    入度和出度就是顶点被多少条边所引用,还有边是指向顶点还是由顶点指向其他点。

    g.inDegrees.sort("id").show()
    # +---+--------+
    # | id|inDegree|
    # +---+--------+
    # |  a|       1|
    # |  b|       2|
    # |  c|       2|
    # |  d|       1|
    # |  e|       1|
    # |  f|       1|
    # +---+--------+
    
    g.outDegrees.sort("id").show()
    # +---+---------+
    # | id|outDegree|
    # +---+---------+
    # |  a|        2|
    # |  b|        1|
    # |  c|        1|
    # |  d|        1|
    # |  e|        2|
    # |  f|        1|
    # +---+---------+
    
    g.degrees.sort("id").show()
    # +---+------+
    # | id|degree|
    # +---+------+
    # |  a|     3|
    # |  b|     3|
    # |  c|     3|
    # |  d|     2|
    # |  e|     3|
    # |  f|     2|
    # +---+------+
    

    对顶点和边的数据进行分析

    # 对边和顶点的数据进行分析
    agelist = g.vertices.groupBy().min("age").show()
    # +--------+
    # |min(age)|
    # +--------+
    # |      29|
    # +--------+
    
    youngest = g.vertices.filter("age < 30").show()
    # +---+-----+---+
    # | id| name|age|
    # +---+-----+---+
    # |  d|David| 29|
    # +---+-----+---+
    
    numFollows = g.edges.filter("relationship = 'follow'").count()
    print(numFollows)
    # 4
    

    搜索指定结构路径

    # 搜索指定结构
    motif = g.find("(start)-[pass]->(end)")
    motif.show()
    # +----------------+--------------+----------------+
    # |           start|          pass|             end|
    # +----------------+--------------+----------------+
    # | [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|
    # |  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
    # | [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
    # |  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|
    # |    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
    # |[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
    # |  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
    # |  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
    # +----------------+--------------+----------------+
    

    对搜索结果过滤

    # 在搜索的结果上进行过滤
    motif.filter("start.age > 33 and end.age > 33").show()
    # +--------------+--------------+------------+
    # |         start|          pass|         end|
    # +--------------+--------------+------------+
    # |[a, Alice, 34]|[a, b, friend]|[b, Bob, 36]|
    # +--------------+--------------+------------+
    

    多路径条件搜索

    注意,搜索条件中,a和b可以指代相同的顶点,如果需要限制为不同的顶点,需要在返回结果中使用过滤器。

    # 多个路径条件
    motif = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
    motif.show()
    # +----------------+--------------+----------------+--------------+
    # |               a|             e|               b|            e2|
    # +----------------+--------------+----------------+--------------+
    # |[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|
    # |    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|
    # +----------------+--------------+----------------+--------------+
    

    匿名顶点和边

    # 不需要返回路径中的元素时,可以使用匿名顶点和边
    motif = g.find("(start)-[]->()")
    motif.show()
    # +----------------+
    # |           start|
    # +----------------+
    # |  [f, Fanny, 36]|
    # | [e, Esther, 32]|
    # | [e, Esther, 32]|
    # |  [d, David, 29]|
    # |[c, Charlie, 30]|
    # |    [b, Bob, 36]|
    # |  [a, Alice, 34]|
    # |  [a, Alice, 34]|
    # +----------------+
    

    设置路径不存在的条件

    # 设置路径不存在的条件
    motif = g.find("(a)-[]->(b); !(b)-[]->(a)")
    motif.show()
    # +---------------+----------------+
    # |              a|               b|
    # +---------------+----------------+
    # | [a, Alice, 34]| [e, Esther, 32]|
    # |[e, Esther, 32]|  [d, David, 29]|
    # |[e, Esther, 32]|  [f, Fanny, 36]|
    # | [a, Alice, 34]|    [b, Bob, 36]|
    # | [f, Fanny, 36]|[c, Charlie, 30]|
    # | [d, David, 29]|  [a, Alice, 34]|
    # +---------------+----------------+
    

    有状态和无状态查询

    上面对路径的条件查询都是无状态的,没有指定任务限制条件,只能在查询完后再进行过滤。
    例如,我们要查询四个顶点的路径。

    无状态查询

    使用无状态查询,例子如下:

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from graphframes.examples import Graphs
    
    sc = SparkContext("local", appName="mysqltest")
    sqlContext = SQLContext(sc)
    g = Graphs(sqlContext).friends()  # Get example graph
    g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)").show()
    

    返回一个较大的数据集:

    +----------------+--------------+----------------+--------------+----------------+--------------+----------------+
    |               a|            ab|               b|            bc|               c|            cd|               d|
    +----------------+--------------+----------------+--------------+----------------+--------------+----------------+
    |  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
    |    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
    |  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
    | [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
    |[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
    |  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
    | [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
    +----------------+--------------+----------------+--------------+----------------+--------------+----------------+
    

    有状态查询

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.functions import col, lit, when
    
    from graphframes.examples import Graphs
    from functools import reduce
    
    
    sc = SparkContext("local", appName="mysqltest")
    sqlContext = SQLContext(sc)
    g = Graphs(sqlContext).friends()  # Get example graph
    chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
    
    
    # 定义下一个顶点更新状态的条件:如果关系为friend则cnt+1
    sumFriends = lambda cnt, relationship: when(relationship == "friend", cnt+1).otherwise(cnt)
    
    # 将更新方法应用到整个链的,链上每有一个关系是friend就加一,链上共三个关系。
    condition =reduce(lambda cnt, e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
    
    # 传入限制条件
    chainWith2Friends2 = chain4.where(condition >= 2)
    chainWith2Friends2.show()
    

    可以看到,我们在过程中限制的了链中必须有2个以上的关系是friend,结果符合预期。

    +---------------+--------------+--------------+--------------+--------------+--------------+----------------+
    |              a|            ab|             b|            bc|             c|            cd|               d|
    +---------------+--------------+--------------+--------------+--------------+--------------+----------------+
    | [d, David, 29]|[d, a, friend]|[a, Alice, 34]|[a, b, friend]|  [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
    |[e, Esther, 32]|[e, d, friend]|[d, David, 29]|[d, a, friend]|[a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
    +---------------+--------------+--------------+--------------+--------------+--------------+----------------+
    

    子图

    例一

    from graphframes.examples import Graphs
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    
    
    sc = SparkContext("local", appName="mysqltest")
    sqlContext = SQLContext(sc)
    g = Graphs(sqlContext).friends()  # Get example graph
    
    # 创建子图
    g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")
    g1.vertices.show()
    # +---+------+---+
    # | id|  name|age|
    # +---+------+---+
    # |  a| Alice| 34|
    # |  b|   Bob| 36|
    # |  e|Esther| 32|
    # |  f| Fanny| 36|
    # +---+------+---+
    
    
    g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()
    g1.vertices.show()
    # +---+-----+---+
    # | id| name|age|
    # +---+-----+---+
    # |  b|  Bob| 36|
    # |  a|Alice| 34|
    # +---+-----+---+
    

    这个例子中,我们使用filterVertices和filterEdges过滤顶点和边用来创建子图。
    在最后我们对子图调用dropIsolatedVertices()方法,删除孤立的没有连接的点。

    例二

    from graphframes.examples import Graphs
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from graphframes import GraphFrame
    
    
    sc = SparkContext("local", appName="mysqltest")
    sqlContext = SQLContext(sc)
    g = Graphs(sqlContext).friends()  # Get example graph
    
    # 路径搜索
    paths = g.find("(a)-[e]->(b)")\
      .filter("e.relationship = 'follow'")\
      .filter("a.age < b.age")
    paths.show()
    +----------------+--------------+--------------+
    # |               a|             e|             b|
    # +----------------+--------------+--------------+
    # | [e, Esther, 32]|[e, f, follow]|[f, Fanny, 36]|
    # |[c, Charlie, 30]|[c, b, follow]|  [b, Bob, 36]|
    # +----------------+--------------+--------------+
    
    # 选择关系数据集中的列
    e2 = paths.select("e.src", "e.dst", "e.relationship")
    # +---+---+------------+
    # |src|dst|relationship|
    # +---+---+------------+
    # |  e|  f|      follow|
    # |  c|  b|      follow|
    # +---+---+------------+
    
    # 使用顶点和边的集合构造子图
    g2 = GraphFrame(g.vertices, e2)
    

    在上述例子的过程为:

    1. 首先用(a)-[e]->(b)过滤出所有连接的顶点,得到顶点数据集。
    2. 使用条件过滤指定关系follow和节点年龄的大小关系,得到一个边的数据集。这里可以看到,这个数据集中的元素,其实也是一个数据集。
    3. 将顶点和边的数据集传入GraphFrame可以得到一个子图。

    参考

    https://graphframes.github.io/graphframes/docs/_site/user-guide.html
    https://www.jianshu.com/p/d612fd2f4310
    https://blog.csdn.net/ssyshenn/article/details/96702905

    展开全文
  • 基于pyspark图计算的算法实例

    千次阅读 2020-03-18 15:04:58
    基于pyspark的图计算实例引入广度优先搜索连通分量强连通分量标签传播PageRank最短路径算法三角形计数 引入 图算法指利用特制的线条算图求得答案的一种简便算法。无向图、有向图和网络能运用很多常用的图算法,这些...
  • 有一些关于spark df和pandas df的介绍,可以详细看基于Pyspark的Pandas_udf使用方法 这里介绍我在使用过程中遇到的问题 Pandas UDFs pandas udf是用户定义的函数,是由spark用arrow传输数据,pandas去处理数据。我们...
  • 基于PySpark大数据分析/Python/Spark

    千次阅读 2018-03-12 22:26:57
    很少见的基于pyspark的spark教程,比较方便带着大家入手spark,实现大数据分析!讲明一点,付费的,不过是最便宜的,不信可以自己去问问! 课程目录: 课程一:PySpark课程及环境准备 0101-为什么要学习PySpark...
  • 基于PySpark的网络服务异常检测系统 阶段总结(二) 在上篇博文中介绍了网络服务异常检测的大概,本篇将详细介绍SVDD和Isolation Forest这两种算法 1. SVDD算法 SVDD的英文全称是Support Vector Data ...
  • Pyspark是近段时间笔者接触到的比较高效的大数据处理工具,他的亮点是整理出了数据分析过程中两个最高频应用的工具:pandas的DataFrame包和sklearn包,能够方便的完成数据处理及模型构建两块内容,上一篇笔者整理了...
  • 文章目录一、基本操作1.1 创建spark连接1.1.1 SparkSession1.1.2 Sparkconf1.2 数据加载1.2.1 载入json1.2.2 载入文本1.2.3 载入csv1.3 一般操作1.3.1 json等有表头的数据1.3.2 rdd操作1.3....from pyspark.sql imp...
  • 基于pyspark的als推荐电影

    千次阅读 2019-10-25 11:30:29
    ALS算法是基于模型的推荐算法 基本思想 对稀疏矩阵进行模型分解,评估出缺失项的值,以此来得到一个基本的训练模型。然后依照此模型可以针对新的用户和物品数据进行评估。ALS是采用交替的最小二乘法来算出缺失项的,...
  • 使用数据驱动方法检测大量文本中的常用短语。 发现的短语的大小可以是任意的。 可以用于英语以外的语言
  • 基于PySpark的电影推荐引擎

    千次阅读 2019-10-09 19:44:20
    开启PySpark 代码调试 我们使用sc.textFile读取ml-100k 我们使用sc.textFile读取HDFS上的ml-100k数据集中的u.data,并且查看数据项数 rawUserData = sc . textFile ( "hdfs://172.18.74.236:9000/ml-...
  • 案例 1.Pyspark交互式编程 2.编写独立应用程序实现数据去重 3.编写独立应用程序实现求平均值 二.前置准备 1.实验环境 2.传输数据文本文件 三.具体步骤 1.Pyspark交互式编程 2.编写独立应用程序实现数据去重 3.编写...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,103
精华内容 2,041
关键字:

基于pyspark