精华内容
下载资源
问答
  • python threadpool
    2021-01-14 06:38:16

    threadpool模块是一个很老的实现python线程池的模块,pypi已经建议用multiprocessing代替它了,但是,它使用的便捷性还是征服了一批忠实用户。

    threadpool模块实现多线程只需要如下几行代码:

    from threadpool import *

    pool = ThreadPool(poolsize)

    requests = makeRequests(some_callable, list_of_args, callback)

    [pool.putRequest(req) for req in requests]

    pool.wait()

    它通过传入一个参数组来实现多线程,并且它的多线程是有序的,顺序与参数组中的参数顺序保持一致。

    用它传递参数组的例子如下:

    #----------------------------------------------------------------------

    def hello(m, n, o):

    """"""

    print "m = %s, n = %s, o = %s"%(m, n, o)

    if __name__ == '__main__':

    # 方法1

    lst_vars_1 = ['1', '2', '3']

    lst_vars_2 = ['4', '5', '6']

    func_var = [(lst_vars_1, None), (lst_vars_2, None)]

    # 方法2

    dict_vars_1 = {'m':'1', 'n':'2', 'o':'3'}

    dict_vars_2 = {'m':'4', 'n':'5', 'o':'6'}

    func_var = [(None, dict_vars_1), (None, dict_vars_2)]

    pool = threadpool.ThreadPool(2)

    requests = threadpool.makeRequests(hello, func_var)

    [pool.putRequest(req) for req in requests]

    pool.wait()

    更多相关内容
  • from multiprocessing.dummy import Pool as ThreadPool from collections import defaultdict uriv = defaultdict(list) def aas(): global b b =[2] def process(item): i = item[0] a = item[1] c = a + b[0...

    需求

    做实验的时候,为了加速处理数据的速度,采用多线程的方式来实现。

    实现

    from multiprocessing.dummy import Pool as ThreadPool
    from collections import defaultdict
    uriv = defaultdict(list)
    def aas():
        global b
        b =[2]
    def process(item):
        i = item[0]
        a = item[1]
        c = a + b[0]
        uriv[int(c)].append(i)
    aas()
    items = ['张三', '李四', '王五', '赵六']
    a = [i for i in range(4)]
    z = zip(items,a)
    pool = ThreadPool()
    pool.map(process, z)
    pool.close()
    pool.join()
    
    print(uriv)
    

    结果:

    defaultdict(<class 'list'>, {2: ['张三'], 3: ['李四'], 4: ['王五'], 5: ['赵六']})
    
    展开全文
  • python threadpool线程池的使用

    千次阅读 2019-07-29 15:48:02
    python threadpool 1.不使用线程池 import time def sayhello(str): print('hello', str) time.sleep(2) name_list = ['wangfei', 'aa', 'bb', 'cc'] start_time = time.time() for i in range(len(name_list)...

    python threadpool
    1.不使用线程池

    import time
    
    
    def sayhello(str):
        print('hello', str)
        time.sleep(2)
    
    
    name_list = ['wangfei', 'aa', 'bb', 'cc']
    start_time = time.time()
    for i in range(len(name_list)):
        sayhello(name_list[i])
    
    print('%d second' % (time.time()-start_time))
    

    打印结果
    hello wangfei
    hello aa
    hello bb
    hello cc
    8 second

    2.使用线程池

    import time
    import threadpool
    
    
    def sayhello(str):
        print('hello', str)
        time.sleep(2)
    
    
    name_list = ['wangfei', 'aa', 'bb', 'cc']
    start_time = time.time()
    # 定义了一个线程池,最多创建10个线程
    pool = threadpool.ThreadPool(10)
    # 创建要开启多线程的函数,以及函数相关参数和回调函数,其中回调数可以不写,default是none
    requests = threadpool.makeRequests(sayhello, name_list)
    # 将所有要运行多线程的请求扔进线程池
    [pool.putRequest(req) for req in requests]
    # 所有的线程完成工作后退出
    pool.wait()
    print('%d second' % (time.time()-start_time))
    

    打印结果
    hello wangfei
    hello aa
    hello bb
    hello cc
    2 second

    3.使用线程池多值传参的情况

    def hello(m, n, o):
        print("m = %s, n = %s, o = %s" % (m, n, o))
    
    
    if __name__ == '__main__':
        # # 方法1
        # lst_vars_1 = ['1', '2', '3']
        # lst_vars_2 = ['4', '5', '6']
        # func_var = [(lst_vars_1, None), (lst_vars_2, None)]
    
        # 方法2
        dict_vars_1 = {'m': '1', 'n': '2', 'o': '3'}
        dict_vars_2 = {'m': '4', 'n': '5', 'o': '6'}
        func_var = [(None, dict_vars_1), (None, dict_vars_2)]
        pool = threadpool.ThreadPool(2)
        requests = threadpool.makeRequests(hello, func_var)
        # [pool.putRequest(req) for req in requests]
        print(requests)
        for req in requests:
            print(req)
            pool.putRequest(req)
        pool.wait()
    

    打印结果
    [<threadpool.WorkRequest object at 0x0000023146648550>, <threadpool.WorkRequest object at 0x0000023146648588>]
    <WorkRequest id=2410657645904 args=[] kwargs={‘m’: ‘1’, ‘n’: ‘2’, ‘o’: ‘3’} exception=False>
    <WorkRequest id=2410657645960 args=[] kwargs={‘m’: ‘4’, ‘n’: ‘5’, ‘o’: ‘6’} exception=False>
    m = 1, n = 2, o = 3
    m = 4, n = 5, o = 6

    展开全文
  • python threadpool

    2012-02-22 15:39:49
    python threadpool python多线程
  • 一、Threadpool,需要通过pip安装包 [sudo] pip install threadpool 旧的线程方法,建议使用multiprocessing &gt;&gt;&gt; pool = ThreadPool(poolsize) &gt;&gt;&gt; requests = make...

    一、Threadpool,需要通过pip安装包 [sudo] pip install threadpool

    旧的线程方法,建议使用multiprocessing

    >>> pool = ThreadPool(poolsize)
    >>> requests = makeRequests(some_callable, list_of_args, callback)
    >>> [pool.putRequest(req) for req in requests]
    >>> pool.wait()
    二、multiprocessing 标准库中有,通过创建 Process对象
    1、pool类
    from multiprocessing import Pool
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        with Pool(5) as p:
            print(p.map(f, [1, 2, 3]))
    2、process类  (start和join方法)
    
    from multiprocessing import Process
    
    def f(name):
        print('hello', name)
    
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()

    3、线程启动的三种方法

    spawn 父进程产出子进程,只会继承run()方法的部分资源

    fork 通过 os.fork()产出子进程,全部继承,不安全

    forkserver,单线程,安全,不全部继承资源

     通过set_start_method()选择创建进程方法,最多只能用一次

    通过get_context()一个程序可以多个启动方法,context对象和 multiprocessing具有相同的api:

    import multiprocessing as mp
    
    def foo(q):
        q.put('hello')
    
    if __name__ == '__main__':
        mp.set_start_method('spawn')
        q = mp.Queue()
        p = mp.Process(target=foo, args=(q,))
        p.start()
        print(q.get())
        p.join()
    
    if __name__ == '__main__':
        ctx = mp.get_context('spawn')
        q = ctx.Queue()
        p = ctx.Process(target=foo, args=(q,))
        p.start()
        print(q.get())
        p.join()
    
    

    三、multiprocessing进程间通信

    支持两种方法Queues和Pipes

    Queue同queue.Queue,进程和线程安全
    from multiprocessing import Process, Queue
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    Pipe()返回相互连接的对象,全双工,每个对象都有 send()and recv()方法,如果同时读写,可能会冲突
    
    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()

     

    线程同步,和threading中有对应元素,比如可以用lock保证数据同步打印
    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    进程间共享状态(应尽量避免)

    共享内存,通过Value or Array共享数据

    from multiprocessing import Process, Value, Array
    
    def f(n, a):
        n.value = 3.1415927
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        num = Value('d', 0.0)
        arr = Array('i', range(10))
    
        p = Process(target=f, args=(num, arr))
        p.start()
        p.join()
    
        print(num.value)
        print(arr[:])

    服务器进程:Manager类的对象可以管理python类型的数据,其他进程可以通过代理访问其数据,manager对象支持如下类型

    listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)
    工作进程池
    from multiprocessing import Pool, TimeoutError
    import timeimport os
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        # start 4 worker processes
        with Pool(processes=4) as pool:
            # print "[0, 1, 4,..., 81]"
            print(pool.map(f, range(10)))
    

    除了map方法,还有imap_unordered()无序计算,apply_async()同步

    from multiprocessing import Pool, TimeoutError
    import time
    import os
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        # start 4 worker processes
        with Pool(processes=4) as pool:
            # print "[0, 1, 4,..., 81]"
            print(pool.map(f, range(10)))
    
    pool.imap_unordered(f, range(10))
    res = pool.apply_async(time.sleep, (10,))

    三、多进程和多线程区别

    Python多线程不能利用CPU多核优势,IO密集型可用多线程,CPU密集型适合用多进程

    定义需要并发处理的函数

    import random
    def Test(a, b):
        time.sleep(random.randint(5, 20))
        print(str(a) + '_' + str(b) + '\t'

    线程池


    • import random
      import threadpool
      def MultiThreadTest():
          pool = threadpool.ThreadPool(20)
          li = []
          for i in range(1000):
              li.append((None, {'a': i, 'b': i + 10}))
          requests = threadpool.makeRequests(Test, li)
          [pool.putRequest(req) for req in requests]
          pool.wait()
    • 进程池
    • import multiprocessing
      def MultiProcessTest():
          pool = multiprocessing.Pool(processes = 4)
          for i in range(1000):
              pool.apply_async(Test, (i, i + 10, ))
          pool.close()
          pool.join()
    • 共享数据

      多线程可以用Python的Queue共享数据,多进程要用multiprocessing.Queue。

    • import multiprocessing
      def Test(a, b, mpDict):
          print(str(a) + "test", b)
          mpDict[str(a) + "test"] = b
      def MultiProcessTest():
          pool = multiprocessing.Pool(processes=4)
          mpDict = multiprocessing.Manager().dict()
          for i in range(5):
              pool.apply_async(Test, (i, i + 10, mpDict, ))
          pool.close()
          pool.join()
          traditionDict = dict(mpDict)
          print(traditionDict)
    • 生产者-消费者 模型

    • Pool 共享 Queue 有个 multiprocessing.Queue() 只支持 Process 出来的进程,不支持 Pool 的,在 Pool 中需要使用 multiprocessing.Manager()

     

    # 生产者
    def write(q):
        a = np.random.randint(0, 100, (100, 2, 2))
        for value in range(a.shape[0]):
            print('Put %s to queue...\n' % a[value])
            q.put(a[value])
            print(q.qsize())
    # 消费者:
    def read(q):
        while True:
            # get的参数是 block=True, timeout=None
            # block表示队列空时是阻塞等待还是抛出异常
            # timeout指等待一定时间抛出异常,还是无限等待。
            value = q.get(True)
            print('Get %s from queue.\n' % value)
            print(q.qsize())
            time.sleep(random.random())
    def test_pool():
        manager = mp.Manager()
        q = manager.Queue(2)
        pw = Process(target=write, args=(q,))
        pw.start()
        worker_num = 4
        pool = mp.Pool(processes=worker_num)
        for i in range(worker_num):
            print('start data worker ' + str(i))
            pool.apply_async(read, (q, ))
        pool.close()
        pw.join()
        pool.join()
    1.  
    展开全文
  • import timeimport threadpooldef sayhello(str):print "Hello ",strtime.sleep(2)name_list =['xiaozi','aa','bb','cc']start_time = time.time()pool = threadpool.ThreadPool(10)requests = threadpool.makeReque...
  • 你好楼主关于threadpool的写法是这样的,然后楼主讲这个代码丢到了celery里面每分钟执行一次每次执行大概4-5s的样子但是跑一段时间就会报出can'tstartnewthread求大神告知是为什么poo...你好 楼主关于threadpool的...
  • python ThreadPool多线程

    2021-02-08 21:48:47
    使用pandas.read_excel读取数据虽然比较方便,但是如果涉及excel文件过多时,读取数据速度会大大减慢,如果是从excel文件中在指定位置拿取部分数据,速度会更加慢,使用ThreadPool线程池可以适当加快数据读取速度。...
  • python线程池threadpool

    千次阅读 2022-01-17 20:43:13
    今天在学习python进程与线程时,无意间发现了线程池threadpool模块 模块使用非常简单,前提是得需要熟悉线程池的工作原理。 我们知道系统处理任务时,需要为每个请求创建和销毁对象。当有大量并发任务需要处理时,再...
  • 简单实现了一下threadpool,避免了只会调用却不知道如何运行的尴尬,详细代码如下: import queue import threading import contextlib import time """ author:LancerWu email: wuxs231@163.com description: 一...
  • python threadpool使用

    2018-11-20 11:28:23
    import time ...import threadpool def test(a, b): print threading.current_thread(), a, b time.sleep(1) # 参数格式 data =[(['1','2'], None), (['3','4'], None)] start_time = time.t...
  • python threadpool 多参数处理

    千次阅读 2016-08-03 16:30:42
    #-----------------------------------------------... pool = threadpool.ThreadPool(2) requests = threadpool.makeRequests(hello, func_var) [pool.putRequest(req) for req in requests] pool.wait()
  • Python线程池ThreadPool

    2021-10-20 20:20:54
    于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。 微信小程序搜索:Python面试宝典 或可关注原创个人博客:...
  • 主要介绍了python线程池(threadpool)模块使用笔记详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • 需求加入我们需要处理一串个位数(0~9),奇数时需要循环打印它;偶数则等待对应时长并完成所有任务;0则是错误,但不需要终止任务,可以自定义一些...threadpool定义线程池并发实现# -*- coding: utf-8 -*-from ...
  • 一、安装pip install threadpool二、使用介绍(1)引入threadpool模块(2)定义线程函数(3)创建线程 池threadpool.ThreadPool()(4)创建需要线程池处理的任务即threadpool.makeRequests()(5)将创建的多个任务...
  •  在 ThreadPool 版本和 multiprocessing.Pool 版本的脚本执行期间,通过如下命令观察 python 进程数。  可以观察到,当 processes=8 时,ThreadPool 版本只有 1 个进程,而 multiprocessing.Pool 版本有 9 个...
  • from threadpool import ThreadPool, makeRequests def call_func(info):  try:  tn = info['tn']  tag = info['tag']  # do  except Exception as e:  print("error :
  • 1.对单个元素的函数使用线程池: # encoding:utf-8 ...import threadpool def func(name): print 'hi {}\n'.format(name) if __name__ == '__main__': data = ['xijun.gong', 'xijun', 'gxjun'] ...
  • python多线程 通过创建线程池threadpool的方法,向方法中传入多个参数,同时获取方法的返回值。threadpool是第三方模块,需要先进行安装, pip instll threadpool import threadpool #多线程执行的方法 def retry_...
  • 需求加入我们需要处理一串个位数(0~9),奇数时需要循环打印它;偶数则等待对应时长并完成所有任务;0则是错误,但不需要终止任务,可以自定义一些...threadpool定义线程池并发实现# -*- coding: utf-8 -*-from ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,581
精华内容 2,632
关键字:

python threadpool