精华内容
下载资源
问答
  • 文章目录Python 生产消费模型生产消费模型介绍生产消费模型实现函数yield 方式多进程方式实现一、实现二、多线程方式 生产消费模型介绍 为什么要使用生产消费模型 生产者指的是生产数据的任务,...

    Python 生产者消费者模型

    生产者消费者模型介绍

    为什么要使用生产者消费者模型

    生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者和消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    生产者消费者模型实现

    函数yield 方式

    def producer(name, count, cons):
        cons.send(None)
        for i in range(count):
            print(f"{name}, 生产了第{i}个包子")
            cons.send(f"{name}, 第{i}个包子")
    
    
    def consumer(name):
        while True:
            time.sleep(1)
            n = yield
            print(f"\033[1;32m{name} 吃了{n}\033[0m")
    
    
    for i in range(1, 3):
        c = consumer(f"消费者{i}")
        producer(f"生产者{i}", 5, c)
    

    多进程方式

    实现一、

    使用多进程+多进程模块中的Queue队列实现

    import time
    from multiprocessing import Process, Queue
    
    
    def producer(name, q, count):
        for i in range(count):
            time.sleep(0.5)
            d = f"生产者{name} 第{i}包子"
            print(d)
            q.put(d)
    
    
    def consumer(name, q):
        while True:
            time.sleep(1)
            if q.empty():   # 判断队列是否为空, 为空结束索取
                break
            print(f"\033[1;32m消费者{name} 吃了{q.get()}\033[0m")
    
    
    if __name__ == '__main__':
        queue = Queue()
        for i in range(1, 3):
            p = Process(target=producer, args=(i, queue, 10))
            p.start()
    
        for i in range(1, 6):
            c = Process(target=consumer, args=(i, queue))
            c.start()
    
        print("主进程")
    

    实现二、

    使用多进程+多进程模块中的JoinableQueue队列实现

    import time
    from multiprocessing import Process, JoinableQueue
    
    
    def producer(name, q, count):
        for i in range(count):
            time.sleep(0.5)
            d = f"生产者{name} 第{i}包子"
            print(d)
            q.put(d)
        q.join()
    
    
    def consumer(name, q):
        while True:
            time.sleep(1)
            print(f"\033[1;32m消费者{name} 吃了{q.get()}\033[0m")
            q.task_done()
    
    
    if __name__ == '__main__':
        queue = JoinableQueue()
        p_list = []
        for i in range(1, 3):
            p = Process(target=producer, args=(i, queue, 10))
            p_list.append(p)
            p.start()
    
        for i in range(1, 6):
            c = Process(target=consumer, args=(i, queue))
            c.daemon = True
            c.start()
    
        for i in p_list:
            i.join()
    
        print("主进程")
    

    多线程方式

    queue 模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程间交换的线程编程

    def producer(name, count, q):
        for i in range(count):
            i = f"{name} 第{i}包子"
            print(i)
            q.put(i)
    
    
    def consumer(name, q):
        while True:
            time.sleep(0.5)
            if q.empty():
                break
            print(f"{name} 消费{q.get()}")
    
    
    if __name__ == '__main__':
        q = queue.Queue()
        print(q.empty())
        for i in range(1, 4):
            p = Thread(target=producer, args=(f"生产者{i}", 10, q))
            p.start()
    
        for i in range(1, 6):
            c = Thread(target=consumer, args=(f"消费者{i}", q))
            c.start()
    
        print("主进程")
    
    展开全文
  • import queue import threading import time def produce(q: queue.Queue): thread_name = threading.current_thread().getName() for i in range(10): print("生产者[%s]--- %d" % (thread_nam...
    import queue
    import threading
    import time
    
    
    def produce(q: queue.Queue):
        thread_name = threading.current_thread().getName()
        for i in range(10):
            print("生产者[%s]--- %d" % (thread_name, i))
            q.put(i, block=True)
            time.sleep(1)
    
    
    def consume(q: queue.Queue):
        thread_name = threading.current_thread().getName()
        while True:
            print("消费者[%s]--- %d" % (thread_name, q.get(block=True)))
            time.sleep(2)
    
    
    if __name__ == '__main__':
        q = queue.Queue(3)
    
        p = threading.Thread(target=produce, args=(q,), name="worker-p")
        c = threading.Thread(target=consume, args=(q,), name="worker-c")
    
        p.start()
        c.start()
        p.join()
        c.join()

    转载于:https://www.cnblogs.com/jzsg/p/11151295.html

    展开全文
  • 第一种: from Queue import Queue import random,threading,time ...#生产者类 class Producer(threading.Thread): def __init__(self, name,queue): threading.Thread.__init__(self, name=name) self.d...

    第一种:

    from Queue import Queue
    import random,threading,time
    
    #生产者类
    class Producer(threading.Thread):
        def __init__(self, name,queue):
            threading.Thread.__init__(self, name=name)
            self.data=queue
    
        def run(self):
            for i in range(5):
                print("%s is producing %d to the queue!" % (self.getName(), i))
                self.data.put(i)
                time.sleep(random.randrange(10)/5)
            print("%s finished!" % self.getName())
    
    #消费者类
    class Consumer(threading.Thread):
        def __init__(self,name,queue):
            threading.Thread.__init__(self,name=name)
            self.data=queue
        def run(self):
            for i in range(5):
                val = self.data.get()
                print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val))
                time.sleep(random.randrange(10))
            print("%s finished!" % self.getName())
    
    def main():
        queue = Queue()
        producer = Producer('Producer',queue)
        consumer = Consumer('Consumer',queue)
    
        producer.start()
        consumer.start()
    
        producer.join()
        consumer.join()
        print 'All threads finished!'
    
    if __name__ == '__main__':
        main()
    

    第二种

    #!/usr/bin/python
    # coding: utf-8
    
    from Queue import Queue
    import random,threading,time
    import thread
    
    class TestMultThreadMode( threading.Thread ):
        def __init__( self, queue ):
            threading.Thread.__init__( self )
            self.data = queue
            
        def producer( self ):
            for i in range( 5 ):
                print 'this is %s%d' % ( "producer", i )
                self.data.put( i )
                time.sleep( random.randrange(10) / 5 )
            print '%s end!' % 'producer'
            
        def consumer( self ):
            for i in range ( 5 ):
                print 'this is %s%d' % ( "consumer", i )
                self.data.get( i )
                time.sleep( random.randrange(10) / 5 )
            print '%s end!' % 'consumer'
    
    def main():
        queue = Queue()
        testThread = TestMultThreadMode( queue )
        thread.start_new_thread( testThread.producer, () )
        thread.start_new_thread( testThread.consumer, () )
        while True:
            pass
    
    if __name__=="__main__":
        main()
    
    展开全文
  • python生产消费模型

    万次阅读 2017-10-24 14:18:39
    # 初始化消费Queue中数据的线程 self.queue = Queue.Queue() self.threads_consumer = [] self.threads_producer = [] def run(self): # 启动Consumer线程 for i in xrange(10): consumer = Consumer(self...
    #-*- coding:utf-8 -*-
    
    import Queue
    import threading
    import time
    import json
    import sys
    import signal
    import random
    reload( sys )
    sys.setdefaultencoding('utf-8')
    
    class Enum(set):
        def __getattr__(self, name):
            if name in self:
                return name
            else:
                raise AttributeError
    State = Enum(['NORMAL', 'UPDATE', 'STOP'])
    
    engine_do = True
    def handler(signum, frame):
        print 'receive signal: %s' % signum
        global engine_do
        engine_do = False
    
    signal.signal(signal.SIGINT, handler)
    signal.signal(signal.SIGTERM, handler)
    
    class Consumer(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
            self.do = True
    
        def stop(self):
            self.do = False
            print 'change consumer.do to False'
        
        def run(self):
            print 'Create new consumer thread, id: %s' % self.ident
            while self.do:
                messages = []
                result = []
                msg = random.randint(0,100)
                self.queue.put(msg)
            print 'Consumer thread will exit.'
            
    class Producer(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
            self.msgs = Queue.Queue()
            self.state = State.NORMAL
            self.do = True
    
        def stop(self):
            self.do = False
            self.state = State.STOP
    
        def run(self):
            while self.do:
                if self.state == State.NORMAL:
                    if not self.queue.empty():
                        data = self.queue.get()
                        print 'Producer get data: %s' % data
                    else:
                        print 'data queue is empty, sleep 5 seconds.'
                        time.sleep(5)
                elif self.state == State.STOP:
                    while not self.queue.empty():
                        data = self.queue.get()
                        print 'Producer get data: %s' % data
            print 'Producer thread will exit.'
    
    class Engine():
        def __init__(self):
            # 在获取所有的topic并初始化连接
            # 初始化消费Queue中数据的线程
            self.queue = Queue.Queue()
            self.threads_consumer = []
            self.threads_producer = []
    
    
        def run(self):
            # 启动Consumer线程
            for i in xrange(10):
                consumer = Consumer(self.queue)
                consumer.start()
                self.threads_consumer.append(consumer)
            producer = Producer(self.queue)
            self.threads_producer.append(producer)
            producer.start()
            while True:
                time.sleep(5)
                print engine_do
                if not engine_do:
                    print 'engine will exit...'
                    print 'first stop consumer threads'
                    for consumer in self.threads_consumer:
                        consumer.stop()
                    for consumer in self.threads_consumer:
                        consumer.join()
                    print 'all consumer threads are done.'
                    print 'second stop producer threads...'
                    for producer in self.threads_producer:
                        producer.stop()
                    for producer in self.threads_producer:
                        producer.join()
                    print 'all producer threads are done.'
                    break
            print 'All threads are not alive, main thread will exit.'
            return 
    
    if __name__=='__main__':
        engine = Engine()
        engine.run()

    展开全文
  • 主要介绍了python生产消费模型实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 生产消费模型 的建立需要借助第三方进行传递信息。那么使用什么充当这个第三方进行传递信息能够使得生产消费模型能够效率更高,实现更为简单呢? 这里使用队列作为这个第三方进行传递信息,连同生产者与消费...
  • python生产消费模型 生产消费模型作用于: 1.爬虫的时候 2.分布式操作:celery 其本质:就是让生产数据和消费数据的效率达到平衡并且最大化的效率 为什么要使用生产消费模型? 在并发编程中,如果生产...
  • 我们知道python最早的协程是基于yield生成器来实现, python3.5之后出现了asyncioasync/await关键字,有了更方便的协程方式. ...python协程实现生产和消费模型 generator.send(value): Resumes .
  • python 多线程生产消费模型:一个生产者多个消费者The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged...
  • 文章目录什么是生产消费模型?为什么要使用生产消费模型?实现生产消费模型三要素:什么时候使用生产消费模型生产消费者模式的优点举个栗子         ...
  • python生产消费者简单模型

    万次阅读 2017-08-29 09:36:57
    /usr/bin/python import Queue import time import threading q=Queue.Queue() class producer(threading.Thread): def __init__(self,i): threading.Thread.__init__(self,name="producer Thread-%
  • 多线程实现生产消费模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产消费模型:信号量(Semaphore)、条件(Condition)、...
  • import time,random import queue,threading q = queue.Queue() # q.put(2) # q.join() # q.put(3) def Producer(name): count = 0 while count <... print("making........") ... print('Producer %s has produced...
  • 主要介绍了Python semaphore evevt生产消费模型原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要介绍了python多进程下的生产和消费模型,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • def consumer(q, name): # 消费者 while 1: info = q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据阻塞等待 if info: # 收到None作为结束标识 print(' %s 得到 %s' % (name, info)) else: # 当...
  • 文章目录神马是生产和消费者?为什么要使用生产消费模型?用该模型有个蛇好处? 生产消费模型当中有两大类重要的角色,一个是生产者(负责造数据的任务),另一个是消费者(接收造出来的数据进行进一步的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,181
精华内容 7,272
关键字:

python生产和消费模型

python 订阅