精华内容
下载资源
问答
  • 比如线程1处理完的结果以结构体模式,写进队列1,  线程2读出队列1中的结构体,进行处理。
  • 多线程c++队列使用

    千次阅读 2017-07-22 00:39:36
    我有一个队列的全局变量,然后一个线程不断的获取数据,向这个队列里面压入,然后另一个线程每次读取队列的第一个元素,然后删掉。 #include #include #include #include #include #include #include #include ...
    我有一个队列的全局变量,然后一个线程不断的获取数据,向这个队列里面压入,然后另一个线程每次读取队列的第一个元素,然后删掉。
    #include<iostream>
    #include<queue>
    #include<windows.h>
    #include<stdlib.h>
    #include<process.h>
    #include<opencv2/core/core.hpp>
    #include<opencv2/imgproc/imgproc.hpp>
    #include<opencv2/highgui/highgui.hpp>
    
    using namespace std;
    using namespace cv;
    
    CRITICAL_SECTION g_cs;
    
    VideoCapture cap;
    
    std::queue<Mat> myQ;
    
    unsigned int __stdcall producer(PVOID para)
    {
    	int i = 0;
    	Mat frame;
    	while (1)
    	{
    		cap >> frame;
    
    		EnterCriticalSection(&g_cs);
    		myQ.push(frame.clone());
    		LeaveCriticalSection(&g_cs);
    
    		//i++;
    		Sleep(2);
    	}
    }
    
    unsigned int __stdcall consumer(PVOID para)
    {
    	int i = 0;
    
    	Mat frame;
    	while (1)
    	{
    		while (myQ.empty())
    			Sleep(2);
    
    		EnterCriticalSection(&g_cs);
    		//cout << myQ.front()<<"  ";
    		frame = myQ.front();
    		myQ.pop();
    
    		LeaveCriticalSection(&g_cs);
    		Sleep(2);
    
    		imshow("frame", frame);
    		waitKey(1);
    
    		i++;
    		if (i >= 1000)
    		{
    			i = 0;
    
    			EnterCriticalSection(&g_cs);
    
    			//一次性清空队列里的所有数据
    			while (myQ.size())
    				myQ.pop();
    
    			LeaveCriticalSection(&g_cs);
    			Sleep(2);
    		}
    	}
    }
    
    
    int main()
    {
    
    	cap.open(0);
    
    	//初始化关键段
    	InitializeCriticalSection(&g_cs);
    
    	HANDLE tid1, tid2;
    
    	tid1 = (HANDLE)_beginthreadex(NULL, 0, producer, NULL, 0, NULL);
    	tid2 = (HANDLE)_beginthreadex(NULL, 0, consumer, NULL, 0, NULL);
    
    
    	/*等待两个线程结束,这是必须要有的,否则主程序线程和其它子线程是同时执行的,
    	主线程开完子线程之后,直接return 0,退出了,子线程也就退出了*/
    	//linux中也有类似的,pthread_join
    	WaitForSingleObject(tid1, INFINITE);
    	WaitForSingleObject(tid2, INFINITE);
    	
    
    	CloseHandle(tid1);
    	CloseHandle(tid2);
    	
    
    	DeleteCriticalSection(&g_cs);
    
    	system("pause");
    	return 0;
    }
    
    
    展开全文
  • 多线程下的消息队列实现

    千次阅读 2019-01-19 06:14:09
    1、定义一个队列缓存池: private static List queueCache = new LinkedList(); 2、定义队列缓冲池最大消息数,如果达到该值...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程线程等待中 new Thread(){ ...

    1、定义一个队列缓存池:

    private static List queueCache = new LinkedList();

    2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

    private Integer offerMaxQueue = 2000;

    3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

    new Thread(){
            public void run(){
              while(true){
                String ip = null;
                try {
                  synchronized (queueCache) {
                    Integer size = queueCache.size();
                    if(size==0){
    //队列缓存池没有消息,等待。。。。									queueCache.wait();
                    }
                    Queue queue = queueCache.remove(0);
     
                    if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
                      queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
                      continue;
                    }else{
                ;//这里是处理该消息的操作。
                    }
                    size = queueCache.size();
                    if(size<offerMaxQueue&&size>=0){									queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
                    }
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                }finally{
                  try {//检出该消息队列的锁
                    unIpLock(queueStr);
                  } catch (Execption e) {//捕获异常,不能让线程挂掉
                    e.printStackTrace();
                  }	
                                                }
                }
          }.start();
    

    4、检入队列

    synchronized (queueCache) {
    while(true){
    Integer size = queueCache.size();
    if(size>=offerMaxQueue){
                try {
                  queueCache.wait();
    continue;//继续执行等待中的检入任务。
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
     }//IF
     
    if(size<=offerMaxQueue&&size>0){
      queueCache.notifyAll();
    }
    break;//检入完毕
    }//while
    }
    

    5、锁方法实现

    /**
       * 锁
       * @param ip
       * @return
       * @throws 
       */
      public Boolean isLock(String queueStr) {
        return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
      }
      //解锁
      public void unIpLock(String queueStr) {
        if(ip!=null){
          this.redisManager.del(queueStr+"_lock");
    //			lock.unlock();
        }
      }
    
    展开全文
  • 多线程消费一个队列问题

    千次阅读 2019-08-02 11:16:38
    问题描述 ...但是发现这四个线程消费队列的地方又严重的延迟。特此想解决此问题。 贴代码 往队列里push数据 void KafkaConsumer::msgConsume(RdKafka::Message* message, void* opaque) { KafkaC...

    问题描述

    最近公司有个转发服务,业务逻辑是从kafka消费到大量的数据,然后放入一个队列中。之后用一个线程池,去消费这个队列。

    但是发现这四个线程消费队列的地方又严重的延迟。特此想解决此问题。

    贴代码

    • 往队列里push数据
    void KafkaConsumer::msgConsume(RdKafka::Message* message, void* opaque)
    {
    	KafkaConsumer::Data cData;
    	int errcode = message->err();
    
    	if (errcode == RdKafka::ERR__TIMED_OUT)
    	{
    		return;
    	}
    	else if (errcode == RdKafka::ERR_NO_ERROR)  //消费数据,放入队列
    	{
    		Data *pData=new Data;
    		pData->buffer.writeBlock(static_cast<const char*>(message->payload()),static_cast<int>(message->len())); // payload 装载,载荷;这里就是里面的内容
    		//pData->topic = message->topic()->name();  
    		pData->topic = message->topic_name();   // 注意这里
    		pData->ipartition = message->partition();
    
    		_cMutex.lock();
    		_cDataQue.push(pData); // 放入队列
    		_cMutex.unlock();
    	}
    	else if (RdKafka::ERR__PARTITION_EOF)
    	{
    		if (_exit_eof) _run = false;
    	}
    	else
    	{
    		LOG(INFO) << "kafkaConsumer--error: Consumer failed:" << message->errstr();
    	}
    }
    • 取队列数据,处理篇
    void KafkaConsumer::run(void* param)
    {
    	int tag;
    	memcpy(&tag,&param,sizeof(int));
    	while (1)
    	{
    		if (tag == CDATA)
    		{
    			if(_cDataQue.size() == 0) {
    				usleep(2000);
    				continue;
    			}
    			_cMutex.lock();
    			while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
    			{
    				Data *pData = _cDataQue.pop(); // 队列中取出
    				HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
    				SAFE_DELETE(pData);
    			}
    			_cMutex.unlock();
    		} else {
    			break;
    		}
    	}
    }

    代码错误分析

        _cMutex.lock();
                while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
                {
                    Data *pData = _cDataQue.pop(); // 队列中取出
                    HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
                    SAFE_DELETE(pData);
                }
                _cMutex.unlock();

     线程在数据队列_cDataQue中的数据时,先上锁,然后不断的循环取出队列中的数据并处理。(取出数据 和处理数据在一起)

    处理完每条数据之后delete.

    当锁定时的整个队列中的数据处理完毕之后,解锁。

    定义几个变量:

    N : 锁时队列的长度

    T1: pop 一条数据的时间

    T2:HandleMsg 函数执行的时间

    T3:push 一条数据的时间

    此活动中的动作:

    1. kafka消费到数据,锁队列,写队列,解锁队列。

    2.数据解析线程,锁队列,读数据,解锁队列,处理数据。

    此时的处理方式,几乎没有发挥多线程的优势,每次都是把锁时的队列的全部内容处理完。其他三个线程和生产数据的线程干等

    t = N * (T1+T2) 的时间。 若此时是程序刚启动。kafka瞬间消费到很多数据成万条的数据。 那么t 将是一个很大的时间。且kafka消费到的数据还不能及时的存放如队列中。于是就造成了延迟。

    隐患就是:

    1.根本没发挥多线程的优势和能力

    2.若数据量大,取数据和处理数据放一起,导致锁态占用的时间很长,影响其他线程(往queue里放数据的线程)干活

    3.其他线程竞争不到,干等,浪费CPU时间。一个线程死干活,处理不完,数据堆积。延迟。

    改进方法

    1. 将取数据的地方放在锁的里面,处理数据的地方放在锁的外面。

    2.每次取固定数量的nCount 个数据,放在一个容器里。然后出锁后慢慢处理。

    同时,每次取固定数量的来处理,锁占用的时间是固定的,t = nCount * T1 .也就是说,其他3个处理线程和1个往queue里塞数据的线程。最多只等 3 * t 的时间就能拿到 queue的控制权,并对其进行操作。

    而数据处理的时间 T2 与queue的操作(加锁,读取,塞入)没有关系。

    不过要控制nCount的值,太小。锁的次数很频繁; 太大,t 的时间会变大。

    这样多线程就用其来了。队列应用也灵活了。处理能力大大提升。

    void KafkaConsumer::run(void* param)
    {
    	int tag;
    	memcpy(&tag,&param,sizeof(int));
    	while (1){
    		if(_cDataQue.size() == 0) {
    			usleep(2000);
    			continue;
    		}
    		std::vector<Data*> vDatas;
    		_cMutex.lock();
    		while(_cDataQue.size()>0) {//上锁的时间尽量短,为其他线程争取到和写入线程腾出时间
    			Data *pData = _cDataQue.pop(); // 队列中取出
    			vDatas.push_back(pData);
    			if(vDatas.size() > 10){ //这里能限制这个长度 ,最多弄10条。处理快,节省时间。
    				break;
    			}
    		}
    		_cMutex.unlock();
    		// 将处理移除在锁之外,慢慢处理这些数据,处理完释放
    		for(vector<Data*>::iterator iter = vDatas.begin(); iter != vDatas.end(); ++iter){
    			Data *pData = *iter;
    			HandleMsg(pData);
    			SAFE_DELETE(pData);
    		}	
    	}
    }

     

    用生活实例来解释描述:

    1.角色 : 大厨 (生产者) , 取餐台/口(queue),包子(数据),顾客(消费处理线程)

    2.动作:生产数据(push进queue),取出数据(pop出queue),占住取餐台(Lock),放开取餐台(UNLock),吃包子(HandleMsg)

     

    方案一

    大厨们生产包子,锁住取餐口,放下包子。然后顾客1 占住取餐口,假如这里有10个包子,他就取一个吃了,再去一个吃了,直到10个取完吃完才离开取餐口。此时,大厨没法往里放包子,其他三个顾客都干等着。

    方案二

    大厨们生产包子,占住取餐口,放下包子。顾客1,占住取餐口,取了10个包子,去一边吃去。顾客2 ,马上来也取10个,然后一遍吃去。同理顾客3,4 也一样。当然这里只是理想情况,顾客1去完之后,也可能大厨又占住取餐口,放了1w个包子。

    关键是,每次取餐口被占用的时间,之后顾客们取包子的时间。非常短。而且每个顾客取完之后就去一边吃包子。同时大家可能都在吃包子,实现了多线程处理。


    哈哈。就酱紫。


     

    展开全文
  • python 多线程队列

    千次阅读 2019-01-17 11:27:41
    各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。...


    各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。

    Queue队列

    Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。
    Queue.Queue(maxsize)  FIFO(先进先出队列)
    Queue.LifoQueue(maxsize)  LIFO(先进后出队列)
    Queue.PriorityQueue(maxsize)  为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。
    如果设置的maxsize小于1,则表示队列的长度无限长

    FIFO是常用的队列,常用的方法有:

    Queue.qsize()   返回队列大小
    Queue.empty()  判断队列是否为空
    Queue.full()   判断队列是否满了

    Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。
    在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。

    Queue.put(…[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。
    同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
    Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成。

    Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

    队列-单线程

    import threading
    import queue
    import time
    
    
    class worker(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
            self.thread_stop = False
    
        def run(self):
            while not self.thread_stop:
                print("thread%d %s: waiting for tast" % (self.ident, self.name))
                try:
                    task = q.get(block=True, timeout=2)  # 接收消息
                except queue.Empty:
                    print("Nothing to do! I will go home!")
                    self.thread_stop = True
                    break
                print("tasking: %s ,task No:%d" % (task[0], task[1]))
                print("I am working")
                time.sleep(3)
                print("work finished!")
                q.task_done()                           # 完成一个任务
                res = q.qsize()                         # 判断消息队列大小(队列中还有几个任务)
                if res > 0:
                    print("fuck! Still %d tasks to do" % (res))
    
        def stop(self):
            self.thread_stop = True
    
    
    if __name__ == "__main__":
        q = queue.Queue(3)                                    # 创建队列(大小为3)
        worker = worker(q)                                    # 将队列加入类中
        worker.start()                                        # 启动类
        q.put(["produce cup!", 1], block=True, timeout=None)  # 向队列中添加元素,产生任务消息
        q.put(["produce desk!", 2], block=True, timeout=None)
        q.put(["produce apple!", 3], block=True, timeout=None)
        q.put(["produce banana!", 4], block=True, timeout=None)
        q.put(["produce bag!", 5], block=True, timeout=None)
        print("***************leader:wait for finish!")
        q.join()                                             # 等待所有任务完成
        print("***************leader:all task finished!")
    
    
    输出:
    thread9212 Thread-1: waiting for tast
    tasking: produce cup! ,task No:1
    I am working
    work finished!
    fuck! Still 3 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce desk! ,task No:2
    I am working
    ***************leader:wait for finish!
    work finished!
    fuck! Still 3 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce apple! ,task No:3
    I am working
    work finished!
    fuck! Still 2 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce banana! ,task No:4
    I am working
    work finished!
    fuck! Still 1 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce bag! ,task No:5
    I am working
    work finished!
    thread9212 Thread-1: waiting for tast
    ***************leader:all task finished!
    Nothing to do!i will go home!
    

    队列-多线程

    import threading
    import time
    from queue import Queue
    
    img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3',
                 'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3',
                 'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3',
                 'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3',
                 'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3',
                 'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3',
                 'lipei_00123.mp3','lipei_00129.mp3',]
    
    q = Queue(10)
    
    class Music_Cols(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            global img_lists
            global q
            while True:
                try:
                    music = img_lists.pop(0)
                    q.put(music)
                except IndexError:
                    break
    
    class Music_Play(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            global q
            while True:
                if q.not_empty:
                    music = q.get()
                    print('{}正在播放{}'.format(threading.current_thread(), music))
                    time.sleep(5)
                    q.task_done()
                    print('{}播放结束'.format(music))
                else:
                    break
    
    
    if __name__ == '__main__':
        mc_thread = Music_Cols('music_cols')
        mc_thread.setDaemon(True)       # 设置为守护进程,主线程退出时,子进程也kill掉
        mc_thread.start()               # 启动进程
        for i in range(5):              # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = Music_Play('music_play')
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                        # 线程阻塞(等待所有子线程处理完成,再退出)
    
    输出:
    <Music_Play(music_play, started daemon 1068)>正在播放lipei_00006.mp3
    <Music_Play(music_play, started daemon 1072)>正在播放lipei_00007.mp3
    <Music_Play(music_play, started daemon 4920)>正在播放lipei_00012.mp3
    <Music_Play(music_play, started daemon 3880)>正在播放lipei_00014.mp3
    <Music_Play(music_play, started daemon 5400)>正在播放lipei_00021.mp3
    lipei_00014.mp3播放结束
    ... ...
    <Music_Play(music_play, started daemon 1068)>正在播放lipei_00117.mp3
    lipei_00066.mp3播放结束
    <Music_Play(music_play, started daemon 1072)>正在播放lipei_00123.mp3
    lipei_00104.mp3播放结束
    <Music_Play(music_play, started daemon 4920)>正在播放lipei_00129.mp3
    lipei_00123.mp3播放结束
    lipei_00117.mp3播放结束
    lipei_00087.mp3播放结束
    lipei_00106.mp3播放结束
    lipei_00129.mp3播放结束
    

    或者(效果与上述一样)

    import threading
    import time
    from queue import Queue
    
    
    img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3',
                 'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3',
                 'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3',
                 'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3',
                 'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3',
                 'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3',
                 'lipei_00123.mp3','lipei_00129.mp3',]
    
    q = Queue(10)
    
    
    class Music_Cols(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                try:
                    music = img_lists.pop(0)
                    q.put(music)
                except IndexError:
                    break
    
    class Music_Play(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                if q.not_empty:
                    music = q.get()
                    print('{}正在播放{}'.format(threading.current_thread(), music))
                    time.sleep(5)
                    q.task_done()
                    print('{}播放结束'.format(music))
                else:
                    break
    
    
    if __name__ == '__main__':
        mc_thread = Music_Cols('music_cols')
        mc_thread.setDaemon(True)       # 设置为守护进程,主线程退出时,子进程也kill掉
        mc_thread.start()               # 启动进程
        for i in range(5):              # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = Music_Play('music_play')
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                        # 线程阻塞(等待所有子线程处理完成,再退出)
    

    队列-多线程—图像增强实例

    
    """
    开启多线程:图像增强
    """
    import os
    import random
    import queue
    import numpy as np
    import cv2
    import time
    import threading
    
    def Affine_transformation(img_array):
        rows, cols = img_array.shape[:2]
        pointsA = np.float32([[30, 80], [180, 60], [80, 230]])  # 左偏
        pointsB = np.float32([[60, 50], [220, 70], [20, 180]])  # 右偏
        pointsC = np.float32([[70, 60], [180, 50], [50, 200]])  # 前偏
        pointsD = np.float32([[40, 50], [210, 60], [70, 180]])  # 后偏
    
        points1 = np.float32([[50, 50], [200, 50], [50, 200]])
        points2 = random.choice((pointsA, pointsB, pointsC, pointsD))
    
        matrix = cv2.getAffineTransform(points1, points2)
        Affine_transfor_img = cv2.warpAffine(img_array, matrix, (cols, rows))
        return Affine_transfor_img
    
    def random_rotate_img(img):
        rows, cols= img.shape[:2]
        angle = random.choice([25, 90, -25, -90, 180])
        Matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
        res = cv2.warpAffine(img, Matrix, (cols, rows), borderMode=cv2.BORDER_CONSTANT)
        return res
    
    def random_hsv_transform(img, hue_vari, sat_vari, val_vari):
        """
        :param img:
        :param hue_vari: 色调变化比例范围(0,360)
        :param sat_vari: 饱和度变化比例范围(0,1)
        :param val_vari: 明度变化比例范围(0,1)
        :return:
        """
        hue_delta = np.random.randint(-hue_vari, hue_vari)
        sat_mult = 1 + np.random.uniform(-sat_vari, sat_vari)
        val_mult = 1 + np.random.uniform(-val_vari, val_vari)
    
        img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.float)
        img_hsv[:, :, 0] = (img_hsv[:, :, 0] + hue_delta) % 180
        img_hsv[:, :, 1] *= sat_mult
        img_hsv[:, :, 2] *= val_mult
        img_hsv[img_hsv > 255] = 255
        return cv2.cvtColor(np.round(img_hsv).astype(np.uint8), cv2.COLOR_HSV2BGR)
    
    def random_gamma_transform(img, gamma_vari):
        """
        :param img:
        :param gamma_vari:
        :return:
        """
        log_gamma_vari = np.log(gamma_vari)
        alpha = np.random.uniform(-log_gamma_vari, log_gamma_vari)
        gamma = np.exp(alpha)
        gamma_table = [np.power(x / 255.0, gamma) * 255.0 for x in range(256)]
        gamma_table = np.round(np.array(gamma_table)).astype(np.uint8)
        return cv2.LUT(img, gamma_table)
    
    def random_flip_img(img):
        """
        0 = X axis, 1 = Y axis,  -1 = both
        :param img:
        :return:
        """
        flip_val = [0,1,-1]
        random_flip_val = random.choice(flip_val)
        res = cv2.flip(img, random_flip_val)
        return res
    
    def clamp(pv):     #防止像素溢出
        if pv > 255:
            return 255
        if pv < 0:
            return 0
        else:
            return pv
    
    def gaussian_noise(image):   # 加高斯噪声
        """
        :param image:
        :return:
        """
        h, w, c = image.shape
        for row in range(h):
            for col in range(w):
                s = np.random.normal(0, 20, 3)
                b = image[row, col, 0] # blue
                g = image[row, col, 1] # green
                r = image[row, col, 2] # red
                image[row, col, 0] = clamp(b + s[0])
                image[row, col, 1] = clamp(g + s[1])
                image[row, col, 2] = clamp(r + s[2])
        return image
    
    def get_img(input_dir):
        img_path_list = []
        for (root_path,dirname,filenames) in os.walk(input_dir):
            for filename in filenames:
                Suffix_name = ['.png', '.jpg', '.tif', '.jpeg']
                if filename.endswith(tuple(Suffix_name)):
                    img_path = root_path+"/"+filename
                    img_path_list.append(img_path)
        return  img_path_list
    
    
    class IMG_QUEUE(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                try:
                    img_path = img_path_list.pop(0)
                    q.put(img_path)
                except IndexError:
                    break
    
    class IMG_AUG(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
            self.q = q
    
        def run(self):
            while True:
                if q.not_empty:
                    img_path = q.get()
                    try:
                        print("doing...")
                        img_array = cv2.imread(img_path)
                        Affine_transfor_img = Affine_transformation(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_Affine_transfor.png', Affine_transfor_img)
    
                        res_rotate = random_rotate_img(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_rotate_img.png',res_rotate)
    
                        GAMMA_IMG = random_gamma_transform(img_array, 0.3)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_GAMMA_IMG.png',GAMMA_IMG)
    
                        res_flip = random_flip_img(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_flip_img.png',res_flip)
    
                        G_Noiseimg = gaussian_noise(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_G_Noise_img.png',G_Noiseimg)
    
                        HSV_IMG = random_hsv_transform(img_array, 2, 0.3, 0.6)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_HSV_IMG.png',HSV_IMG)
                    except:
                        print("图像格式错误!")
                        pass
                    q.task_done()
                else:
                    break
    
    
    if __name__ == '__main__':
        input_dir = './cccc'
        output_dir = './eeee'
        start_time = time.time()            # 开始计时
        img_path_list = get_img(input_dir)  # 获取图像数据
    
        q = queue.Queue(10)                 # 设置队列元素个数
        my_thread = IMG_QUEUE('IMG_QUEUE')  # 实例化
        my_thread.setDaemon(True)           # 设置为守护进程,主线程退出时,子进程也kill掉
        my_thread.start()                   # 启动进程
    
        for i in range(5):                  # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = IMG_AUG('IMG_AUG')
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                            # 线程阻塞(等待所有子线程处理完成,再退出)
        end_time = time.time()
        print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")
    

    多线程-创建图像缩略图(等比缩放图像)

    import os
    from PIL import Image
    import threading
    import time
    import queue
    
    
    def get_img(input_dir):
        img_path_list = []
        for (root_path,dirname,filenames) in os.walk(input_dir):
            for filename in filenames:
                Suffix_name = ['.png', '.jpg', '.tif', '.jpeg']
                if filename.endswith(tuple(Suffix_name)):
                    img_path = root_path+"/"+filename
                    img_path_list.append(img_path)
        return  img_path_list
    
    class IMG_QUEUE(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                try:
                    img_path = img_path_list.pop(0)
                    q.put(img_path)
                except IndexError:
                    break
    
    class IMG_RESIZE(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                if q.not_empty:
                    img_path = q.get()
                    try:
                        im = Image.open(img_path)
                        im.thumbnail((size, size))
                        print(im.format, im.size, im.mode)
                        im.save(img_path, 'JPEG')
                    except:
                        print("图像格式错误!")
                        pass
                    q.task_done()
                else:
                    break
    
    
    
    if __name__=='__main__':
        input_dir = 'D:\\20190112_20190114_all' #需要创建缩略图,图片的目录
        start_time = time.time()            # 开始计时
        img_path_list = get_img(input_dir)  # 获取图像数据
    
        size = 800
        q = queue.Queue(100)                # 设置队列元素个数
        my_thread = IMG_QUEUE('IMG_QUEUE')  # 实例化
        my_thread.setDaemon(True)           # 设置为守护进程,主线程退出时,子进程也kill掉
        my_thread.start()                   # 启动进程
    
        for i in range(5):                  # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = IMG_RESIZE(str(i))
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                            # 线程阻塞(等待所有子线程处理完成,再退出)
        end_time = time.time()              # 计时结束
        print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")
    
    展开全文
  • 先来说说队列的基本概念: #队列,消息队列,redis缓存 ...# 但是队列可以设置后进先出的 # 还可以设置优先级的先进先出 后进先出 import queue #导入队列包 # q = queue.LifoQueue() #实现后进先出...
  • Java多线程——4 阻塞队列

    千次阅读 2013-08-13 23:38:41
    阻塞队列是Java5线程新特征中的内容,Java定义了...同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止。是一种常用的并发数据结构,常用于生产者-消费者模式。 下面我们看一下阻塞队列类
  • 队列多线程使用

    千次阅读 2017-10-21 12:42:28
    1. 概述:1.1 队列简介队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,...
  • C++多线程多线程通信,队列

    千次阅读 2019-06-25 19:19:18
    文章目录1,使用2,实现3,知识点1, std::mutex2,std::condition_variable 1,使用 TQueueConcurrent<std::vector<std::string>> fifo_queue; ... ... fifo_queue.emplace_back(string_ret);...
  • c语言实现多线程下的链表队列

    千次阅读 2015-12-23 23:22:27
    项目中需要一个链表,线程A进行入队操作,线程B进行查询出队操作,同时不希望线程B在队列为空时阻塞,降低cpu负载,因此考虑用pthread_cond_wait进行实现: 主要实现功能: ...3)队列为空时线程B进入休眠
  • SynchronousQueue:一种阻塞...除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队线程元素;如果没有已排
  • 多线程并发队列实现

    千次阅读 2019-04-20 13:53:59
    如果get执行时,队列为空,线程必须阻塞等待,直到有队列有数据。如果add时,队列已经满,则add线程要等待,直到队列有空闲空间。 /** * 1.使用 wait notify 实现一个队列,队列有2个方法,add 和 ge...
  • 多线程(八)线程队列

    千次阅读 2017-03-31 09:13:50
    本文参考以下文章整理而成,希望大家多多指教!共同学习!原文地址如下:Java 并发工具包...Java线程(篇外篇):阻塞队列BlockingQueue引言  在上一篇多线程(七)线程池详解中提到了线程队列,相信大家现在已清楚明白线
  • Java多线程总结之线程安全队列Queue

    万次阅读 2017-09-19 10:21:54
    在Java多线程应用中,队列使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是...
  • 思路: 一个文件:创建一个线程和主函数,或者创建两个线程主函数调用(我用这种...队列 一个进程读(消息类型2) b.c 一个进程写(消息类型2) ---->>队列 一个进程读(消息类型1) a.c #inclu...
  • 多线程+队列的简单使用

    千次阅读 2017-08-15 21:03:16
    以前在项目中很少用到队列,其实队列对于解决并发很是重要,今天着手看了相关资料并实际简单操作,如下: 1.队列的含义:队列就是一种特殊的线性表,采用FIFO方式,而栈是LIFO方式。 2.常用的队列:LinkedList实现...
  • java多线程队列实例

    万次阅读 2016-06-02 13:54:39
    java多线程队列实例
  • c++多线程队列

    千次阅读 2019-05-06 22:48:40
    多线程中,使用队列不需要锁也能运行,取得时候先判断一下长度,长度0线Sleep(3)。 struct RevData{ int size; char data[1000]; } recv_data; #include<iostream> #include<queue> #...
  • Java多线程中的阻塞队列和并发集合

    千次阅读 2015-06-14 17:41:06
    Java为多线程专门提供了特有的线程安全的集合类,通过下面的学习,您需要掌握这些集合的特点是什么,底层实现如何、在何时使用等问题。 3.1 BlockingQueue接口 java阻塞队列应用于生产者消费者模式、消息传递...
  • Queue在python3中重命名queue,在python2到python3转换中可以自动转换队列可应用在多个生产者多个消费者的模型中,并且在多线程中可用于线程之间数据信息的安全交换通信,防止冲突。 在队列中已经实现多线程的锁...
  • 最近转到银行工作,在做最核心的财务账务部分,对我来说是一个比较新的东西,工作也已经四年有余,接触一些新的东西,也是不错,每天也累得像狗...,...使用ExecutorService 管理多线程使用Executors创建newFixedThrea
  • 多线程安全无锁消息队列

    千次阅读 2013-10-28 15:57:10
    1.该算法的入队时候需要重新分配空间,分配空间这个开销是很大的,实际上可以不用每次入队都分配工具,可以是实现个多线程安全的freeList,用来存放可用的空间结点,每次入队了,从freeList得到一个空间,加入队列。...
  • 消息队列多线程的选择

    千次阅读 2020-06-17 11:33:40
    为什么发送邮件要使用消息队列而不是多线程? 1.消息队列多线程应该怎么选择呢? 可靠性要求高时选择消息队列:消息队列多线程两者并不冲突,多线程可以作为队列的生产者和消费者。 使用外部的消息队列时,第一...
  • 《秒杀多线程第十六篇 多线程十大经典案例之一 双线程读写队列数据》 http://blog.csdn.net/morewindows/article/details/8646902 配套程序 在《秒杀多线程系列》的前十五篇中介绍多线程的相关概念,多线程同步互斥...
  • Python 多线程队列运用实例

    万次阅读 2020-02-13 11:29:01
    多线程队列运用实例 #!/usr/bin/env python #-*- coding: utf-8 -*- # import threading import queue as Queue class RunThread(threading.Thread): def __init__(self,name,q): threading.Thread.__init__(s...
  • IOS多线程队列使用

    万次阅读 2013-12-11 17:56:18
    最近搞一款塔防游戏,提到塔防,自然就想到了A星寻路。的确,它是一种高效的寻路...实在没辙了,我想到了队列线程。之前都没接触过这个东东,还好在网上找到很详细的线程介绍。当然,我只是用到了其中的一点点。分享给
  • Python多线程/多进程操作队列

    千次阅读 2017-08-12 11:36:07
    为了实现数据的快速处理,我先后尝试了把数据一口气读入内存、多线程和多进程的方式。当然,肯定是多进程双队列的方式最好,因为可以充分利用多核和cpu。 一般来说导入队列可以这样操作: import Queue myqueue = ...
  • 自己写了一个多线程的工作队列,能够实现对队列中对象的自动处理。多线程添加元素到队列中,队列根据绑定 的事件进行自动处理,可以设置WorkSequential属性来实现对队列处理的单线程(严格顺序处理)或者多线程...
  • JAVA多线程队列

    千次阅读 2018-07-07 10:14:20
    JAVA 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。 在

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 519,545
精华内容 207,818
关键字:

多线程为什么使用队列