精华内容
下载资源
问答
  • 本篇文章小编给大家分享一下python多线程分块读取文件实例,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。# _*_coding:utf-8_*_import time, threading, ConfigParser""&...

    本篇文章小编给大家分享一下python多线程分块读取文件实例,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。

    # _*_coding:utf-8_*_

    import time, threading, ConfigParser

    """

    Reader类,继承threading.Thread

    @__init__方法初始化

    @run方法实现了读文件的操作

    """

    class Reader(threading.Thread):

    def __init__(self, file_name, start_pos, end_pos):

    super(Reader, self).__init__()

    self.file_name = file_name

    self.start_pos = start_pos

    self.end_pos = end_pos

    def run(self):

    fd = open(self.file_name, "r")

    """

    该if块主要判断分块后的文件块的首位置是不是行首,

    是行首的话,不做处理

    否则,将文件块的首位置定位到下一行的行首

    """

    if self.start_pos != 0:

    fd.seek(self.start_pos-1)

    if fd.read(1) != " ":

    line = fd.readline()

    self.start_pos = fd.tell()

    fd.seek(self.start_pos)

    """

    对该文件块进行处理

    """

    while (self.start_pos <= self.end_pos):

    line = fd.readline()

    """

    do somthing

    """

    self.start_pos = fd.tell()

    """

    对文件进行分块,文件块的数量和线程数量一致

    """

    class Partition(object):

    def __init__(self, file_name, thread_num):

    self.file_name = file_name

    self.block_num = thread_num

    def part(self):

    fd = open(self.file_name, "r")

    fd.seek(0, 2)

    pos_list = []

    file_size = fd.tell()

    block_size = file_size/self.block_num

    start_pos = 0

    for i in range(self.block_num):

    if i == self.block_num-1:

    end_pos = file_size-1

    pos_list.append((start_pos, end_pos))

    break

    end_pos = start_pos+block_size-1

    if end_pos >= file_size:

    end_pos = file_size-1

    if start_pos >= file_size:

    break

    pos_list.append((start_pos, end_pos))

    start_pos = end_pos+1

    fd.close()

    return pos_list

    if __name__ == "__main__":

    """

    读取配置文件

    """

    config = ConfigParser.ConfigParser()

    config.readfp(open("conf.ini"))

    #文件名

    file_name = config.get("info", "fileName")

    #线程数量

    thread_num = int(config.get("info", "threadNum"))

    #起始时间

    start_time = time.clock()

    p = Partition(file_name, thread_num)

    t = []

    pos = p.part()

    #生成线程

    for i in range(thread_num):

    t.append(Reader(file_name, *pos[i]))

    #开启线程

    for i in range(thread_num):

    t[i].start()

    for i in range(thread_num):

    t[i].join()

    #结束时间

    end_time = time.clock()

    print "Cost time is %f" % (end_time - start_time)

    展开全文
  • 主要为大家详细介绍了python多线程分块读取文件,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
  • 本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件# -*- coding: GBK -*-import urlparseimport datetime...

    本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:

    读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件

    # -*- coding: GBK -*-

    import urlparse

    import datetime

    import os

    from multiprocessing import Process,Queue,Array,RLock

    """

    多进程分块读取文件

    """

    WORKERS = 4

    BLOCKSIZE = 100000000

    FILE_SIZE = 0

    def getFilesize(file):

    """

    获取要读取文件的大小

    """

    global FILE_SIZE

    fstream = open(file,"r")

    fstream.seek(0,os.SEEK_END)

    FILE_SIZE = fstream.tell()

    fstream.close()

    def process_found(pid,array,file,rlock):

    global FILE_SIZE

    global JOB

    global PREFIX

    """

    进程处理

    Args:

    pid:进程编号

    array:进程间共享队列,用于标记各进程所读的文件块结束位置

    file:所读文件名称

    各个进程先从array中获取当前最大的值为起始位置startpossition

    结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)

    if startpossition==FILE_SIZE则进程结束

    if startpossition==0则从0开始读取

    if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理

    if 当前位置 <=endpossition 就readline

    否则越过边界,就从新查找array中的最大值

    """

    fstream = open(file,"r")

    while True:

    rlock.acquire()

    print "pid%s"%pid,",".join([str(v) for v in array])

    startpossition = max(array)

    endpossition = array[pid] = (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)

    rlock.release()

    if startpossition == FILE_SIZE:#end of the file

    print "pid%s end"%(pid)

    break

    elif startpossition !=0:

    fstream.seek(startpossition)

    fstream.readline()

    pos = ss = fstream.tell()

    ostream = open("/data/download/tmp_pid"+str(pid)+"_jobs"+str(endpossition),"w")

    while pos

    #处理line

    line = fstream.readline()

    ostream.write(line)

    pos = fstream.tell()

    print "pid:%s,startposition:%s,endposition:%s,pos:%s"%(pid,ss,pos,pos)

    ostream.flush()

    ostream.close()

    ee = fstream.tell()

    fstream.close()

    def main():

    global FILE_SIZE

    print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")

    file = "/data/pds/download/scmcc_log/tmp_format_2011004.log"

    getFilesize(file)

    print FILE_SIZE

    rlock = RLock()

    array = Array("l",WORKERS,lock=rlock)

    threads=[]

    for i in range(WORKERS):

    p=Process(target=process_found, args=[i,array,file,rlock])

    threads.append(p)

    for i in range(WORKERS):

    threads[i].start()

    for i in range(WORKERS):

    threads[i].join()

    print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")

    if __name__ == "__main__":

    main()

    希望本文所述对大家Python程序设计有所帮助。

    展开全文
  • file_name:文件地址 ''' inputfile = open(file_name, 'rb') #可打开含有中文的地址 data = pd.read_csv(inputfile, iterator=True) loop = True chunkSize = 1000 #一千行一块 chunks = [] while loop: ...
  • 主要介绍了Python多进程分块读取超大文件的方法,涉及Python多进程操作与文件分块读取的相关技巧,需要的朋友可以参考下
  • 今天小编就为大家分享一篇Python 多线程不加锁分块读取文件的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • python多进程分块读取文件

    千次阅读 2012-06-27 18:31:39
    # -*- coding: GBK -*- # filename: multiprocessreadfile.py import urlparse import datetime import os from multiprocessing import Process,Queue,Array,...多进程分块读取文件 """ WORKERS = 4 BLOCKSIZE = 1
    # -*- coding: GBK -*-
    # filename: multiprocessreadfile.py
    
    import urlparse
    import datetime
    import os
    from multiprocessing import Process,Queue,Array,RLock
    
    """
    多进程分块读取文件
    """
    
    WORKERS = 4
    BLOCKSIZE = 100000000 #字节,接近100MB
    FILE_SIZE = 0
    
    def getFilesize(file):
        """
            获取要读取文件的大小
        """
        global FILE_SIZE
        fstream = open(file,'r')
        fstream.seek(0,os.SEEK_END)
        FILE_SIZE = fstream.tell()
        fstream.close()
    
    def process_found(pid,array,file,rlock):
        global FILE_SIZE
        global JOB
        global PREFIX
        """
            进程处理
            Args:
                pid:进程编号
                array:进程间共享队列,用于标记各进程所读的文件块结束位置
                file:所读文件名称
            各个进程先从array中获取当前最大的值为起始位置startpossition
            结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
            if startpossition==FILE_SIZE则进程结束
            if startpossition==0则从0开始读取
            if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理
            if 当前位置 <=endpossition 就readline
            否则越过边界,就从新查找array中的最大值
        """
        fstream = open(file,'r')
        
        while True:
            rlock.acquire()
            print 'pid%s'%pid,','.join([str(v) for v in array])
            startpossition = max(array)            
            endpossition = array[pid] = (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
            rlock.release()
            
            if startpossition == FILE_SIZE:#end of the file
                print 'pid%s end'%(pid)
                break
            elif startpossition !=0:
                fstream.seek(startpossition)
                fstream.readline()
            pos = ss = fstream.tell()
            ostream = open('/data/download/tmp_pid'+str(pid)+'_jobs'+str(endpossition),'w')
            while pos<endpossition:
                #处理line
                line = fstream.readline()                        
                ostream.write(line)
                pos = fstream.tell()
    
            print 'pid:%s,startposition:%s,endposition:%s,pos:%s'%(pid,ss,pos,pos)
            ostream.flush()
            ostream.close()
            ee = fstream.tell()        
    
        fstream.close()
    
    def main():
        global FILE_SIZE
        print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S") 
        
    #    file = "/data/pds/download/scmcc_log/tmp_format_2011004.log"
        file = "zdmm1_10.txt"
        getFilesize(file)
        print FILE_SIZE
        
        rlock = RLock()
        array = Array('l',WORKERS,lock=rlock)
        threads=[]
        for i in range(WORKERS):
            p=Process(target=process_found, args=[i,array,file,rlock])
            threads.append(p)
    
        for i in range(WORKERS):
            threads[i].start()
        
        for i in range(WORKERS):
            threads[i].join()
    
        print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S") 
    
    if __name__ == '__main__':
        main()


    点击打开链接

    展开全文
  • import pandas as pd def read_data(file_name): ... file_name:文件地址 ''' inputfile = open(file_name, 'rb') #可打开含有中文的地址 data = pd.read_csv(inputfile, iterator=True) loop = True ...
    import pandas as pd
    def read_data(file_name):
        '''
        file_name:文件地址
        '''
        inputfile = open(file_name, 'rb')   #可打开含有中文的地址
        data = pd.read_csv(inputfile, iterator=True)
        loop = True
        chunkSize = 1000    #一千行一块
        chunks = []
        while loop:
            try:
                chunk = data.get_chunk(chunkSize)
                chunks.append(chunk)
            except StopIteration:
                loop = False
                print("Iteration is stopped.")
        data = pd.concat(chunks, ignore_index=True)
        #print(train.head())
        return data

     

     

    展开全文
  • Python 多线程分块读取文件

    千次阅读 2018-08-14 17:45:31
    什么也不说,直接上代码,绝对看的懂 # _*_coding:utf-8_*_ import time, threading, ConfigParser ...@run方法实现了读文件的操作 ''' class Reader(threading.Thread): def __init__(self,...
  • Python多进程分块读取文件

    千次阅读 2012-08-27 16:27:51
    最近在做日志分析,可恨的log动辄上G,如果...所以就打算多进程分块来读入文件。  网上有位哥哥给了一份参考的源代码,http://www.oschina.net/code/snippet_97079_4465 # -*- coding: GBK -*- import urlpars
  • 因此通过文件分块,可以比较有效的解决多线程读问题,之前看到有人写的分块操作,比较复杂,需要实现建立好线程以及所读取块信息,在这里,我提供了一种比较简便的方法,以供参考。#!/user/bin/env python #_*_cod.....
  • 本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件# -*- coding: GBK -*-import urlparseimport datetime...
  • for i in range(1,3000): print(i,end='line\n',...'rb') print(file.read()) while True: chunk=file.read(2) if not chunk:break print(chunk) line=file.readline()一行行读取 chunk=file.read(2)一次读取2个字节
  • # ===================== 逐块读取文本文件 ============...chunksize = 4 # 分块读取,返回一个可迭代对象TextFileReader iterator = True # 返回一个可迭代对象,使用df.get_chunk(10)查看数据 # ===============...
  • 本文要点刚要:(一)读文本文件格式的数据函数:read_csv,read_table1....数据太大时需要逐块读取文本数据用chunksize进行分块。(二)将数据写成文本文件格式函数:to_csv范例如下:(一)读取文本文件格...
  • pandas分块处理参考链接: https://blog.csdn.net/weixin_43790560/article/details/88587123 https://blog.csdn.net/zcgyq/article/details/83088259 import os import pandas as pd import numpy as np from ...
  • 本文要点刚要:(一)读文本文件格式的数据函数:read_csv,read_table1....数据太大时需要逐块读取文本数据用chunksize进行分块。(二)将数据写成文本文件格式函数:to_csv范例如下:(一)读取文本文件格...
  • data = pd.read_csv(path, sep=',',engine = 'python',iterator=True) loop = True chunkSize = 1000 chunks = [] index=0 while loop: try: print(index) chunk = data.get_chunk(chunkSize) chunks.append...
  • 本文要点刚要:(一)读文本文件格式的数据函数:read_csv,read_table1....数据太大时需要逐块读取文本数据用chunksize进行分块。(二)将数据写成文本文件格式函数:to_csv范例如下:(一)读取文本文件格...
  • 本文要点刚要:(一)读文本文件格式的数据函数:read_csv,read_table1....数据太大时需要逐块读取文本数据用chunksize进行分块。(二)将数据写成文本文件格式函数:to_csv范例如下:(一)读取文本文件格...
  • Python chunk读取超大文件

    万次阅读 2019-05-11 16:40:11
    查询到pandas的read_csv()提供了chunk分块读取能力。 官方示例 这是一张原始的table In [185]: table = pd.read_csv('tmp.sv', sep='|') In [186]: table Out[186]: Unnamed: 0 0 ...
  • python读取文件

    2019-10-02 19:49:58
    """生成器函数:分块读取文件内容,使用 iter 函数 """ # 首先使用 partial(fp.read, block_size) 构造一个新的无需参数的函数 # 循环将不断返回 fp.read(block_size) 调用结果,直到其为 '' 时终止 ...

空空如也

空空如也

1 2 3 4 5
收藏数 91
精华内容 36
关键字:

python分块读取文件

python 订阅