精华内容
下载资源
问答
  • 生产者消费者模式

    千次阅读 2018-09-06 23:21:22
    生产者消费者模式

    生产者消费者模式

    首先实现一个EventQueue,用来缓存待处理事件,代码如下:

    package com.yozzs.PAndC;
    
    import java.util.LinkedList;
    
    import static java.lang.Thread.currentThread;
    
    public class EventQueue {
        //最大可缓存事件数
        private final int max;
        //待处理事件
        static class Event{}
        //队列
        private static LinkedList<Event> eventQueue = new LinkedList<>();
        //默认最大值
        private static final int DEFUALT_MAX_EVENT = 10;
    
        public EventQueue(int max) {
            this.max=max;
        }
    
        public EventQueue() {
            this(DEFUALT_MAX_EVENT);
        }
    
        //加入事件到队列
        public void produce(Event event){
            synchronized (eventQueue){
                while(eventQueue.size()>=max){
                    console("队列已满!");
                    try {
                        eventQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                eventQueue.addLast(event);
                console("事件已添加!");
                eventQueue.notifyAll();
            }
        }
    
        //处理事件(从队列中移除事件)
        public Event consume(){
            synchronized (eventQueue){
                while(eventQueue.size()<=0){
                    console("队列为空!");
                    try {
                        eventQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Event event = eventQueue.removeFirst();
                console("事件已处理!");
                eventQueue.notifyAll();
                return event;
            }
        }
    
        private void console(String message) {
            System.out.printf("%s:%s\n",currentThread().getName(),message);
        }
    
    }
    

    测试代码:

    package com.yozzs.PAndC;
    
    import java.util.concurrent.TimeUnit;
    
    public class EventClient {
        public static void main(String[] args) {
            final EventQueue eventQueue = new EventQueue();
    
            for (int i = 1; i <= 5; i++) {
                new Thread(() -> {
                    while (true) {
                        eventQueue.produce(new EventQueue.Event());
                    }
                }, "Producer" + i).start();
                new Thread(() -> {
                    while (true) {
                        eventQueue.consume();
                        try {
                            TimeUnit.SECONDS.sleep(10);//模拟处理事件耗时10秒
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, "Consumer" + i).start();
            }
        }
    }
    

    这里启动5个生产者线程和5个消费者线程,执行结果如下:
    这里写图片描述


    展开全文
  • 生产者 消费者模式

    2021-02-09 17:29:18
    开发中的生产者和消费者模式 和生活中的生产者消费者 类似 都是一种生产消费关系 生产者产出 供给给消费者使用 在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责...

    生产者 消费者模式

    开发中的生产者和消费者模式 和生活中的生产者消费者 类似

    都是一种生产消费关系 生产者产出 供给给消费者使用

    在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

    单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。

    img

    缓冲区作用

    解 耦

    假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

    支持并发

    生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。

    在使用中我们通常用 一个队列来作为缓冲区 生产者将数据放在队列中 消费者从队列中取出数据

    上一个例子

    这是爬取某个网站图片 的爬虫代码 使用的是生产者消费者模式

    import threading
    import requests
    from lxml import etree
    from queue import Queue
    import re
    import os
    from urllib import request
    
    headers = {
    	"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36",
    }
    
    # 生产者  获取图片链接 放入队列中 
    class Producer(threading.Thread):
    
    
    	def __init__(self,page_queue,image_quque,*args,**kwargs):
    		super(Producer, self).__init__()
    		self.page_queue = page_queue
    		self.image_quque = image_quque
    
    	def run(self):
    		while True:
    			# 判断队列是否为空 为空返回真   .empty
    			if self.page_queue.empty():
    				break
    
    			else:
    				# 取出队列中的page链接
    				url = self.page_queue.get()
    				# 调用  parse获取图片链接
    				self.parse_page(url)
    
    
    	def parse_page(self,url):
    		response = requests.get(url,headers=headers)
    		text = response.text
    		html = etree.HTML(text)
    		imgs = html.xpath('//div[@class="page-content text-center"]//img')
    		for img in imgs:
    			img_url = img.get('data-original')
    			alt = img.get('alt')
    			# print(type(alt))
    			alt = re.sub(r'[\??\.,。!!\/]','',alt)
    
    
    			suffix = os.path.splitext(img_url)[1]
    			filename = alt+suffix
    			# 判断图片队列是否满   没有满就继续放入
    			if not self.image_quque.full():
    				self.image_quque.put((filename,img_url))
    
    # 消费者   从队列中拿出链接  获取图片保存 
    class Consumer(threading.Thread):
    	def __init__(self,page_queue,img_queue,*args,**kwargs):
    		super(Consumer, self).__init__()
    		self.page_queue = page_queue
    		self.img_queue = img_queue
    
    	def run(self):
    		while True:
    			if self.page_queue.empty() and self.img_queue().empty():
    				break
    
    			filenama,img_url = self.img_queue.get()
    			request.urlretrieve(img_url,'sources/'+filenama)
    			print(filenama,'下载完成')
    
    def main():
    	page_queue = Queue(100)
    	image_queue = Queue(1000)
    	for x in range(1, 101):
    		url = 'http://www.doutula.com/photo/list/?page=%d' % x
    		# 将需要爬取的页面都放入queue队列
    		page_queue.put(url)
    
    	# 创建生产者进程
    	for i in range(5):
    		p1 = Producer(page_queue, image_queue)
    		p1.start()
    
    	# 创建消费者进程
    	for i in range(5):
    		c1 = Consumer(page_queue,image_queue)
    		c1.start()
    
    
    if __name__ == '__main__':
    	main()
    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,834
精华内容 4,733
关键字:

生产者消费者模式