2017-03-29 17:50:43 victory0508 阅读数 9358
  • 基于pyspark数据分析

    本课程以案例驱动的方式,讲解pyspark的sql,streaming模块,并利用这些模块获取,分析和处理,存储数据,以及利用flask,echarts对存储的数据进行展示。

    91 人正在学习 去看看 夏水军


pyspark读取Mysql数据:

样例code 1:

from pyspark.sqlimportSQLContext

sqlContext = SQLContext(sc)
dataframe_mysql = sqlContext.read.format("jdbc").options(url="jdbc:mysql://127.0.0.1:3306/spark_db", driver="com.mysql.jdbc.Driver", dbtable="spark_table", user="root", password="root").load()
dataframe_mysql.show()


样例code 2:

from pyspark import SparkContext,SQLContext
from pyspark.sql import SQLContext

sc = SparkContext("spark://train01:7077","LDASample")  
sqlContext=SQLContext(sc)
jdbcDf=sqlContext.read.format("jdbc").options(url="jdbc:mysql://10.10.10.10:3306/adl",driver="com.mysql.jdbc.Driver",dbtable="(SELECT code,title,description FROM project) tmp",user="mouren",password="mouren").load()
print(jdbcDf.select('description').show(2))


前提:配置文件/etc/spark/conf/spark-env.sh

+export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/mysql-connector-java/mysql-connector-java-5.1.40-bin.jar

这样的配置有时报错:

WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to ':/opt/mysql-connector-java/mysql-connector-java-5.1.40-bin.jar' as a work-around.

解决方案:

去掉上面的配置,编辑spark-defaults.conf

+spark.executor.extraClassPath /opt/mysql-connector-java/mysql-connector-java-5.1.40-bin.jar

2020-01-07 17:48:01 levy_cui 阅读数 21
  • 基于pyspark数据分析

    本课程以案例驱动的方式,讲解pyspark的sql,streaming模块,并利用这些模块获取,分析和处理,存储数据,以及利用flask,echarts对存储的数据进行展示。

    91 人正在学习 去看看 夏水军

使用pyspark读取hive中的数据,测试代码:

vi test.py

#!-*- coding:utf-8 -*-

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext

conf = (SparkConf().setMaster("yarn").setAppName("My app").set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)

my_dataframe = sqlContext.sql("Select * from database.table limit 10")
my_dataframe.show()

sc.stop()

报错:
python pyspark_hive.py
Traceback (most recent call last):
  File "pyspark_hive.py", line 2, in <module>
    from pyspark.sql import HiveContext,SparkSession
ModuleNotFoundError: No module named 'pyspark'


环境变量设置:
vi ~/.profile
 

export SPARK_HOME=/usr/lib/spark-current
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:$PYTHONPATH

参考:
https://www.cnblogs.com/tong775131501/p/7582258.html

2019-10-03 18:18:48 littlely_ll 阅读数 253
  • 基于pyspark数据分析

    本课程以案例驱动的方式,讲解pyspark的sql,streaming模块,并利用这些模块获取,分析和处理,存储数据,以及利用flask,echarts对存储的数据进行展示。

    91 人正在学习 去看看 夏水军

数据读取

hadoopFile

Parameters:

  • path – path to Hadoop file
  • inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (None by default)
  • valueConverter – (None by default)
  • conf – Hadoop configuration, passed in as a dict (None by default)
  • batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
# hadoopFile:返回键值对,键为为行的偏移量,值为行的内容
# log.txt:
# http://www.baidu.com
# http://www.google.com
# http://www.google.com
# ...	...		...

rdd = sc.hadoopFile("hdfs://centos03:9000/datas/log.txt",
inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
keyClass="org.apache.hadoop.io.LongWritable",
valueClass="org.apache.hadoop.io.Text")
print(rdd.collect())  #1
rdd1 = rdd.map(lambda x: x[1].split(":"))
print(rdd1.collect())  #2

#1 [(0, ‘http://www.baidu.com’), (22, ‘http://www.google.com’), (45, ‘http://www.google.com’), (68, ‘http://cn.bing.com’), (88, ‘http://cn.bing.com’), (108, ‘http://www.baidu.com’), (130, ‘http://www.sohu.com’), (151, ‘http://www.sina.com’), (172, ‘http://www.sin2a.com’), (194, ‘http://www.sin2desa.com’), (219, ‘http://www.sindsafa.com’)]

#2 [[‘http’, ‘//www.baidu.com’], [‘http’, ‘//www.google.com’], [‘http’, ‘//www.google.com’], [‘http’, ‘//cn.bing.com’], [‘http’, ‘//cn.bing.com’], [‘http’, ‘//www.baidu.com’], [‘http’, ‘//www.sohu.com’], [‘http’, ‘//www.sina.com’], [‘http’, ‘//www.sin2a.com’], [‘http’, ‘//www.sin2desa.com’], [‘http’, ‘//www.sindsafa.com’]]

newAPIHadoopFile

Parameters:

  • path – path to Hadoop file
  • inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (None by default)
  • valueConverter – (None by default)
  • conf – Hadoop configuration, passed in as a dict (None by default)
  • batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
# newAPIHadoopFile:返回键值对,键为为行的偏移量,值为行的内容
rdd = sc.newAPIHadoopFile("hdfs://centos03:9000/datas/log.txt",
# inputFormatClass与旧的API不同
inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
keyClass="org.apache.hadoop.io.LongWritable",
valueClass="org.apache.hadoop.io.Text"
)
print(rdd.collect())  #1
rdd1 = rdd.map(lambda x: x[1].split(":"))
print(rdd1.collect())  #2

#1 [(0, ‘http://www.baidu.com’), (22, ‘http://www.google.com’), (45, ‘http://www.google.com’), (68, ‘http://cn.bing.com’), (88, ‘http://cn.bing.com’), (108, ‘http://www.baidu.com’), (130, ‘http://www.sohu.com’), (151, ‘http://www.sina.com’), (172, ‘http://www.sin2a.com’), (194, ‘http://www.sin2desa.com’), (219, ‘http://www.sindsafa.com’)]

#2 [[‘http’, ‘//www.baidu.com’], [‘http’, ‘//www.google.com’], [‘http’, ‘//www.google.com’], [‘http’, ‘//cn.bing.com’], [‘http’, ‘//cn.bing.com’], [‘http’, ‘//www.baidu.com’], [‘http’, ‘//www.sohu.com’], [‘http’, ‘//www.sina.com’], [‘http’, ‘//www.sin2a.com’], [‘http’, ‘//www.sin2desa.com’], [‘http’, ‘//www.sindsafa.com’]]

hadoopRDD

Parameters:

  • inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (None by default)
  • valueConverter – (None by default)
  • conf – Hadoop configuration, passed in as a dict (None by default)
  • batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
confs = {"mapred.input.dir": "hdfs://centos03:9000/datas/log.txt"}
rdd = sc.hadoopRDD(inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
                   keyClass="org.apache.hadoop.io.LongWritable",
                   valueClass="org.apache.hadoop.io.Text",
                   conf=confs)
print(rdd.collect())  #1

#1` [(0, ‘http://www.baidu.com’), (22, ‘http://www.google.com’), (45, ‘http://www.google.com’), (68, ‘http://cn.bing.com’), (88, ‘http://cn.bing.com’), (108, ‘http://www.baidu.com’), (130, ‘http://www.sohu.com’), (151, ‘http://www.sina.com’), (172, ‘http://www.sin2a.com’), (194, ‘http://www.sin2desa.com’), (219, ‘http://www.sindsafa.com’)]

newAPIHadoopRDD

Parameters:

  • inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (None by default)
  • valueConverter – (None by default)
  • conf – Hadoop configuration, passed in as a dict (None by default)
  • batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
confs = {"mapreduce.input.fileinputformat.inputdir":"hdfs://centos03:9000/datas/log.txt"}
rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
keyClass="org.apache.hadoop.io.LongWritable",
valueClass="org.apache.hadoop.io.Text", 
conf=confs)
print(rdd.collect())  #1

#1 [(0, ‘http://www.baidu.com’), (22, ‘http://www.google.com’), (45, ‘http://www.google.com’), (68, ‘http://cn.bing.com’), (88, ‘http://cn.bing.com’), (108, ‘http://www.baidu.com’), (130, ‘http://www.sohu.com’), (151, ‘http://www.sina.com’), (172, ‘http://www.sin2a.com’), (194, ‘http://www.sin2desa.com’), (219, ‘http://www.sindsafa.com’)]

pickleFile

Parameter:

  • name – 加载数据的地址
  • minPartitions=None

读取由saveAsPickleFile保存的RDD

# pickleFile读取由saveAsPickleFile保存的数据,数据形式与原来保存的数据形式一样
rdd = sc.newAPIHadoopFile("hdfs://centos03:9000/datas/log.txt",
inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
keyClass="org.apache.hadoop.io.LongWritable",
valueClass="org.apache.hadoop.io.Text"
)
print(rdd.collect())  #1
rdd1 = rdd.map(lambda x: x[1].split(":")).map(lambda x: (x[0], x[1]))
print(rdd1.collect())  #2

rdd1.saveAsPickleFile("hdfs://centos03:9000/datas/logp.txt")
print(sc.pickleFile("hdfs://centos03:9000/datas/logp.txt").collect())  #3

#1[(0, ‘http://www.baidu.com’), (22, ‘http://www.google.com’), (45, ‘http://www.google.com’), (68, ‘http://cn.bing.com’), (88, ‘http://cn.bing.com’), (108, ‘http://www.baidu.com’), (130, ‘http://www.sohu.com’), (151, ‘http://www.sina.com’), (172, ‘http://www.sin2a.com’), (194, ‘http://www.sin2desa.com’), (219, ‘http://www.sindsafa.com’)]

#2 [(‘http’, ‘//www.baidu.com’), (‘http’, ‘//www.google.com’), (‘http’, ‘//www.google.com’), (‘http’, ‘//cn.bing.com’), (‘http’, ‘//cn.bing.com’), (‘http’, ‘//www.baidu.com’), (‘http’, ‘//www.sohu.com’), (‘http’, ‘//www.sina.com’), (‘http’, ‘//www.sin2a.com’), (‘http’, ‘//www.sin2desa.com’), (‘http’, ‘//www.sindsafa.com’)]

#3 [(‘http’, ‘//www.baidu.com’), (‘http’, ‘//www.google.com’), (‘http’, ‘//www.google.com’), (‘http’, ‘//cn.bing.com’), (‘http’, ‘//cn.bing.com’), (‘http’, ‘//www.baidu.com’), (‘http’, ‘//www.sohu.com’), (‘http’, ‘//www.sina.com’), (‘http’, ‘//www.sin2a.com’), (‘http’, ‘//www.sin2desa.com’), (‘http’, ‘//www.sindsafa.com’)]

sequenceFile

Parameters:

  • path – path to sequncefile
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter
  • valueConverter
  • minSplits – minimum splits in dataset (default min(2, sc.defaultParallelism))
  • batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
# 读取hadoop序列化的文件,其中keyClass和valueClass可以不用指定
rdd = sc.sequenceFile(path="hdfs://centos03:9000/datas/seqFile",
keyClass="org.apache.hadoop.io.LongWritable",
valueClass="org.apache.hadoop.io.Text")
print(rdd.collect())  #1

#1 [(‘Pandas’, 3), (‘Key’, 6), (‘Sanil’, 2)]

textFile

Parameter:

  • name – 文件名称
  • minPartitions=None
  • use_unicode=True
# textFile,如果use_unicode=False, 字符串为str类型,会比unicode更快更小
rdd = sc.textFile(name="hdfs://centos03:9000/datas/log.txt")
print(rdd.collect())  #1

#1 [‘http://www.baidu.com’, ‘http://www.google.com’, ‘http://www.google.com’, ‘http://cn.bing.com’, ‘http://cn.bing.com’, ‘http://www.baidu.com’, ‘http://www.sohu.com’, ‘http://www.sina.com’, ‘http://www.sin2a.com’, ‘http://www.sin2desa.com’, ‘http://www.sindsafa.com’]

wholeTextFiles

从HDFS,本地文件系统或其他hadoop支持的文件系统中读取文件路径,每个文件作为一个record被读取,并返回一个key-value pair, key为每个文件的路径,value为文件的内容

Parameters:

  • path
  • minPartitions=None
  • use_unicode=True
# wholeTextFiles,比较适合小文件多的情况
rdd = sc.wholeTextFiles(path="hdfs://centos03:9000/table")
print(rdd.collect())  #1
rdd1 = rdd.map(lambda x: x[1].split("\t"))
print(rdd1.collect())  #2

#1 [(‘hdfs://centos03:9000/table/order.txt’, ‘1001\t01\t1\r\n1002\t02\t2\r\n1003\t03\t3\r\n1004\t01\t4\r\n1005\t02\t5\r\n1006\t03\t6’), (‘hdfs://centos03:9000/table/pd.txt’, ‘01\t小米\r\n02\t华为\r\n03\t格力\r\n’)]

#2 [[‘1001’, ‘01’, ‘1\r\n1002’, ‘02’, ‘2\r\n1003’, ‘03’, ‘3\r\n1004’, ‘01’, ‘4\r\n1005’, ‘02’, ‘5\r\n1006’, ‘03’, ‘6’], [‘01’, ‘小米\r\n02’, ‘华为\r\n03’, ‘格力\r\n’]]

数据保存

saveAsHadoopFile

Output a Python RDD of key-value pairs(of form RDD[(K, V)])

Parameters:

  • path – path to Hadoop file
  • outputFormatClass – fully qualified classname of Hadoop OutputFormat (e.g. “org.apache.hadoop.mapred.SequenceFileOutputFormat”)
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.IntWritable”, None by default)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.Text”, None by default)
  • keyConverter – (None by default)
  • valueConverter – (None by default)
  • conf – (None by default)
  • compressionCodecClass – (None by default)
# saveAsHadoopFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
print(rdd.collect())
rdd.saveAsHadoopFile(
path="hdfs://centos03:9000/datas/rdd_seq",
outputFormatClass="org.apache.hadoop.mapred.SequenceFileOutputFormat"
)
print(sc.sequenceFile("hdfs://centos03:9000/datas/rdd_seq").collect())  #1

#1 [(‘good’, 1), (“spark”, 4), (“beats”, 3)]

或:

# saveAsHadoopFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
print(rdd.collect())
rdd.saveAsHadoopFile(
path="hdfs://centos03:9000/datas/rdd_seq",
outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat")

rdd1 = sc.hadoopFile(
"hdfs://centos03:9000/datas/rdd_seq",
inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
keyClass="org.apache.hadoop.io.IntWritable",
valueClass="org.apache.hadoop.io.Text")
print(rdd1.collect())  #1

#1 [(0, ‘good\t1’), (0, ‘spark\t4’), (0, ‘beats\t3’)]

从上面两段代码来看,序列化形式保存数据比较好。

但是当数据为sc.parallelize([{'good': 1}, {'spark': 4}, {'beats': 3}])时会出现org.apache.spark.SparkException: RDD element of type java.util.HashMap cannot be used的错误,即使rdd中的数据使用json.dumps后仍然出错(org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used),在网上找到一句话: To use String and Map objects you will need to use the more extensive native support available in Scala and Java.

其实在官方API文档也解释了输出的是键值对的PythonRDD

saveAsNewAPIHadoopFile

Output a Python RDD of key-value pairs(of form RDD[(K, V)])

Parameters:

  • path – path to Hadoop file
  • outputFormatClass – fully qualified classname of Hadoop OutputFormat (e.g. “org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat”)
  • keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.IntWritable”, None by default)
  • valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.Text”, None by default)
  • keyConverter – (None by default)
  • valueConverter – (None by default)
  • conf – Hadoop job configuration, passed in as a dict (None by default)
# saveAsNewAPIHadoopFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
print(rdd.collect())
rdd.saveAsNewAPIHadoopFile(path="hdfs://centos03:9000/datas/rdd_seq",
outputFormatClass="org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
print(sc.sequenceFile("hdfs://centos03:9000/datas/rdd_seq").collect())  #1

#1 [(‘good’, 1), (‘spark’, 4), (‘beats’, 3)]

# saveAsNewAPIHadoopFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
print(rdd.collect())
rdd.saveAsNewAPIHadoopFile(
path="hdfs://centos03:9000/datas/rdd_seq",
outputFormatClass="org.apache.hadoop.mapreduce.lib.output.TextOutputFormat"
)

rdd2 = sc.hadoopFile("hdfs://centos03:9000/datas/rdd_seq", inputFormatClass="org.apache.hadoop.mapred.TextInputFormat", keyClass="org.apache.hadoop.io.IntWritable", valueClass="org.apache.hadoop.io.Text")
print(rdd2.collect())  #1

#1 [(0, ‘good\t1’), (0, ‘spark\t4’), (0, ‘beats\t3’)]

如果改变数据存储形式呢:

rdd = sc.parallelize([(1, {'good': 1}), (2, {'spark': 4}), (3, {'beats': 3})])
print(rdd.collect())
rdd.saveAsNewAPIHadoopFile(
path="hdfs://centos03:9000/datas/rdd_seq",
outputFormatClass="org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
print(sc.sequenceFile("hdfs://centos03:9000/datas/rdd_seq").collect())  #1

#1 [(1, {‘good’: 1}), (2, {‘spark’: 4}), (3, {‘beats’: 3})]

rdd = sc.parallelize([(1, {'good': 1}), (2, {'spark': 4}), (3, {'beats': 3})])
print(rdd.collect())
rdd.saveAsNewAPIHadoopFile(
path="hdfs://centos03:9000/datas/rdd_seq",
outputFormatClass="org.apache.hadoop.mapreduce.lib.output.TextOutputFormat")
rdd2 = sc.hadoopFile(
"hdfs://centos03:9000/datas/rdd_seq",
inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
keyClass="org.apache.hadoop.io.IntWritable",
valueClass="org.apache.hadoop.io.Text")
print(rdd2.collect())  #1

#1 [(0, ‘1\torg.apache.hadoop.io.MapWritable@3e9840’), (0,‘2\torg.apache.hadoop.io.MapWritable@83dcb79’), (0,‘3\torg.apache.hadoop.io.MapWritable@7493c20’)]

从上面代码看出,保存数据时还是使用序列化的形式比较好,能够保存原数据的结构

saveAsHadoopDataset

Output a Python RDD of key-value pairs (of form RDD[(K, V)])

Parameters:

  • conf – Hadoop job configuration, passed in as a dict
  • keyConverter – (None by default)
  • valueConverter – (None by default)
# saveAsHadoopDataset
confs = {"outputFormatClass": "org.apache.hadoop.mapred.TextOutputFormat",
         "keyClass": "org.apache.hadoop.io.LongWritable",
         "valueClass": "org.apache.hadoop.io.Text",
         "mapred.output.dir": "hdfs://centos03:9000/datas/rdd"}
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
rdd.saveAsHadoopDataset(conf=confs)  # conf中配置job参数

rdd2 = sc.hadoopFile("hdfs://centos03:9000/datas/rdd", inputFormatClass="org.apache.hadoop.mapred.TextInputFormat", keyClass="org.apache.hadoop.io.LongWritable", valueClass="org.apache.hadoop.io.Text")
print(rdd2.collect())  #1

#1 [(0, ‘good\t1’), (0, ‘spark\t4’), (0, ‘beats\t3’)]

# saveAsHadoopDataset
confs = {"outputFormatClass":"org.apache.hadoop.mapred.SequenceFileOutputFormat", "keyClass": "org.apache.hadoop.io.LongWritable", 
             "valueClass": "org.apache.hadoop.io.Text",
             "mapred.output.dir": "hdfs://centos03:9000/datas/rdd"
            }
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
rdd.saveAsHadoopDataset(conf=confs)

rdd2 = sc.textFile("hdfs://centos03:9000/datas/rdd")  # 序列化的文件可以被textFile读取
print(rdd2.collect())  #1 

#1 [‘good\t1’, ‘spark\t4’, ‘beats\t3’]

saveAsNewAPIHadoopDataset

Output a Python RDD of key-value pairs (of form RDD[(K, V)])

Parameters:

  • conf – Hadoop job configuration, passed in as a dict
  • keyConverter – (None by default)
  • valueConverter – (None by default)
# saveAsNewAPIHadoopDataset
confs = {"outputFormatClass":"org.apache.hadoop.mapreduce.lib.output.TextOutputFormat",
         "keyClass": "org.apache.hadoop.io.LongWritable",
         "valueClass": "org.apache.hadoop.io.Text",
         "mapreduce.output.fileoutputformat.outputdir": "hdfs://centos03:9000/datas/rdd"
        }
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
rdd.saveAsNewAPIHadoopDataset(conf=confs)

rdd1 = sc.newAPIHadoopFile(path="hdfs://centos03:9000/datas/rdd", inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat", keyClass="org.apache.hadoop.io.LongWritable", valueClass="org.apache.hadoop.io.Text")
print(rdd1.collect())  #1

rdd2 = sc.textFile("hdfs://centos03:9000/datas/rdd")
print(rdd2.collect())  #2

#1 [(0, ‘good\t1’), (0, ‘spark\t4’), (0, ‘beats\t3’)]

#2 [‘good\t1’, ‘spark\t4’, ‘beats\t3’]

saveAsPickleFile

Save this RDD as a SequenceFile of serialized objects. The serializer used is pyspark.serializers.PickleSerializer, default batch size is 10.

  • path
  • batchSize=10
# saveAsPickleFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
rdd.saveAsPickleFile("hdfs://centos03:9000/datas/rdd")

rdd1 = sc.pickleFile("hdfs://centos03:9000/datas/rdd")
print(rdd1.collect())  #1

#1 [(‘good’, 1), (‘spark’, 4), (‘beats’, 3)]

saveAsSequenceFile

Output a Python RDD of key-value pairs (of form RDD[(K, V)])

中间做了两次转换:1. pickled python RDD -> java RDD; 2. java RDD -> writables; 3. written out

Parameters:

  • path – path to sequence file
  • compressionCodecClass – (None by default)
# saveAsSequenceFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
rdd.saveAsSequenceFile("hdfs://centos03:9000/datas/rdd")

rdd1 = sc.sequenceFile("hdfs://centos03:9000/datas/rdd")
print(rdd1.collect())  #1

rdd2 = sc.textFile("hdfs://centos03:9000/datas/rdd")
print(rdd2.collect())  #2

#1 [(‘good’, 1), (‘spark’, 4), (‘beats’, 3)]
#2 ['SEQ\x06\x19org.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable\x00\x00\x00\x00\x00\x00�ekpR2\x08� U��Yn$’, 'SEQ\x06\x19org.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable\x00\x00\x00\x00\x00\x00�4��E�}βZ;�v\x1f\t\x00\x00\x00\t\x00\x00\x00\x05\x04good\x00\x00\x00\x01', 'SEQ\x06\x19org.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable\x00\x00\x00\x00\x00\x00\x14��˹\x02oM�g��f�\x02v\x00\x00\x00', '\x00\x00\x00\x06\x05spark\x00\x00\x00\x04', 'SEQ\x06\x19org.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable\x00\x00\x00\x00\x00\x00F\x0b��\x04lD\x116+\x16n��d�\x00\x00\x00', '\x00\x00\x00\x06\x05beats\x00\x00\x00\x03']

saveAsTextFile

# saveAsTextFile
rdd = sc.parallelize([('good', 1), ("spark", 4), ("beats", 3)])
rdd.saveAsTextFile("hdfs://centos03:9000/datas/rdd")
rdd2 = sc.textFile("hdfs://centos03:9000/datas/rdd")
print(rdd2.collect())  #1

#1 ["(‘good’, 1)", “(‘spark’, 4)”, “(‘beats’, 3)”]

2018-12-11 16:34:16 intersting 阅读数 628
  • 基于pyspark数据分析

    本课程以案例驱动的方式,讲解pyspark的sql,streaming模块,并利用这些模块获取,分析和处理,存储数据,以及利用flask,echarts对存储的数据进行展示。

    91 人正在学习 去看看 夏水军

前一篇文章pyspark连接oracle中详细讲述了初步连接Oracle的方法,这种连接方式每次只使用一个RDD分区,即numPartitions默认为1.这种方式当表特别大的时候,很可能出现OOM.

pyspark提供两种对数据库进行分区读取的方式

方法一:指定数据库字段的范围

之前的方式是:

empDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:oracle:thin:@//hostname:portnumber/SID") \
    .option("dbtable", "hr.emp") \
    .option("user", "db_user_name") \
    .option("password", "password") \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load()

 现在需要增加partitionColumn, lowerBound, upperBound, numPartitions这几个属性值

empDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:oracle:thin:@//hostname:portnumber/SID") \
    .option("dbtable", "hr.emp") \
    .option("user", "db_user_name") \
    .option("password", "password") \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .option("partitionColumn", partitionColumn)
    .option("lowerBound", lowerBound)
    .option("upperBound", upperBound)
    .option("numPartitions", numPartitions)
    .load()

这些属性仅适用于读数据,而且必须同时被指定。partitionName就是需要分区的字段,这个字段在数据库中的类型必须是数字;lowerBound就是分区的下界;upperBound就是分区的上界;numPartitions是分区的个数。

这个方法可以将数据库中表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想情况下,每个分区处理相同数量的数据,我们在使用的时候不建议将这个值设置的比较大,因为这可能导致数据库挂掉!但是根据前面介绍,这个函数的缺点就是只能使用整形数据字段作为分区关键字。

方法二:根据任意字段进行分区

因为数据库中有很多时候需要对日期进行分段,所以第一种方法就不适用了。还好,spark根据不同需求提供了一个函数:

jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
'''
构建一个DataFrame表示通过JDBC URL url命名的table和连接属性连接的数据库表。
column参数可用于对表进行分区,然后根据传递给此函数的参数并行检索它。
predicates参数给出了一个适合包含在WHERE子句中的列表表达式; 每一个都定义了DataFrame的一个分区。
注:不要在大型集群上并行创建太多分区; 否则Spark可能会使外部数据库系统崩溃。

参数:url – 一个JDBC URL
     table – 表名称
    column – 用于分区的列
    lowerBound – 分区列的下限
    upperBound – 分区列的上限
    numPartitions – 分区的数量
    predicates – 包含在WHERE子句中的表达式列表; 每一个都定义了DataFrame的一个分区
    properties – JDBC数据库连接参数,任意字符串的标签/值的列表。For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }
返回 : 一个DataFrame
'''

 在这个函数里需要设置属性predicates、properties 的值

predicates = []
datelist = {"2014-11-01": "2015-01-01",
            "2014-09-01": "2014-11-01",
            "2014-07-01": "2014-09-01",
            "2014-05-01": "2014-07-01",
            "2014-03-01": "2014-05-01",
            "2014-01-01": "2014-03-01"}
for startdate, enddate in datelist.items():
    predicates.append("STARTDATE >= to_date('" + startdate + "', 'yyyy-MM-dd'" \
        + "and STARTDATE < to_date('" + enddate + "', 'yyyy-MM-dd')")
properties = {"user": db_user_name,
              "password" : password,
              "driver": driver}
df = spark.read.jdbc(url=url, table=dbtable, predicates=predicates, properties=properties)

 最后rdd的分区数量就等于predicates.length。

有一点要注意的是,驱动是放在properties里,网上一般都是连接MySQL数据库,不像oracle数据库一样需要额外的驱动。

还有数据中STARTDATE是date类型的数据,所以需要利用to_date()做数据类型转换。

参考文献:

Spark读取数据库(Mysql)的四种方式讲解

spark jdbc(mysql) 读取并发度优化

https://github.com/UrbanInstitute/sparkr-tutorials/blob/master/08_databases-with-jdbc.md

【SparkSQL】partitionColumn, lowerBound, upperBound, numPartitions的理解

Spark JDBC To MySQL

2019-06-24 01:09:07 u011412768 阅读数 4096
  • 基于pyspark数据分析

    本课程以案例驱动的方式,讲解pyspark的sql,streaming模块,并利用这些模块获取,分析和处理,存储数据,以及利用flask,echarts对存储的数据进行展示。

    91 人正在学习 去看看 夏水军

1、读Hive表数据

        pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下:

from pyspark.sql import HiveContext,SparkSession

_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()

hive_context= HiveContext(spark_session )

# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from  {}.{}".format(hive_database, hive_table)

# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)

 

2 、将数据写入hive表

        pyspark写hive表有两种方式:

        (1)通过SQL语句生成表

from pyspark.sql import SparkSession, HiveContext

_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"

spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()

data = [
    (1,"3","145"),
    (1,"4","146"),
    (1,"5","25"),
    (1,"6","26"),
    (2,"32","32"),
    (2,"8","134"),
    (2,"8","134"),
    (2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])

# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")

(2)saveastable的方式

# method two

# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
#  mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')

 

tips:

spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:

spark-submit --conf spark.sql.catalogImplementation=hive test.py

 

没有更多推荐了,返回首页