精华内容
下载资源
问答
  • Multiprocess Support

    2020-12-08 20:16:12
    <div><p>Hi, does Terminado somehow mess with the multiprocess support provided by Tornado? I'm getting <code>RuntimeError: Cannot run in multiple processes: IOLoop instance has already been ...
  • multiprocess模块

    2019-11-12 19:15:37
    文章目录multiprocess模块Process类的介绍Process对象的join方法守护进程互斥锁模拟抢票练习互斥锁与joinQueue方法介绍 multiprocess模块 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os....

    multiprocess模块

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。
    Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,>提供了Process、Queue、Pipe、Lock等组件。
    需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

    Process类的介绍

    创建进程的类
    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    参数介绍:
    1 group参数未使用,值始终为None
    2 target表示调用对象,即子进程要执行的任务
    3 args表示调用对象的位置参数元组,args=(1,2,‘egon’,)
    4 kwargs表示调用对象的字典,kwargs={‘name’:‘egon’,‘age’:18}
    5 name为子进程的名称
    强调:
    1.需要使用关键字的方式来指定参数
    2.args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    方法介绍:
    p.start():启动进程,并调用该子进程中的p.run()
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果p仍然运行,返回True
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。
    属性介绍
    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    p.name:进程的名称
    p.pid:进程的pid
    prccess模块使用:

    import time
    import random
    from multiprocessing import Process
    
    def piao(name):
        print('%s is running' %name)
        time.sleep(random.randrange(1,3))
        print('%s is end' %name)
    
    if __name__ == '__main__':
        #实例化得到四个对象
        p1=Process(target=piao,args=('进程1',)) #必须加,号
        p2=Process(target=piao,args=('进程2',))
        p3=Process(target=piao,args=('进程3',))
        p4=Process(target=piao,args=('进程4',))
    
        #调用对象下的方法,开启四个进程
        p1.start()
        p2.start()
        p3.start()
        p4.start()
        print('主')
    

    Process对象的join方法

    在主进程运行过程中如果想并发地执行其他的任务,我们可以开启子进程,此时主进程的任务与子进程的任务分两种情况
    情况一:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。
    情况二:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用

    from multiprocessing import Process
    import time
    import random
    
    def task(name):
        print('%s is piaoing' %name)
        time.sleep(random.randint(1,3))
        print('%s is piao end' %name)
    
    if __name__ == '__main__':
        p1=Process(target=task,args=('egon',))
        p2=Process(target=task,args=('alex',))
        p3=Process(target=task,args=('yuanhao',))
        p4=Process(target=task,args=('wupeiqi',))
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    
        # 有的同学会有疑问: 既然join是等待进程结束, 那么我像下面这样写, 进程不就又变成串行的了吗?
        # 当然不是了, 必须明确:p.join()是让谁等?
        # 很明显p.join()是让主线程等待p的结束,卡住的是主进程而绝非子进程p,
        p1.join()
        p2.join()
        p3.join()
        p4.join()
    
        print('主')
    

    详细解析如下:
    进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
    而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
    join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
    所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间

    守护进程

    主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比崇祯皇帝身边的老太监,崇祯皇帝已死老太监就跟着殉葬了。
    关于守护进程需要强调两点:
    其一:守护进程会在主进程代码执行结束后就终止
    其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
    如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止

    from multiprocessing import Process
    import time
    import random
    
    def task(name):
        print('%s is piaoing' %name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %name)
    if __name__ == '__main__':
        p=Process(target=task,args=('egon',))
        p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
        p.start()
        print('主') #只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
    

    互斥锁

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如下:

    #并发运行,效率高,但竞争同一打印终端,带来了打印错乱
    from multiprocessing import Process
    import os,time
    def work():
        print('%s is running' %os.getpid())
        time.sleep(2)
        print('%s is done' %os.getpid())
    
    if __name__ == '__main__':
        for i in range(3):
            p=Process(target=work)
            p.start()
    

    如何控制,就是加锁处理。而互斥锁的意思就是互相排斥,如果把多个进程比喻为多个人,互斥锁的工作原理就是多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁,其他人才有可能有一个抢到…所以互斥锁的原理,就是把并发改成穿行,降低了效率,但保证了数据安全不错乱。

    #由并发变成了串行,牺牲了运行效率,但避免了竞争
    from multiprocessing import Process,Lock
    import os,time
    def work(lock):
        lock.acquire() #加锁
        print('%s is running' %os.getpid())
        time.sleep(2)
        print('%s is done' %os.getpid())
        lock.release() #释放锁
    if __name__ == '__main__':
        lock=Lock()
        for i in range(3):
            p=Process(target=work,args=(lock,))
            p.start()
    

    模拟抢票练习

    多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务

    #文件db.txt的内容为:{"count":1}
    #注意一定要用双引号,不然json无法识别
    from multiprocessing import Process
    import time,json
    
    def search(name):
        dic=json.load(open('db.txt'))
        time.sleep(1)
        print('\033[43m%s 查到剩余票数%s\033[0m' %(name,dic['count']))
    
    def get(name):
        dic=json.load(open('db.txt'))
        time.sleep(1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(1) #模拟写数据的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('\033[46m%s 购票成功\033[0m' %name)
    
    def task(name):
        search(name)
        get(name)
    
    if __name__ == '__main__':
        for i in range(10): #模拟并发10个客户端抢票
            name='<路人%s>' %i
            p=Process(target=task,args=(name,))
            p.start()
            ```
    并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,卖成功给了10个人
    解决方法:
    加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
    
    ```python
    #把文件db.txt的内容重置为:{"count":1}
    from multiprocessing import Process,Lock
    import time,json
    
    def search(name):
        dic=json.load(open('db.txt'))
        time.sleep(1)
        print('\033[43m%s 查到剩余票数%s\033[0m' %(name,dic['count']))
    
    def get(name):
        dic=json.load(open('db.txt'))
        time.sleep(1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(1) #模拟写数据的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('\033[46m%s 购票成功\033[0m' %name)
    
    def task(name,lock):
        search(name)
        with lock: #相当于lock.acquire(),执行完自代码块自动执行lock.release()
            get(name)
    
    if __name__ == '__main__':
        lock=Lock()
        for i in range(10): #模拟并发10个客户端抢票
            name='<路人%s>' %i
            p=Process(target=task,args=(name,lock))
            p.start()
    

    互斥锁与join

    使用join可以将并发变成串行,互斥锁的原理也是将并发变成穿行,那我们直接使用join就可以了啊,为何还要互斥锁,说到这里我赶紧试了一下

    #把文件db.txt的内容重置为:{"count":1}
    from multiprocessing import Process,Lock
    import time,json
    
    def search(name):
        dic=json.load(open('db.txt'))
        print('\033[43m%s 查到剩余票数%s\033[0m' %(name,dic['count']))
    
    def get(name):
        dic=json.load(open('db.txt'))
        time.sleep(1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(1) #模拟写数据的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('\033[46m%s 购票成功\033[0m' %name)
    
    def task(name,):
        search(name)
        get(name)
    
    if __name__ == '__main__':
        for i in range(10):
            name='<路人%s>' %i
            p=Process(target=task,args=(name,))
            p.start()
            p.join()
    

    发现使用join将并发改成串行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否,此时join与互斥锁的区别就显而易见了,join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行
    总结
    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1、效率低(共享数据基于文件,而文件是硬盘上的数据)
    2、需要自己加锁处理
    因此我们最好找寻一种解决方案能够兼顾:
    1、效率高(多个进程共享一块内存的数据)
    2、帮我们处理好锁问题。
    这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    Queue方法介绍

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
    创建队列的类(底层就是以管道和锁定的方式实现):
    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
    参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。
    但需要明确:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
    主要方法介绍:
    q.put方法用以插入数据到队列中。
    q.get方法可以从队列读取并且删除一个元素。
    参考资料
    路飞学城内部资料

    展开全文
  • Python 多进程multiprocess阅读 (184) |发布于 2020-05-19 14:47:26Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。在multiprocess模块中有 process 和...

    Python 多进程multiprocess

    阅读 (184) |

    发布于 2020-05-19 14:47:26

    Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。

    在multiprocess模块中有 process 和 Process 两个类,大家使用过程中一点要注意区别二者。

    multiprocess.Process 类用法:

    通过以下示例,可以发现多进程语法上和多线程几乎是一样的。#!/usr/bin/env python

    #coding=utf-8

    import os

    import multiprocessing

    def foo(i):

    # 当前进程名

    print("Current pname:", multiprocessing.current_process().name)

    #当前进程的父进程id

    print("parents pid:", os.getppid())

    #当前进程id

    print("Current pid:", os.getpid())

    if __name__ == '__main__':

    for i in range(3):

    p = multiprocessing.Process(target=foo, args=(i,))

    p.start()

    进程间共享数据

    在linux中, 每个进程都是由父进程的。最大的用户进程为1号进程。子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。

    进程的创建需要很大的开销,每个进程都有独立的内存空间。进程之间通常是不能直接共享数据。

    进程如果需要数据共享,就要通过中间件来实现。

    下面的代码试图用全局列表来实现 进程的数据共享:#!/usr/bin/env python

    #coding=utf-8

    import multiprocessing

    import os

    lis = []

    def foo(i):

    lis.append(i)

    print("Pid: %d addr: %s, lis: %s" % (os.getpid(), id(lis), lis))

    if __name__ == '__main__':

    for i in range(3):

    p = multiprocessing.Process(target=foo, args=(i,))

    p.start()

    print("The End. Pid:%s, addr: %s lis:%s"%(os.getpid(), id(lis), lis))

    得到如下结果, 每个进程的lis都是独立的,全局列表并没有改变。Pid: 8974 addr: 20094344, lis: [0]

    Pid: 8975 addr: 20094344, lis: [1]

    The End. Pid:8973, addr: 20094344 lis:[]

    Pid: 8976 addr: 20094344, lis: [2]

    multiprocess模块提供了三个类来实现进程之间的数据共享Queues

    Array

    Manager

    Queues类

    Queues类的导入方式是 from multiprocessing.queues import Queue。它在multiprocessing包的queues模块中。#!/usr/bin/env python

    #coding=utf-8

    import os

    import multiprocessing

    from multiprocessing.queues import Queue

    from multiprocessing import Process

    def func(i, q):

    print("Pid: %s get: %s then put:%s" % (os.getpid(),q.get(), i))

    q.put(i)

    if __name__ == "__main__":

    lis = Queue(1, ctx=multiprocessing)

    lis.put(10)

    for i in range(5):

    p = Process(target=func, args=(i, lis))

    p.start()

    multiprocessing自己还有一个Queue类,一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。

    Array类

    Array数组类,语法:

    arr = Array('i', 5)

    'i' 表示内部元素为int类型 ,5表示数组的长度。更多类型 :'c': ctypes.c_char, 'u': ctypes.c_wchar,

    'b': ctypes.c_byte, 'B': ctypes.c_ubyte,

    'h': ctypes.c_short, 'H': ctypes.c_ushort,

    'i': ctypes.c_int, 'I': ctypes.c_uint,

    'l': ctypes.c_long, 'L': ctypes.c_ulong,

    'f': ctypes.c_float, 'd': ctypes.c_double

    示例:#!/usr/bin/env python

    import os

    from multiprocessing import Process

    from multiprocessing import Array

    def func(i,temp):

    temp[0] += 100

    print("Pid:",os.getpid(),"array:", temp[0])

    if __name__ == '__main__':

    temp = Array('i', [1, 2, 3, 4])

    for i in range(5):

    p = Process(target=func, args=(i, temp))

    p.start()

    Manager类

    Manager实例化返回一个服务进程,其他进程可以通过代理的方式操作Manager对象。

    Manager 支持list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。#!/usr/bin/env python

    import os

    import time

    import threading

    from multiprocessing import Process

    from multiprocessing import Manager

    def func(i, dic):

    dic["num" + str(i)] = 100+i

    print("Pid:", os.getpid(), dic.items())

    time.sleep(3)

    if __name__ == '__main__':

    dic = Manager().dict()

    process_lis = []

    for i in range(5):

    p = Process(target=func, args=(i, dic))

    p.start()

    process_lis.append(p)

    for p in process_lis:

    p.join()

    注意: 这里在启动子进程时用了join(), 这是必须的。这个join和线程的作用是一样的。是要父进程等所有子进程结束后再结束。

    如果没有join()父进程执行完释放了内存。dic也就不存在。其他用到dic的子进程就会报错。

    进程锁

    与线程锁一样,进程中也有同名的锁,并且用法和作用都是一样的。

    在multiprocessing 有锁类: RLock可重入锁, Lock互斥锁,Event事件锁, Condition条件锁,Semaphore信号锁。

    这里不多做解释了, 用一个简单的例子来看一下进程锁的使用。#!/usr/bin/env python

    import os

    import time

    from multiprocessing import Array

    from multiprocessing import Process

    from multiprocessing import RLock, Lock, Event, Condition, Semaphore

    def func(i,lis,lc):

    lc.acquire()

    if i%2 ==0:

    lis[0] = lis[0] - 1

    else:

    lis[0] = lis[0] + 1

    lc.release()

    if __name__ == "__main__":

    array = Array('i', 1)

    array[0] = 10

    lock = RLock()

    process_lis = []

    for i in range(1000):

    p = Process(target=func, args=(i, array, lock))

    p.start()

    process_lis.append(p)

    for i in process_lis:

    p.join()

    print(array[0])

    以上例子中,枷锁后结果为10, 不加锁 结果不定。

    Pool类(进程池类)

    创建进程开销较大,频繁的创建销毁进程会消耗大量的内存空间。同线程一样,进程也可以使用进程池来控制开销。

    python 给我们内置了一个进程池,导入:from multiprocessing import Pool

    Pool类里面的常用方法:apply() 同步执行 (串行)

    apply_async() 异步执行 (并行)

    terminate() 立刻关闭进程池(强制)

    join() 主进程等待进程池内所有进程执行完毕。 (必须在close或terminate之后使用)

    close() 等待所有进程池结束后,关闭进程池。

    以下示例是异步执行的方式,并行线程数5#!/usr/bin/env python

    from multiprocessing import Pool

    import time

    def func(args):

    time.sleep(1)

    print("running", args)

    if __name__ == '__main__':

    p = Pool(5)

    for i in range(30):

    p.apply_async(func=func, args=(i,))

    p.close()

    # p.terminate()

    p.join()

    print("end")

    展开全文
  • Add Multiprocess execution

    2020-12-02 04:01:02
    <div><p>Add a LinkModel that performs the simulation in multiprocess, one process per SNR. The implementation works as a drop-in replacement, just need to import <em>multiprocess_links</em> instead of...
  • <div><p>I was trying to add typing to a function that can take either an integer or a <code>multiprocess.Pool(). in the case of the integer it would star a Pool itself. <pre><code> from typing import ...
  • from prometheus_client import multiprocess def child_exit(server, worker): multiprocess.mark_process_dead(worker.pid) workers = 5 </code></pre> <p>Start command: <pre><code> prometheus_...
  • we need to pass <code>--multiprocess</code> flag as a command line arg. That may not be enough, we may need custom protocol messages to tell VS/VSC adapter that another process has started.</p><p>该...
  • Multithread or multiprocess

    2020-12-01 19:14:21
    I wonder, it is possible to add multithread and even multiprocess in the code to make use the my multicore computer please? Thank you very much!</p><p>该提问来源于开源项目:dib-lab/sourmash</p>...
  • multiprocess time question

    2021-01-11 20:41:48
    <div><p>I have test this project with demo, and i find that the multiprocess DetectionLoader.start time > DetectionLoader update time, my datalen = 90, det batchsize= 10, DetectionLoader....
  • Task/multiprocess

    2021-01-08 19:44:28
    <p>Allowing the database to work across process requires us to disable a few optimizations when the user has enabled multiprocess support. The optimizations are still available when in single process ...
  • 进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event) 锁 —— multiprocess.Lock 通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的...

    进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

    锁 —— multiprocess.Lock

          通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。

      当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。 

    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    #虽然可以用文件共享数据实现进程间通信,但问题是:
    #1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    #2.需要自己加锁处理
    
    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    #队列和管道都是将数据存放于内存中
    #队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    #我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    信号量 —— multiprocess.Semaphore(了解)

    #互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
    #假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    #实现:
    #信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和#V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    #信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

     

    # 信号量介绍Semaphore
    # 多进程中的组件
    # ktv
    # 4个
    # 一套资源  同一时间 只能被n个人访问
    # 某一段代码 同一时间 只能被n个进程执行
    import time#引入时间模块
    import random#引入随机数
    from multiprocessing import Process#引入进程模块
    from multiprocessing import Semaphore#引入信号模块
    
    # sem = Semaphore(4)#实例化4个信号
    # sem.acquire()
    # print('拿到第一把钥匙')
    # sem.acquire()
    # print('拿到第二把钥匙')
    # sem.acquire()
    # print('拿到第三把钥匙')
    # sem.acquire()
    # print('拿到第四把钥匙')
    # sem.acquire()
    # print('拿到第五把钥匙')
    def ktv(i,sem):
        sem.acquire()    #获取钥匙
        print('%s走进ktv'%i)  #进入ktv
        time.sleep(random.randint(1,5))#随机选择1到5之间的数
        print('%s走出ktv'%i)#打印走出ktv
        sem.release()   #还钥匙
    
    
    if __name__ == '__main__' :#如果为真
        sem = Semaphore(4)#实例化一个红绿灯
        for i in range(20):#循环20个数
            p = Process(target=ktv,args=(i,sem))#开启一个进程对象
            p.start()#开启这个进程

     

    事件 —— multiprocess.Event(了解)

    #python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    
    #事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 #event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    
    #clear:将“Flag”设置为False
    #set:将“Flag”设置为True

     

     

    #红绿灯示例
    # 通过一个信号 来控制 多个进程 同时 执行或者阻塞
    # 事件
    # from multiprocessing import Event
    # 一个信号可以使所有的进程都进入阻塞状态
    # 也可以控制所有的进程解除阻塞
    # 一个事件被创建之后,默认是阻塞状态
    # e = Event()  # 创建了一个事件
    # print(e.is_set())   # 查看一个事件的状态,默认被设置成阻塞
    # e.set()      # 将这个事件的状态改为True
    # print(e.is_set())
    # e.wait()     # 是依据e.is_set()的值来决定是否阻塞的
    # print(123456)
    # e.clear()    # 将这个事件的状态改为False
    # print(e.is_set())
    # e.wait()     # 等待 事件的信号被变成True
    # print('*'*10)
    
    
    # set 和 clear
        #  分别用来修改一个事件的状态 True或者False
    # is_set 用来查看一个事件的状态
    # wait 是依据事件的状态来决定自己是否在wait处阻塞
        #  False阻塞 True不阻塞
    
    
    # 红绿灯事件
    import time#引入时间模块
    import random#引入随机模块
    from multiprocessing import Event,Process#引入进程模块和时间模块
    def cars(e,i):#定义一个函数
        if not e.is_set():#如果信号灯为真的时候
            print('car%i在等待'%i)#打印内容
            e.wait()    # 阻塞 直到得到一个 事件状态变成 True 的信号
        print('\033[0;32;40mcar%i通过\033[0m' % i)#打印通过
    
    def light(e):#定义一个灯
        while True:#循环为真
            if e.is_set():#如果事件状态为真
                e.clear()#则清除信号灯
                print('\033[31m红灯亮了\033[0m')#打印红灯亮了
            else:#否则
                e.set()#设置状态为真
                print('\033[32m绿灯亮了\033[0m')#打印绿灯亮了
            time.sleep(2)#睡2秒
    
    if __name__ == '__main__':#如果为真
        e = Event()#实例化一个事件
        traffic = Process(target=light,args=(e,))#定义一个灯的进程
        traffic.start()#开始进程
        for i in range(20):#循环20次
            car = Process(target=cars, args=(e,i))#创建20个汽车进程
            car.start()#启动汽车进程
            time.sleep(random.random())#随机睡,随机出现0~1之间的小数

     

    进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

    进程间通信

    IPC(Inter-Process Communication) #进程间通信

    队列 

    概念介绍

    创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

    #Queue([maxsize]) 
    #创建共享的进程队列。
    #参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
    #底层队列使用管道和锁定实现。

     

    #方法介绍
    
    Queue([maxsize]) 
    创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
    Queue的实例q具有以下方法:
    
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    #q.get_nowait( ) 
    同q.get(False)方法。
    
    #q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    #q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    
    #q.empty() 
    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    #q.full() 
    如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

     

    #其他方法
    #q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在#get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    #q.cancel_join_thread() 
    不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    
    #q.join_thread() 
    连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

     

    代码实例

    '''
    multiprocessing模块支持进程间通信的两种主要形式:管道和队列
    都是基于消息传递实现的,但是队列接口
    '''
    
    from multiprocessing import Queue#引入一个队列模块
    q=Queue(3)#实例化一个队列
    
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)#放入队列中
    q.put(3)#放入队列中
    q.put(3)#放入队列中
    # q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
               # 如果队列中的数据一直不被取走,程序就会永远停在这里。
    try:#异常处理
        q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
        print('队列已经满了')
    
    # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
    print(q.full()) #满了
    
    print(q.get())#取出一个
    print(q.get())#取出一个
    print(q.get())#取出一个
    # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
    try:#异常处理
        q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    
    print(q.empty()) #空了

    上面这个例子还没有加入进程通信,只是先来看看队列为我们提供的方法,以及这些方法的使用和现象。

    #子进程发送数据给父进程
    import time#引入一个时间模块
    from multiprocessing import Process, Queue#引入一个进程和队列模块
    
    def f(q):#定义一个函数
        q.put([time.asctime(), 'from Eva', 'hello'])  #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
    
    if __name__ == '__main__':#定义一个函数
        q = Queue() #创建一个Queue对象
        p = Process(target=f, args=(q,)) #创建一个进程
        p.start()#开始进程
        print(q.get())#拿出一个
        p.join()#感知子进程结束

    上面是一个queue的简单应用,使用队列q对象调用get函数来取得队列中最先进入的数据。 接下来看一个稍微复杂一些的例子:

    #批量生产数据放入队列再批量获取结果 
    import os#引入操作系统模块
    import time#引入时间模块
    import multiprocessing#引入多元进程模块
    
    # 向queue中输入数据的函数
    def inputQ(queue):#定义一个函数
        info = str(os.getpid()) + '(put):' + str(time.asctime())
        queue.put(info)向队列中放入一个信息
    
    # 向queue中输出数据的函数
    def outputQ(queue):#取队列中的数据
        info = queue.get()#取信息
        print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))#打印这个内容
    
    # Main
    if __name__ == '__main__':#如果用户名是当前用户名
        multiprocessing.freeze_support()#
        record1 = []   # store input processes
        record2 = []   # store output processes
        queue = multiprocessing.Queue(3)#实例化一个队列
    
        # 输入进程
        for i in range(10):#循环10个数
            process = multiprocessing.Process(target=inputQ,args=(queue,))#创建一个进程
            process.start()#开始这个进程
            record1.append(process)#添加到列表中
    
        # 输出进程
        for i in range(10):#循环10个数
            process = multiprocessing.Process(target=outputQ,args=(queue,))#创建一个进程
            process.start()#开始进程
            record2.append(process)#添加到列表里
    
        for p in record1:#循环这个列表
            p.join()#感知子进程结束
    
        for p in record2:#循环这个进程
            p.join()#感知子进程结束

    生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    基于队列实现生产者消费者模型
    #基于队列实现生产者消费者模型
    # 队列
    # 生产者消费者模型
    
    # 生产者 进程
    # 消费者 进程
    import time#引入时间模块
    import random#引入随机数
    from multiprocessing import Process,Queue#引入进程模块和队列模块
    def consumer(q,name):#定义一个消费者函数
        while True:#循环为真
            food = q.get()#拿出食物
            if food is None:#如果食物为空
                print('%s获取到了一个空'%name)#打印胡去到一个空
                break#打断
            print('\033[31m%s消费了%s\033[0m' % (name,food))#打印谁消费了什么食物
            time.sleep(random.randint(1,3))#随机睡1~3秒
    
    def producer(name,food,q):#定义一个生产者函数
        for i in range(4):#循环4次
            time.sleep(random.randint(1,3))#随机睡1~3秒
            f = '%s生产了%s%s'%(name,food,i)#谁生产了什么食物
            print(f)#打印内容
            q.put(f)#把食物放到队列里
    
    if __name__  == '__main__':#如果名称是当前名称
        q = Queue(20)#实例化一个队列20
        p1 = Process(target=producer,args=('Egon','包子',q))#创建一个进程
        p2 = Process(target=producer, args=('wusir','泔水', q))#创建一个进程
        c1 = Process(target=consumer, args=(q,'alex'))#创建一个进程
        c2 = Process(target=consumer, args=(q,'jinboss'))#创建一个进程
        p1.start()#启动一个进程
        p2.start()#启动一个进程
        c1.start()#启动一个进程
        c2.start()#启动一个进程
        p1.join()#感知p1进程结
        p2.join()#感知p2进程结束
        q.put(None)#往队列中添加一个None
        q.put(None)#往队列中添加一个None

     

    此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

    解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

     

    注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

    JoinableQueue([maxsize]) 
    创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

    #方法介绍
    #
    JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法: #q.task_done() #使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。 #q.join() #生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 #下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
    #JoinableQueue队列实现消费之生产者模型
    import time#引入一个时间模块
    import random#引入一个随机数模块
    from multiprocessing import Process,JoinableQueue#引入进程模块和队列模块
    def consumer(q,name):#定义一个消费者函数
        while True:#循环为真
            food = q.get()#从队列中拿出食物
            print('\033[31m%s消费了%s\033[0m' % (name,food))#打印内容
            time.sleep(random.randint(1,3))#随机睡1~3秒
            q.task_done()     # count - 1#
    
    def producer(name,food,q):#生产者
        for i in range(4):#循环4次
            time.sleep(random.randint(1,3))#随机睡1~3秒
            f = '%s生产了%s%s'%(name,food,i)#谁生产了食物
            print(f)#打印这个内容
            q.put(f)#放入到队列里
        q.join() # 阻塞  直到一个队列中的所有数据 全部被处理完毕
    
    if __name__  == '__main__':#如果文件名为当前名称
        q = JoinableQueue(20)#实例化一个队列对象
        p1 = Process(target=producer,args=('Egon','包子',q))#创建一个生产者进程
        p2 = Process(target=producer, args=('wusir','泔水', q))#创建一个生产着进程
        c1 = Process(target=consumer, args=(q,'alex'))#创建一个消费者
        c2 = Process(target=consumer, args=(q,'jinboss'))#创建一个消费者进程
        p1.start()#开启一个生产者进程
        p2.start()#开启一个生产者进程
        c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
        c2.daemon = True   #设置守护进程
        c1.start()  #开启一个消费者进程
        c2.start()  #开启一个消费者进程
        p1.join()    #感知一个生产者进程结束
        p2.join()      # 感知一个进程的结束
    
    #  在消费者这一端:
        # 每次获取一个数据
        # 处理一个数据
        # 发送一个记号 : 标志一个数据被处理成功
    
    # 在生产者这一端:
        # 每一次生产一个数据,
        # 且每一次生产的数据都放在队列中
        # 在队列中刻上一个记号
        # 当生产者全部生产完毕之后,
        # join信号 : 已经停止生产数据了
                    # 且要等待之前被刻上的记号都被消费完
                    # 当数据都被处理完时,join阻塞结束
    
    # consumer 中把所有的任务消耗完
    # producer 端 的 join感知到,停止阻塞
    # 所有的producer进程结束
    # 主进程中的p.join结束
    # 主进程中代码结束
    # 守护进程(消费者的进程)结束

     

    管道(了解)

    #创建管道的类:
    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     #其他方法:
    conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回连接使用的整数文件描述符
    conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
     
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
     
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    from multiprocessing import Process, Pipe#引入进程模块和管道模块
    
    def f(conn):#定义一个函数
        conn.send("Hello The_Third_Wave")#发送一条信息
        conn.close()#关闭这个进程
    
    if __name__ == '__main__':#如果名字等于当前名称
        parent_conn, child_conn = Pipe()#接收两个参数
        p = Process(target=f, args=(child_conn,))#创建一个进程
        p.start()#启动进程
        print(parent_conn.recv())#接收一个信息
        p.join()#等待进程结束

     

    应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。 

    #引发EOFError
    from multiprocessing import Process, Pipe引入进程模块和管道模块
    
    def f(parent_conn,child_conn):#定义一个函数传入两个参数
        #parent_conn.close() #不写close将不会引发EOFError
        while True:#循环为真
            try:#异常处理
                print(child_conn.recv())#打印接收的值
            except EOFError:#万能异常
                child_conn.close()#关闭连接
    
    if __name__ == '__main__':#如果用户名是当前用户名
        parent_conn, child_conn = Pipe()#接受两个参数
        p = Process(target=f, args=(parent_conn,child_conn,))#实例化一个进程
        p.start()#而开始进程
        child_conn.close()#关闭客户端连接
        parent_conn.send('hello')#发送信息
        parent_conn.close()#冠词这个信息
        p.join()#等待进程结束
    #pipe实现生产者和消费者
    from multiprocessing import Process,Pipe#引入两个模块
    
    def consumer(p,name):#定义一个消费者
        produce, consume=p#接收两个参数
        produce.close()#关闭生产者
        while True:#循环为真
            try:#异常处理
                baozi=consume.recv()#接收信息
                print('%s 收到包子:%s' %(name,baozi))#打印内容
            except EOFError:#异常处理
                break#打断
    
    def producer(seq,p):#定义一个生产者
        produce, consume=p#接受两个参数
        consume.close()#关闭生产者
        for i in seq:#循环打印
            produce.send(i)#发送
    
    if __name__ == '__main__':#如果用户名等于当前用户名
        produce,consume=Pipe()#接受两个信息
    
        c1=Process(target=consumer,args=((produce,consume),'c1'))#创建一个进程
        c1.start()#开始这个进程
    
        seq=(i for i in range(10))#循环是个数
        producer(seq,(produce,consume))#运行生产者函数
    
        produce.close()#关闭生产者
        consume.close()#关闭消费者
    
        c1.join()#等待进程结束
        print('主进程')

     

    #多个消费之之间的竞争问题带来的数据不安全问题
    from multiprocessing import Process,Pipe,Lock#引入进程模块,管道,锁
    
    def consumer(p,name,lock):#定义一个消费者
        produce, consume=p#传入两个参数
        produce.close()#关闭生产者
        while True:#循环为真
            lock.acquire()#拿钥匙
            baozi=consume.recv()#接收信息
            lock.release()#还钥匙
            if baozi:#如果有信息
                print('%s 收到包子:%s' %(name,baozi))#打印接收到包子
            else:#否则
                consume.close()#关闭消费者
                break#打断
    
    def producer(p,n):#定义一个生产者
        produce, consume=p#接收两个参数
        consume.close()#关闭消费者
        for i in range(n):#循环
            produce.send(i)#发送i
        produce.send(None)#生产者发送一个none
        produce.send(None)#生产者发送一个none
        produce.close()#关闭生产者
    
    if __name__ == '__main__':#如果名字等于当前用户名
        produce,consume=Pipe()#接收两个参数
        lock = Lock()#实例化一个锁
        c1=Process(target=consumer,args=((produce,consume),'c1',lock))#创建一个消费者进程
        c2=Process(target=consumer,args=((produce,consume),'c2',lock))#创建一个消费者进程
        p1=Process(target=producer,args=((produce,consume),10))#创建一个生产者进程
        c1.start()#开启进程
        c2.start()#开启进程
        p1.start()#开启进程
    
        produce.close()#关闭生产者
        consume.close()#关闭消费者
    
        c1.join()#等待进程接收
        c2.join()#等待进程接收
        p1.join()#等待进程接收
        print('主进程')
    #多个消费之之间的竞争问题带来的数据不安全问题
    from multiprocessing import Process,Pipe,Lock#引入进程模块,管道,锁
    
    def consumer(p,name,lock):#定义一个消费者
        produce, consume=p#传入两个参数
        produce.close()#关闭生产者
        while True:#循环为真
            lock.acquire()#拿钥匙
            baozi=consume.recv()#接收信息
            lock.release()#还钥匙
            if baozi:#如果有信息
                print('%s 收到包子:%s' %(name,baozi))#打印接收到包子
            else:#否则
                consume.close()#关闭消费者
                break#打断
    
    def producer(p,n):#定义一个生产者
        produce, consume=p#接收两个参数
        consume.close()#关闭消费者
        for i in range(n):#循环
            produce.send(i)#发送i
        produce.send(None)#生产者发送一个none
        produce.send(None)#生产者发送一个none
        produce.close()#关闭生产者
    
    if __name__ == '__main__':#如果名字等于当前用户名
        produce,consume=Pipe()#接收两个参数
        lock = Lock()#实例化一个锁
        c1=Process(target=consumer,args=((produce,consume),'c1',lock))#创建一个消费者进程
        c2=Process(target=consumer,args=((produce,consume),'c2',lock))#创建一个消费者进程
        p1=Process(target=producer,args=((produce,consume),10))#创建一个生产者进程
        c1.start()#开启进程
        c2.start()#开启进程
        p1.start()#开启进程
    
        produce.close()#关闭生产者
        consume.close()#关闭消费者
    
        c1.join()#等待进程接收
        c2.join()#等待进程接收
        p1.join()#等待进程接收
        print('主进程')

     

    进程之间的数据共享

    展望未来,基于消息传递的并发编程是大势所趋

    即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。

    这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

    但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。

    以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。

    #Manger模块介绍
    #进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
    #虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    
    #A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
    
    #A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
    from multiprocessing import Manager,Process,Lock#引入进程模块,锁模块
    def work(d,lock):#定义一个工作方法
        with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
            d['count']-=1
    
    if __name__ == '__main__':#如果用户名等于当前用户名
        lock=Lock()#实例化一个锁
        with Manager() as m:
            dic=m.dict({'count':100})#传入一个字典
            p_l=[]#创建一个空列表
            for i in range(100):#循环100个数
                p=Process(target=work,args=(dic,lock))##创建一个进程
                p_l.append(p)#添加到列表里
                p.start()#开始进程
            for p in p_l:#循环列表
                p.join()#等待进程结束
            print(dic)#打印这个字典

     

    进程池和multiprocess.Pool模块

    进程池

    为什么要有进程池?进程池的概念。

    在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

    在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

    multiprocess.Pool模块

    概念介绍

    #Pool([numprocess  [,initializer [, initargs]]]):创建进程池
    #1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    #2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    #3 initargs:是要传给initializer的参数组

     

    #1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    #2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    #3 
    #4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    #5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
    #6    
    #7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    #8 
    #9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

     

    #1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    #2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    #3 obj.ready():如果调用完成,返回True
    #4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    #5 obj.wait([timeout]):等待结果变为可用。
    #6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

     

    代码实例

    进程池和多进程效率对比
    同步和异步
    #进程池的同步调用
    import os,time#引入系统模块和时间模块
    from multiprocessing import Pool#引入进程池模块
    
    def work(n):#定义一个函数
        print('%s run' %os.getpid())#打印id
        time.sleep(3)#睡3秒
        return n**2#返回一个n平方
    
    if __name__ == '__main__':#如果文件名等于当前文件名
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]#创建一个列表
        for i in range(10):#循环十个数
            res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
                                        # 但不管该任务是否存在阻塞,同步调用都会在原地等着
        print(res_l)#打印列表

     

    import os#引入系统模块
    import time#引入时间模块
    import random#引入随机数模块
    from multiprocessing import Pool#引入进程池模块
    
    def work(n):#定义一个函数
        print('%s run' %os.getpid())#打印内容
        time.sleep(random.random())#随机睡一会
        return n**2#返回n*n
    
    if __name__ == '__main__':#如果文件名等于当前用户名
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]#得到一个空列表
        for i in range(10):#循环十个数
            res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
                                              # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
                                              # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
                                              # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。  
            res_l.append(res)
    
        # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
        # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        p.close()
        p.join()
        for res in res_l:
            print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

     

    练习
     server:进程池版socket并发聊天
     client

    发现:并发开启多个客户端,服务端同一时间只有4个不同的pid,只能结束一个客户端,另外一个客户端才会进来.

    进程池的其他实现方式:https://docs.python.org/dev/library/concurrent.futures.html

    参考资料
    http://www.cnblogs.com/linhaifeng/articles/6817679.html
    https://www.jianshu.com/p/1200fd49b583
    https://www.jianshu.com/p/aed6067eeac9

     

    转载于:https://www.cnblogs.com/chongdongxiaoyu/p/8658379.html

    展开全文
  • '''multiprocess组件Lock'''import multiprocessingimport timedef add1(lock,value,number):with lock: # 加锁print("start add1 number={0}".format(number))for i in range(1,5):number += valuetime.sleep(0.3)...

    '''

    multiprocess组件Lock

    '''

    import multiprocessing

    import time

    def add1(lock,value,number):

    with lock: # 加锁

    print("start add1 number={0}".format(number))

    for i in range(1,5):

    number += value

    time.sleep(0.3)

    print("number add1 = {0}".format(number))

    def add3(lock,value,number):

    lock.acquire() #加锁

    print("start add3 number={0}".format(number))

    try:

    for i in range(1,5):

    number += value

    time.sleep(0.3)

    print("number add3 = {0}".format(number))

    except Exception as e :

    raise e

    finally:

    lock.release() #解锁

    if __name__ == '__main__':

    print("start main")

    number = 0

    lock = multiprocessing.Lock()

    p1 = multiprocessing.Process(target=add1 ,args=(lock,1,number))

    p3 = multiprocessing.Process(target=add3 ,args=(lock,3,number))

    p1.start()

    p3.start()

    print("end main")

    展开全文
  • multiprocess segfault on OSX

    2020-12-02 22:18:21
    <div><p>(this happens every time) <p>Process: Python [39974] Path: /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python Identifier: Python ...
  • MultiProcess 多进程测试框架
  • <div><p>I used Anaconda python 2.7 (64 bit) to compile the pathos multiprocess fork. The code is from here: https://github.com/uqfoundation/multiprocess</p> <p>However getting this error: <p>python....
  • Multiprocess Windows" speed up of Firefox but RAS is not recognized as being compatible, so Multiprocess isn't enabled. So for now it's an OR situation. I would like to have both :D <p>Is...
  • multiprocess无法启动

    2021-05-07 20:38:33
    @multiprocess无法启动 只需要加入 if name == “main”: 即可 from socket import * from multiprocessing import Process def handle(connd,addr): print("客户端:",addr) while True: data = connd.recv...
  • Multiprocess mode is slow

    2020-11-30 18:53:51
    s multiprocess mode. <p>The "this thing doesn't work in multiprocess mode" features aside, it works acceptably in the beginning. However, presenting metrics gets slower and slower as the ...
  • This PR makes pickling for mulitprocessing work in more situations by using the <a href="https://pypi.org/project/multiprocess/"><code>multiprocess</code></a> library, if it's available.<code>...
  • 仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类...
  • Multiprocess rate limiting?

    2020-12-01 12:10:07
    <div><p>How difficult would it be to implement multiprocess rate limiting in PRAW? e.g., instead of keeping track of request rate on a per-process basis, keep track of it at a central location and ...
  • <p>When I run <code>rebuild_index</code> or <code>update_index</code> with the multiprocess parameter <code>-k 6, it should index all the documents. <h2>Actual behaviour <p>It fails to index all the ...
  • Python multiprocess 使用指南Multiprocess 的一些常用功能7个坑(不理解过程会犯错) Multiprocess 的一些常用功能 先举个例子: def transform(x,data): xxxxxx return output import multiprocess index_...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,972
精华内容 788
关键字:

multiprocess