python 大数据处理案例

2019-07-19 21:46:14 ZmlDreams 阅读数 2380

数据来源百度网盘,提取码:lnqc 

二手房数据分析——文件名:lianjia.csv
 

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from IPython.display import display
plt.style.use("fivethirtyeight")
sns.set_style({'font.sans-serif':['simhei','Arial']})

# 导入链家二手房数据
lianjia_df = pd.read_csv('lianjia.csv')
display(lianjia_df.head(n=3))
lianjia_df.info()
lianjia_df.describe()

#添加新特征房屋均价
df = lianjia_df.copy()
df['PerPrice'] = lianjia_df['Price']/lianjia_df['Size']

#重新摆放列位置
columns = ['Region', 'District', 'Garden', 'Layout', 'Floor', 'Year', 'Size', 'Elevator', 'Direction', 'Renovation', 'PerPrice', 'Price']
df = pd.DataFrame(df, columns = columns)

#重新审视数据集
display(df.head(n=2))
# 对二手房区域分组对比二手房数量和每平米房价
df_house_count = df.groupby('Region')['Price'].count().sort_values(ascending=False).to_frame().reset_index()
df_house_mean = df.groupby('Region')['PerPrice'].mean().sort_values(ascending=False).to_frame().reset_index()

f, [ax1,ax2,ax3] = plt.subplots(3,1,figsize=(30,50))
sns.barplot(x='Region', y='PerPrice', palette="Blues_d", data=df_house_mean, ax=ax1)
ax1.set_title('北京各大区二手房每平米单价对比',fontsize=15)
ax1.set_xlabel('区域')
ax1.set_ylabel('每平米单价')

sns.barplot(x='Region', y='Price', palette="Greens_d", data=df_house_count, ax=ax2)
ax2.set_title('北京各大区二手房数量对比',fontsize=15)
ax2.set_xlabel('区域')
ax2.set_ylabel('数量')

sns.boxplot(x='Region', y='Price', data=df, ax=ax3)
ax3.set_title('北京各大区二手房房屋总价',fontsize=15)
ax3.set_xlabel('区域')
ax3.set_ylabel('房屋总价')
f1, [ax1,ax2] = plt.subplots(1, 2, figsize=(15, 10))
# 建房时间的分布情况
sns.distplot(df['Size'], bins=20, ax=ax1, color='r')
sns.kdeplot(df['Size'], shade=True, ax=ax1)
# 建房时间和出售价格的关系
sns.regplot(x='Size', y='Price', data=df, ax=ax2)
plt.show()

df.loc[df['Size']<10]
df.loc[df['Size']>1000]
df = df[(df['Layout']!='叠拼别墅')&(df['Size']<1000)]

# 特征分析
f2, ax1= plt.subplots(figsize=(20,40))
sns.countplot(y='Layout', data=df, ax=ax1)
ax1.set_title('房屋户型',fontsize=15)
ax1.set_xlabel('数量')
ax1.set_ylabel('户型')
plt.show()

df['Renovation'].value_counts()

# 去掉错误数据“南北”,因为爬虫过程中一些信息位置为空,导致“Direction”的特征出现在这里,需要清除或替换
df['Renovation'] = df.loc[(df['Renovation'] != '南北'), 'Renovation']

# 画幅设置
f3, [ax1,ax2,ax3] = plt.subplots(1, 3, figsize=(10, 5))
sns.countplot(df['Renovation'], ax=ax1)
sns.barplot(x='Renovation', y='Price', data=df, ax=ax2)
sns.boxplot(x='Renovation', y='Price', data=df, ax=ax3)
plt.show()

misn = len(df.loc[(df['Elevator'].isnull()), 'Elevator'])
print('Elevator缺失值数量为:'+ str(misn))

# 由于存在个别类型错误,如简装和精装,特征值错位,故需要移除
df['Elevator'] = df.loc[(df['Elevator'] == '有电梯')|(df['Elevator'] == '无电梯'), 'Elevator']

# 填补Elevator缺失值
df.loc[(df['Floor']>6)&(df['Elevator'].isnull()), 'Elevator'] = '有电梯'
df.loc[(df['Floor']<=6)&(df['Elevator'].isnull()), 'Elevator'] = '无电梯'

f5, [ax1,ax2] = plt.subplots(1, 2, figsize=(5, 7))
sns.countplot(df['Elevator'], ax=ax1)
ax1.set_title('有无电梯数量对比',fontsize=3)
ax1.set_xlabel('是否有电梯')
ax1.set_ylabel('数量')
sns.barplot(x='Elevator', y='Price', data=df, ax=ax2)
ax2.set_title('有无电梯房价对比',fontsize=3)
ax2.set_xlabel('是否有电梯')
ax2.set_ylabel('总价')
plt.show()

grid = sns.FacetGrid(df, row='Elevator', col='Renovation', palette='seismic',size=4)
grid.map(plt.scatter, 'Year', 'Price')
grid.add_legend()


f6, ax1= plt.subplots(figsize=(20,5))
sns.countplot(x='Floor', data=df, ax=ax1)
ax1.set_title('房屋户型',fontsize=5)
ax1.set_xlabel('数量')
ax1.set_ylabel('户型')
plt.show()

 

2019-09-21 23:24:46 dafeidouzi 阅读数 768

关于数据处理案例有两个,第一个案例是我整理到有道云上的,就直接剪切下来了,下面直接进入正题~

案例1:快餐数据

案例2:欧洲杯数据

先进行数据探索 

data.info()


data.describe()

查看数据集是否有缺失值且哪个字段存在缺失值?可以用下面的代码,也可以用前面案例1缺失值那里提到的前两种方法

for i in range(data.shape[1]):
    if data.iloc[:,i].notnull().sum() != data.shape[0]:
         print('第%d列:字段%s 存在缺失值'%(i+1,data.columns[i]))

代码运行结果是

对Clearances off line进行缺失值处理

首先查看Clearances off line字段

统计其数字组成

data['Clearances off line'].value_counts()

从统计结果可以看到,在Clearances off line这个字段中有11个值为0,3个值为1,1个值为2,故考虑采用众数(mode)填充缺失值

mode=data['Clearances off line'].mode()
data['Clearances off line']=data['Clearances off line'].fillna(mode)

描述性统计

统计有多少球队参加了欧洲杯?

data.Team.count()

将数据集中的列Team, Yellow Cards和Red Cards单独存为一个名叫discipline的数据框

discipline=data[['Team','Yellow Cards','Red Cards']]

按照先Red Cards再Yellow Cards进行降序排序

discipline.sort_values(by=['Red Cards','Yellow Cards'])

计算每个球队拿到黄牌的平均值

data['Yellow Cards'].mean()

找出进球数大于6个的球队的数据

data[data['Goals']>6]

对比英格兰(England)、意大利(Italy)和俄罗斯(Russia)的射正率(Shooting Accuracy)

data['Shooting Accuracy'].[data.Team.isin(['England','ltaly','Russia'])]

 

2017-02-07 17:05:12 chengxuyuanyonghu 阅读数 12739


#查询用户余额代码案例


import sys

import MySQLdb

import pandas as pd


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.14',

                'dbport' : 3306,

                'dbname' : 'HBAODB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusercoin(userid):

    i = int(userid) % 10

    reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

    #print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    return ret[0]

    

def getall(userlist):

    userdata = pd.DataFrame(columns=('userid', 'coin'))

    index = 0

    for userid in userlist:

        coins = getusercoin(userid) #调用前面的函数

        #print coins[0],coins[1]/100.0

        if coins[0] is not None:

            userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        else:

            userdata.loc[index] = (str(userid), 0)

        index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(50)

   




#用户消费查询代码案例


import sys

import MySQLdb

import pandas as pd

import datetime

import time


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.12',

                'dbport' : 3306,

                'dbname' : 'JIESUANDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

#用户人民币消费

def getuserconsume(userid, startday): #定义带参函数

    strdate = startday.strftime("%y%m%d")

    # 送礼物 +  守护 +  点歌 +  表情贴

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

        

#用户充值

def getusercharge(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)#调用前面的函数

    print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

    

#用户当天结余人民币

def getusercurcoin(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)

    print ret

    if ret:

        return float(ret[0][1])/100.0

    else:

        return 0

        

def getconsume():

    startdate = datetime.date(2017, 1, 1)

    enddate = datetime.date(2017, 2, 2)

    userid = 3101011990

    

    userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))


    index = 0

    

    # 计算日差

    td = enddate - startdate

    datelen = td.days + 1

    #print datelen

    delta = datetime.timedelta(days=1)

    allcoins = 0 

    for i in range(0,datelen):

        startday = startdate + delta * i

        consume_coin = getuserconsume(userid, startday)#调用前面的函数

        charge = getusercharge(userid, startday)#调用前面的函数

        dayleftcoin = getusercurcoin(userid, startday)#调用前面的函数

        

        

        userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

        index += 1

        

    #userdata.loc[index] = ('total',str(userid), allcoins, 0)

    print userdata.tail(100)

    return

    

getconsume()






#查询用户机器ID 代码案例


import sys

import MySQLdb

import pandas as pd

import datetime


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.15',

                'dbport' : 3306,

                'dbname' : 'JIQIDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusermid(userid, months):

    i = int(userid) % 50

    reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

    print reqsql

    ret = sql_select(reqsql)

    #print ret

    #print ret[0]

    return ret

    

def getall(userlist):

    today = datetime.date.today()

    months = today.strftime("%Y%m")

    userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))

    index = 0

    for userid in userlist:

        coins = getusermid(userid, months)

        for i in range(len(coins)):

            #print coins[i]

            userdata.loc[index] = (str(coins[i][0]), str(coins[i][1]))

            index += 1

        

        #print coins[0],coins[1]/100.0

        #userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        #index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(1000)





#人民币统计代码案例

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write


def fromDayToDay(startdate, datelen, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        endday = startdate + delta * (i + 1)

        func(startday, endday)

    return

def fromDayToEndDay(startdate, datelen, endday, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        #endday = startdate + delta * (i + 1)

        func(startday, endday)

    return


# 获取人民币数据

def saveDayPackageData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.12',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'JIESUANDB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

    return


def savePackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayPackageData)

    

# 获取WF册数据

def saveDayWifiPhoneRegData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.15',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'AADB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

    return


def saveWifiPhoneReg():

    startday = datetime.date(2016, 12, 1)

    endday = datetime.date(2016, 12, 1)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

    

OPTypeName = {

    0:"会员",

    1:"道具",


}


OpDetailName19 = {

    1:"购物保存收益",

    2:"下注和返注",

    3:"发红包",

    4:"抢红包",


}


OpDetailName22 = {

    1:"活动1收益到总账号",

    2:"活动2收益到总账号",

    3:"活动3收益到总账号",


}


OpDetailName23 = {

    0:"购买会员",

    1:"购买道具",

    2:"扫雷",


}


def getOpTypeName(func):

    name = OPTypeName.get(func)

    if name == None:

        return ""

    else:

        return name.decode('utf8')

    

def getOpDetailName(func, detail):

    if func == 19:

        if detail > 10000 and detail < 30000:

            return "包裹回滚".decode('utf8')

        elif detail > 50000 and detail < 60000:

            return "红包接龙".decode('utf8')

        else:

            name = OpDetailName19.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 22:

            name = OpDetailName22.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 23:

            name = OpDetailName23.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    else:

        return ""


def getDayPackageData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    df.show(10)

    #df.createOrReplaceTempView('billdata')

    #df.registerTempTable("billdata")

    #sqlret = sqlc.sql("SELECT count(*) from billdata")

    #sqlret.show(1)

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('billdata')

    return

    

def getPackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, getDayPackageData)#调用前面的函数

    print 'getPackageData finish'


# 获取充值数据

def getChargeInfo(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.14', 

     'port': 3306, 

     'user':'user', 

     'passwd':'123654', 

     'db':'BAOIMDB', 

     'charset':'utf8'}

    

    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    regdata = pd.DataFrame()

    for i in range(0, 20): 

        sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

        print sql

        #pddf = pd.DataFrame()

        pddf = pd.read_sql(sql, con=mysql_cn)

        #print pddf.head(5)

        if len(pddf.index) > 0:

            regdata = regdata.append(pddf,ignore_index=True)

            print regdata.tail(5)

    

    if len(regdata.index) > 0:

        print regdata.describe()

        write("/home/haoren/logstatis/register"+strday+".parq", regdata)

    mysql_cn.close()

    return

    

def pudf(x):

    return getOpTypeName(x.OPTYPE)

    

def getMergeData(strday):

    dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

    dfwifireg = ParquetFile("/home/haoren/logstatis/wifiphonereg"+strday+".parq").to_pandas()

    tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

    #write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

    #print tempdf.head(10)

    tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

    #print tempdf.head(10)

    tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

    df = spark.createDataFrame(tempdf)

    df.show(10)

    return df

    

def analyzeDayBillData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'


    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

    df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)

    df3.show(10)

    df3.write.parquet("/home/haoren/logstatis/analyze"+strday+".parq")

    

    #df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

    #df2.show(10)

    #df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    #df.show(10)

    #df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeDayBillData2(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    #df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

    df = getMergeData(strday)

    return

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeBillData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, analyzeDayBillData2)#调用前面的函数

    print 'analyzeBillData finish'

    

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()


#查询用户余额代码案例


import sys

import MySQLdb

import pandas as pd


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.14',

                'dbport' : 3306,

                'dbname' : 'HBAODB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusercoin(userid):

    i = int(userid) % 10

    reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

    #print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    return ret[0]

    

def getall(userlist):

    userdata = pd.DataFrame(columns=('userid', 'coin'))

    index = 0

    for userid in userlist:

        coins = getusercoin(userid) #调用前面的函数

        #print coins[0],coins[1]/100.0

        if coins[0] is not None:

            userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        else:

            userdata.loc[index] = (str(userid), 0)

        index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(50)

   




#用户消费查询代码案例


import sys

import MySQLdb

import pandas as pd

import datetime

import time


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.12',

                'dbport' : 3306,

                'dbname' : 'JIESUANDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

#用户人民币消费

def getuserconsume(userid, startday): #定义带参函数

    strdate = startday.strftime("%y%m%d")

    # 送礼物 +  守护 +  点歌 +  表情贴

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

        

#用户充值

def getusercharge(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)#调用前面的函数

    print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

    

#用户当天结余人民币

def getusercurcoin(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)

    print ret

    if ret:

        return float(ret[0][1])/100.0

    else:

        return 0

        

def getconsume():

    startdate = datetime.date(2017, 1, 1)

    enddate = datetime.date(2017, 2, 2)

    userid = 3101011990

    

    userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))


    index = 0

    

    # 计算日差

    td = enddate - startdate

    datelen = td.days + 1

    #print datelen

    delta = datetime.timedelta(days=1)

    allcoins = 0 

    for i in range(0,datelen):

        startday = startdate + delta * i

        consume_coin = getuserconsume(userid, startday)#调用前面的函数

        charge = getusercharge(userid, startday)#调用前面的函数

        dayleftcoin = getusercurcoin(userid, startday)#调用前面的函数

        

        

        userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

        index += 1

        

    #userdata.loc[index] = ('total',str(userid), allcoins, 0)

    print userdata.tail(100)

    return

    

getconsume()






#查询用户机器ID 代码案例


import sys

import MySQLdb

import pandas as pd

import datetime


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.15',

                'dbport' : 3306,

                'dbname' : 'JIQIDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusermid(userid, months):

    i = int(userid) % 50

    reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

    print reqsql

    ret = sql_select(reqsql)

    #print ret

    #print ret[0]

    return ret

    

def getall(userlist):

    today = datetime.date.today()

    months = today.strftime("%Y%m")

    userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))

    index = 0

    for userid in userlist:

        coins = getusermid(userid, months)

        for i in range(len(coins)):

            #print coins[i]

            userdata.loc[index] = (str(coins[i][0]), str(coins[i][1]))

            index += 1

        

        #print coins[0],coins[1]/100.0

        #userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        #index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(1000)





#人民币统计代码案例

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write


def fromDayToDay(startdate, datelen, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        endday = startdate + delta * (i + 1)

        func(startday, endday)

    return

def fromDayToEndDay(startdate, datelen, endday, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        #endday = startdate + delta * (i + 1)

        func(startday, endday)

    return


# 获取人民币数据

def saveDayPackageData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.12',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'JIESUANDB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

    return


def savePackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayPackageData)

    

# 获取WF册数据

def saveDayWifiPhoneRegData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.15',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'AADB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

    return


def saveWifiPhoneReg():

    startday = datetime.date(2016, 12, 1)

    endday = datetime.date(2016, 12, 1)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

    

OPTypeName = {

    0:"会员",

    1:"道具",


}


OpDetailName19 = {

    1:"购物保存收益",

    2:"下注和返注",

    3:"发红包",

    4:"抢红包",


}


OpDetailName22 = {

    1:"活动1收益到总账号",

    2:"活动2收益到总账号",

    3:"活动3收益到总账号",


}


OpDetailName23 = {

    0:"购买会员",

    1:"购买道具",

    2:"扫雷",


}


def getOpTypeName(func):

    name = OPTypeName.get(func)

    if name == None:

        return ""

    else:

        return name.decode('utf8')

    

def getOpDetailName(func, detail):

    if func == 19:

        if detail > 10000 and detail < 30000:

            return "包裹回滚".decode('utf8')

        elif detail > 50000 and detail < 60000:

            return "红包接龙".decode('utf8')

        else:

            name = OpDetailName19.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 22:

            name = OpDetailName22.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 23:

            name = OpDetailName23.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    else:

        return ""


def getDayPackageData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    df.show(10)

    #df.createOrReplaceTempView('billdata')

    #df.registerTempTable("billdata")

    #sqlret = sqlc.sql("SELECT count(*) from billdata")

    #sqlret.show(1)

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('billdata')

    return

    

def getPackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, getDayPackageData)#调用前面的函数

    print 'getPackageData finish'


# 获取充值数据

def getChargeInfo(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.14', 

     'port': 3306, 

     'user':'user', 

     'passwd':'123654', 

     'db':'BAOIMDB', 

     'charset':'utf8'}

    

    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    regdata = pd.DataFrame()

    for i in range(0, 20): 

        sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

        print sql

        #pddf = pd.DataFrame()

        pddf = pd.read_sql(sql, con=mysql_cn)

        #print pddf.head(5)

        if len(pddf.index) > 0:

            regdata = regdata.append(pddf,ignore_index=True)

            print regdata.tail(5)

    

    if len(regdata.index) > 0:

        print regdata.describe()

        write("/home/haoren/logstatis/register"+strday+".parq", regdata)

    mysql_cn.close()

    return

    

def pudf(x):

    return getOpTypeName(x.OPTYPE)

    

def getMergeData(strday):

    dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

    dfwifireg = ParquetFile("/home/haoren/logstatis/wifiphonereg"+strday+".parq").to_pandas()

    tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

    #write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

    #print tempdf.head(10)

    tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

    #print tempdf.head(10)

    tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

    df = spark.createDataFrame(tempdf)

    df.show(10)

    return df

    

def analyzeDayBillData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'


    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

    df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)

    df3.show(10)

    df3.write.parquet("/home/haoren/logstatis/analyze"+strday+".parq")

    

    #df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

    #df2.show(10)

    #df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    #df.show(10)

    #df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeDayBillData2(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    #df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

    df = getMergeData(strday)

    return

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeBillData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, analyzeDayBillData2)#调用前面的函数

    print 'analyzeBillData finish'

    

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()


2019-01-02 19:05:28 weixin_39667003 阅读数 4867

 使用python进行数据处理的实例(数据为某公司HR部门关于员工信息的部分摘录,kaggle上面的一次赛题)

https://www.kaggle.com/c/kfru-dbm-hr-analytics

该实例是根据其他所给属性预测员工是否会离职,代码实现如下所示

import pandas as pd
from sklearn.preprocessing import MinMaxScaler,StandardScaler
from sklearn.preprocessing import LabelEncoder,OneHotEncoder
from sklearn.decomposition import PCA

def hr_preprocessing(sl=False,le=False,npr=False,amh=False,tsc=False,wa=False,pl5=False,dp=False,slr=False,lower_id=False,ld_n=1):
    df=pd.read_csv('C:\\Users\\Administrator\Desktop\\network\\HR.csv')
    #1 清洗数据,根据探索性数据分析出的结果来去掉空值
    df=df.dropna(subset=['satisfaction_level','last_evaluation'])
    df=df[df['satisfaction_level']<=1][df['salary']!='nme']

    #2 得到标注
    label=df['left']
    df=df.drop('left',axis=1)

    #3 特征选择(因为本文的特征本来就不多,暂时不做)

    # 4 特征处理(归一化,标准化,降维)
    scaler_lst=[sl,le,npr,amh,tsc,wa,pl5]
    column_lst = ["satisfaction_level", "last_evaluation", "number_project",
                  "average_monthly_hours", "time_spend_company", "Work_accident",
                  "promotion_last_5years"]
    for i in range(len(scaler_lst)):
        if not scaler_lst[i]:
            df[column_lst[i]]=MinMaxScaler().fit_transform(df[column_lst[i]].values.reshape(-1,1)).reshape(1,-1)[0]
        else:
            df[column_lst[i]]=StandardScaler().fit_transform(df[column_lst[i]].values.reshape(-1,1)).reshape(1,-1)[0]

    #针对离散值的处理
    scaler_lst=[slr,dp]
    column_lst=['salary','department']
    for i in range(len(scaler_lst)):
        if not scaler_lst[i]:
            if column_lst[i]=='salary':
                df[column_lst[i]]=[map_salary(s)for s in df['salary'].values]
            else:
                df[column_lst[i]]=LabelEncoder().fit_transform(df[column_lst[i]])
            df[column_lst[i]]=MinMaxScaler().fit_transform(df[column_lst[i]].values.reshape(-1,1)).reshape(1,-1)[0]
        else:
            df=pd.get_dummies(df,columns=[column_lst[i]])
    if lower_id:
        return PCA(n_components=ld_n).fit_transform(df.values),label
    return df,label

d=dict([('low',0),('medium',1),('high',2)])
def map_salary(s):
    return d.get(s,0)

#建模
def hr_modeling_nn(features,label):
    from sklearn.model_selection import train_test_split
    #sklearn中没有可以一次性将数据划分成验证集、训练集、测试集的包,所以需要进行两步划分
    f_v=features.values
    l_v=label.values
    X_tt,X_validation,Y_tt,Y_validation=train_test_split(f_v,l_v,test_size=0.2)
    X_train,X_test,Y_train,Y_test=train_test_split(X_tt,Y_tt,test_size=0.25)
    #print(len(X_train),len(X_test),len(X_validation))

    #分类
    from sklearn.metrics import accuracy_score,recall_score,f1_score
    from sklearn.neighbors import NearestNeighbors,KNeighborsClassifier
    from sklearn.naive_bayes import GaussianNB,BernoulliNB
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.svm import SVC
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.ensemble import AdaBoostClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.ensemble import GradientBoostingClassifier

    models=[]
    models.append(('KNN',KNeighborsClassifier(n_neighbors=3)))
    models.append(('GaussianNB',GaussianNB()))
    models.append(('BernoulliNB',BernoulliNB()))
    models.append(('DecisionTreeGini',DecisionTreeClassifier()))
    models.append(('DecisionTreeEntropy',DecisionTreeClassifier(criterion='entropy')))
    models.append(('SVM:',SVC(C=1000)))
    models.append(('OriginalRandomForest',RandomForestClassifier()))
    models.append(('RandomForest',RandomForestClassifier(n_estimators=11,max_features=None)))
    models.append(('Adaboost',AdaBoostClassifier(n_estimators=100)))
    models.append(('LogisticRegression',LogisticRegression(C=1000,tol=1e-10,solver='sag',max_iter=10000)))
    models.append(('GBDT',GradientBoostingClassifier(max_depth=6,n_estimators=100)))


    for clf_name ,clf in models:
        clf.fit(X_train,Y_train)
        xy_list=[(X_train,Y_train),(X_validation,Y_validation),(X_test,Y_test)]
        for i in range(len(xy_list)):
            X_part=xy_list[i][0]
            Y_part=xy_list[i][1]
            Y_pred=clf.predict(X_part)
            print(i)
            print(clf_name,'-ACC',accuracy_score(Y_part,Y_pred))
            print(clf_name,'-REC:',recall_score(Y_part,Y_pred))
            print(clf_name,'-F1:',f1_score(Y_pred,Y_part))

#回归
def regr_t(features,label):
    print('X',features)
    print('Y',label)
    from sklearn.linear_model import LinearRegression,Ridge,Lasso
    regr=Ridge(alpha=1)
    regr.fit(features.values,label.values)
    Y_pred=regr.predict(features.values)
    print('Coef:',regr.coef_)
    from sklearn.metrics import mean_squared_error,mean_absolute_error,r2_score
    print('MSE:',mean_squared_error(label.values,Y_pred))
    print('MAE:',mean_absolute_error(label.values,Y_pred))
    print('R2:',r2_score(label.values,Y_pred))


def main():
    features,label=hr_preprocessing()
    hr_modeling_nn(features,label)
    regr_t(features[['number_project','average_monthly_hours']],features['last_evaluation'])
if __name__=='__main__':
    main()




 

2018-03-26 21:30:37 shujuelin 阅读数 18744

Mysql SQLyog导入导出csv文件

SQLyog 导出表中数据存为csv文件

1.    选择数据库表 --> 右击属性 --> 备份/导出 --> 导出表数据作为 --> 选择cvs --> 选择下面的“更改” --> 字段 --> 可变长度--> 字段终止与 -->输入逗号,(这是重点,否则导出的csv文件内容都在一列中,而不是分字段分列)
下面两个选项框取消。


2.导出csv文件后,使用UE编辑器或者记事本打开,另存为,选择编码为utf-8格式,保存。

3.打开csv文件,这样中文为正确的显示,如果不转码保存的话,为中文乱码。

SQLyog 将csv文件数据导入mysql表中

1.      将数据文件存为csv文件,保存的时候选择逗号(或\t)作为分隔符;

2.    选择数据库表 --> 导入 --> 导入本地可使用的CSV数据 --> 从文件导入,选择刚刚的csv文件,导入完成。

 

 

2.    选择cvs --> 选择下面的“更改” --> 字段 --> 可变长度--> 字段终止与 -->输入逗号,(这是重点,否则导入的csv文件内容都在一列中,而不是分字段分列)
下面两个选项框取消。

 http://www.cnblogs.com/DswCnblog/p/5970873.html



用Python Pandas处理亿级数据

在数据分析领域,最热门的莫过于Python和R语言,此前有一篇文章《别老扯什么Hadoop了,你的数据根本不够大》指出:只有在超过5TB数据量的规模下,Hadoop才是一个合理的技术选择。这次拿到近亿条日志数据,千万级数据已经是关系型数据库的查询分析瓶颈,之前使用过Hadoop对大量文本进行分类,这次决定采用Python来处理数据:

  • 硬件环境
      • CPU:3.5 GHz Intel Core i7
      • 内存:32 GB HDDR 3 1600 MHz
      • 硬盘:3 TB Fusion Drive
  • 数据分析工具
      • Python:2.7.6
      • Pandas:0.15.0
      • IPython notebook:2.0.0

源数据如下表所示:

 TableSizeDesc
ServiceLogs98,706,832 rows x 14 columns8.77 GB交易日志数据,每个交易会话可以有多条交易
ServiceCodes286 rows × 8 columns20 KB交易分类的字典表

数据读取

启动IPython notebook,加载pylab环境:

Pandas提供了IO工具可以将大文件分块读取,测试了一下性能,完整加载9800万条数据也只需要263秒左右,还是相当不错了。

 1百万条1千万条1亿条
ServiceLogs1 s17 s263 s

使用不同分块大小来读取再调用 pandas.concat 连接DataFrame,chunkSize设置在1000万条左右速度优化比较明显。

下面是统计数据,Read Time是数据读取时间,Total Time是读取和Pandas进行concat操作的时间,根据数据总量来看,对5~50个DataFrame对象进行合并,性能表现比较好。

Chunk SizeRead Time (s)Total Time (s)Performance
100,000224.418173261.358521 
200,000232.076794256.674154 
1,000,000213.128481234.934142√ √
2,000,000208.410618230.006299√ √ √
5,000,000209.460829230.939319√ √ √
10,000,000207.082081228.135672√ √ √ √
20,000,000209.628596230.775713√ √ √
50,000,000222.910643242.405967 
100,000,000263.574246263.574246 

屏幕快照 2015-02-17 下午2.05.48

如果使用Spark提供的Python Shell,同样编写Pandas加载数据,时间会短25秒左右,看来Spark对Python的内存使用都有优化。

数据清洗

Pandas提供了 DataFrame.describe 方法查看数据摘要,包括数据查看(默认共输出首尾60行数据)和行列统计。由于源数据通常包含一些空值甚至空列,会影响数据分析的时间和效率,在预览了数据摘要后,需要对这些无效数据进行处理。

首先调用 DataFrame.isnull() 方法查看数据表中哪些为空值,与它相反的方法是 DataFrame.notnull() ,Pandas会将表中所有数据进行null计算,以True/False作为结果进行填充,如下图所示:

屏幕快照 2015-02-16 下午11.21.29

Pandas的非空计算速度很快,9800万数据也只需要28.7秒。得到初步信息之后,可以对表中空列进行移除操作。尝试了按列名依次计算获取非空列,和 DataFrame.dropna() 两种方式,时间分别为367.0秒和345.3秒,但检查时发现 dropna() 之后所有的行都没有了,查了Pandas手册,原来不加参数的情况下, dropna() 会移除所有包含空值的行。如果只想移除全部为空值的列,需要加上 axis 和 how 两个参数:

共移除了14列中的6列,时间也只消耗了85.9秒。

接下来是处理剩余行中的空值,经过测试,在 DataFrame.replace() 中使用空字符串,要比默认的空值NaN节省一些空间;但对整个CSV文件来说,空列只是多存了一个“,”,所以移除的9800万 x 6列也只省下了200M的空间。进一步的数据清洗还是在移除无用数据和合并上。

对数据列的丢弃,除无效值和需求规定之外,一些表自身的冗余列也需要在这个环节清理,比如说表中的流水号是某两个字段拼接、类型描述等,通过对这些数据的丢弃,新的数据文件大小为4.73GB,足足减少了4.04G!

数据处理

使用 DataFrame.dtypes 可以查看每列的数据类型,Pandas默认可以读出int和float64,其它的都处理为object,需要转换格式的一般为日期时间。DataFrame.astype() 方法可对整个DataFrame或某一列进行数据格式转换,支持Python和NumPy的数据类型。

对数据聚合,我测试了 DataFrame.groupby 和 DataFrame.pivot_table 以及 pandas.merge ,groupby 9800万行 x 3列的时间为99秒,连接表为26秒,生成透视表的速度更快,仅需5秒。

根据透视表生成的交易/查询比例饼图:

屏幕快照 2015-02-17 上午12.00.09

将日志时间加入透视表并输出每天的交易/查询比例图:

屏幕快照 2015-02-17 下午2.27.05

除此之外,Pandas提供的DataFrame查询统计功能速度表现也非常优秀,7秒以内就可以查询生成所有类型为交易的数据子表:

该子表的大小为 [10250666 rows x 5 columns]。在此已经完成了数据处理的一些基本场景。实验结果足以说明,在非“>5TB”数据的情况下,Python的表现已经能让擅长使用统计分析语言的数据分析师游刃有余。

 


Python数据处理

阅读数 1934