2019-11-27 16:39:45 mch2869253130 阅读数 27
  • Python-数据

    Python数据库编程视频教程,数据库是MySQL,讲解Python链接MySQL数据库,并对数据库进行增删改查操作。

    10913 人正在学习 去看看 尹成

主要用到multiprocessing库,思想是将大数据读入内存,然后切片存储,然后多进程分别处理分片。

from multiprocessing import Pool
import math
import os

# 读取数据
path = os.path.join(os.getcwd(), 'test.txt')
with open(path, 'r') as f:
    data = f.readlines()
    
processor=4
l_data = len(data)
size = math.ceil(l_data / processor)

# 切分数据并存储
for i in range(processor):
    start = size*i
    end = (i+1)*size if (i+1)*size < l_data else l_data
    
    filename = 'en_wiki_' + str(i) +'.txt'
    path = os.path.join(os.getcwd(), filename)
    with open(path, 'w') as f:
        for i in range(start, end):
            f.write(data[i])

# 删除读入内存的大数据,高效利用内存
del data,l_data  


# 处理数据
def proess(path1, path2, pid):
    # do something

def run(i):
    filename1 = 'en_wiki_piece_' + str(i) + '.txt'
    path1 = os.path.join(os.getcwd(), filename1)

    filename2 = 'processed_wiki_piece_' + str(i) + '.txt'
    path2 = os.path.join(os.getcwd(), filename2)

    process(path1, path2, i)


# 开启多进程处理数据
p=Pool(processor)
for i in range(processor):
    p.apply_async(run, args=(i,))
    print(str(i) + ' processor started !')
    
p.close()
p.join()
print("Process over!")

2017-11-23 18:24:30 WU_DENG9495 阅读数 866
  • Python-数据

    Python数据库编程视频教程,数据库是MySQL,讲解Python链接MySQL数据库,并对数据库进行增删改查操作。

    10913 人正在学习 去看看 尹成

使用python将whois信息分片,存入数据库


  • whois服务器返回的whois字典信息会很长,有的域名的whois信息有2000+个字节,将返回的whois信息进行分片处理,程序选择将其分为9片,存入数据库中

  • 也可以进行关键字匹配进行存储,但是得到的信息会不够全面, 两种方法各有优劣。

  • 以下是分片处理的python源代码


'''
--------------------------

ver     :       2.0
date    :       2017/11/22
auth    :       wud

--------------------------
'''

import DNS
import MySQLdb
import whois
import sys
from time import sleep
from time import ctime
import time


def getwhois(url):
    try:
        print "finding whois information..."
        data = whois.whois(url)
        print "whois query successfully!"
        #print data
        return data
    except:
        print "whois query fail!"
        pass

def getip(url):
    try:
        query = sys.argv[0]
        DNS.DiscoverNameServers()
        reqobj = DNS.Request(url)
        answerobj = reqobj.req(name=query, qtype=DNS.Type.A)
        if not len(answerobj.answers):
            return
        for item in answerobj.answers:
            ip = ("%s") % (item['data'])
        print "IP is: ", ip
        return ip
    except:
        print "TIME OUT"
        f2 = open("fail.txt", 'r+')
        print >> f2, url
        f2.close()
        pass


def whoisoperation(ip,url,data):
    info = str(data)
    size = len(info)
    info1 = info[:size/9]
    info2 = info[size/9:size*2/9]
    info3 = info[size*2/9:size*3/9]
    info4 = info[size*3/9:size*4/9]
    info5 = info[size*4/9:size*5/9]
    info6 = info[size*5/9:size*6/9]
    info7 = info[size*6/9:size*7/9]
    info8 = info[size*7/9:size*8/9]
    info9 = info[size*8/9:]
    #print "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    #print info1,"\n",info2,"\n",info3,"\n",info4,"\n",info5,"\n",info6,"\n",info7,"\n",info8,"\n",info9
    #print "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    #info2 = info[len/9:len / 9]
    try:
        print "databaes connecting"
        db = MySQLdb.connect("***.***.***.***", "*****", "********", "***", charset="utf8")
        print "connected!"
        cursor = db.cursor()
        cursor.execute('INSERT INTO whois_info2 (url,ip,info_1,info_2,info_3,info_4,info_5,info_6,info_7,info_8,info_9,insert_time)values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',[url,ip,info1,info2,info3,info4,info5,info6,info7,info8,info9,ctime()])
        db.commit()
        print "insert successfully!"
    except:
        print "error"


def main():
    f = open("whois_test.txt",'r')
    f1 = open("whois.txt",'r+')
    flag = 92
    while(flag>0):
        a = time.time()
        url = f.readline()[:-2]
        print 93-flag
        print url
        ip = getip(url)
        data = getwhois(url)
        whoisoperation(ip,url,data)
        print >>f1, url
        print >>f1, ip
        print >>f1, data
        print "using time :", time.time()-a
        sleep(10)
        flag-=1

if __name__ == '__main__':
    main()
2018-05-10 16:50:33 ZHANGJING199402 阅读数 16910
  • Python-数据

    Python数据库编程视频教程,数据库是MySQL,讲解Python链接MySQL数据库,并对数据库进行增删改查操作。

    10913 人正在学习 去看看 尹成

5.3老板交给我一个任务,简单处理一些数据,三个CSV文件,每个都是2.3G大小,以下是要求


看着觉得很easy,兴冲冲地去搞了,当时还是用的notepad++写python代码,对于python来说,表里面的要求利用分片,替换等都可以完成,只剩下最后的滤重比较麻烦,想来想去,用了最笨的方法,遍历,还是两重的,时间复杂度瞬间飙到了n平方,代码跑了一晚上,还没跑出结果,于是放弃这个蠢办法,查了查数据清洗常用工具,发现有excel,于是尝试用excel解决问题,先根据需要滤重的三个属性进行排序,之后根据三个属性上下相邻的两个两行是否相同添加了新的一列,有重复则赋值TRUE,没有则赋值FALSE,保存之后,再利用python读取处理,凡是TRUE的直接略过,只处理FALSE的,自以为很聪明的解决了问题,结果发给老板,后来想起来打开excel的时候提示我“可能会损失部分数据”,而且查到了excel现在只能显示1048567条数据,所以想看看到底丢了多少条,用python读取之后,发现原文件有近两千万条数据,而我只处理了104万条,周末就上网查了很多资料,python的pandas库适合用来做数据处理,用import导入pandas库失败之后,下载安装了Anaconda,搭好环境之后(详情请见:https://www.zhihu.com/question/58033789)

按照正常思路,肯定是先去重再处理(去重可以去掉一半多数据),我将去重分成两步,先将所有属性全相同的去重之后再按照指定属性去重,然后按照要求对每一行数据进行处理,最后写出,后来看了pandas库的用法,里面的DataFrame有列名,而我的原始数据没有列名,所以我先将原始文件利用python的读取CSV文件读出来,判断是否是“脏”数据之后,存入一个list里(rows=[]),写入目标文件时,先将列名写入,然后将list写入,后来发现每次运行这个程序,电脑就卡死,清理了半天电脑之后发现还是如此,后来意识到可能是因为数据量太大,rows占了太多内存导致电脑卡死,所以想到了办法:先打开目标文件,写入列名,再打开原始文件,按行读取,按照条件判断这一行是否为“脏”数据,不是的话再按照上面表格里的要求进行处理,之后按行写入目标文件,这样一来,电脑内存占用率下降,电脑也就不会卡了,最后再将初步处理过的文件利用pandas打开,利用其中的DataFrame数据结构的方法进行去重,两千万条的数据五分钟之内处理完成,以下为源代码:

import csv
rows=[]
with open(r'C:\Users\Hanju\Desktop\uploadPortal(5).csv',"w", newline='') as _csvfile: 
    writer = csv.writer(_csvfile)
    #先写入columns_name
    writer.writerow(["Dev_mac","Action","User_mac","User_mac_head","Bssid","WiFi","Time","Date"])
    i=0
    with open(r'D:\UploadPortalData\uploadPortal (5).csv',encoding='UTF-8') as csvfile:
        readCSV=csv.reader(csvfile,delimiter=',')
        for row in readCSV:
            if(len(row)!=8):
                continue
            row1=[]
            i+=1
            row1.append(row[0].replace(':','')[-5:])
                
            if row[2]=='auth':
                row1.append('1')
            elif row[2]=='deauth':
                row1.append('2')
            elif row[2]=='portal':
                row1.append('3')
            elif row[2]=='portalauth':
                row1.append('4')

            row1.append(str(row[3].replace(':','')))
            row1.append(str(row[3].replace(':','')[0:6]))

            if row[0]==row[4]:
                row1.append('2')
            else:
                row1.append('5')

            if 'City-WiFi-5G' in row[5]:
                row1.append('2')
            elif 'City-WiFi' in row[5]:
                row1.append('1')
            else:
                row1.append('0')
        
            row1.append(float(row[6])/86400.0-2.0/3.0+719530.0)
            
            row1.append(row[7])
                
            writer.writerow(row1)

print('Done')
print(i)

import pandas as pd
df=pd.read_csv(r'C:\Users\Hanju\Desktop\uploadPortal(5).csv')
#print(df.head())
#print(df.tail())
print(df.shape)
New_df=df.drop_duplicates(['Action','User_mac','Time'])
print(New_df.shape)
#print(New_df.head())
#print(New_df.tail())
New_df.to_csv(r'C:\Users\Hanju\Desktop\uploadPortal(5)_Final.csv')
print('Done')

为了查看去重效果,加了几个输出

在这挖一个坑,其实还是应该先去重再处理会更省时间,但是目前还没有想到更好的办法,以后想到了再来更新,还是要再看看pandas库,实在太强大,不得不服

下面贴一部分原始数据:

F0:AC:D7:73:11:EC,d93004d3-2164-44a0-b4fc-f5adfcf56207,portal,3C:A3:48:45:EA:5E,F0:AC:D7:73:11:EC,City-WiFi,1524813532,20180427

F0:AC:D7:73:11:EC,d93004d3-2164-44a0-b4fc-f5adfcf56207,portal,3C:A3:48:45:EA:5E,F0:AC:D7:73:11:EC,City-WiFi,1524813532,20180427
F0:AC:D7:73:11:EC,d93004d3-2164-44a0-b4fc-f5adfcf56207,portal,3C:A3:48:45:EA:5E,F0:AC:D7:73:11:EC,City-WiFi,1524813532,20180427
F0:AC:D7:73:11:EC,d93004d3-2164-44a0-b4fc-f5adfcf56207,portal,3C:A3:48:45:EA:5E,F0:AC:D7:73:11:EC,City-WiFi,1524813532,20180427
F0:AC:D7:73:11:EC,d93004d3-2164-44a0-b4fc-f5adfcf56207,portal,3C:A3:48:45:EA:5E,F0:AC:D7:73:11:EC,City-WiFi,1524813532,20180427

这是处理后的部分数据:

,Dev_mac,Action,User_mac,User_mac_head,Bssid,WiFi,Time,Date
0,311EC,3,3CA34845EA5E,3CA348,2,1,737177.6381018519,20180427
17,311EC,1,F42981BEF089,F42981,2,1,737177.6349074075,20180427
18,311EC,2,F42981BEF089,F42981,2,1,737177.6349074075,20180427
19,311EC,1,F42981BEF089,F42981,2,1,737177.6349189816,20180427
20,311EC,1,3CA34845EA5E,3CA348,2,1,737177.6349421295,20180427

就用这个不算完善的结果来作为我CSDN博客的处女作吧,毕竟,终于动手写了一篇,虽然都是流水账,可能对别人没有多大参考价值,以后的博客多写一些遇到的坑及解决办法吧,做些记录的同时也有可能对别人有所帮助,毕竟我自己遇到的好多问题都是参考了很多博主的文章得以解决的,就到这里吧!
2017-12-12 21:59:06 Bryan__ 阅读数 4921
  • Python-数据

    Python数据库编程视频教程,数据库是MySQL,讲解Python链接MySQL数据库,并对数据库进行增删改查操作。

    10913 人正在学习 去看看 尹成

数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。

import math
from multiprocessing import Pool

def run(data, index, size):  # data 传入数据,index 数据分片索引,size进程数
    size = math.ceil(len(data) / size)
    start = size * index
    end = (index + 1) * size if (index + 1) * size < len(data) else len(data)
    temp_data = data[start:end]
    # do something
    return data  # 可以返回数据,在后面收集起来

processor = 40
res = []
p = Pool(processor)
for i in range(processor):
    res.append(p.apply_async(run, args=(data, i, processor,)))
    print(str(i) + ' processor started !')
p.close()
p.join()
for i in res:
    print(i.get())  # 使用get获得多进程处理的结果


分文件处理:当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。

from multiprocessing import Pool
import pandas as pd
import math
data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})
users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])
processor=4
p=Pool(processor)
l_data = len(users)
size = math.ceil(l_data / processor)
res = []
def run(i):
    data=pd.read_csv('../data/user_'+str(i)+'.csv')
    #todo
    return data

for i in range(processor):
    start = size * i
    end = (i + 1) * size if (i + 1) * size < l_data else l_data
    user = users[start:end]
    t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)
    t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)
    print(len(t_data))

del data,l_data,users
for i in range(processor):
    res.append(p.apply_async(run, args=(i,)))
    print(str(i) + ' processor started !')
p.close()
p.join()
data = pd.concat([i.get() for i in res])


多进程数据共享:当需要修改共享的数据时,那么这个时候可以使用数据共享:

from multiprocessing import Process, Manager
# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
def func(i, d):
    d[i] = i + 100
    print(d.values())
# 在主进程中创建特殊字典
m = Manager()
d = m.dict()
for i in range(5):
    # 让子进程去修改主进程的特殊字典
    p = Process(target=func, args=(i, d))
    p.start()
p.join()
------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
2018-02-06 22:43:27 Magic_Engine 阅读数 4977
  • Python-数据

    Python数据库编程视频教程,数据库是MySQL,讲解Python链接MySQL数据库,并对数据库进行增删改查操作。

    10913 人正在学习 去看看 尹成

今天接到一个业务的需求,需要把一部分据大量的数据导出excel,数据达到千万级
从一开始打算直接使用plsql导出tsv文件,然后直接脚本处理,后来发现数据量太大,在查询的过程中一不小心出现闪断,前功尽弃。于是根据条件进行数据分片,大的几百万,小的几十万。
针对office2007以后的版本,xlsx文件上限行数大约为100多万条的样子。
以上是前提。
开始使用openpyxl将tsv文件记录转换成excel的时候,想着尽可能多的将记录写入一个文件,后来发现太天真,8G内存直接占满,使用量99% - 100%。没奈何使用分片方法,大约50万条记录生成一个xlsx文件,
脚本如下代码区。
实际运行过程中,内存使用量稳定在30%-40%之间,还算可以。后边是日志。

file_nums = ["0112","0115","0123","0125","0126","0129","0130"]

start = time.time()
lines = []
try:
    for file_num in file_nums:
        file_name_ext = 1
        file_path = "F:\\export{}.tsv".format(file_num)
        out_path = "B:\\export{}{}.xlsx".format(file_num,"_"+str(file_name_ext))

        lines.clear()
        with open(file_path, 'r', encoding="utf-8") as file:
            print("write all file begin")
            line_num = 0
            for line in file:
                line_tmp = line.split('\t')
                lines.append(line_tmp)
                line_num += 1
                if line_num % 500000 == 0:
                    part_start = time.time()
                    print("write file {} at :{}".format(out_path,part_start))
                    workbook = openpyxl.Workbook(write_only=True)
                    sheet = workbook.create_sheet()

                    for l in lines:
                        sheet.append(l)

                    workbook.save(out_path)
                    workbook.close()
                    workbook = None
                    file_name_ext += 1
                    out_path = "B:\\export{}{}.xlsx".format(file_num, "_" + str(file_name_ext))

                    part_end = time.time()
                    print("file {} write done at :{}".format(out_path, part_end))
                    print("part used : {}".format(str(part_end - part_start)))
                    lines.clear()

            if lines and len(lines) > 0:
                part_start = time.time()
                print("write file {} at :{}".format(out_path, part_start))
                workbook = openpyxl.Workbook(write_only=True)
                sheet = workbook.create_sheet()

                for l in lines:
                    sheet.append(l)

                workbook.save(out_path)
                workbook.close()
                workbook = None
                part_end = time.time()
                print("file {} write done at :{}".format(out_path, part_end))
                print("part used : {}".format(str(part_end - part_start)))
                lines.clear()

except Exception as e:
    print(e)

end = time.time()
total = end - start
print("write all file finish, used {} times".format(total))

控制台打印日志部分:

write all file begin
write file B:\export0112_1.xlsx at :1517926709.4048114
file B:\export0112_2.xlsx write done at :1517926758.3921452
part used : 48.98733377456665
write file B:\export0112_2.xlsx at :1517926759.657679
file B:\export0112_3.xlsx write done at :1517926808.537671
part used : 48.87999200820923
write file B:\export0112_3.xlsx at :1517926809.696322
file B:\export0112_4.xlsx write done at :1517926859.3176138
part used : 49.62129187583923
write file B:\export0112_4.xlsx at :1517926860.551937
file B:\export0112_5.xlsx write done at :1517926910.1919634
part used : 49.640026330947876
write file B:\export0112_5.xlsx at :1517926911.337625
file B:\export0112_6.xlsx write done at :1517926960.894076
part used : 49.556451082229614
write file B:\export0112_6.xlsx at :1517926962.0258036
file B:\export0112_7.xlsx write done at :1517927011.4833808
part used : 49.45757722854614
write file B:\export0112_7.xlsx at :1517927012.7268114
file B:\export0112_8.xlsx write done at :1517927061.5736873
part used : 48.8468759059906
write file B:\export0112_8.xlsx at :1517927062.6536007
file B:\export0112_9.xlsx write done at :1517927111.822423
part used : 49.168822288513184
write file B:\export0112_9.xlsx at :1517927113.0346901
file B:\export0112_10.xlsx write done at :1517927162.1026373
part used : 49.06794714927673
write file B:\export0112_10.xlsx at :1517927163.3083832
file B:\export0112_11.xlsx write done at :1517927212.4063733
part used : 49.09799003601074
write file B:\export0112_11.xlsx at :1517927212.8430336
file B:\export0112_11.xlsx write done at :1517927230.3664558
part used : 17.523422241210938
write all file begin
write file B:\export0115_1.xlsx at :1517927230.3875127
file B:\export0115_1.xlsx write done at :1517927230.4441934
part used : 0.05668067932128906
write all file begin
write file B:\export0123_1.xlsx at :1517927231.383569
file B:\export0123_1.xlsx write done at :1517927265.6331534
part used : 34.249584436416626
write all file begin
write file B:\export0125_1.xlsx at :1517927267.00361
file B:\export0125_2.xlsx write done at :1517927316.4248767
part used : 49.42126679420471
write file B:\export0125_2.xlsx at :1517927317.801019
file B:\export0125_2.xlsx write done at :1517927370.0042286
part used : 52.20320963859558
write all file begin
write file B:\export0126_1.xlsx at :1517927371.6056333
file B:\export0126_2.xlsx write done at :1517927427.7860975
part used : 56.18046426773071
write file B:\export0126_2.xlsx at :1517927429.2113287
file B:\export0126_3.xlsx write done at :1517927481.465444
part used : 52.25411534309387
write file B:\export0126_3.xlsx at :1517927482.6818128
file B:\export0126_3.xlsx write done at :1517927526.3801858
part used : 43.69837307929993
write all file begin
write file B:\export0129_1.xlsx at :1517927527.860628
file B:\export0129_2.xlsx write done at :1517927580.998396
part used : 53.137768030166626
write file B:\export0129_2.xlsx at :1517927582.5338671
file B:\export0129_3.xlsx write done at :1517927640.1109939
part used : 57.5771267414093
write file B:\export0129_3.xlsx at :1517927640.3472462
file B:\export0129_3.xlsx write done at :1517927647.2646072
part used : 6.91736102104187
write all file begin
write file B:\export0130_1.xlsx at :1517927647.2751348
file B:\export0130_1.xlsx write done at :1517927647.291679
part used : 0.016544103622436523
write all file finish, used 938.9572479724884 times

Process finished with exit code 0
没有更多推荐了,返回首页