精华内容
下载资源
问答
  • Python进程队列间传递对象

    千次阅读 2019-07-02 20:55:56
    python 需要在队列中传递对象, 会出现进程不能正常退出的情况。 其原因是因为 在父进程 向子进程传入的Queue对象不对, Queue对象正常是子进程之间的信息传递, 而当我在父进程 创建一个Queue, 把它当参数传入到...

    前言

    在python 需要在队列中传递对象, 会出现进程不能正常退出的情况。

    其原因是因为 在父进程 向子进程传入的Queue对象不对, Queue对象正常是子进程之间的信息传递, 而当我在父进程 创建一个Queue, 把它当参数传入到子进程时, 这时就会出现子进程无法退出的情况

    解决方案

    将主进程要传入自进程的Queue的初始化方法改为

    from multiprocessing import Process, Manager
    my_queue = Manager().Queue()
    

    也就是Queue 是Manage类初始化的 队列

    参考资料

    https://www.yangyanxing.com/article/1103.html

    展开全文
  • Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。1. Queue的使用可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下...

    Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。

    1. Queue的使用

    可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue的工作原理:

    import multiprocessing

    q = multiprocessing.Queue(3) # 初始化的Queue对象,最多能put三条消息

    q.put("消息1")

    q.put("消息2")

    print(q.full())

    q.put("消息3")

    print(q.full())

    # 因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常

    try:

    q.put("消息4", True, 2)

    except:

    print("消息已满,现有的消息为%d" % (q.qsize()))

    try:

    q.put_nowait("消息4")

    except:

    print("消息已满,现有的消息为%d" % (q.qsize()))

    # 推荐的方式,先判断消息列队是否已满,再写入

    if not q.full():

    q.put_nowait("消息4")

    # 读取消息时,先判断消息列队是否为空,再读取

    if not q.empty():

    for i in range(q.qsize()):

    print(q.get_nowait())

    运行结果:

    False

    True

    消息已满,现有的消息为3

    消息已满,现有的消息为3

    消息1

    消息2

    消息3

    说明:

    初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

    Queue.qsize():返回当前队列包含的消息数量;

    Queue.empty():如果队列为空,返回True,反之False ;

    Queue.full():如果队列满了,返回True,反之False;

    Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True

    1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;

    2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;

    Queue.get_nowait():相当Queue.get(False);

    Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

    1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;

    2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;

    Queue.put_nowait(item):相当Queue.put(item, False);

    2. Queue实例

    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

    import multiprocessing

    import time

    import random

    def write(q):

    """忘队列中写入数据"""

    for value in "ABCD":

    print("Put %s to queue" % (value))

    q.put(value)

    time.sleep(random.random())

    def read(q):

    """读取队列中的数据"""

    while True:

    if not q.empty():

    value = q.get(True)

    print("Get %s from queue" % (value))

    time.sleep(random.random())

    else:

    break

    if __name__ == "__main__":

    q = multiprocessing.Queue()

    pw = multiprocessing.Process(target=write, args=(q,))

    pr = multiprocessing.Process(target=read, args=(q,))

    pw.start()

    pw.join()

    pr.start()

    pr.join()

    print('')

    print('所有数据都写入并且读完')

    运行结果:

    Put A to queue

    Put B to queue

    Put C to queue

    Put D to queue

    Get A from queue

    Get B from queue

    Get C from queue

    Get D from queue

    所有数据都写入并且读完

    展开全文
  • multiprocessing模块支持的进程间通信主要有两种:管道和队列。一般来说,发送较少的大对象比发送大量的小对象要好。Queue队列底层使用管道和锁,同时运行支持线程讲队列中的数据传输到底层管道中,来实习进程间通信...

    multiprocessing模块支持的进程间通信主要有两种:管道和队列。一般来说,发送较少的大对象比发送大量的小对象要好。

    Queue队列

    底层使用管道和锁,同时运行支持线程讲队列中的数据传输到底层管道中,来实习进程间通信。

    语法:

    Queue([maxsize])

    创建共享队列。使用multiprocessing模块的Queue实现多进程之间的数据传递。Queue本身是一个消息队列,

    maxsize是队列运行的最大项数,如果不指定,则不限制大小。

    常用方法

    q.close():关闭队列,不再向队列中添加数据,那些已经进入队列的数据会继续处理。q被回收时将自动调用此方法。

    q.empty():如果调用此方法时,队列为null返回True,单由于此时其他进程或者线程正在添加或删除项,

    所以结果不可靠,而且有些平台运行该方法会直接报错,我的mac系统运行该方法,直接报错。

    q.full():如果调用此方法时,队列已满,返回True,同q.empty()方法,结果不可靠。

    q.get([block,timeout]):返回q中的一个项,block如果设置为True,如果q队列为空,该方法会阻塞(就是不往下运行了,处于等待状态),

    直到队列中有项可用为止,如果同时页设置了timeout,那么在改时间间隔内,都没有等到有用的项,就会引发Queue.Empty异常。

    如果block设置为false,timeout没有意义,如果队列为空,将引发Queue.Empt异常。

    q.get_nowait():等同于q.get(False)

    q.put(item,block,timeout):将item放入队列,如果此时队列已满:

    如果block=True,timeout没有设置,就会阻塞,直到有可用空间为止。

    如果block=True,timeout也设置,就会阻塞到timeout,超过这个时间会报Queue.Full异常。

    如果block=False,timeout设置无效,直接报Queue.Full异常。

    q.put_nowait(item):等同于q.put(item,False)

    q.qsize():返回当前队列项的数量,结果不可靠,而且mac会直接报错:NotImplementedError。

    实例1:验证:put方法会阻塞

    实例:

    #验证:put方法会阻塞

    from multiprocessing import Queue

    queue=Queue(3)#初始化一个Queue队列,可以接受3个消息

    queue.put("我是第1条信息")

    queue.put("我是第2条信息")

    queue.put("我是第3条信息")

    print("插入第4条信息之前")

    queue.put("我是第4条信息")

    print("插入第4条信息之后")

    效果:程序会一直阻塞,最后一句输永远也不会输出。

    实例2:closse方法、get方法、put方法简单使用:多进程访问同一个Queue

    代码:

    #closse方法、get方法、put方法简单使用:多进程访问同一个Queue

    from multiprocessing import Queue,Process

    import time,os

    #参数q就是Queue实例

    def mark(q,interval):

    time.sleep(interval)

    # 打印信息

    print("进程%d取出数据:"%os.getpid()+queue.get(True))

    if __name__=="__main__":

    queue = Queue(3) # 初始化一个Queue队列,可以接受3个消息

    queue.put("我是第1条信息")

    queue.put("我是第2条信息")

    queue.put("我是第3条信息")

    p1=Process(target=mark,args=(queue,1))

    p2=Process(target=mark,args=(queue,2))

    p3=Process(target=mark,args=(queue,3))

    p1.start()

    p2.start()

    p3.start()

    # 关闭队列,不再插入信息

    queue.close()

    # 下面插入会导致异常

    # queue.put("我是第4条信息")

    # 打印第1条信息

    print("程序语句执行完成")

    效果

    JoinableQueue队列

    创建可连接的共享进程队列,可以看做还是一个Queue,只不过这个Queue除了Queue特有功能外,允许项的消费者通知项的生产者,项已经处理成功。该通知进程时使用共享的信号和条件变量来实现的。

    JoinableQueue实例除了与Queue对象相同的方法外,还具有下列方法:

    q.task_done():消费者使用此方法发送信号,表示q.get()返回的项已经被处理。

    注意⚠️:如果调用此方法的次数大于队列中删除的项的数量,将引发ValueError异常。

    q.join():生产者使用此方法进行阻塞,直到队列中所有的项都被处理完成,即阻塞将持续到队列中的每一项均调用q.task_done()方法为止。

    代码实例:

    #利用JoinableQueue实现生产者与消费者,并且加入了哨兵,来监听生产者的要求

    from multiprocessing import JoinableQueue,Process

    import time

    #参数q为JoinableQueue队列实例

    def mark(q):

    #循环接受信息,一直运行,这也下面为什么要将它设为后台进程的原因,必须保证当主线程退出时,它可以退出

    while True:

    value = q.get()

    print(value) # 实际开发过程中,此处一般用来进行有用的处理

    # 消费者发送信号:任务完成(此处实例的任务就是打印一下下)

    q.task_done()

    #我来方便看出效果,特意停留1s

    time.sleep(1)

    #使用哨兵,监听生产者的消息,此处通过判断value是否为None来判断传递的消息

    if value==None:

    #执行哨兵计划后,后面的语句都不会输出

    break

    if __name__=="__main__":

    #实例化JoinableQueue

    q=JoinableQueue()

    #定义消费者进程

    p=Process(target=mark,args=(q,))

    #将消费者线程设置为后台进程,随创建它的进程(此处是主进程)的终止而终止

    #也就是当它的创建进程(此处是主现场)意外退出时,它也会跟随一起退出。

    #并且后台进程无法创建新的进程

    p.daemon=True

    #启动消费者进程

    p.start()

    #模拟生产者,生产多个项

    for xx in range(5):

    print(xx)

    #当xx==3时去执行哨兵计划

    if xx==3:

    print("我用哨兵计划了")

    q.put(None)

    print("哨兵计完美执行")

    q.put("第%d条消息"%xx)

    #等待所有项都处理完成再退出,由于使用了哨兵计划,队列没有完全执行,所以会一直卡在这个位置

    q.join()

    print("程序真正退出了")

    效果:

    管道

    除了使用队列来进行进程间通信,还可以使用管道来进行消息传递。

    语法:

    (connection1,connection2)=Pipe([duplex])

    在进程间创建一条管道,并返回元祖(connection1,connection2),其中connection1、connection2表示两端的Connection对象。

    默认情况下,duplex=True,此时管道是双向的,如果设置duplex=false,connection1只能用于接收,connection2只能用于发送。

    注意:必须在多进程创建之前创建管道。

    常用方法:

    connection.close() :关闭连接,当connection被垃圾回收时,默认会调用该方法。

    connection.fileno() :返回连接使用的整数文件描述符

    connection.poll([timeout]):如果连接上的数据可用,返回True,timeout为等待的最长时间,如果不指定,该方法将立刻返回结果。

    如果指定为None,该方法将会无限等待直到数据到达。

    connection.send(obj):通过连接发送对象,obj是与序列号兼容的任意对象。

    connection.send_bytes(buffer[,offset,size]):通过连接发送字节数据缓冲区,buffer是支持缓冲区的任何对象。

    offset是缓冲区的字节偏移量,而size是要发送的字节数。

    connection.recv():接收connection.send()方法返回的对象。如果连接的另一端已经关闭,再也不会存在任何数据,

    该方法将引起EOFError异常。

    connection.recv_bytes([maxlength]):接收connection.send_bytes()方法发送的一条完整字节信息,maxlength为可以接受的

    最大字节数。如果消息超过这个最大数,将引发IOError异常,并且在连接上无法进一步读取。如果连接的另一端已经关闭,

    再也不会有任何数据,该方法将引发EOFError异常。

    connection.recv_bytes_into(buffer[,offset]):接收一条完整的字节信息,兵把它保存在buffer对象中,

    该对象支持可写入的缓冲区接口(就是bytearray对象或类似对象)。

    offset指定缓冲区放置消息的字节偏移量。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

    实例1:理解管道的生产者与消费者

    示意图:

    代码:

    #理解管道的生产者与消费者

    from multiprocessing import Pipe, Process

    import time

    def mark(pipe):

    #接受参数

    output_p, input_p = pipe

    print("mark方法内部调用input_p.close()")

    #消费者(子进程)此实例只接收,所以把输入关闭

    input_p.close()

    while True:

    try:

    item = output_p.recv()

    except EOFError:

    print("报错了")

    break

    print(item)

    time.sleep(1)

    print("mark执行完成")

    if __name__ == "__main__":

    #必须在多进程创建之前,创建管道,该管道是双向的

    (output_p, input_p) = Pipe()#创建管道

    #创建一个进程,并把管道两端都作为参数传递过去

    p = Process(target=mark, args=((output_p, input_p),))

    #启动进程

    p.start()

    #生产者(主进程)此实例只输入,所以关闭输出(接收端)

    output_p.close()

    for item in list(range(5)):

    input_p.send(item)

    print("主方法内部调用input_p.close()()")

    #关闭生产者(主进程)的输入端

    input_p.close()

    效果图:

    实例2:利用管道实现多进程协作:子线程计算结果,返回给主线程

    代码:

    #利用管道实现多进程协作:子线程计算结果,返回给主线程

    from multiprocessing import Pipe, Process

    def mark(pipe):

    #接受参数

    server_p, client_p = pipe

    #消费者(子进程)此实例只接收,所以把输入关闭

    client_p.close()

    while True:

    try:

    x,y = server_p.recv()

    except EOFError:

    print("报错了")

    break

    result=x+y

    server_p.send(result)

    print("mark执行完成")

    if __name__ == "__main__":

    #必须在多进程创建之前,创建管道,该管道是双向的

    (server_p, client_p) = Pipe()#创建管道

    #创建一个进程,并把管道两端都作为参数传递过去

    p = Process(target=mark, args=((server_p, client_p),))

    #启动进程

    p.start()

    #生产者(主进程)此实例只输入,所以关闭输出(接收端)

    server_p.close()

    #发送数据

    client_p.send((4,5))

    #打印接受到的数据

    print(client_p.recv())

    client_p.send(("Mark", "大帅哥"))

    # 打印接受到的数据

    print(client_p.recv())

    #关闭生产者(主进程)的输入端

    client_p.close()

    结果:

    9

    Mark大帅哥

    报错了

    mark执行完成

    展开全文
  • Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。1. Queue的使用可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下...

    Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。

    1. Queue的使用

    可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue的工作原理:

    import multiprocessing

    q = multiprocessing.Queue(3) # 初始化的Queue对象,最多能put三条消息

    q.put("消息1")

    q.put("消息2")

    print(q.full())

    q.put("消息3")

    print(q.full())

    # 因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常

    try:

    q.put("消息4", True, 2)

    except:

    print("消息已满,现有的消息为%d" % (q.qsize()))

    try:

    q.put_nowait("消息4")

    except:

    print("消息已满,现有的消息为%d" % (q.qsize()))

    # 推荐的方式,先判断消息列队是否已满,再写入

    if not q.full():

    q.put_nowait("消息4")

    # 读取消息时,先判断消息列队是否为空,再读取

    if not q.empty():

    for i in range(q.qsize()):

    print(q.get_nowait())

    运行结果:

    False

    True

    消息已满,现有的消息为3

    消息已满,现有的消息为3

    消息1

    消息2

    消息3

    说明:

    初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

    Queue.qsize():返回当前队列包含的消息数量;

    Queue.empty():如果队列为空,返回True,反之False ;

    Queue.full():如果队列满了,返回True,反之False;

    Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True

    1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;

    2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;

    Queue.get_nowait():相当Queue.get(False);

    Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

    1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;

    2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;

    Queue.put_nowait(item):相当Queue.put(item, False);

    2. Queue实例

    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

    import multiprocessing

    import time

    import random

    def write(q):

    """忘队列中写入数据"""

    for value in "ABCD":

    print("Put %s to queue" % (value))

    q.put(value)

    time.sleep(random.random())

    def read(q):

    """读取队列中的数据"""

    while True:

    if not q.empty():

    value = q.get(True)

    print("Get %s from queue" % (value))

    time.sleep(random.random())

    else:

    break

    if __name__ == "__main__":

    q = multiprocessing.Queue()

    pw = multiprocessing.Process(target=write, args=(q,))

    pr = multiprocessing.Process(target=read, args=(q,))

    pw.start()

    pw.join()

    pr.start()

    pr.join()

    print('')

    print('所有数据都写入并且读完')

    运行结果:

    Put A to queue

    Put B to queue

    Put C to queue

    Put D to queue

    Get A from queue

    Get B from queue

    Get C from queue

    Get D from queue

    所有数据都写入并且读完

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    展开全文
  • Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。1.Queue的使用可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue...
  • 默认是不共享数据的•通过Queue(队列Q)可以实现进程间的数据传递•Q本身是一个消息队列•如何添加消息(入队操作):#如何添加消息(入队操作)from multiprocessing import Queueq=Queue(3)#初始化一个Queue对象,...
  • 该multiprocessing库提供了包装套接字的侦听器和客户端,允许您传递任意python对象。您的服务器可以侦听接收python对象:from multiprocessing.connection import Listeneraddress = ('localhost', 6000) # family ...
  • 1.Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信.可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序:from multiprocessing import Queueq=Queue(3) # ...
  • 本文实例讲述了Python进程间通信Queue消息队列用法。分享给大家供大家参考,具体如下: 进程间通信-Queue Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。 1. Queue的使用 可以使用...
  • Python 进程间通信

    2018-11-22 15:14:04
    进程间通信 --------->> 队列,命名管道,无名管道 内存映射   ********************************************************* 可以使⽤multiprocessing模块的Queue实现多进程之间的数据传递,Queue ...
  • Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。1. Queue的使用可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下...
  • 无法传递 Lock对象 from multiprocessing import Pool,Lock def text(i,lock): print(i) lock.acquire() DOSOMETHING lock.release() if __name__ == '__main__': lock=Lock() pool=Pool(processes=8) for....
  • Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。 1. Queue的使用 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下...
  • 多进程之间,默认是不共享数据的通过队列将queue对象当成参数传入进程创建,可以实现进程间的数据传递 初始化Queue() 对象时,q = Queue() 若括号中没有指定最大消息数量,或数量为负 表示没有上限 Queue.qsize() ...
  • 当你需要在不同进程或设备安全无损的传递Python对象时,可以尝试使用Knight-bus库
  • 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道即Queue和Pipe这两种方式,这两种方式都是使用消息传递的。Queue用于多个进程间实现通信,Pipe是两个进程的通信,管道...
  • from multiprocessing import Process, Pipe def f(conn): conn.send('Hello World!') #向管道中发送数据 conn.close() ... p = Process(target=f, args=(conn_A,)) #将管道的一方给子进程 p.start()
  • 1 共享内存基本特点:(1)共享内存是一种最为高效的进程间通信方式,进程可以直接读写内存,而不需要任何数据的拷贝。(2)为了在多个进程间交换信息,内核专门留出了一块内存...优缺点:优点:快速在进程间传递数据缺...
  • 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue的工作原理。 #coding=utf-8 from multiprocessing import Queue q=Queue(3) #初始化...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 系统可以运行多进程,有时进程间需要通信传递信息,进程间通信的方式有:管道、消息队列、共享内存、信号、信号量、套接字 管道: 原理:在内存中开辟管道空间,生成管道对象,多个进程对管道对象进行读写,实现进程...
  • 1.Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信. 可以使用multiprocessing模块的Queue实现多进程之间的数据传递, Queue本身是一个消息列队程序: from multiprocessing import Queue q=...
  • python-进程

    2021-02-10 16:13:42
    文章目录python-进程一、进程进程池1、非阻塞式2、阻塞式线程1、 创建线程2、数据共享3、进程间通信:生产者与消费者 python-进程 一、进程 from multiprocessing import Process process = Process(target = 函数,...
  • 为了和其他系统保持兼容,Linux 也...如果进程要访问 System V IPC 对象,则需要在系统调用中传递唯一的引用标识符。 对 System V IPC 对象的访问,必须经过类似文件访问的许可检验。对这些对象访问权限的设置由对象

空空如也

空空如也

1 2 3 4
收藏数 76
精华内容 30
关键字:

python进程间传递对象

python 订阅