精华内容
下载资源
问答
  • 2018-07-07 10:14:20
    原文地址为: JAVA多线程与队列

             JAVA 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。    

             在Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

             首先利用BlockingQueue封装了一个队列类。队列里存放Map对象,这个依项目需求而定,供参考。

             

    import java.util.AbstractMap;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    /**
    * 单例的缓存map
    */

    public class CachePool<Key, Value> extends AbstractMap<Key, Value>{

    // 私有化缓存对象实例
    private static CachePool cachePool = new CachePool();
    private int maxCount = 1000;
    private BlockingQueue<Entry> queue = new LinkedBlockingQueue<Entry>();
    /**
    * private Constructor.
    * @return
    */
    private CachePool() {
    }
    /**
    * 开放一个公有方法,判断是否已经存在实例,有返回,没有新建一个在返回
    * @return
    */
    public static CachePool getInstance(){
    return cachePool;
    }

    /**
    * The Entry for this Map.
    * @author AnCan
    *
    */
    private class Entry implements Map.Entry<Key, Value>{
    private Key key;
    private Value value;

    public Entry(Key key, Value value){
    this.key = key;
    this.value = value;
    }

    @Override
    public String toString() {
    return key + "=" + value;
    }

    public Key getKey() {
    return key;
    }

    public Value getValue() {
    return value;
    }

    public Value setValue(Value value) {
    return this.value = value;
    }
    }



    /**
    * Constructor.
    * @param size the size of the pooled map;
    */
    public CachePool(int size) {
    maxCount = size;
    }

    @Override
    public Value put(Key key, Value value) {
    while(queue.size() >= maxCount){
    queue.remove();
    }
    queue.add(new Entry(key, value));
    return value;
    }

    @Override
    public Value get(Object key){
    for(Iterator<Entry> iter = queue.iterator();iter.hasNext();){
    Entry type = iter.next();
    if(type.key.equals(key)){
    queue.remove(type);
    queue.add(type);
    return type.value;
    }
    }
    return null;
    }

    @Override
    public Set<Map.Entry<Key, Value>> entrySet() {
    Set<Map.Entry<Key, Value>> set = new HashSet<Map.Entry<Key, Value>>();
    set.addAll(queue);
    return set;
    }

    @Override
    public void clear() {
    queue.clear();
    }

    @Override
    public Set<Key> keySet() {
    Set<Key> set = new HashSet<Key>();
    for(Entry e : queue){
    set.add(e.getKey());
    }
    return set;
    }

    @Override
    public Value remove(Object obj) {
    for(Entry e : queue){
    if(e.getKey().equals(obj)){
    queue.remove(e);
    return e.getValue();
    }
    }
    return null;
    }

    @Override
    public int size() {
    return queue.size();
    }
    }

                其中根据项目的需求重写了一些方法。

                先看下消费者类,使用多线程来处理队列中的内容:

          

    import java.util.Date;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;

    import javax.servlet.ServletException;
    import javax.servlet.http.HttpServlet;

    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;


    /**
    * 操作业务类,通过参数中的方法参数进行具体的操作
    */
    public class TicketTradeOper extends HttpServlet
    {
    /**
    * 缓存对象 map
    */
    public static CachePool<String, Object> mapPool = CachePool.getInstance();

    private static final int NTHREADS=5;
    // 使用线程池来避免 为每个请求创建一个线程。
    private static final Executor threadPool=Executors.newFixedThreadPool(NTHREADS);

    //业务操作
    IETicketTradeOper ticketTradeOper;

    @Override
    public void init() throws ServletException
    {
    Timer timer = new Timer();
    timer.schedule(new TimerTask(){
    @Override
    public void run() {
    startThread();
    }
    }, new Date(), 5000);//间隔5秒执行一次定时器任务
    super.init();
    }


    public void startThread(){
    threadPool.execute(new Runnable(){
    public void run() {
    executeCodeOper();
    }
    });
    }

    public void executeCodeOper()
    {
    String key = "";
    Map param = null;
    synchronized (mapPool)
    {
    System.out.println(Thread.currentThread().getName() + "进来了。。。。");
    System.out.println("现在队列中共有----"+mapPool.size()+"---条数据");

    Iterator it = mapPool.keySet().iterator();
    //缓存不为空时,取出一个值
    while (it.hasNext())
    {
    key = (String) it.next();
    param = (Map) mapPool.get(key);
    }
    if (null != param)
    {
    //为防止重复,将其移除
    mapPool.remove(key);
    }
    }

    if (null != param)
    {
    boolean result =ticketTradeOperator(param);
    System.out.println("此条数据处理========"+result);
    if(!result){
    //若处理失败,重新放回队列
    mapPool.put(key, param);
    };
    }
    }


    public boolean ticketTradeOperator(Map<String, String> params)
    {
    //具体的处理工作
    return resultCode;
    }

    public IETicketTradeOper getTicketTradeOper()
    {
    return ticketTradeOper;
    }
    public void setTicketTradeOper(IETicketTradeOper ticketTradeOper)
    {
    this.ticketTradeOper = ticketTradeOper;
    }

    }
                生产者,根据业务需求将接收到的数据放到队列里:

         TicketTradeOper.mapPool.put(newParams.get("order_id"), newParams);


    以上便是整个队列生产消费的过程,有问题的欢迎交流。

    关于队列类Queue的介绍。下篇博客进行。。



             

      

    转载请注明本文地址: JAVA多线程与队列
    更多相关内容
  • 主要介绍了Python多线程队列操作实例,本文直接给给实例代码,需要的朋友可以参考下
  • 本篇文章主要介绍了java多线程消息队列的实现代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • 易语言简单的多线程消息队列
  • 简单的多线程消息队列
  • 主要介绍了C#队列Queue多线程用法,实例分析了队列的相关使用技巧,需要的朋友可以参考下
  • 多线程与循环队列

    2016-12-30 21:09:41
    多线程中使用循环队列
  • 易语言简单的多线程消息队列。@Patek。
  • 主要介绍了C#多线程处理多个队列数据的方法,涉及C#线程与队列的相关操作技巧,需要的朋友可以参考下
  • 本文实例讲述了Python多线程通信queue队列用法。分享给大家供大家参考,具体如下: queue: 什么是队列:是一种特殊的结构,类似于列表。不过就像排队一样,队列中的元素一旦取出,那么就会从队列中删除。 线程之间...
  • 实现消息队列的关键因素是考量不同线程访问消息队列的同步问题。本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类。定义如下: template <class> class lock_guard; lock_...
  • java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号 java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号
  • 易语言多线程任务队列模型。@小石子。Tags:多线程
  • 线程安全队列的接口文件如下: #include template class threadsafe_queue { public: threadsafe_queue(); threadsafe_queue(const threadsafe_queue&); threadsafe_queue& operator=(const threadsafe_queue&...
  • 本文给大家介绍的是使用很简单的代码实现的多线程任务队列,给大家一个思路,希望对大家学习python能够有所帮助
  • python 多线程与队列

    千次阅读 2019-01-17 11:27:41
    各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。...


    各位好,之前写了 多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。

    Queue队列

    Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。
    Queue.Queue(maxsize)  FIFO(先进先出队列)
    Queue.LifoQueue(maxsize)  LIFO(先进后出队列)
    Queue.PriorityQueue(maxsize)  为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。
    如果设置的maxsize小于1,则表示队列的长度无限长

    FIFO是常用的队列,常用的方法有:

    Queue.qsize()   返回队列大小
    Queue.empty()  判断队列是否为空
    Queue.full()   判断队列是否满了

    Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。
    在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。

    Queue.put(…[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。
    同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
    Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成。

    Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

    队列-单线程

    import threading
    import queue
    import time
    
    
    class worker(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
            self.thread_stop = False
    
        def run(self):
            while not self.thread_stop:
                print("thread%d %s: waiting for tast" % (self.ident, self.name))
                try:
                    task = q.get(block=True, timeout=2)  # 接收消息
                except queue.Empty:
                    print("Nothing to do! I will go home!")
                    self.thread_stop = True
                    break
                print("tasking: %s ,task No:%d" % (task[0], task[1]))
                print("I am working")
                time.sleep(3)
                print("work finished!")
                q.task_done()                           # 完成一个任务
                res = q.qsize()                         # 判断消息队列大小(队列中还有几个任务)
                if res > 0:
                    print("fuck! Still %d tasks to do" % (res))
    
        def stop(self):
            self.thread_stop = True
    
    
    if __name__ == "__main__":
        q = queue.Queue(3)                                    # 创建队列(大小为3)
        worker = worker(q)                                    # 将队列加入类中
        worker.start()                                        # 启动类
        q.put(["produce cup!", 1], block=True, timeout=None)  # 向队列中添加元素,产生任务消息
        q.put(["produce desk!", 2], block=True, timeout=None)
        q.put(["produce apple!", 3], block=True, timeout=None)
        q.put(["produce banana!", 4], block=True, timeout=None)
        q.put(["produce bag!", 5], block=True, timeout=None)
        print("***************leader:wait for finish!")
        q.join()                                             # 等待所有任务完成
        print("***************leader:all task finished!")
    
    
    输出:
    thread9212 Thread-1: waiting for tast
    tasking: produce cup! ,task No:1
    I am working
    work finished!
    fuck! Still 3 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce desk! ,task No:2
    I am working
    ***************leader:wait for finish!
    work finished!
    fuck! Still 3 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce apple! ,task No:3
    I am working
    work finished!
    fuck! Still 2 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce banana! ,task No:4
    I am working
    work finished!
    fuck! Still 1 tasks to do
    thread9212 Thread-1: waiting for tast
    tasking: produce bag! ,task No:5
    I am working
    work finished!
    thread9212 Thread-1: waiting for tast
    ***************leader:all task finished!
    Nothing to do!i will go home!
    

    队列-多线程

    import threading
    import time
    from queue import Queue
    
    img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3',
                 'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3',
                 'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3',
                 'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3',
                 'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3',
                 'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3',
                 'lipei_00123.mp3','lipei_00129.mp3',]
    
    q = Queue(10)
    
    class Music_Cols(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            global img_lists
            global q
            while True:
                try:
                    music = img_lists.pop(0)
                    q.put(music)
                except IndexError:
                    break
    
    class Music_Play(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            global q
            while True:
                if q.not_empty:
                    music = q.get()
                    print('{}正在播放{}'.format(threading.current_thread(), music))
                    time.sleep(5)
                    q.task_done()
                    print('{}播放结束'.format(music))
                else:
                    break
    
    
    if __name__ == '__main__':
        mc_thread = Music_Cols('music_cols')
        mc_thread.setDaemon(True)       # 设置为守护进程,主线程退出时,子进程也kill掉
        mc_thread.start()               # 启动进程
        for i in range(5):              # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = Music_Play('music_play')
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                        # 线程阻塞(等待所有子线程处理完成,再退出)
    
    输出:
    <Music_Play(music_play, started daemon 1068)>正在播放lipei_00006.mp3
    <Music_Play(music_play, started daemon 1072)>正在播放lipei_00007.mp3
    <Music_Play(music_play, started daemon 4920)>正在播放lipei_00012.mp3
    <Music_Play(music_play, started daemon 3880)>正在播放lipei_00014.mp3
    <Music_Play(music_play, started daemon 5400)>正在播放lipei_00021.mp3
    lipei_00014.mp3播放结束
    ... ...
    <Music_Play(music_play, started daemon 1068)>正在播放lipei_00117.mp3
    lipei_00066.mp3播放结束
    <Music_Play(music_play, started daemon 1072)>正在播放lipei_00123.mp3
    lipei_00104.mp3播放结束
    <Music_Play(music_play, started daemon 4920)>正在播放lipei_00129.mp3
    lipei_00123.mp3播放结束
    lipei_00117.mp3播放结束
    lipei_00087.mp3播放结束
    lipei_00106.mp3播放结束
    lipei_00129.mp3播放结束
    

    或者(效果与上述一样)

    import threading
    import time
    from queue import Queue
    
    
    img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3',
                 'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3',
                 'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3',
                 'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3',
                 'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3',
                 'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3',
                 'lipei_00123.mp3','lipei_00129.mp3',]
    
    q = Queue(10)
    
    
    class Music_Cols(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                try:
                    music = img_lists.pop(0)
                    q.put(music)
                except IndexError:
                    break
    
    class Music_Play(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                if q.not_empty:
                    music = q.get()
                    print('{}正在播放{}'.format(threading.current_thread(), music))
                    time.sleep(5)
                    q.task_done()
                    print('{}播放结束'.format(music))
                else:
                    break
    
    
    if __name__ == '__main__':
        mc_thread = Music_Cols('music_cols')
        mc_thread.setDaemon(True)       # 设置为守护进程,主线程退出时,子进程也kill掉
        mc_thread.start()               # 启动进程
        for i in range(5):              # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = Music_Play('music_play')
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                        # 线程阻塞(等待所有子线程处理完成,再退出)
    

    队列-多线程—图像增强实例

    
    """
    开启多线程:图像增强
    """
    import os
    import random
    import queue
    import numpy as np
    import cv2
    import time
    import threading
    
    def Affine_transformation(img_array):
        rows, cols = img_array.shape[:2]
        pointsA = np.float32([[30, 80], [180, 60], [80, 230]])  # 左偏
        pointsB = np.float32([[60, 50], [220, 70], [20, 180]])  # 右偏
        pointsC = np.float32([[70, 60], [180, 50], [50, 200]])  # 前偏
        pointsD = np.float32([[40, 50], [210, 60], [70, 180]])  # 后偏
    
        points1 = np.float32([[50, 50], [200, 50], [50, 200]])
        points2 = random.choice((pointsA, pointsB, pointsC, pointsD))
    
        matrix = cv2.getAffineTransform(points1, points2)
        Affine_transfor_img = cv2.warpAffine(img_array, matrix, (cols, rows))
        return Affine_transfor_img
    
    def random_rotate_img(img):
        rows, cols= img.shape[:2]
        angle = random.choice([25, 90, -25, -90, 180])
        Matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
        res = cv2.warpAffine(img, Matrix, (cols, rows), borderMode=cv2.BORDER_CONSTANT)
        return res
    
    def random_hsv_transform(img, hue_vari, sat_vari, val_vari):
        """
        :param img:
        :param hue_vari: 色调变化比例范围(0,360)
        :param sat_vari: 饱和度变化比例范围(0,1)
        :param val_vari: 明度变化比例范围(0,1)
        :return:
        """
        hue_delta = np.random.randint(-hue_vari, hue_vari)
        sat_mult = 1 + np.random.uniform(-sat_vari, sat_vari)
        val_mult = 1 + np.random.uniform(-val_vari, val_vari)
    
        img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.float)
        img_hsv[:, :, 0] = (img_hsv[:, :, 0] + hue_delta) % 180
        img_hsv[:, :, 1] *= sat_mult
        img_hsv[:, :, 2] *= val_mult
        img_hsv[img_hsv > 255] = 255
        return cv2.cvtColor(np.round(img_hsv).astype(np.uint8), cv2.COLOR_HSV2BGR)
    
    def random_gamma_transform(img, gamma_vari):
        """
        :param img:
        :param gamma_vari:
        :return:
        """
        log_gamma_vari = np.log(gamma_vari)
        alpha = np.random.uniform(-log_gamma_vari, log_gamma_vari)
        gamma = np.exp(alpha)
        gamma_table = [np.power(x / 255.0, gamma) * 255.0 for x in range(256)]
        gamma_table = np.round(np.array(gamma_table)).astype(np.uint8)
        return cv2.LUT(img, gamma_table)
    
    def random_flip_img(img):
        """
        0 = X axis, 1 = Y axis,  -1 = both
        :param img:
        :return:
        """
        flip_val = [0,1,-1]
        random_flip_val = random.choice(flip_val)
        res = cv2.flip(img, random_flip_val)
        return res
    
    def clamp(pv):     #防止像素溢出
        if pv > 255:
            return 255
        if pv < 0:
            return 0
        else:
            return pv
    
    def gaussian_noise(image):   # 加高斯噪声
        """
        :param image:
        :return:
        """
        h, w, c = image.shape
        for row in range(h):
            for col in range(w):
                s = np.random.normal(0, 20, 3)
                b = image[row, col, 0] # blue
                g = image[row, col, 1] # green
                r = image[row, col, 2] # red
                image[row, col, 0] = clamp(b + s[0])
                image[row, col, 1] = clamp(g + s[1])
                image[row, col, 2] = clamp(r + s[2])
        return image
    
    def get_img(input_dir):
        img_path_list = []
        for (root_path,dirname,filenames) in os.walk(input_dir):
            for filename in filenames:
                Suffix_name = ['.png', '.jpg', '.tif', '.jpeg']
                if filename.endswith(tuple(Suffix_name)):
                    img_path = root_path+"/"+filename
                    img_path_list.append(img_path)
        return  img_path_list
    
    
    class IMG_QUEUE(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                try:
                    img_path = img_path_list.pop(0)
                    q.put(img_path)
                except IndexError:
                    break
    
    class IMG_AUG(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
            self.q = q
    
        def run(self):
            while True:
                if q.not_empty:
                    img_path = q.get()
                    try:
                        print("doing...")
                        img_array = cv2.imread(img_path)
                        Affine_transfor_img = Affine_transformation(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_Affine_transfor.png', Affine_transfor_img)
    
                        res_rotate = random_rotate_img(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_rotate_img.png',res_rotate)
    
                        GAMMA_IMG = random_gamma_transform(img_array, 0.3)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_GAMMA_IMG.png',GAMMA_IMG)
    
                        res_flip = random_flip_img(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_flip_img.png',res_flip)
    
                        G_Noiseimg = gaussian_noise(img_array)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_G_Noise_img.png',G_Noiseimg)
    
                        HSV_IMG = random_hsv_transform(img_array, 2, 0.3, 0.6)
                        cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_HSV_IMG.png',HSV_IMG)
                    except:
                        print("图像格式错误!")
                        pass
                    q.task_done()
                else:
                    break
    
    
    if __name__ == '__main__':
        input_dir = './cccc'
        output_dir = './eeee'
        start_time = time.time()            # 开始计时
        img_path_list = get_img(input_dir)  # 获取图像数据
    
        q = queue.Queue(10)                 # 设置队列元素个数
        my_thread = IMG_QUEUE('IMG_QUEUE')  # 实例化
        my_thread.setDaemon(True)           # 设置为守护进程,主线程退出时,子进程也kill掉
        my_thread.start()                   # 启动进程
    
        for i in range(5):                  # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = IMG_AUG('IMG_AUG')
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                            # 线程阻塞(等待所有子线程处理完成,再退出)
        end_time = time.time()
        print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")
    

    多线程-创建图像缩略图(等比缩放图像)

    import os
    from PIL import Image
    import threading
    import time
    import queue
    
    
    def get_img(input_dir):
        img_path_list = []
        for (root_path,dirname,filenames) in os.walk(input_dir):
            for filename in filenames:
                Suffix_name = ['.png', '.jpg', '.tif', '.jpeg']
                if filename.endswith(tuple(Suffix_name)):
                    img_path = root_path+"/"+filename
                    img_path_list.append(img_path)
        return  img_path_list
    
    class IMG_QUEUE(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                try:
                    img_path = img_path_list.pop(0)
                    q.put(img_path)
                except IndexError:
                    break
    
    class IMG_RESIZE(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            while True:
                if q.not_empty:
                    img_path = q.get()
                    try:
                        im = Image.open(img_path)
                        im.thumbnail((size, size))
                        print(im.format, im.size, im.mode)
                        im.save(img_path, 'JPEG')
                    except:
                        print("图像格式错误!")
                        pass
                    q.task_done()
                else:
                    break
    
    
    
    if __name__=='__main__':
        input_dir = 'D:\\20190112_20190114_all' #需要创建缩略图,图片的目录
        start_time = time.time()            # 开始计时
        img_path_list = get_img(input_dir)  # 获取图像数据
    
        size = 800
        q = queue.Queue(100)                # 设置队列元素个数
        my_thread = IMG_QUEUE('IMG_QUEUE')  # 实例化
        my_thread.setDaemon(True)           # 设置为守护进程,主线程退出时,子进程也kill掉
        my_thread.start()                   # 启动进程
    
        for i in range(5):                  # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
            mp_thread = IMG_RESIZE(str(i))
            mp_thread.setDaemon(True)
            mp_thread.start()
        q.join()                            # 线程阻塞(等待所有子线程处理完成,再退出)
        end_time = time.time()              # 计时结束
        print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")
    
    展开全文
  • python队列基本操作与多线程队列

    千次阅读 2022-02-03 11:10:49
    文章目录队列基本操作多线程队列 队列基本操作 from queue import Queue q = Queue(5) # 创建一个容量为5的队列。如果给一个小于0的数,则队列为无限大小。(这是官方的解释,实际不是无限大小,而是跟内存有关) ...

    队列基本操作

    from queue import Queue
    
    q = Queue(5)  # 创建一个容量为5的队列。如果给一个小于0的数,则队列为无限大小。(这是官方的解释,实际不是无限大小,而是跟内存有关)
    
    # 存储数据
    q.put(123)  # 数值 
    q.put('hello world!')  # 字符串
    q.put(['hello', 'world'])  # 列表
    q.put(('hello', 'world'))  # 元组
    q.put({'hello': 'world'})  # 字典
    
    # 如果再试图存储第六个,则会发生阻塞,因为容量已设定为5
    # q.put({'hello': 'python'})
    

    取出队列中的值

    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    

    在这里插入图片描述
    如图五个值被依次取出。Queue队列遵循的是先进先出。

    • q.put_nowait()
      q.put_nowait()方法可以无阻碍地向队列中添加内容,如果队列已满则会立即报错,不会等待(即不会发生阻塞)。
    • q.get_nowait()
      q.get_nowait()方法可以无阻碍地从队列中取出内容,如果队列是空的则也会直接报错,不会等待。

    具体使用不再示例。

    查看队列当前大小

    • q.qsize()
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    

    在这里插入图片描述
    如图,每取出一个值,队列大小就减一。同样每存入一个值队列大小就会加一。

    • q.full()
      判断队列是否是满的。
    • q.empty()
      判断队列是否是空的。
    print(q.full())
    print(q.get())
    print(q.get())
    print(q.full())
    print(q.empty())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())
    

    在这里插入图片描述

    多线程队列

    from queue import Queue
    import threading
    import time
    
    
    # 存储值,每隔一秒存储一个
    def set_value(q):
        num = 0
        while True:
            q.put(num)
            num += 1
            time.sleep(1)
    
    # 取值,不间断地取
    def get_value(q):
        while True:
            print(q.get())
    
    
    if __name__ == '__main__':
        q = Queue(4)
        t1 = threading.Thread(target=set_value, args=(q, ))
        t2 = threading.Thread(target=get_value, args=(q, ))
    
        t1.start()
        t2.start()
    

    程序开始运行,一边存储,一边取值:
    在这里插入图片描述
    此思想应用在爬虫上,即一边访问并获取数据,一边下载数据。

    展开全文
  • java多线程队列上传文件_后台处理
  • 多线程任务队列模型
  • private static List queueCache = new LinkedList();2、定义队列缓冲池最大消息数,如果达到该值,...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程线程等待中new Thread(){public void run(){while(...

    private static List queueCache = new LinkedList();

    2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

    private Integer offerMaxQueue = 2000;

    3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

    new Thread(){

    public void run(){

    while(true){

    String ip = null;

    try {

    synchronized (queueCache) {

    Integer size = queueCache.size();

    if(size==0){

    //队列缓存池没有消息,等待。。。。queueCache.wait();

    }

    Queue queue = queueCache.remove(0);

    if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理

    queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,

    continue;

    }else{

    ;//这里是处理该消息的操作。

    }

    size = queueCache.size();

    if(size=0){queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。

    }

    }

    } catch (Exception e) {

    e.printStackTrace();

    }finally{

    try {//检出该消息队列的锁

    unIpLock(queueStr);

    } catch (Execption e) {//捕获异常,不能让线程挂掉

    e.printStackTrace();

    }

    }

    }

    }.start();

    4、检入队列

    synchronized (queueCache) {

    while(true){

    Integer size = queueCache.size();

    if(size>=offerMaxQueue){

    try {

    queueCache.wait();

    continue;//继续执行等待中的检入任务。

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }//IF

    if(size<=offerMaxQueue&&size>0){

    queueCache.notifyAll();

    }

    break;//检入完毕

    }//while

    }

    5、锁方法实现

    /**

    * 锁

    * @param ip

    * @return

    * @throws

    */

    public Boolean isLock(String queueStr) {

    return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;

    }

    //解锁

    public void unIpLock(String queueStr) {

    if(ip!=null){

    this.redisManager.del(queueStr+"_lock");

    //lock.unlock();

    }

    }

    分享到:

    18e900b8666ce6f233d25ec02f95ee59.png

    72dd548719f0ace4d5f9bca64e1d7715.png

    2014-10-12 23:38

    浏览 4127

    评论

    展开全文
  • // 单线程和多线程并发队列测试(同步和异步) // // 基础原理: // 1)队列: // 1.1 dispatch_queue_t 是一个队列,一个FIFO的消费队列 // 1.2 消费者:队列配置的运行线程 // 1.3 被消费对象: 添加到队列中的运行任务...
  • 多线程并发队列实现

    千次阅读 2019-04-20 13:53:59
    如果get执行时,队列为空,线程必须阻塞等待,直到有队列有数据。如果add时,队列已经满,则add线程要等待,直到队列有空闲空间。 /** * 1.使用 wait notify 实现一个队列队列有2个方法,add 和 ge...
  • 适用于windows平台和linux平台的缓存队列实现,支持多线程
  • 今天小编就为大家分享一篇利用Tensorflow的队列多线程读取数据方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • Java多线程-队列

    2021-09-11 14:11:24
    Java多线程-队列 介绍 在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。 使用阻塞算法的队列可以用一个锁(入队和出队用同一把...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 595,191
精华内容 238,076
关键字:

多线程与队列的区别