精华内容
下载资源
问答
  • python实现并行爬虫.pdf

    2021-08-10 16:56:26
    python实现并行爬虫.pdf
  • python实现并行爬虫

    2015-09-07 20:10:18
    指定爬虫depth、线程数, python实现并行爬虫
  • 本文主要是以房价网房价信息爬虫为例,对Python实现整站40万条房价数据并行抓取(可更换抓取城市)的方法进行分析介绍。需要的朋友一起来看下吧
  • 一行 Python 实现并行化 -- 日常多线程操作的新思路 春节坐在回家的火车上百无聊赖,偶然看到 Parallelism in one line 这篇在 Hacker News 和 reddit 上都评论过百的文章,顺手译出,enjoy:-)

    原址如下:

    https://segmentfault.com/a/1190000000414339


    一行 Python 实现并行化 -- 日常多线程操作的新思路


    春节坐在回家的火车上百无聊赖,偶然看到 Parallelism in one line 这篇在 Hacker News 和 reddit 上都评论过百的文章,顺手译出,enjoy:-)

    http://www.zhangzhibo.net/2014/02/01/parallelism-in-one-line/

    Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL1,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。

    传统的例子

    简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:

    #Example.py
    '''
    Standard Producer/Consumer Threading Pattern
    '''
    
    import time 
    import threading 
    import Queue 
    
    class Consumer(threading.Thread): 
        def __init__(self, queue): 
            threading.Thread.__init__(self)
            self._queue = queue 
    
        def run(self):
            while True: 
                # queue.get() blocks the current thread until 
                # an item is retrieved. 
                msg = self._queue.get() 
                # Checks if the current message is 
                # the "Poison Pill"
                if isinstance(msg, str) and msg == 'quit':
                    # if so, exists the loop
                    break
                # "Processes" (or in our case, prints) the queue item   
                print "I'm a thread, and I received %s!!" % msg
            # Always be friendly! 
            print 'Bye byes!'
    
    
    def Producer():
        # Queue is used to share items between
        # the threads.
        queue = Queue.Queue()
    
        # Create an instance of the worker
        worker = Consumer(queue)
        # start calls the internal run() method to 
        # kick off the thread
        worker.start() 
    
        # variable to keep track of when we started
        start_time = time.time() 
        # While under 5 seconds.. 
        while time.time() - start_time < 5: 
            # "Produce" a piece of work and stick it in 
            # the queue for the Consumer to process
            queue.put('something at %s' % time.time())
            # Sleep a bit just to avoid an absurd number of messages
            time.sleep(1)
    
        # This the "poison pill" method of killing a thread. 
        queue.put('quit')
        # wait for the thread to close down
        worker.join()
    
    
    if __name__ == '__main__':
        Producer()
    

    哈,看起来有些像 Java 不是吗?

    我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。

    问题在于…

    首先,你需要一个样板类;
    其次,你需要一个队列来传递对象;
    而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。

    worker 越多,问题越多

    按照这一思路,你现在需要一个 worker 线程的线程池。下面是一篇 IBM 经典教程中的例子——在进行网页检索时通过多线程进行加速。

    #Example2.py
    '''
    A more realistic thread pool example 
    '''
    
    import time 
    import threading 
    import Queue 
    import urllib2 
    
    class Consumer(threading.Thread): 
        def __init__(self, queue): 
            threading.Thread.__init__(self)
            self._queue = queue 
    
        def run(self):
            while True: 
                content = self._queue.get() 
                if isinstance(content, str) and content == 'quit':
                    break
                response = urllib2.urlopen(content)
            print 'Bye byes!'
    
    
    def Producer():
        urls = [
            'http://www.python.org', 'http://www.yahoo.com'
            'http://www.scala.org', 'http://www.google.com'
            # etc.. 
        ]
        queue = Queue.Queue()
        worker_threads = build_worker_pool(queue, 4)
        start_time = time.time()
    
        # Add the urls to process
        for url in urls: 
            queue.put(url)  
        # Add the poison pillv
        for worker in worker_threads:
            queue.put('quit')
        for worker in worker_threads:
            worker.join()
    
        print 'Done! Time taken: {}'.format(time.time() - start_time)
    
    def build_worker_pool(queue, size):
        workers = []
        for _ in range(size):
            worker = Consumer(queue)
            worker.start() 
            workers.append(worker)
        return workers
    
    if __name__ == '__main__':
        Producer()
    

    这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……

    至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。

    何不试试 map

    map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。

        urls = ['http://www.yahoo.com', 'http://www.reddit.com']
        results = map(urllib2.urlopen, urls)
    

    上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:

    results = []
    for url in urls: 
        results.append(urllib2.urlopen(url))
    

    map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。

    为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。

    在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.dummy.

    这里多扯两句: multiprocessing.dummy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:"嘛,有这么个东西,你知道就成."相信我,这个库被严重低估了!

    dummy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 dummy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。
    所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。2

    动手尝试

    使用下面的两行代码来引用包含并行化 map 函数的库:

    from multiprocessing import Pool
    from multiprocessing.dummy import Pool as ThreadPool
    

    实例化 Pool 对象:

    pool = ThreadPool()
    

    这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问。

    Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。

    一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。

    pool = ThreadPool(4) # Sets the pool size to 4
    

    线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

    创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

    import urllib2 
    from multiprocessing.dummy import Pool as ThreadPool 
    
    urls = [
        'http://www.python.org', 
        'http://www.python.org/about/',
        'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
        'http://www.python.org/doc/',
        'http://www.python.org/download/',
        'http://www.python.org/getit/',
        'http://www.python.org/community/',
        'https://wiki.python.org/moin/',
        'http://planet.python.org/',
        'https://wiki.python.org/moin/LocalUserGroups',
        'http://www.python.org/psf/',
        'http://docs.python.org/devguide/',
        'http://www.python.org/community/awards/'
        # etc.. 
        ]
    
    # Make the Pool of workers
    pool = ThreadPool(4) 
    # Open the urls in their own threads
    # and return the results
    results = pool.map(urllib2.urlopen, urls)
    #close the pool and wait for the work to finish 
    pool.close() 
    pool.join() 
    

    实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

    # results = [] 
    # for url in urls:
    #   result = urllib2.urlopen(url)
    #   results.append(result)
    
    # # ------- VERSUS ------- # 
    
    
    # # ------- 4 Pool ------- # 
    # pool = ThreadPool(4) 
    # results = pool.map(urllib2.urlopen, urls)
    
    # # ------- 8 Pool ------- # 
    
    # pool = ThreadPool(8) 
    # results = pool.map(urllib2.urlopen, urls)
    
    # # ------- 13 Pool ------- # 
    
    # pool = ThreadPool(13) 
    # results = pool.map(urllib2.urlopen, urls)
    

    结果:

    #        Single thread:  14.4 Seconds 
    #               4 Pool:   3.1 Seconds
    #               8 Pool:   1.4 Seconds
    #              13 Pool:   1.3 Seconds
    

    很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

    另一个真实的例子

    生成上千张图片的缩略图
    这是一个 CPU 密集型的任务,并且十分适合进行并行化。

    基础单进程版本

    import os 
    import PIL 
    
    from multiprocessing import Pool 
    from PIL import Image
    
    SIZE = (75,75)
    SAVE_DIRECTORY = 'thumbs'
    
    def get_image_paths(folder):
        return (os.path.join(folder, f) 
                for f in os.listdir(folder) 
                if 'jpeg' in f)
    
    def create_thumbnail(filename): 
        im = Image.open(filename)
        im.thumbnail(SIZE, Image.ANTIALIAS)
        base, fname = os.path.split(filename) 
        save_path = os.path.join(base, SAVE_DIRECTORY, fname)
        im.save(save_path)
    
    if __name__ == '__main__':
        folder = os.path.abspath(
            '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
        os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
    
        images = get_image_paths(folder)
    
        for image in images:
            create_thumbnail(Image)
    

    上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

    这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

    如果我们使用 map 函数来代替 for 循环:

    import os 
    import PIL 
    
    from multiprocessing import Pool 
    from PIL import Image
    
    SIZE = (75,75)
    SAVE_DIRECTORY = 'thumbs'
    
    def get_image_paths(folder):
        return (os.path.join(folder, f) 
                for f in os.listdir(folder) 
                if 'jpeg' in f)
    
    def create_thumbnail(filename): 
        im = Image.open(filename)
        im.thumbnail(SIZE, Image.ANTIALIAS)
        base, fname = os.path.split(filename) 
        save_path = os.path.join(base, SAVE_DIRECTORY, fname)
        im.save(save_path)
    
    if __name__ == '__main__':
        folder = os.path.abspath(
            '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
        os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
    
        images = get_image_paths(folder)
    
        pool = Pool()
        pool.map(creat_thumbnail, images)
        pool.close()
        pool.join()
    

    5.6 秒!

    虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

    到这里,我们就实现了(基本)通过一行 Python 实现并行化。

    Update:
    译文已获作者 Chris 授权 https://medium.com/building-things-on-the-internet/40e9b2b36148#66bf-f06f781cb52b


    展开全文
  • 一行 Python 实现并行化 -- 日常多线程操作的新思路 - 左手键盘,右手书 - SegmentFault
    展开全文
  • Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL1,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨...

    Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL1,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。

    传统的例子

    简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:

    #Example.py
    '''
    Standard Producer/Consumer Threading Pattern
    '''
    
    import time 
    import threading 
    import Queue 
    
    class Consumer(threading.Thread): 
        def __init__(self, queue): 
            threading.Thread.__init__(self)
            self._queue = queue 
    
        def run(self):
            while True: 
                # queue.get() blocks the current thread until 
                # an item is retrieved. 
                msg = self._queue.get() 
                # Checks if the current message is 
                # the "Poison Pill"
                if isinstance(msg, str) and msg == 'quit':
                    # if so, exists the loop
                    break
                # "Processes" (or in our case, prints) the queue item   
                print "I'm a thread, and I received %s!!" % msg
            # Always be friendly! 
            print 'Bye byes!'
    
    
    def Producer():
        # Queue is used to share items between
        # the threads.
        queue = Queue.Queue()
    
        # Create an instance of the worker
        worker = Consumer(queue)
        # start calls the internal run() method to 
        # kick off the thread
        worker.start() 
    
        # variable to keep track of when we started
        start_time = time.time() 
        # While under 5 seconds.. 
        while time.time() - start_time < 5: 
            # "Produce" a piece of work and stick it in 
            # the queue for the Consumer to process
            queue.put('something at %s' % time.time())
            # Sleep a bit just to avoid an absurd number of messages
            time.sleep(1)
    
        # This the "poison pill" method of killing a thread. 
        queue.put('quit')
        # wait for the thread to close down
        worker.join()
    
    
    if __name__ == '__main__':
        Producer()
    

    哈,看起来有些像 Java 不是吗?

    我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。

    问题在于…

    首先,你需要一个样板类;
    其次,你需要一个队列来传递对象;
    而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。

    worker 越多,问题越多

    按照这一思路,你现在需要一个 worker 线程的线程池。下面是一篇 IBM 经典教程中的例子——在进行网页检索时通过多线程进行加速。

    #Example2.py
    '''
    A more realistic thread pool example 
    '''
    
    import time 
    import threading 
    import Queue 
    import urllib2 
    
    class Consumer(threading.Thread): 
        def __init__(self, queue): 
            threading.Thread.__init__(self)
            self._queue = queue 
    
        def run(self):
            while True: 
                content = self._queue.get() 
                if isinstance(content, str) and content == 'quit':
                    break
                response = urllib2.urlopen(content)
            print 'Bye byes!'
    
    
    def Producer():
        urls = [
            'http://www.python.org', 'http://www.yahoo.com'
            'http://www.scala.org', 'http://www.google.com'
            # etc.. 
        ]
        queue = Queue.Queue()
        worker_threads = build_worker_pool(queue, 4)
        start_time = time.time()
    
        # Add the urls to process
        for url in urls: 
            queue.put(url)  
        # Add the poison pillv
        for worker in worker_threads:
            queue.put('quit')
        for worker in worker_threads:
            worker.join()
    
        print 'Done! Time taken: {}'.format(time.time() - start_time)
    
    def build_worker_pool(queue, size):
        workers = []
        for _ in range(size):
            worker = Consumer(queue)
            worker.start() 
            workers.append(worker)
        return workers
    
    if __name__ == '__main__':
        Producer()
    

    这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……

    至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。

    何不试试 map

    map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。

        urls = ['http://www.yahoo.com', 'http://www.reddit.com']
        results = map(urllib2.urlopen, urls)
    

    上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:

    results = []
    for url in urls: 
        results.append(urllib2.urlopen(url))
    

    map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。

    为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。

    Map

    在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.dummy.

    这里多扯两句: multiprocessing.dummy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:"嘛,有这么个东西,你知道就成."相信我,这个库被严重低估了!

     

    dummy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 dummy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。
    所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。2

    动手尝试

    使用下面的两行代码来引用包含并行化 map 函数的库:

    from multiprocessing import Pool
    from multiprocessing.dummy import Pool as ThreadPool
    

    实例化 Pool 对象:

    pool = ThreadPool()
    

    这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问。

    Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。

     

    一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。

    pool = ThreadPool(4) # Sets the pool size to 4
    

    线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

    创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

    import urllib2 
    from multiprocessing.dummy import Pool as ThreadPool 
    
    urls = [
        'http://www.python.org', 
        'http://www.python.org/about/',
        'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
        'http://www.python.org/doc/',
        'http://www.python.org/download/',
        'http://www.python.org/getit/',
        'http://www.python.org/community/',
        'https://wiki.python.org/moin/',
        'http://planet.python.org/',
        'https://wiki.python.org/moin/LocalUserGroups',
        'http://www.python.org/psf/',
        'http://docs.python.org/devguide/',
        'http://www.python.org/community/awards/'
        # etc.. 
        ]
    
    # Make the Pool of workers
    pool = ThreadPool(4) 
    # Open the urls in their own threads
    # and return the results
    results = pool.map(urllib2.urlopen, urls)
    #close the pool and wait for the work to finish 
    pool.close() 
    pool.join() 
    

    实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

    # results = [] 
    # for url in urls:
    #   result = urllib2.urlopen(url)
    #   results.append(result)
    
    # # ------- VERSUS ------- # 
    
    
    # # ------- 4 Pool ------- # 
    # pool = ThreadPool(4) 
    # results = pool.map(urllib2.urlopen, urls)
    
    # # ------- 8 Pool ------- # 
    
    # pool = ThreadPool(8) 
    # results = pool.map(urllib2.urlopen, urls)
    
    # # ------- 13 Pool ------- # 
    
    # pool = ThreadPool(13) 
    # results = pool.map(urllib2.urlopen, urls)
    

    结果:

    #        Single thread:  14.4 Seconds 
    #               4 Pool:   3.1 Seconds
    #               8 Pool:   1.4 Seconds
    #              13 Pool:   1.3 Seconds
    

    很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

    另一个真实的例子

    生成上千张图片的缩略图
    这是一个 CPU 密集型的任务,并且十分适合进行并行化。

    基础单进程版本

    import os 
    import PIL 
    
    from multiprocessing import Pool 
    from PIL import Image
    
    SIZE = (75,75)
    SAVE_DIRECTORY = 'thumbs'
    
    def get_image_paths(folder):
        return (os.path.join(folder, f) 
                for f in os.listdir(folder) 
                if 'jpeg' in f)
    
    def create_thumbnail(filename): 
        im = Image.open(filename)
        im.thumbnail(SIZE, Image.ANTIALIAS)
        base, fname = os.path.split(filename) 
        save_path = os.path.join(base, SAVE_DIRECTORY, fname)
        im.save(save_path)
    
    if __name__ == '__main__':
        folder = os.path.abspath(
            '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
        os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
    
        images = get_image_paths(folder)
    
        for image in images:
            create_thumbnail(Image)
    

    上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

    这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

    如果我们使用 map 函数来代替 for 循环:

    import os 
    import PIL 
    
    from multiprocessing import Pool 
    from PIL import Image
    
    SIZE = (75,75)
    SAVE_DIRECTORY = 'thumbs'
    
    def get_image_paths(folder):
        return (os.path.join(folder, f) 
                for f in os.listdir(folder) 
                if 'jpeg' in f)
    
    def create_thumbnail(filename): 
        im = Image.open(filename)
        im.thumbnail(SIZE, Image.ANTIALIAS)
        base, fname = os.path.split(filename) 
        save_path = os.path.join(base, SAVE_DIRECTORY, fname)
        im.save(save_path)
    
    if __name__ == '__main__':
        folder = os.path.abspath(
            '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
        os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
    
        images = get_image_paths(folder)
    
        pool = Pool()
        pool.map(creat_thumbnail, images)
        pool.close()
        pool.join()
    

    5.6 秒!

    虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

    到这里,我们就实现了(基本)通过一行 Python 实现并行化。

    展开全文
  • 本篇文章主要介绍了Python实现数据库并行读取和写入实例,非常具有实用价值,需要的朋友可以参考下
  • Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏”重”。而且往往隔靴搔痒,没有深入探讨...
  • 一行 Python 代码实现并行.pdf
  • 之前由于并行计算的作业被视为类同或抄袭网络,正好最近学习Python,于是刚好在这里作一个补充,新方法采用Python实现并行,一来学习用,二来为了完成作业。 旧作业地址如下: OpenMP计算圆周率 Python并行计算 ...

    前情提要

    之前由于并行计算的作业被视为类同或抄袭网络,正好最近学习Python,于是刚好在这里作一个补充,新方法采用Python实现并行,一来学习用,二来为了完成作业。

    旧作业地址如下:
    OpenMP计算圆周率

    Python并行计算

    Python在并行计算方面使用的是GIL(Global Interperter Lock,全局解释器锁),被认为的多线程其实是伪的,比较鸡肋,但在实验环境中,配合GPU,其实还不是很鸡肋,在此我们就讲解一下;

    Python的原解释器CPython是有GIL的,在执行代码过程中,会产生互斥锁来限制线程对共享资源的访问,而GIL的作用就是,一个进行同一时间只能允许一个线程运算,摆明的单线程啊!

    由于CPython中的GIL的存在我们可以暂时不奢望能在CPython中使用多线程利用多核资源进行并行计算了,因此我们在Python中可以利用多进程的方式充分利用多核资源。

    并行计算的目的是将所有的核心都运行起来以提高代码的执行速度,在python中由于存在全局解释器锁(GIL)如果使用默认的python多线程进行并行计算可能会发现代码的执行速度并不会加快,甚至会比使用但核心要慢!!!

    用到的库

    下面就来简单介绍几个Python并行计算中用到的库,最后再使用Python实现一个小例子,应该可以完成作业了吧?

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。当然我们处理一个简单的例子用哪个都可以。

    这里我们使用Python的multiprocessing模块,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

    • 简单介绍一下multiprocessing模块的用法:

    multiprocessing模块提供了一个Process类来代表一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,start()方法启动,join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

    代码举例(代码经过实验,在Python3上运行):

    #-*-coding=utf-8-*-
    
    from multiprocessing import Process
    import os
    import time
    
    
    def run_process(name):
    	time.sleep(2)
    	print('Run child process %s is (%s). ' %(name, os.getpid()))
    
    	pass
    
    def hello_world():
    	time.sleep(5)
    	print('Run child process is (%s). ' %(os.getpid()))
    
    	pass
    
    if __name__ == "__main__":
    
    	print("Parent process is %s." % (os.getpid()))
    
    	p1 = Process(target=run_process, args=('test', ))
    	p2 = Process(target=hello_world)
    
    	print(" process will start...... ")
    
    	p1.start()
    	p2.start()
    	p1.join()
    
    	print(" process end!")
    

    输出如下:

    output

    一些并行模块通过修改pyhton的GIL机制突破了这个限制,使得Python在多核电脑中也能够有效的进行并行计算。PP(Parallel Python)模块就是其中一种。

    • PP(Parallel Python)模块举例:

    安装pp模块的时候出现了一个问题,在这里做出解释:

    pip install pp==1.6.5
    Collecting pp==1.6.5
      Using cached https://files.pythonhosted.org/packages/14/e9/f69030681985226849becd36b04e2c0cb99babff23c8342bc4e30ded06b2/pp-1.6.5.tar.gz
        Complete output from command python setup.py egg_info:
        Traceback (most recent call last):
          File "<string>", line 1, in <module>
          File "/private/var/folders/gk/4tnnvlmj0zzg74xzmnkq8g0c0000gn/T/pip-install-1pnu5md4/pp/setup.py", line 12, in <module>
            from pp import version as VERSION
          File "/private/var/folders/gk/4tnnvlmj0zzg74xzmnkq8g0c0000gn/T/pip-install-1pnu5md4/pp/pp.py", line 121
            print sout,
                     ^
        SyntaxError: Missing parentheses in call to 'print'. Did you mean print(print sout, end=" ")?
        
    Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/gk/4tnnvlmj0zzg74xzmnkq8g0c0000gn/T/pip-install-xa12jeey/pp/
    

    具体原因是因为pip工具默认调用了python3的语法运行了setup.py脚本,但是该下载脚本默认用的貌似是python2的语法,解决方法如下:

    pip install setuptools --upgrade --user
    

    接着运行:

    pip install pp
    

    出现:

    Collecting pp
      Using cached https://files.pythonhosted.org/packages/14/e9/f69030681985226849becd36b04e2c0cb99babff23c8342bc4e30ded06b2/pp-1.6.5.tar.gz
        Complete output from command python setup.py egg_info:
        Traceback (most recent call last):
          File "<string>", line 1, in <module>
          File "/private/var/folders/gk/4tnnvlmj0zzg74xzmnkq8g0c0000gn/T/pip-install-i2brisl3/pp/setup.py", line 12, in <module>
            from pp import version as VERSION
          File "/private/var/folders/gk/4tnnvlmj0zzg74xzmnkq8g0c0000gn/T/pip-install-i2brisl3/pp/pp.py", line 121
            print sout,
                     ^
        SyntaxError: Missing parentheses in call to 'print'. Did you mean print(print sout, end=" ")?
        
        ----------------------------------------
    Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/gk/4tnnvlmj0zzg74xzmnkq8g0c0000gn/T/pip-install-i2brisl3/pp/
    

    上述原因是使用pip(python3版本)工具时调用了python2的脚本,导致无法正常调用对应版本的setup.py脚本;

    使用pip2安装出现:

     Could not fetch URL https://pypi.python.org/simple/pp/: There was a problem confirming the ssl certificate: [SSL: TLSV1_ALERT_PROTOCOL_VERSION] tlsv1 alert protocol version (_ssl.c:590) - skipping
      Could not find a version that satisfies the requirement pp (from versions: )
    No matching distribution found for pp
    

    所以可能是python3上的pp模块并不是这个名字,我们使用其它方法把这个模块下载下来:

    wget https://files.pythonhosted.org/packages/14/e9/f69030681985226849becd36b04e2c0cb99babff23c8342bc4e30ded06b2/pp-1.6.5.tar.gz
    tar xvfz pp-1.6.5
    cd pp-1.6.5
    python setup.py install
    

    这样我们使用python2.7对pp模块进行了安装,而且pp模块的最新版本1.6.5是只支持python2的;接下来我们在python2.7下对pp模块进行引用,来完成并行计算的例子;

    最后发现个问题,python版本对应有特定的pp版本,并不是不的支持了,详细见网站PP模块版本,但是环境已经配好那就用python2吧暂时;

    • 例子举例,该例子实现了用并行计算的方法解决“从0到给定范围内所有质数的和”:
    #-*-coding=utf-8-*-
    
    import time 
    import math
    
    def isprime(n):
    	if not isinstance(n, int):
    		raise TypeError("argument passed to is_prime is not of 'int' type")
    	if n < 2:
    		return False
    	if n == 2:
    		return True
    	max = int(math.ceil(math.sqrt(n)))
    	i = 2
    	while i <= max:
    		if n % i == 0:
    			return False
    		i += 1
    	return True
    
    def sum_primes(n):
    	return sum([x for x in range(2, n) if isprime(x)])
    
    
    #串行代码
    print("{beg} serial process {beg}".format(beg='-'*16))
    startTime = time.time()
    
    inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700)
    results = [(input,sum_primes(input)) for input in inputs]
    
    for input, result in results:
        print("Sum of primes below %s is %s" % (input, result))
    
    print("use: %.3fs"%( time.time()-startTime))
    
    
    import pp
    
    #并行代码
    print("{beg} parallel process {beg}".format(beg='-'*16))
    startTime = time.time()
    
    job_server = pp.Server()
    
    inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700)
    jobs = [(input, job_server.submit(sum_primes, (input, ), (isprime, ),
            ("math", )) ) for input in inputs]
    
    for input, job in jobs:
        print("Sum of primes below %s is %s" % (input, job()))
    
    print("use: %.3fs"%( time.time()-startTime ) )
    

    运行得出结果:

    output

    实验总结

    这次作业补充让我学习了两个python并行计算的模块,一个是使用多线程方法的mutilprocessing,另一个是真正实现并行计算的pp模块,在今后的学习中,可以利用这些实现的并行计算去加速运行大规模的计算量,另外在深度学习方面我们有强大的numpy以及利用gpu的库,总的来说在并行计算的方面我们还有很多东西可以实践,今天就到这了,希望这个作业可以过。

    展开全文
  • PythonPython 多核并行计算

    万次阅读 多人点赞 2017-07-11 20:24:12
    Python 多核并行计算 Nov 2, 2016•python 以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称核好了),觉得去折腾并行没啥意义(除非在做IO密集型任务)。...
  • 文长度为5200字,建议阅读8分钟本文教你通过一行Python实现并行化。Python在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和GIL,我觉得错误的教学指导才是主要问题。常见的经典Python多线程、...
  • 主要介绍了教你用一行Python代码实现并行任务(附代码),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • python怎么并行

    千次阅读 热门讨论 2018-10-10 18:12:07
    python怎么并行 为了保证多线程数据安全,python语言的设计中,有个全局解释锁GIL(global interpretor lock),每个线程在开始运行时必须获得锁,遇到I/O或sleep挂起时释放锁,从而保证同一时刻只有一个线程在运行,...
  • 以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程...Python 里面有 multiprocessing和 threading 这两个用来实现并行的库。用线程应该是很自然的想法,毕竟(直觉上)开
  • Python并行执行for循环

    万次阅读 多人点赞 2018-12-04 20:52:55
    在介绍如何最简单地利用 python 实现并行前,我们先来看一个简单的代码。 words = ['apple', 'bananan', 'cake', 'dumpling'] for word in words: print word 上面的例子中,我们用一个 for 循环打印出 words ...
  • python并行

    2018-07-03 10:00:12
    python并行实现,通用性强,有使用价值,有实用价值
  • Python 多核并行计算 Nov 2, 2016 • python 以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称核好了),觉得去折腾并行没啥意义(除非在做IO密集型任务...
  • Python实现多核心并行计算

    千次阅读 2019-07-03 15:06:24
    Python实现多核心并行计算 原文链接 平常写的程序,无论是单线程还是多线程,大多只有一个进程,而且只能在一个核心里工作。所以很多应用程序即使正在满载运行,在任务管理器中CPU使用量还是只有50%(双核CPU)或...
  • Python 多核并行计算

    千次阅读 2017-04-10 10:12:50
    Python 多核并行计算 转载自:https://abcdabcd987.com/python-multiprocessing/ 以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称核好了),觉得去...
  • python进行并行计算

    2020-03-15 16:58:12
    python进行并行计算 有的时候在我们在用python进行跑单个程序的时候耗费时间比较多,占用的资源又比较少,而且可能需要多次测试调参,这个时候如果可以一次跑多个程序的话就会很方便了。本期分享一下python如何利用...
  • Python并行--基于joblib

    千次阅读 2020-07-23 19:21:21
    Python并行–基于joblib Python并行远不如Matlab好用。比如Matlab里面并行就直接把for改成parfor就行(当然还要注意迭代时下标的格式),而Python查 一查并行,各种乱七八糟的方法一大堆,而且最不爽的一点就是...
  • A S imple PAR基础上,多包Python的等位基因Map实现。 用法 例如,要使用两个并行的进程对数组[1, 2, 3, 4, 5]中的所有元素求和1,并在终端上打印输出,则应编写: from sparmap import parmap for result in ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 82,200
精华内容 32,880
关键字:

python实现并行

python 订阅