精华内容
下载资源
问答
  • pyspark入门

    2020-07-06 18:33:36
    pyspark入门欢迎使用Markdown编辑器spark streaming无状态转换有状态转换sockets数据流 欢迎使用Markdown编辑器 你好! 这是学习pyspark的记录。 spark streaming Spark Streaming利用Spark Core的快速调度能力执行...

    欢迎使用Markdown编辑器

    你好! 这是学习pyspark的记录。

    spark streaming

    Spark Streaming利用Spark Core的快速调度能力执行流数据的分析。它以最小批次获取数据,并对批次上的数据执行RDD转化。
    这样的设计,可以让用于批处理分析的Spark应用程序代码也可以用于流数据分析,因此便于实时大数据处理架构的实现。但是这种便利性带来的问题是处理最小批次数据的延时。
    其他流数据处理引擎,例如Storm和Flink的streaming组件,都是以事件而不是最小批次为单位处理流数据的。Spark Streaming支持从Kafka、Flume、Twitter、ZeroMQ、Kinesis和TCP/IP sockets接收数据。

    无状态转换

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/3 10:33
    # @Author  : ljk
    # @Email   : ljk13572@163.com
    
    # spark streaming 本地文件流wordcount -> 无状态
    
    # https://blog.csdn.net/weixin_43931941/article/details/105386131
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    # 创建streaming环境
    
    sc = SparkContext('local','ljk_streaming')
    
    # 设置滑动窗口
    ssc = StreamingContext(sc,10)
    
    # 读取文件流
    fileDStream = ssc.textFileStream('data/')
    
    # 遍历输入内容,执行rdd转化,行动算子
    fileDStream.foreachRDD(lambda rdd:print(rdd.collect()))
    # 打印控制台
    fileDStream.pprint()
    
    # 对流进行转换
    # 对流进行转换
    result = fileDStream.flatMap(lambda line:line.split(',')).map(lambda word:(word, 1)).reduceByKey(lambda a,b:a+b)
    result.pprint()
    # 启动streaming
    ssc.start()
    ssc.awaitTermination()
    

    有状态转换

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/3 11:27
    # @Author  : ljk
    # @Email   : ljk13572@163.com
    
    # spark streaming 本地文件流wordcount -> 有状态
    # https://blog.csdn.net/a8131357leo/article/details/101006510
    
    
    
    def updateFunction(newValues, runningCount):
        # 对于不存在的Key,他的value就是None
    
        if runningCount is None:
            runningCount = 0
        return sum(newValues, runningCount)
    
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    sc = SparkContext('local','ljk_streaming')
    ssc = StreamingContext(sc, 10)
    ssc.checkpoint("checkpoint")
    
    lines = ssc.textFileStream('data/')
    
    # 生成一个初始的dic,用来保存计数,其实不用加初始值
    initial_dic = sc.parallelize(range(1, 5)).map(lambda x: (x, 0))
    
    words = lines.flatMap(lambda line: line.split("\n")).map(lambda word: (word, 1))
    
    
    # reduceBykey得到一个RDD内的计数,然后根据计数再去更新数据
    wordCounts = words.reduceByKey(lambda x, y: x + y).updateStateByKey(updateFunction)
    
    wordCounts.pprint()
    # 启动streaming
    ssc.start()
    ssc.awaitTermination()
    

    sockets数据流

    生产数据

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/3 14:53
    # @Author  : ljk
    # @Email   : ljk13572@163.com
    
    
    import random
    import socket
    from time import sleep
    
    host = 'localhost'
    port = 9999
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((host, port))
    s.listen(1)
    print('\nListening for a client at', host, port)
    conn, addr = s.accept()
    print('\nConnected by', addr)
    
    count = 0
    lists = ['a','b']
    try:
        while True:
    
            # wordcount发送的文件,随机5~15的数字
    
            line = [lists[random.randint(0, 1)] for x in range(2)]
    
            line = ",".join(line) + "\n"
            print(line)
            # socket只能发送byte编码的数据,所以设置编码 或者b'aaaaa'这样也行
            conn.send(line.encode('utf-8'))
            sleep(2)
            count += 1
    
            if count == 20:
                # 发送一个文字流包含字母的流
                # conn.send("a,b,c,d,e".encode('utf-8'))
                conn.close()
    
    
    except socket.error:
        print('Error Occured.\n\nClient disconnected.\n')
    
    # s.shutdown(socket.SHUT_RDWR)
    # s.close()
    
    

    pyspark streaming处理数据

    # -*- coding: utf-8 -*-
    # @Time    : 2020/7/3 14:52
    # @Author  : ljk
    # @Email   : ljk13572@163.com
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    sc = SparkContext('local','ljk_streaming')
    ssc = StreamingContext(sc, 5)
    
    lines = ssc.socketTextStream("localhost", 9999)
    
    words = lines.flatMap(lambda line: line.split(","))
    pairs = words.map(lambda word: (word, 1))
    
    # Count each word in each batch
    
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    
    # Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.pprint()
    
    ssc.start()  # Start the computation
    
    try:
        ssc.awaitTerminationOrTimeout(30)
        ssc.stop(stopSparkContext=False, stopGraceFully=True)
    except:
        print('input wrong')
        ssc.stop(stopSparkContext=False, stopGraceFully=True)
    
    
    展开全文
  • Pyspark入门

    2020-05-01 14:07:15
    RDD创建 来源可以是本地也可以从外部读取(HDFS或其他兼容Hadoop的inputformat的) 本地: data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) 外部: PySpark can create distributed datasets from any ...

    RDD(Resilent Distributed Dataset)

    Resilent对丢失节点,丢失数据集的修复

    Distributed分布式,运行在不同节点上

    Dataset数据集

    RDD是spark最基本的抽象,是不可变的(每个RDD生成后就不再改变,所有的操作都是生成一个新的RDD,因为是并行化计算,如果在原有基础上进行修改,那么不得不浪费时间在同步通信上)

    可以进行分割的,

    分割后的数据集可以并行运算的。

    支持的操作有map,filter,persist。

    PairRDDFunctions

    DoubleRDDFunctions

    SequenceFIleRDDFunctions

    RDD特性(面试必备):

    • A list of partitions 数据可分成split1,split2
    • Function for each splits 对被分割的splits每一个都进行运算,programmer不需要知道是如何划分的
    • A list of other dependencies: rdd1 -》 rdd2 -》 rdd3 -》rdd4 RDD会保存一份依赖关系,当数据丢失的时候可以追踪溯源到上一个节点进行重新计算,也是弹性的基础保障
    • 可选 Partitioner for Key-value RDD 
    • 可选 a list of prefferd locations:移动数据不如移动计算,为什么会有多个location,因为数据也是多份拷贝的

    一个partition(split)是一个task

    SparkContext

    主入口点,连接到Spark集群:local,standalone,yarn,mesos等。 创建RDD,广播参数到集群。

    也可以通过SparkConf来设置,优先级高于系统配置。

    如何配置?

     

    RDD创建

    来源可以是本地也可以从外部读取(HDFS或其他兼容Hadoop的inputformat的)

    本地:

    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)

    外部:

    PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

    distFile = sc.textFile("data.txt")
    This method takes an URI for the file 
    (either a local path on the machine, 
    or a hdfs://, s3a://, etc URI) 
    and reads it as a collection of lines.
    如果用的集群方式,所有节点都必须含有这个文件,
    而且必须是相同路径

    部署到环境:https://spark.apache.org/docs/latest/submitting-applications.html

    ./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      --deploy-mode <deploy-mode> \
      --conf <key>=<value> \
      ... # other options
      <application-jar> \
      [application-arguments]

    RDD Operations:

    Transformations 从已存在的dataset中创建一个RDD,所有的transformations都是lazy的,不会进行计算,只会记录转化的关系,等遇到了action才会进行真正的计算。

    Actions 触发计算,返回values到driver或者把计算的数据存储起来。

     

    RDD Transformations

    Map(func) 将dataset中的每一个元素形成func形成一个新的rdd。

    eg  常见例子 (book)  -> (book, 1)

    Filter(func) 返回func为true的值,形成新的rdd

    FlatMap(func) 与map功能相同,只不过map返回的与原本的元素数量是相同的即一一对应,但flatmap一个元素可以返回多个值一对多对应。

    Groupbykey 将相同的key分配到一起,元素必须是(key, value)的样式

    ReduceByKey 就是在GroupByKey的基础上多了一步操作,将key后面整合的元素进行reduce操作。

    需要对结果排序:

    SortByKey 是按照key进行排序的,如果根据其他位置进行排序,则需要换位置,记得排序完成以后恢复成原来的样式。

    union 将两个rdd合并起来

    distinct 去重复

    join 类似于sql中的连表操作,join是相当于inner join,leftOuterjoin和rightOuterjoin是左右外连表。

     

    RDD Actions

    Collect获取所有结果,take从结果中取出一部分, min,max,count得到结果的size,foreach相当于map的action版本,reduce和reducebygroup是差不多

    综合例子 词频统计,排序取出top N,过滤掉空值

    平均年龄计算,这里注意类型转化即可

     

    Pyspark运行模式

    生产中用的更多的是yarn和standalone模式,而且绝大部分运行在yarn模式上,将spark当成客户端,提交作业到yarn上执行

     

     

     

     

    展开全文
  • PySpark 入门

    2018-12-22 16:18:00
    1.wordCount 1 from __future__ import print_function 2 3 import sys 4 from operator import add ... 6 from pyspark import SparkContext 7 8 9 if __name__ == "__main__": 10 if len(...

    1.wordCount 

     1 from __future__ import print_function
     2 
     3 import sys
     4 from operator import add
     5 
     6 from pyspark import SparkContext
     7 
     8 
     9 if __name__ == "__main__":
    10 if len(sys.argv) != 2:
    11 print("Usage: wordcount <file>", file=sys.stderr)
    12 exit(-1)
    13 sc = SparkContext(appName="PythonWordCount")
    14 lines = sc.textFile(sys.argv[1], 1)
    15 counts = lines.flatMap(lambda x: x.split(' ')) \
    16 .map(lambda x: (x, 1)) \
    17 .reduceByKey(add)
    18 output = counts.collect()
    19 for (word, count) in output:
    20 print("%s: %i" % (word, count))
    21 
    22 sc.stop()

     

    2. Sql.py

    Sql介绍了DataFrame的使用方法

    from __future__ import print_function
    
    import os
    import sys
    
    
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
    
    
    if __name__ == "__main__":
        sc = SparkContext(appName="PythonSQL")
        sqlContext = SQLContext(sc)
    
        # RDD is created from a list of rows
        some_rdd = sc.parallelize([Row(name="John", age=19),
                                  Row(name="Smith", age=23),
                                  Row(name="Sarah", age=18)])
        # Infer schema from the first row, create a DataFrame and print the schema
        some_df = sqlContext.createDataFrame(some_rdd)
        some_df.printSchema()
    
        # Another RDD is created from a list of tuples
        another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
        # Schema with two fields - person_name and person_age
        schema = StructType([StructField("person_name", StringType(), False),
                            StructField("person_age", IntegerType(), False)])
        # Create a DataFrame by applying the schema to the RDD and print the schema
        another_df = sqlContext.createDataFrame(another_rdd, schema)
        another_df.printSchema()
        # root
        #  |-- age: integer (nullable = true)
        #  |-- name: string (nullable = true)
    
        # A JSON dataset is pointed to by path.
    

     

    3. Sort

    sort实现了排序功能,主要通过sortByKey, 也可以使用SortWith, 注意如果数据量特别大,不要使用collect, 而是应该将rdd repatition为1个分区然后保存在hdfs上使用

    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("Usage: sort <file>", file=sys.stderr)
            exit(-1)
        sc = SparkContext(appName="PythonSort")
        lines = sc.textFile(sys.argv[1], 1)
        sortedCount = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (int(x), 1)) \
            .sortByKey(lambda x: x)
        # This is just a demo on how to bring all the sorted data back to a single node.
        # In reality, we wouldn't want to collect all the data to the driver node.
        output = sortedCount.collect()
        for (num, unitcount) in output:
            print(num)
    
        sc.stop()
    

     

    4. LR回归

    from __future__ import print_function
    
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    
    
    D = 10  # Number of dimensions
    
    # Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
    # make further computations faster.
    # The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
    # into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
    def readPointBatch(iterator):
        strs = list(iterator)
        matrix = np.zeros((len(strs), D + 1))
        for i, s in enumerate(strs):
            matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
        return [matrix]
    
    if __name__ == "__main__":
    
        if len(sys.argv) != 3:
            print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
            exit(-1)
    
        print("""WARN: This is a naive implementation of Logistic Regression and is
          given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py
          to see how MLlib's implementation is used.""", file=sys.stderr)
        sc = SparkContext(appName="PythonLR")
        points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
        iterations = int(sys.argv[2])
    
        # Initialize w to a random value
        w = 2 * np.random.ranf(size=D) - 1
        print("Initial w: " + str(w))
    
        # Compute logistic regression gradient for a matrix of data points
        def gradient(matrix, w):
            Y = matrix[:, 0]    # point labels (first column of input file)
            X = matrix[:, 1:]   # point coordinates
            # For each point (x, y), compute gradient function, then sum these up
            return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
    
        def add(x, y):
            x += y
            return x
    
        for i in range(iterations):
            print("On iteration %i" % (i + 1))
            w -= points.map(lambda m: gradient(m, w)).reduce(add)
    
        print("Final w: " + str(w))
    
        sc.stop()
    

     

     

    pyspark递交到yarn上运行

     

    /home/hadoop/soft/spark/bin/spark-submit \

     

     

    --master yarn \

     

     

    --deploy-mode cluster \ 

     

     

    --num-executors 1 \ 

    --executor-memory 1G \ 

     

     

    wordCount.py

    转载于:https://www.cnblogs.com/energy1010/p/10161475.html

    展开全文
  • pyspark 入门小案例

    2020-10-06 09:51:02
    pyspark 入门小案例 导入相应的依赖包 import sys from pyspark import SparkConf, SparkContext 设置对应的导入 if name == ‘main’: if len(sys.argv)!=3: print("Usage:wordcount ", sys.stderr) sys.exit(-1)...

    pyspark 入门小案例

    导入相应的依赖包

    import sys
    
    from pyspark import SparkConf, SparkContext
    

    设置对应的导入

    if name == ‘main’:
    if len(sys.argv)!=3:
    print("Usage:wordcount ", sys.stderr)
    sys.exit(-1)

    配置配置参数

    conf=SparkConf()
    sc=SparkContext(conf=conf);
    

    定义一个打印方法

    def printresult():
        counts=sc.textFile(sys.argv[1]).flatMap(lambda x:x.split(" "))\
            .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
        print(counts.collect())
    
        output=counts.collect()
    
        for (i,j) in output:
            print("%s:%s" %(i,j))
    

    定义导出方法

    def save_file():
        sc.textFile(sys.argv[1]).flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile(sys.argv[2])
    

    存储最终统计的文件

    save_file()
    

    关闭程序,释放空间

    sc.stop()
    

    放到服务器上运行

    ./spark-submit --master local[4] --name pyspark1006 /opt/pyspark_scripty/py_wc.py file:///opt/hello.txt file:///opt/pyspark_scripty/wc

    展开全文
  • pyspark入门(一)

    2020-02-05 22:37:14
    pyspark入门基础 pyspark简介 首先我们都应该清楚apache是Scala编写的程序,而最近几年吟哦日机器学习的兴起,同时降低科技人才使用编程语言的代价,python这种动态语言成为2019年最受欢迎的编程语言之一(java依然...
  • python pyspark入门

    2017-12-11 16:45:00
    python pyspark入门篇 一.环境介绍: 1.安装jdk 7以上 2.python 2.7.11 3.IDE pycharm 4.package:spark-1.6.0-bin-hadoop2.6.tar.gz 二.Setup 1.解压spark-1.6.0-bin-hadoop2.6.tar.gz 到目录D:\spark-...
  • pyspark入门教程

    千次阅读 多人点赞 2020-07-20 20:18:21
    一、windows下配置pyspark环境 1.1 jdk下载安装 1.2 Scala下载安装 1.3 spark下载安装 1.4 Hadoop下载安装 1.5 pyspark下载安装 1.6 anaconda下载安装 1.7 测试环境是否搭建成功 二、pyspark原理简介 三、...
  • PySpark入门:键值对RDD操作 RDD基本转换运算 创建RDD最简单的方式是使用SparkContext的parallelize方法 intRDD=sc.parallelize([3,1,2,5,5]) intRDD.collect() 由于spark的惰性,转化操作并不会马上执行,而...
  • pyspark入门学习demopyspark创建Dataframe增、删、改等相关语法 最近数据机太大,用pandas处理耗时太久,于是用学习pyspark处理数据。 pyspark创建Dataframe from pyspark import SparkConf from pyspark.sql import...
  • pyspark入门整理

    2020-07-02 15:03:18
    最近工作需要对千万以上数据做特征处理,为了提升运(zao)算(ri)效(xia)率(ban),开始使用pyspark做分布式运算。 也是从基本开始学习,先把用到的一些资料贴在这里,日后有空结合业务进一步整理。 (一)...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,912
精华内容 764
关键字:

pyspark入门