精华内容
下载资源
问答
  • multiprocessing 模块是跨平台版本的多进程模块,像线程一样管理进程,与 threading 很相似,对多核CPU的利用率会比 threading 好的多。 Pool 可以提供指定数量的进程供用户调用,当有新的请求提交到Pool时,...

    简介

    可以使用 Pool来实现多进程并行。

    Pool 模块来自于 multiprocessing 模块。

    • multiprocessing 模块是跨平台版本的多进程模块,像线程一样管理进程,与 threading 很相似,对多核CPU的利用率会比 threading 好的多。
    • Pool 类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

    函数

    apply()

    函数原型:apply(func[, args=()[, kwds={}]])

    该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。

    apply_async()

    函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

    与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。

    map()

    函数原型:map(func, iterable[, chunksize=None])

    Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。

    map_async()

    函数原型:map_async(func, iterable[, chunksize[, callback]])

    与map用法一致,但是它是非阻塞的。其有关事项见apply_async。

    阻塞与非阻塞的讲解见下面备注。

    close()

    关闭进程池(pool),使其不在接受新的任务。

    terminal()

    结束工作进程,不在处理未处理的任务。

    join()

    主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

    示例

    比如我想同时让服务器执行多条 hive 命令,可编程如下:

    from multiprocessing import Pool
    import subprocess
    
    # 定义所有并行语句都回调用的函数
    def run_sh(sh):
        '''
        执行一行shell命令
        '''
        (statusLoad, outputLoad) = subprocess.getstatusoutput(sh)    
        return (statusLoad, outputLoad)
    
    # 将需要执行的多条语句放入到一个list中
    sh_list = []
    sh_list.append('hive -e "select * from A" > A_result')
    sh_list.append('hive -e "select * from B" > B_result')
    sh_list.append('hive -e "select * from C" > C_result')
    
    # 开始并行
    pool = Pool(len(sh_list))
    pool.map(run_sh, sh_list) # 表示将 sh_list 每个元素作为参数递给 run_sh
    pool.close() # 将进程池关闭,不再接受新的进程
    pool.join() # 主进程阻塞,只有池中所有进程都完毕了才会通过
    
    # 开始处理结果文件,此时三个 *_result 文件肯定是存在并且已经写入完毕的
    

    备注

    1、阻塞与非阻塞的区别

    map() 会使进程阻塞,即通过 map() 开启的多进程都结束之后,这个函数才会有返回结果,否则主进程会一直等待,不会往下进行 。

    map_async() 为非阻塞,即通过 map_async() 开启多进程之后,立刻会返回结果,主进程会继续往下执行。

    注意:

    如果后面调用了 join() 函数,则不管之前用的是 map 还是 map_async,主进程都会等待,直到进程池中所有进程执行完毕,才会继续往下执行。

    2、starmap 函数

    Pool 类中,python 3.X 还引入了 starmap 函数,与 map 的区别在于, starmap 支持将多个参数放入到队列中,不同参数按照顺序以元组形式存放,举例如下:

    from multiprocessing import Pool
    def func(a, b):
        print(a + b)
    
    if __name__=="__main__":
        args = [(1,2),(3,4),(5,6)]
        pool = Pool(3)
        pool.starmap(func, args)    
    

    输出

    3
    7
    11
    

    内存共享问题

    多进程并行有一个特点:多个进程之间并不能共享内存。

    比如一个人写出了以下代码,期望可以对同一个数进行累加:

    from multiprocessing import Pool
    def func(dic, c):
        dic['count'] += c
    
    if __name__=="__main__":
        d = dict() 
        d['count'] = 0
        args = [(d, 1), (d, 2), (d, 3)]
        pool = Pool(3)
        pool.starmap(func, args)   
        pool.close()
        pool.join()
        print(f'dic={d}')
    

    但是输出为:

    dic={'count': 0}

    不是我们想要的结果。

    这是因为,多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改。

    解决办法

    可以使用 multiprocessing.Manager 来创建对象,这样的对象可以被共享,如:

    from multiprocessing import Pool, Manager
    def func(dic, c):
        dic['count'] += c
    
    if __name__=="__main__":
        d = Manager().dict()  #生成一个字典,可以在多个进程中传递和共享。
        d['count'] = 0
        args = [(d, 1), (d, 2), (d, 3)]
        pool = Pool(3)
        pool.starmap(func, args)   
        pool.close()
        pool.join()
        print(f'dic={d}')
    

    输出是我们所期望的:

    dic={'count': 6}
    

    Manager() 内部有加锁机制,不允许两个进程同时修改一份数据,因为进程的数据是独立的,因此数据是安全的。

    另外,如果只要求并行,不要求必须是多进程,可以使用多线程来实现共享数据。 参照python技巧——使用threadpool实现多线程并行

    展开全文
  • 引入 在进入多进程的学习之前, 一定需要先了解一个应用程序是如何开启一个进程的, 以及操作...multiprocess 模块是 Python 多进程管理模块 2.multiprocessing 模块简介 python中的多线程无法利用多核优势,如果想

    引入

    在进入多进程的学习之前, 一定需要先了解一个应用程序是如何开启一个进程的, 以及操作系统对进程是如何进行分配资源的, 进程、线程、进程池、进程三态、同步、异步、并发、并行、串行的概念也要非常的明确, 下面将介绍 Python 并发编程之多进程

    一.multiprocessing 模块介紹

    1.什么是 multiprocessing 模块

    • multiprocess 模块是 Python 中的多进程管理模块

    2.multiprocessing 模块简介

    • python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程, Python提供了multiprocessing

    3.multiprocessing 模块的作用

    • multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似
    • multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件

    ps : 值得注意的是 : 与线程不同,进程没有任何共享状态,多个进程的内存空间相互物理隔离, 进程修改的数据,改动仅限于该进程内

    二.Process类介绍

    multiprocessing 模块提供了 Process 类,该类可用来在 Windows 平台上创建新进程

    使用 Process 类创建实例化对象,其本质是调用该类的构造方法创建新进程

    Process([group [, target [, name [, args [, kwargs]]]]])  # 实际上是调用了下面的构造方法
    def __init__(self,group=None,target=None,name=None,args=(),kwargs={})
    

    值得注意的是 :

    ​ 参数的指定需要使用关键字的方式

    args 指定的值是为 target 指定的函数的位置参数, 并且是一个元组形式, 一个值必须带逗号

    • 参数含义 :

    参数名 说明
    group 该参数未进行实现,不需要传参
    target 为新建进程指定执行任务,也就是指定一个函数
    name 为新建进程设置名称
    args 为 target 参数指定的参数传递非关键字参数
    kwargs 为 target 参数指定的参数传递关键字参数
    • 常用方法

    方法 作用
    run( ) 第 2 种创建进程的方式需要用到,继承类中需要对方法进行重写,该方法中包含的是新进程要执行的代码
    start( ) 和启动子线程一样,新创建的进程也需要手动启动,该方法的功能就是启动新创建的线程
    join([timeout]) 主线程等待子进程终止(强调:是主线程处于等的状态,而p是处于运行的状态),timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
    is_alive( ) 判断当前进程是否还活着
    terminate( ) 中断该进程
    • 常用属性

    属性 作用
    name 可以为该进程重命名,也可以获得该进程的名称。
    daemon 和守护线程类似,通过设置该属性为 True,可将新建进程设置为“守护进程”
    pid 返回进程的 ID 号。大多数操作系统都会为每个进程配备唯一的 ID 号
    exitcode 进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    authkey 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网

    三.Process类创建子进程的两种方式

    0.Process 类使用的注意点

    WindowsProcess( ) 必须放在 if __name__ == '__main__': 之下

    • 这是 Windows上多进程的实现问题, 在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错, 所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候如果不是当前执行文件就不会执行 Process, 也就不会无限递归了 (Linux上没有这个问题)

    ps : fork 是 OS提供的方法 os.fork(), 该方法可以在当前程序中再创建出一个进程, 但是在 Windows 平台上无效, 只在 Linux, UNIX, Mac OSX上有效

    1.开启子进程方式一

    • 直接创建 Process 类的实例对象,由此就可以创建一个新的进程
    from multiprocessing import Process
    import time,os
    
    def test(n):
        print(f"父进程{os.getppid()},紫禁城{os.getpid()}")
        time.sleep(n)
        print(f"父进程{os.getppid()},紫禁城{os.getpid()}")
    
    if __name__ == '__main__':
        p = Process(target=test,args=(2,))
        p.start()  # 做发起系统调用的活
        print(f"当前执行文件{os.getpid()}")
    
    '''
    当前执行文件16860
    父进程16860,紫禁城6404
    父进程16860,紫禁城6404
    '''
    

    2.开启子进程方式二

    • 通过继承 Process 类的子类,创建实例对象,也可以创建新的进程
    • 继承 Process 类的子类需重写父类的 run( ) 方法
    from multiprocessing import Process
    import time,os
    
    class MyProcess(Process):
        def __init__(self,n):
            super().__init__()
            self.n = n
    
        def run(self) -> None:
            print(f"父进程{os.getppid()},紫禁城{self.pid}")
            time.sleep(self.n)
            print(f"父进程{os.getppid()},紫禁城{os.getpid()}")
    
    if __name__ == '__main__':
        p = MyProcess(2)
        p.start()
        print(f"当前执行文件{os.getpid()}")
        
    '''
    当前执行文件8136
    父进程8136,紫禁城1280
    父进程8136,紫禁城1280
    '''
    

    四.验证进程的内存空间是相互隔离的

    from multiprocessing import Process
    import time
    
    x = 222
    
    def test():
        global x
        x = 111
    
    if __name__ == '__main__':
        p = Process(target=test)
        p.start()     # 发送系统调用
        time.sleep(1) # 等待子进程运行完
        print(x)      # 222 (还是原来的)
    

    子进程 test 函数中声明全局变量 x, 并修改 x 的值, 等待子进程运行完毕, 最后打印 x , 发现 x 的值并没有改变

    五.Process 对象的 join 方法

    • 让父进程等待子进程的终止, 父进程在等, 子进程在运行
    from multiprocessing import Process
    
    x = 222
    
    def test():
        global x
        x = 111
    
    if __name__ == '__main__':
        p = Process(target=test)
        p.start()   # 发送系统调用
        p.join()    # 等待子进程运行完(之前我们使用sleep并不能精确的知道子进程结束运行的时间)
        print(x)    # 222 (还是原来的)
    
    • 参数 timeout 是可选的超时间, 等多久就不等了
    from multiprocessing import Process
    
    x = 222
    
    def test():
        global x
        x = 111
    
    if __name__ == '__main__':
        p = Process(target=test)
        p.start()        # 发送系统调用
        p.join(0.001)    # 等待 0.001 秒就不等了
    
    • 注意点 : start() 只是发起系统调用, 并不是运行子进程, 当 start() 执行完后紧接着就执行后面的代码
    • start() 发起调用之后, 是通知操作系统创建一个子进程, 操作系统需要申请一个内存空间, 将父进程的数据复制一份到子进程的内存空间中作为初始化用 (Linux是将父进程的数据原原本本的复制一份, 而Windows 稍有些不同), 然后子进程才运行起来
    import time,os
    
    def test(n):
        time.sleep(n)
        print(f"父进程{os.getppid()} 子进程{os.getpid()}")
    
    if __name__ == '__main__':
        p1 = Process(target=test,args=(3,))
        p2 = Process(target=test,args=(2,))
        p3 = Process(target=test,args=(1,))
    
        p1.start()  # 用时 3 秒
        p2.start()  # 用时 2 秒
        p3.start()  # 用时 1 秒
    
        start_time = time.time()
        p1.join()
        p2.join()
        p3.join()   # 三个进程都在并发的运行, 主进程一共运行3秒多
        stop_time = time.time()
        print(f'主进程{os.getpid()} 用时{stop_time-start_time}')
        
    '''
    父进程10888 子进程6792
    父进程10888 子进程13368
    父进程10888 子进程14800
    主进程10888 用时3.131737470626831
    '''
    

    六. Process 对象其他常用方法介绍

    1.terminate( ) : 关闭进程

    2.is_alive( ) : 查看进程是否存活

    from multiprocessing import Process
    import time
    
    def test():
        time.sleep(1)
    
    if __name__ == '__main__':
        p = Process(target=test,args=(2,))
        p.start()
        p.terminate()  # 只是发起系统调用, 通知操作系统关闭子进程
        print(p.is_alive())  # True
    

    由上面可知 terminate() 只是发起系统调用, 并不是立即关闭子进程, 操作系统关闭子进程回收资源也要一小会, 我们可以使用sleep简单延时

    from multiprocessing import Process
    import time
    
    def test():
        time.sleep(1)
    
    if __name__ == '__main__':
        p = Process(target=test,args=(2,))
        p.start()
        p.terminate()   # 只是发起系统调用, 通知操作系统关闭子进程
        time.sleep(0.1) # 稍微延时一点
        print(p.is_alive())  # False
    

    3.name : 为新建进程设置名字

    4.pid : 进程号

    from multiprocessing import Process
    import time,os
    
    class MyProcess(Process):
        def __init__(self,n,name):
            super().__init__()
            self.n = n
            self.name = name
    
        def run(self) -> None:
            time.sleep(self.n)
            print(f"子进程pid:{self.pid}")    # 子进程pid:14156
            print(f"子进程模块名:{__name__}")  # 子进程模块名:__mp_main__
            print(f"子进程名:{self.name}")    # 子进程名:aaaa
    
    if __name__ == '__main__':
        p = MyProcess(1,"aaaa")
        p.start()
        p.join()
        print(f"打印子进程pid:{p.pid}")       # 打印子进程pid:14156
        print(f"打印主进程pid:{os.getpid()}") # 打印主进程pid:16340
        print(f"子进程名:{p.name}")           # 子进程名:aaaa
        print(f"主进程模块名:{__name__}")      # 主进程模块名:__main__
    

    __name__ : Python中每个模块都有自己的名字, __name__是一个系统变量, 是模块的标识符, 值是模块的名称, 并且在自身模块中:__name__的值等于__mian__

    展开全文
  • multiprocessing 包同时提供本地和远程并发,使用进程代替线程,有效避免了Python中GIL锁????( Global Interpreter Lock )带来的影响。通过它能充分利用机器上的多核,加快处理速度。 注:最新内容移步官网...

    前言

    在这里插入图片描述
    在Python中,multiprocessing 是一个用于产生进程的包,它具有与用于产生线程的包threading相似的API。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免了Python中GIL锁🔒( Global Interpreter Lock )带来的影响。通过它能充分利用机器上的多核,加快处理速度。

    注:最新内容移步官网查看,本文仅对个人使用过程中的一些体会作记录和注解。

    multiprocessing核心模块

    Process 类

    在 multiprocessing 中,一般通过创建一个Process对象然后调用其start()方法来生成进程。在官网中给出的一个简单的多进程程序示例:

    from multiprocessing import Process
    
    def f(name):
        print('hello', name)
    
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

    Process类的定义如下:

    class multiprocessing.Process(group=None, target=None, name=None,
    args=(), kwargs={}, *, daemon=None)

    需要注意:
    Process 类拥有和 threading.Thread 等价的大部分方法。应始终使用关键字参数调用构造函数。 group 应该始终是 None ;它仅用于兼容 threading.Thread 。 target 是由 run() 方法调用的可调用对象。它默认为 None ,意味着什么都没有被调用。 name 是进程名称。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。如果提供,则键参数 daemon 将进程 daemon 标志设置为 True 或 False 。如果是 None (默认值),则该标志将从创建的进程继承。默认情况下,不会将任何参数传递给 target 。如果子类重写构造函数,必须确保它在对进程执行任何其他操作之前调用基类构造函数( Process.init() )。

    run()

    表示进程活动的方法。可以在子类中重载此方法。例如:

    class RecordSourceConsumeProcess(Process):
        def __init__(self, record_pid, CPU_use, GPU_use):
            """
            : 用于统计指定pid的进程对系统资源的消耗情况,包括CPU,GPU
            : record_pid 指定进程的pid
            : CPU_use 用于记录CPU消耗的list
            : GPU_use 用于记录GPU消耗的list
            """
            super().__init__()
            self.record_pid = record_pid
            self.CPU_use = CPU_use
            self.GPU_use = GPU_use
        
        def run(self):
            print('对PID: '+str(self.record_pid)+" 的资源消耗统计进程开始运行")
            while True:#除非主进程强行关闭该子进程,否则一直运行不结束
                    self.CPU_use.append(self.cpu_record())
                    self.GPU_use.append(self.gpu_record())
    

    标准的 run() 方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 args 和 kwargs 参数中获取顺序和关键字参数。

    start()

    启动进程活动。这个方法每个进程对象最多只能调用一次。它会将对象的 run() 方法安排在一个单独的进程中调用。

    • 注:经过实际使用发现,如果run()方法未被重载(即标准Process实例),start()后只会启动一个子进程。如果run()方法被重载,start()后会启动两个子进程,且两个子进程具有同一个父进程。当以spawn方式启动多进程时,会发现启动了三个子进程,一个为信号量跟踪进程(multiprocessing.semaphore_tracker.main),另外两个是实际功能的子进程(multiprocessing.spawn.spawn_main)。

    只有调用start()后才会真正的启动进程,才能获取到pid和name等信息。

    • 注:需要注意只有start()后才能获取子进程的pid。

    join([timeout])

    如果可选参数 timeout 是 None (默认值),则该方法将阻塞,直到调用 join() 方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回 None 。检查进程的 exitcode 以确定它是否终止。一个进程可以被 join 多次。进程无法join自身,因为这会导致死锁。尝试在启动进程之前join进程是错误的。

    在主进程中对创建的子进程使用join()方法,在子进程结束前将会阻塞主进程,也就是只有子进程运行结束主进程才能继续运行和结束。

    import multiprocessing as mp
    
    def mp_test():
        g = mp.Process(name='test1', target=main_test1)
        h = mp.Process(name='test2', target=main_test2)
        g.start()
        h.start()
        print('g_pid: ',g.pid)#只有调用start()后才会真正的启动进程,才能获取到pid和name等信息
        print('g_name: ',g.name)
        print('h_pid: ',h.pid)
        print('h_name: ',h.name)
        g.join()#如果不调用join方法,mp_test有可能在g和h前结束
        h.join()#但是调用后mp_test必须等g和h结束后才能继续运行和结束
        for i in range(1000*1000*100):
            pass
    

    name

    进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。

    初始名称由构造器设定。 如果没有为构造器提供显式名称,则会构造一个形式为 'Process-N1:N2:...:Nk' 的名称,其中每个 Nk 是其父亲的第 N 个孩子。

    is_alive()

    返回进程是否还活着。

    粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。

    if(p.is_alive()):#判断进程p是否还活着
        p.terminate()#p.kill()只有py3.7及以上支持,使用terminate
    

    daemon

    进程的守护标志,一个布尔值。这必须在 start() 被调用之前设置。

    s = Process(name='test', target=test1, args=(task_flag))
    s.daemon = True#开启进程守护,在 start() 被调用之前设置
    s.start()
    

    默认为None,初始值继承自创建进程。

    注意:

    1. 当某个进程退出时,它会尝试终止其开启的所有守护进程子进程。

    2. 不允许守护进程创建子进程!!!否则,当创建守护进程的主进程(认为是父进程)结束后,守护进程(认为是子进程)也会被关闭,而如果守护进程也创建了子进程,那这些守护进程的子进程(认为是孙子进程)将变成孤儿进程。当尝试这么做时,会报错:AssertionError: daemonic processes are not allowed to have children

    在Python 3.3 版中: 加入 daemon 参数。

    pid

    返回进程ID。在生成该进程(start)之前,这将是 None 。

    exitcode

    子进程的退出代码。如果进程尚未终止,这将是 None 。负值 -N 表示子进程被信号 N 终止。

    authkey

    进程的身份验证密钥(字节字符串)。

    当 multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串。

    当创建 Process 对象时,它将继承其父进程的身份验证密钥,尽管可以通过将 authkey 设置为另一个字节字符串来更改。

    sentinel

    系统对象的数字句柄,当进程结束时将变为 “ready” 。

    如果要使用 multiprocessing.connection.wait() 一次等待多个事件,可以使用此值。否则调用 join() 更简单。

    在Windows上,这是一个操作系统句柄,可以与 WaitForSingleObject 和 WaitForMultipleObjects 系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自 select 模块的原语。

    Python 3.3 新版功能

    terminate()

    终止进程。 在Unix上,这是使用 SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等。

    请注意,进程的后代进程将不会被终止 —— 它们将简单地变成孤立的。

    警告 如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。

    kill()

    terminate() 相同,但在Unix上使用 SIGKILL 信号。

    Python 3.7 新版功能,3.7前需要使用terminate()。

    close()

    关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError 。一旦 close() 成功返回, Process 对象的大多数其他方法和属性将引发 ValueError 。

    Python 3.7 新版功能

    启动方法

    根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法有:

    spawn

    父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。

    可在Unix和Windows上使用。 Windows上的默认设置。

    fork

    父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。

    只存在于Unix。Unix中的默认值。

    forkserver

    程序启动并选择 forkserver 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。

    可在Unix平台上使用,支持通过Unix管道传递文件描述符。

    启动方法注意事项

    在 Unix 上通过 spawnforkserver 方式启动多进程会同时启动一个 资源追踪 (resource tracker process)进程,负责追踪当前程序的进程产生的、并且不再被使用的命名系统资源(如命名信号量以及 SharedMemory 对象)。当所有进程退出后,资源追踪会负责释放这些仍被追踪的的对象。

    通常情况下是不会有这种对象的,但是假如一个子进程被某个信号杀死,就可能存在这一类资源的“泄露”情况。(泄露的信号量以及共享内存不会被释放,直到下一次系统重启,对于这两类资源来说,这是一个比较大的问题,因为操作系统允许的命名信号量的数量是有限的,而共享内存也会占据主内存的一片空间)

    要选择一个启动方法,应该在主模块的 if __name__ == '__main__' 子句中调用 set_start_method() 。在程序中 set_start_method() 不应该被多次调用。

    在进程之间交换对象

    multiprocessing 支持进程之间的两种通信通道:

    队列

    Queue 类是一个近似 queue.Queue 的克隆。队列是线程和进程安全的。

    管道

    Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。

    返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

    进程间共享状态

    在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。但是,如果真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

    共享内存

    可以使用 Value 或 Array 将数据存储在共享内存映射中。

    服务进程

    由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。

    Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

    main_process_pid = os.getpid()#获取主进程的pid,传入子进程统计资源占用情况
    with Manager() as manager:#用Manager封装,利于主进程调用子进程统计的数据
         CPU_use = manager.list()
         GPU_use = manager.list()
         p = RecordSourceConsumeProcess(main_process_pid, CPU_use, GPU_use)
         p.start()#启动子进程
         print('CPU_use: ',CPU_use)
         print('GPU_use: ',GPU_use)
    

    在这里插入图片描述

    参考资料

    [1] multiprocessing — 基于进程的并行
    [2] threading — 基于线程的并行
    [3] Python multiprocess模块(上)
    [4] Python multiprocess模块(中)
    [5] Python multiprocess模块(下)
    [6] Python程序中的进程操作-开启多进程
    [7] Python 线程

    展开全文
  • Python多进程

    2018-04-17 15:28:55
    因此在Python开发,我们一般使用多进程进行并行开发。multiprocessing是类似于threading模块的包。它支持了本地和远程并发性,可以更充分的利用多核资源。 Process 要运行一个进程需要创建实例化一个Process...

    多进程介绍

    Python多线程无法利用CPU多核的优势。因此在Python开发中,我们一般使用多进程进行并行开发。multiprocessing是类似于threading模块的包。它支持了本地和远程并发性,可以更充分的利用多核资源。

    Process类

    要运行一个进程需要创建实例化一个Process对象并且调用该类的start()方法。

    `

    from multiprocessing import Process
    
    def greet(name):
        print('hello', name)
    
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()`
    

    Process([group [, target [, name [, args [, kwargs]]]]]),group默认为None,不使用,参数target为调用的对象,name为子进程的名字,args为调用对象的参数元组,kwargs为调用对象的字典。

    有关的方法和属性

    Process中的方法与threading.Thread中的十分相似。

    run():进程启动时运行的方法。

    start():启动进程,会调用子进程中的run()方法。

    join([timeout]):主线程等待当前线程终止,timeout为可选的超时时间,join只能join住start开启的进程,而不能join住run开启的进程。

    terminate():强制终止进程,不会进行任何清理操作,如果创建了子进程,该子进程就成了僵尸进程。如果进程还保存了一个锁那么也将不会被释放,进而导致死锁。

    is_alive():判断进程是否在运行,为bool值。

    name:进程的名字,它是string类型,没有语义,只是用于标志进程,多进程中允许使用同一个名字。

    dameon:必须在start()调用前设置,默认为false,如果设为True,代表当前进程为后台运行的守护进程,当前进程的父进程终止时,它也随之终止,并且设定为True后,不能创建自己的新进程。

    pid:进程的id

    每个进程还有特有的id号,可以通过os.getpid()得到当前进程的ID号,也可以直接使用p.pid得到ID。

    from multiprocessing import Process
    import os
    
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())#得到父进程的id号
        print('process id:', os.getpid())#得到当前进程的id号
    
    def greet(name):
        info('function greet')
        print('hello', name)
    
    if __name__ == '__main__':
        info('main Process')
        p = Process(target=greet, args=('bob',))
        p.start()
        p.join()
    

    进程间的通信

    多进程支持两种进程间通信的方式:队列和管道。

    队列

    队列具有先进先出的特点,并且它是线程和进程安全的,通过q.put()方法将数据插入到队列中,然后使用q.get()方法将数据取出。

    from multiprocessing import Process, Queue
    
    def greet(q):
        q.put("hello")
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # 打印hello 
        p.join()
    

    管道

    Pipe()函数返回一对双向的连接对象,它们代表管道的两端。每个连接对象有send()和recv()方法。当两个进程或者线程试图同时写入或读取管道的一端时,管道中的数据会被损坏,当然同一时刻使用不同的管道端口不会有损坏的风险。

    from multiprocessing import Process, Pipe
    
    def greet(p):
        p.send("hello")
        p.close()
    
    if __name__=='__main__':
        left_p,right_p=Pipe()
        p=Process(target=greet,args=(right_p,))
        p.start()
        print(left_p.recv()) #输出hello
        p.join()
    

    进程间的同步

    多进程包含与多线程中等价的同步原语。例如可以使用锁机制确保某一时刻只有一个进程打印到标准输出。

    from multiprocessing import Process, Lock
    
    def greet(lock,num):
        lock.acquire()  #获取锁
        print("hello",num)
        lock.release()  #将锁释放
    
    if __name__=='__main__':
        lock=Lock()
        for num in range(5):
            Process(target=greet,args=(lock,num),).start()
    

    进程间分享状态

    在进行并发编程时,最好尽量避免使用共享状态,尤其是使用多进程时。但是,如果你确实要使用一些共享的数据,多进程也提供一些方法。

    共享内存

    数组可以利用Value和Array存储在共享内存映射中。

    from multiprocessing import Process, Value, Array
    
    def share(val,arr):
        val.value=1
        for i in range(len(arr)):
            arr[i]=arr[i]+1
    
    if __name__=='__main__':
        val=Value('d',0.0)
        arr=Array('i',range(10))
    
        p=Process(target=share,args=(val,arr))
        p.start()
        p.join()
    
        print(val.value)
        print(arr[:])
    

    输出值为:

    `1.0
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    

    `

    其中d和i分别代表创建Value和Array中存储的数字类型为double和signed integer。这些共享对象都是线程和进程安全的。

    服务器进程

    通过Manager()创建一个管理类,它可以控制一个服务器进程,这些服务器进程持有Python对象,并且允许其他进程利用代理来操纵Python对象。

    from multiprocessing import Process, Manager
    def func(dic,li):
        dic['hello']=1
        dic[1]='Bob'
        dic[2.01]=None
        li.reverse()
    
    if __name__=='__main__':
    
        manager=Manager() 
        dic=manager.dict()
        li=manager.list(range(5))
    
        p=Process(target=func,args=(dic,li))
        p.start()
        p.join()
        print(dic)
        print(li)
    

    输出结果为:

    {'hello': 1, 1: 'Bob', 2.01: None}
    [4, 3, 2, 1, 0]
    

    服务器进程管理比使用共享内存对象更加的灵活,因为它可以支持任意类型的对象。manager也可以通过进程进行共享,只不过比使用共享内存慢。

    进程池

    进程池中有许多进程,它有方法让进程池中的进程以不同的方式运行任务。 可以创建一个池池,这些进程将执行池类向它提交的任务。
    多进程编程并不是进程越多越好,还与CPU核数有关,进程数过多反而会降低效率。

    Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

    相关方法

    apply(func[, args[, kwds]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用
    p.apply_async()

    apply_async(func [, args [, kwargs]]):在一个池工作进程中执func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。

    close():关闭进程池。如果所有操作持续挂起,它们将在工作进程终止前完成。

    P.join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

    from multiprocessing import Pool
    
    def sqr(x):
        return x*x
    
    if __name__=='__main__':
        pool=Pool(processes=5) #开启5个工作进程
        result=pool.apply_async(sqr,[10]) #异步计算sqr(10))
        print(result.get(timeout=1)) #输出100
        print(pool.map(sqr,range(5))) #输出[0,1,4.....16]
    

    参考

    https://www.cnblogs.com/smallmars/p/7093603.html
    http://docspy3zh.readthedocs.io/en/latest/library/multiprocessing.html

    展开全文
  • 使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process动态的生成进程,十几个还好,但是如果上百个...
  • Pool使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process动态的生成进程,十几个还好,但是如果...
  • Python多进程操作-进程池(Pool)

    千次阅读 2018-01-19 16:53:59
    使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制 台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直 接使用Process动态的生成进程,十几个还好,但是如果上...
  • Python中线程与进程

    千次阅读 2017-05-08 18:37:22
    一 概念 进程 线程 进程与线程的关系 并行与并发 同步与异步 二 Python全局解释器锁GIL 三 threading模块 threading直接创建 ...十 多进程调用 process使用 十一 进程间通信 进程队列Queue 管道 manager
  • 使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process动态的生成进程,十几个还好,但是如果上百个...
  • 如果想充分地使用多核CPU的资源,需要使用多进程Python中提供multiprocessing实现。 - multiprocessing multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process来代表一个进...
  • 使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大量时间,如果操作的对象数目不大时,还可以直接适用Process动态生成进程,几十个尚可,若上百个甚至更时...
  • Python并行处理方式

    2020-07-25 06:51:11
    背景知识视频教程 ... 它是Python中线程和处理的更好替代方法,因为它使用相同的接口实现了线程和进程,该接口由抽象的Executor定义。 此外,线程不允许您从可调用函数返回一个值(null除外)。 para
  • 使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机, 并行操作可以节约大量时间,如果操作的对象数目不大时,还可以直接适用Process动态 生成进程,几十个尚可,若上百个甚至更...
  • python 进程

    2018-12-27 14:18:21
    使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process动态的生成进程,十几个还好,但是如果上百个...
  • python3 ,multiprocessing 包的使用

    千次阅读 2017-12-29 11:57:24
    multiprocessing 是基于 threading 模块封装的一个包,可以绕开python多线程存在的 全局解释器锁( Global Interpreter Lock, GIL),实现真正意义上的多进程并行。本文提纲 并行计算的简单例子 Process 进程...
  • python -进程

    2019-02-13 00:07:36
    使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大量时间,如果操作的对象数目不大时,还可以直接适用Process动态生成进程,几十个尚可,若上百个甚至更时...
  • Python进程池)

    2019-01-22 22:34:19
    1.1 在使用Python进行系统管理时,特别是同时操作个文件目录或者远程控制台主机,并行操作可以节约大 量时间,如果操作的对象数目不大时,还可以直接适用Process动态生成进程,几十个尚可,若上百个 ...
  • 目录 一、简介 二、详解 1、线程和进程 2、使用线程 ...线程编程技术可以实现代码并行性,优化处理能力,同时功能的更小划分可以使代码的可重用性更好。Python中threading和Queue模块可以用来
  • DDG上以“Python threading tutorial (Python线程教程)”为关键字的热门搜索结果表明:几乎每篇文章给出的例子都是相同的+队列。 事实上,它们就是以下这段使用producer/Consumer来处理线程/多进程的代码示例...
  • Python多线程

    2020-10-06 13:55:42
    线程基础概念 ...本次主要介绍Python标准库线程模块threading。 threading模块 线程初始化 使用threading模块的Thread初始化对象然后调用start方法启动线程。 import threading import time def wo
  • 文章目录python多线程编程(上)前言课程介绍学前须知开发工具课程安排一、任务介绍电脑任务1.1 任务的概念1.2 任务的两种表现形式1.3 并发1.4 并行二、进程的介绍2.1 程序实现任务的方式2.2 进程的...
  • 线程基础概念 ...本次主要介绍Python标准库线程模块threading。 <!--more--> threading模块 线程初始化 使用threading模块的Thread初始化对象然后调用start方法启动线程。 import th...
  • Python 性能优化

    2016-10-16 17:16:48
    1、 多进程并行编程:对于CPU密集型的程序,可以使用multiprocessing的Process,Pool等封装好的,通过多进程的方式实现并行计算。但是因为进程的通信成本比较大,对于进程之间需要大量数据交互的程序效率未必有大...
  • 前几章讨论的是并行,即一次做多件事,对应的就是多线程或多进程。而本章讨论的内容是并发,即一次处理多件事,也就是协程。 本章介绍 asyncio 包,这个包使用事件循环驱动的协程实现并发。主要话题有: 对比一个...
  • python线程

    2018-07-15 08:47:05
    什么是线程(thread)? 线程是操作系统能够进行运算调度的最小单位。它被包含在进程中,是进程中的实际...python进程语法与实例: Python中使用线程有两种方式:函数或者用来包装线程对象。 函数式:调用 _thread...
  • 现代的操作系统几乎都支持多进程并发执行。注意,并发和并行是两个概念,并行指在同一时刻有多条指令在多个处理器上同时执行;并发是指在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行,使得在宏观上...
  • 这是因为python内部有个GIL锁(全局解释器锁),这个锁限制了在同一时刻,同一个进程中,只能有一个线程被运行!!!二、threading模块的基本使用方法。可以使用它来创建线程。有两种方式来创建线程。1、通过继承...
  • Python Cookbook

    2013-07-31 22:33:26
    8.10 在Python 2.4中使用doctest和unittest 331 8.11 在单元测试中检查区间 334 第9章 进程、线程和同步 336 引言 336 9.1 同步对象中的所有方法 339 9.2 终止线程 342 9.3 将Queue.Queue用作优先级队列 344 ...

空空如也

空空如也

1 2 3
收藏数 59
精华内容 23
关键字:

python类中使用多进程并行

python 订阅