python 分片处理大数据

2017-05-02 17:34:00 weixin_34087503 阅读数 311

本节书摘来自异步社区《Python和HDF 5大数据应用》一书中的第1章,第1.1节,作者[美]Andrew Collette(科莱特),胡世杰 译,更多章节内容可以访问云栖社区“异步社区”公众号查看。

第1章 简介

我刚毕业那会遇到过一个严重的问题——一部国家公认的等离子体研究设备花了整整一周时间收集的上千万个数据的值不太对劲。

比正常情况小了约40个数量级。

我跟我的咨询师挤在他的办公室,在一台崭新的G5 Mac Pro上运行我们的可视化软件,试图搞明白哪里出了问题。从机器中获得的数据是正确的,实验所使用的数字转换器提交的原始数据看上去没有问题。我在Thinkpad笔记本上用IDL语言编写了一个巨大的脚本将原始数据转换成可视化软件能够识别的文件。这些文件的格式十分简单:一个简短的定长头部后面加上一堆二进制浮点数据。我还另外又花了一个小时写了一个程序来验证这些文件,它们也没问题。但当我将所有这些在IDL中看上去如此优雅的数据导入可视化软件以后,它们看上去就像是一锅粥,毫无特色、杂乱无章,所有的值大约都只有10−41左右。

最后我们发现了问题所在:数字转换器和我的Thinkpad使用了“little-endian”格式,而G5 Mac使用了“big-endian”格式。一台机器输出的原始数据值无法被另一台机器正确地读入,反过来也一样。当时我所有想法中最有礼貌的一句是:这也太笨了。哪怕最后发现此类问题是如此司空见惯以至于IDL专门提供了一个SWAP_ENDIAN函数来处理也并没有令我的情绪变得更好。

在此之前我从不关心数据是如何存储的。这个事件以及其他一些类似事件改变了我的想法。作为一名科学家,我最终意识到,我们不仅需要选择数据的组织和存储,同时也需要选择数据的通信方式。设计优雅的标准格式不仅让每个人的生活变得简单(消除了上面愚蠢而又浪费时间的“endian”问题),而且也使得全世界都能共享这些数据。

1.1 Python和HDF5

在Python的世界里,人们在数值类型大数据的存储机制上进行选择时,迅速对层次性数据格式第5版(Hierarchical Data Format version 5,HDF5)达成了共识。当数据量越来越大的时候,数据的组织就变得越来越重要。命名数据集(第3章)、层次性分组(第5章)和用户自定义元数据“特征”(第6章)等HDF5特性对于数据分析的过程极为必要。

HDF5这种结构化的自我描述格式跟Python相辅相成。目前HDF5已经有两大开发成熟、功能丰富的Python接口模块h5py和PyTables,在两者之上还有许多为特定用途开发的小型封装模块。

1.1.1 数据和元数据的组织

这是一个利用HDF5的结构化能力帮助应用程序的简单例子。不要太担心文件结构和HDF5使用API等方面的细节,后续章节自会一一解释。就把这个当成是一次HDF5尝鲜。如果你想要运行这个例子,你需要Python 2并安装NumPy(第2章)。

假设我们有一个NumPy数组,它代表了某次实验获取的一些数据:

  

screenshot

假设这些数据来自某个气象站十秒一次的温度采样。为了让这些数据有意义,我们还需要记录采样的时间间隔“delta-T”。目前我们把它放到一个Python变量中。

  

screenshot

同时我们还需要记录第一个数据获取的时间起点,以及这些数据来自15号气象站:

  

screenshot

我们可以用一个简单的NumPy内建函数np.savez来将这些数据存入磁盘,该函数将每个数据以NumPy数组的格式保存并打包进一个指定名字的zip文件:

  

screenshot

我们可以用np.load从文件中获取这些数据:

  

screenshot

目前为止一切顺利。但如果每个气象站的数据不止一组怎么办?比如说还要记录一组风速数据?

 

screenshot

再假设我们有不止一个气象站。也许我们可以引入某种命名规范,比如“wind_15”作为15号气象站的风速数据,“dt_wind_15”作为采样时间间隔。又或许我们可以使用多个文件……

作为对比,让我们看看如果用HDF5来存储会是怎样:


screenshot

这个例子演示了HDF5的两个杀手级特性:层次性分组和特征。组就像文件系统里的目录,使你可以将相关的数据集保存在一起。本例将来自同一个气象站的温度和风速分别保存在名为“/15”和“/20”的组里。特征允许你在数据上直接附加描述性的元数据。一旦你将这个文件给其他同事,他们可以轻易发现这些信息并明白这些数据的意义:

screenshot

1.1.2 大数据复制

人们正在越来越多地将Python用于大数据集的快速可视化项目,在大规模计算中将Python作为一种高层粘合性语言来协助那些编译型语言如C和FORTRAN。现在一个数据集动不动就是上千GB甚至TB的数据需要处理,HDF5自身最大可以支持EB的规模。

大多数的机器不可能将如此大规模的数据集直接导入内存。HDF5最大的优点之一在于支持子集分片和部分I/O。让我们看一下之前创建的拥有1024个元素的“temperature”数据集:

  

screenshot

这里的dataset对象是一个HDF5数据集的代理对象。它支持数组切片操作,NumPy用户可能会觉得很熟悉:

  

screenshot

记住,真正的数据保存在磁盘上,切片操作会去寻找合适的数据并读入内存。这种形式的切片利用了HDF5底层的子集分片功能,所以非常迅速。

HDF5另一个伟大之处在于你可以控制存储的分配。当你创建了一个全新的数据集,除了一些元数据以外它不会占用任何空间,默认情况下仅当你真的需要写入数据时才会占用磁盘上的空间。

比如下面这个2TB的数据集,你可以在几乎任何计算机上创建出来:

 

 .screenshot

虽然还没有为其真正分配任何存储,但整个数据集的空间对我们都是可用的。我们可以在数据集的任何地方写入,而磁盘上只会占用必要的字节:

 

 screenshot

在存储非常昂贵的情况下,你甚至可以对数据集进行透明压缩(第4章):

 

 screenshot

2018-06-25 09:22:22 a649344475 阅读数 903

一、创建服务端,taskManager.py,代码如下:

import random
import time
import queue
from multiprocessing.managers import BaseManager

# 第一步:建立task_queue 和 result_queue,用来存放任务结果
task_queue = queue.Queue()
result_queue = queue.Queue()

def get_task():
    return task_queue
def get_result():
    return result_queue

class Queuemanager(BaseManager):
    pass

def main(ip,port,kl):
    # 第二步:把创建的两个队列注册在网络上,利用 register 方法,
    # callable参数关联了 Queue 对象,将Queue对象在网络中暴露
    Queuemanager.register('get_task_queue',callable=get_task)
    Queuemanager.register('get_result_queue',callable=get_result)

    # 第三步:初始化对象:绑定IP、端口、设置验证口令 。
    manager = Queuemanager(address=(ip,port),authkey=kl)

    # 第四步:启动管理,监听信息通道
    manager.start()

    # 第五步:通过管理实例的方法获得通过网络访问的Queue对象
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # 第六步:添加任务
    for url in ["ImageUrl_"+str(i) for i in range(10)]:
        print('put task %s ...'%url)
        task.put(url)
    # 获得返回结果
    print('try get result...')
    for i in range(10):
        print('result is %s'%result.get(timeout=10))  # 最大等待10秒
    # 关闭管理
    manager.shutdown()

if __name__ == '__main__':

    ip = '127.0.0.1' # 要绑定的本机IP

    port = 8001 # 端口号

    passwd = b'distributed'  # 口令

    main(ip,port,passwd)


二、创建客户端,taskWorker.py,客户端代码可以放入多台电脑运行,以达到分布式需求,代码如下:

import time
from multiprocessing.managers import BaseManager

# 创建类似的 QueueManager
class QueueManager(BaseManager):
    pass

def main(server_addr,port,kl):
    # 第一步:使用 QueueManager 注册用于获取Queue的方法名称
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 第二步:连接到服务器
    print('Connect to server %s...'%server_addr)
    # 端口和验证口令注意保持与服务进程完全一致
    m = QueueManager(address=(server_addr,port),authkey=kl)
    # 从网络连接
    m.connect()

    # 第三步:获取Queue的对象
    task = m.get_task_queue()
    result = m.get_result_queue()

    # 第四步:从task队列获取任务,并把结果写入result队列
    while (not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...'%image_url)
        time.sleep(1)
        result.put('%s--->Computer 1 success'%image_url)
    # 处理结束
    print('worker exit.')

if __name__ == '__main__':

    ip = '127.0.0.1' # 要连接的服务端的IP

    port = 8001 # 服务端设定的端口号

    passwd = b'distributed'  # 口令

    main(ip,port,passwd)
2019-11-27 16:39:45 mch2869253130 阅读数 213

主要用到multiprocessing库,思想是将大数据读入内存,然后切片存储,然后多进程分别处理分片。

from multiprocessing import Pool
import math
import os

# 读取数据
path = os.path.join(os.getcwd(), 'test.txt')
with open(path, 'r') as f:
    data = f.readlines()
    
processor=4
l_data = len(data)
size = math.ceil(l_data / processor)

# 切分数据并存储
for i in range(processor):
    start = size*i
    end = (i+1)*size if (i+1)*size < l_data else l_data
    
    filename = 'en_wiki_' + str(i) +'.txt'
    path = os.path.join(os.getcwd(), filename)
    with open(path, 'w') as f:
        for i in range(start, end):
            f.write(data[i])

# 删除读入内存的大数据,高效利用内存
del data,l_data  


# 处理数据
def proess(path1, path2, pid):
    # do something

def run(i):
    filename1 = 'en_wiki_piece_' + str(i) + '.txt'
    path1 = os.path.join(os.getcwd(), filename1)

    filename2 = 'processed_wiki_piece_' + str(i) + '.txt'
    path2 = os.path.join(os.getcwd(), filename2)

    process(path1, path2, i)


# 开启多进程处理数据
p=Pool(processor)
for i in range(processor):
    p.apply_async(run, args=(i,))
    print(str(i) + ' processor started !')
    
p.close()
p.join()
print("Process over!")

2016-10-01 14:47:38 LG1259156776 阅读数 8202

Python 适合大数据量的处理吗?

python 能处理数据库中百万行级的数据吗?

处理大规模数据时有那些常用的python库,他们有什么优缺点?适用范围如何?

2017-05-02 09:16:00 weixin_34391445 阅读数 68

本节书摘来自异步社区《Python和HDF 5大数据应用》一书中的第1章,第1.2节,作者[美]Andrew Collette(科莱特),胡世杰 译,更多章节内容可以访问云栖社区“异步社区”公众号查看。

1.2 HDF5到底是什么

HDF5是一种存储相同类型数值的大数组的机制,适用于可被层次性组织且数据集需要被元数据标记的数据模型。

它跟SQL风格的关系型数据库区别相当大,HDF5在组织结构方面有一些特殊的技巧(第8章中有一个例子)。如果你需要在多个表上保持关系,或者想要在数据上进行JOIN,那么一个关系型数据库可能更适合你。又或者你需要在一台没有安装HDF5的机器上读取一个小型的1维数据集,那么CSV这样的文本格式是更合理的选择。

但如果你需要处理多维数组,对性能有非常高的要求,需要在数据集上支持子集分片和部分I/O,需要用特征来给数据集做标记,对关系型特性没有要求,那么HDF5就是完美的选择。

那么说到底,“HDF5”究竟是指什么?我确信它包含下面3点:

1.一种文件规格及相关的数据模型;

2.一个可被C、C++、Java,Python以及其他语言使用的API标准库;

3.一个软件生态系统,由使用HDF5的客户程序以及MATLAB、IDL和Python等“分析平台”组成。

1.2.1 HDF5文件规格

你已经在上面的例子见到HDF5数据模型的三大要素:

数据集:一种数组型对象,在磁盘上保存数值类型的数据;

:层次性容器,可以包含数据集和子组;

特征:自定义元数据信息,可被附加在数据集(以及组!)上。

用户可以使用这些基本抽象构建适合自己问题域的应用格式。比如,我们之前的气象站代码为每个气象站分了一个组,为每个测量参数分配一个数据集,并附加了一些特征以描述数据集的额外信息。这种统一使用“格式内格式”来决定如何用组、数据集和特征来保存信息的方式在实验室或者其他机构中是非常普遍的。

既然HDF5处理一切如“endian”的跨平台问题,数据的分享就只需要对组、数据集和特征进行简单操作并获得结果。由于文件是自我描述的,你甚至不需要了解应用格式就可以从文件中获取数据。你只需打开文件并浏览其内容:


screenshot

任何曾经在读取“简单”二进制格式文件上花费数小时琢磨字节偏移量的人都应该对此充满感激。

最后,HDF5文件的底层字节布局是开放的规格。比起专利软件的二进制格式,这里面没有任何隐秘。虽然基本上人们都会使用HDF组织提供的库来访问这些文件,但是你自己写一个软件去读也没有任何问题。

1.2.2 HDF5标准库

HDF5文件规格及开源库由一个非营利性组织HDF组织(http://www.hdfgroup.org )维护,其总部位于伊利诺伊州尚佩恩县,原本是伊利诺伊大学香槟分校的一部分。HDF组织的主要产品是HDF5标准库。

该库主要用C语言写成,对C++和Java有一些额外的绑定。人们说起“HDF5”时通常就是指这个库。两大脍炙人口的Python接口模块PyTables和h5py使用的就是这个由HDF组织提供的C库。

这个标准库最主要的一点在于开发者对它的积极维护以及在向下兼容方面花费的巨大精力。标准库的向下兼容不仅仅是API的兼容,亦包括文件格式的兼容。对于HDF5这样的归档文件格式来说,兼容性是一个非常必需的特性。而API兼容则使h5py和PyTables这样的模块有能力处理世界上各种不同版本的HDF5。

对于科学数据的存储,包括长期的存储,你应该对HDF5有信心。由于标准库和文件格式都是开源的,哪怕一颗流星摧毁了伊利诺伊州,你的文件依然能够被读取。

1.2.3 HDF5生态系统

最后,让HDF5特别有用的一个原因是你可以在几乎任何平台上读写文件。IDL语言已经支持HDF5好多年了;MATLAB现在甚至以HDF5作为其“.mat”保存文件的默认格式;Python、C++、Java、.NET和LabView以及其他语言对其都有支持。NASA地球观测系统等机构用户使用的“EOS5”格式是建立在HDF5容器之上的应用格式,刚才我们见到的其实是其简化以后的例子。甚至作为HDF5竞争对手之一的NetCDF,其最新的NetCDF4格式也是实现在HDF5的组、数据集和特征之上。

希望我上面介绍的这些能够让你了解HDF5在科学用途上所向披靡的原因。接下来,我们将看到HDF5工作的基本原理并开始在Python上使用它。