-
2018-07-07 10:14:20原文地址为: JAVA多线程与队列
JAVA 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。
在Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
首先利用BlockingQueue封装了一个队列类。队列里存放Map对象,这个依项目需求而定,供参考。
其中根据项目的需求重写了一些方法。import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 单例的缓存map
*/
public class CachePool<Key, Value> extends AbstractMap<Key, Value>{
// 私有化缓存对象实例
private static CachePool cachePool = new CachePool();
private int maxCount = 1000;
private BlockingQueue<Entry> queue = new LinkedBlockingQueue<Entry>();
/**
* private Constructor.
* @return
*/
private CachePool() {
}
/**
* 开放一个公有方法,判断是否已经存在实例,有返回,没有新建一个在返回
* @return
*/
public static CachePool getInstance(){
return cachePool;
}
/**
* The Entry for this Map.
* @author AnCan
*
*/
private class Entry implements Map.Entry<Key, Value>{
private Key key;
private Value value;
public Entry(Key key, Value value){
this.key = key;
this.value = value;
}
@Override
public String toString() {
return key + "=" + value;
}
public Key getKey() {
return key;
}
public Value getValue() {
return value;
}
public Value setValue(Value value) {
return this.value = value;
}
}
/**
* Constructor.
* @param size the size of the pooled map;
*/
public CachePool(int size) {
maxCount = size;
}
@Override
public Value put(Key key, Value value) {
while(queue.size() >= maxCount){
queue.remove();
}
queue.add(new Entry(key, value));
return value;
}
@Override
public Value get(Object key){
for(Iterator<Entry> iter = queue.iterator();iter.hasNext();){
Entry type = iter.next();
if(type.key.equals(key)){
queue.remove(type);
queue.add(type);
return type.value;
}
}
return null;
}
@Override
public Set<Map.Entry<Key, Value>> entrySet() {
Set<Map.Entry<Key, Value>> set = new HashSet<Map.Entry<Key, Value>>();
set.addAll(queue);
return set;
}
@Override
public void clear() {
queue.clear();
}
@Override
public Set<Key> keySet() {
Set<Key> set = new HashSet<Key>();
for(Entry e : queue){
set.add(e.getKey());
}
return set;
}
@Override
public Value remove(Object obj) {
for(Entry e : queue){
if(e.getKey().equals(obj)){
queue.remove(e);
return e.getValue();
}
}
return null;
}
@Override
public int size() {
return queue.size();
}
}先看下消费者类,使用多线程来处理队列中的内容:
生产者,根据业务需求将接收到的数据放到队列里:import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 操作业务类,通过参数中的方法参数进行具体的操作
*/
public class TicketTradeOper extends HttpServlet
{
/**
* 缓存对象 map
*/
public static CachePool<String, Object> mapPool = CachePool.getInstance();
private static final int NTHREADS=5;
// 使用线程池来避免 为每个请求创建一个线程。
private static final Executor threadPool=Executors.newFixedThreadPool(NTHREADS);
//业务操作
IETicketTradeOper ticketTradeOper;
@Override
public void init() throws ServletException
{
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
startThread();
}
}, new Date(), 5000);//间隔5秒执行一次定时器任务
super.init();
}
public void startThread(){
threadPool.execute(new Runnable(){
public void run() {
executeCodeOper();
}
});
}
public void executeCodeOper()
{
String key = "";
Map param = null;
synchronized (mapPool)
{
System.out.println(Thread.currentThread().getName() + "进来了。。。。");
System.out.println("现在队列中共有----"+mapPool.size()+"---条数据");
Iterator it = mapPool.keySet().iterator();
//缓存不为空时,取出一个值
while (it.hasNext())
{
key = (String) it.next();
param = (Map) mapPool.get(key);
}
if (null != param)
{
//为防止重复,将其移除
mapPool.remove(key);
}
}
if (null != param)
{
boolean result =ticketTradeOperator(param);
System.out.println("此条数据处理========"+result);
if(!result){
//若处理失败,重新放回队列
mapPool.put(key, param);
};
}
}
public boolean ticketTradeOperator(Map<String, String> params)
{
//具体的处理工作
return resultCode;
}
public IETicketTradeOper getTicketTradeOper()
{
return ticketTradeOper;
}
public void setTicketTradeOper(IETicketTradeOper ticketTradeOper)
{
this.ticketTradeOper = ticketTradeOper;
}
}TicketTradeOper.mapPool.put(newParams.get("order_id"), newParams);
以上便是整个队列生产消费的过程,有问题的欢迎交流。
关于队列类Queue的介绍。下篇博客进行。。
转载请注明本文地址: JAVA多线程与队列
更多相关内容 -
Python多线程和队列操作实例
2020-09-21 22:52:59主要介绍了Python多线程和队列操作实例,本文直接给给实例代码,需要的朋友可以参考下 -
java多线程消息队列的实现代码
2020-08-29 22:04:25本篇文章主要介绍了java多线程消息队列的实现代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧 -
易语言-易语言简单的多线程消息队列
2021-06-25 17:08:21易语言简单的多线程消息队列 -
简单的多线程消息队列-易语言
2021-06-12 19:11:08简单的多线程消息队列 -
C#队列Queue多线程用法实例
2020-09-03 17:40:08主要介绍了C#队列Queue多线程用法,实例分析了队列的相关使用技巧,需要的朋友可以参考下 -
多线程与循环队列
2016-12-30 21:09:41多线程中使用循环队列 -
易语言简单的多线程消息队列
2022-05-07 17:22:43易语言简单的多线程消息队列。@Patek。 -
C#多线程处理多个队列数据的方法
2020-09-03 14:01:46主要介绍了C#多线程处理多个队列数据的方法,涉及C#线程与队列的相关操作技巧,需要的朋友可以参考下 -
Python多线程通信queue队列用法实例分析
2020-12-20 12:59:35本文实例讲述了Python多线程通信queue队列用法。分享给大家供大家参考,具体如下: queue: 什么是队列:是一种特殊的结构,类似于列表。不过就像排队一样,队列中的元素一旦取出,那么就会从队列中删除。 线程之间... -
C++基于消息队列的多线程实现示例代码
2020-12-31 01:47:06实现消息队列的关键因素是考量不同线程访问消息队列的同步问题。本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类。定义如下: template <class> class lock_guard; lock_... -
java多线程模拟队列实现排队叫号
2022-04-15 15:56:43java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号 java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号 -
易语言多线程任务队列模型
2022-05-09 09:40:26易语言多线程任务队列模型。@小石子。Tags:多线程。 -
c++11 多线程编程——如何实现线程安全队列
2020-12-16 23:28:12线程安全队列的接口文件如下: #include template class threadsafe_queue { public: threadsafe_queue(); threadsafe_queue(const threadsafe_queue&); threadsafe_queue& operator=(const threadsafe_queue&... -
Python实现简单多线程任务队列
2020-09-21 17:51:35本文给大家介绍的是使用很简单的代码实现的多线程任务队列,给大家一个思路,希望对大家学习python能够有所帮助 -
python 多线程与队列
2019-01-17 11:27:41各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。...
各位好,之前写了 多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。Queue队列
Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。
Queue.Queue(maxsize) FIFO(先进先出队列)
Queue.LifoQueue(maxsize) LIFO(先进后出队列)
Queue.PriorityQueue(maxsize) 为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。
如果设置的maxsize小于1,则表示队列的长度无限长-
FIFO是常用的队列,常用的方法有:
-
Queue.qsize() 返回队列大小
Queue.empty() 判断队列是否为空
Queue.full() 判断队列是否满了 -
Queue.get([block[,timeout]]) 从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。
在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。 -
Queue.put(…[,block[,timeout]]) 向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。
同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
Queue.task_done() 从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成。 -
Queue.join() 监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。
队列-单线程
import threading import queue import time class worker(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.thread_stop = False def run(self): while not self.thread_stop: print("thread%d %s: waiting for tast" % (self.ident, self.name)) try: task = q.get(block=True, timeout=2) # 接收消息 except queue.Empty: print("Nothing to do! I will go home!") self.thread_stop = True break print("tasking: %s ,task No:%d" % (task[0], task[1])) print("I am working") time.sleep(3) print("work finished!") q.task_done() # 完成一个任务 res = q.qsize() # 判断消息队列大小(队列中还有几个任务) if res > 0: print("fuck! Still %d tasks to do" % (res)) def stop(self): self.thread_stop = True if __name__ == "__main__": q = queue.Queue(3) # 创建队列(大小为3) worker = worker(q) # 将队列加入类中 worker.start() # 启动类 q.put(["produce cup!", 1], block=True, timeout=None) # 向队列中添加元素,产生任务消息 q.put(["produce desk!", 2], block=True, timeout=None) q.put(["produce apple!", 3], block=True, timeout=None) q.put(["produce banana!", 4], block=True, timeout=None) q.put(["produce bag!", 5], block=True, timeout=None) print("***************leader:wait for finish!") q.join() # 等待所有任务完成 print("***************leader:all task finished!") 输出: thread9212 Thread-1: waiting for tast tasking: produce cup! ,task No:1 I am working work finished! fuck! Still 3 tasks to do thread9212 Thread-1: waiting for tast tasking: produce desk! ,task No:2 I am working ***************leader:wait for finish! work finished! fuck! Still 3 tasks to do thread9212 Thread-1: waiting for tast tasking: produce apple! ,task No:3 I am working work finished! fuck! Still 2 tasks to do thread9212 Thread-1: waiting for tast tasking: produce banana! ,task No:4 I am working work finished! fuck! Still 1 tasks to do thread9212 Thread-1: waiting for tast tasking: produce bag! ,task No:5 I am working work finished! thread9212 Thread-1: waiting for tast ***************leader:all task finished! Nothing to do!i will go home!
队列-多线程
import threading import time from queue import Queue img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3', 'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3', 'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3', 'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3', 'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3', 'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3', 'lipei_00123.mp3','lipei_00129.mp3',] q = Queue(10) class Music_Cols(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): global img_lists global q while True: try: music = img_lists.pop(0) q.put(music) except IndexError: break class Music_Play(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): global q while True: if q.not_empty: music = q.get() print('{}正在播放{}'.format(threading.current_thread(), music)) time.sleep(5) q.task_done() print('{}播放结束'.format(music)) else: break if __name__ == '__main__': mc_thread = Music_Cols('music_cols') mc_thread.setDaemon(True) # 设置为守护进程,主线程退出时,子进程也kill掉 mc_thread.start() # 启动进程 for i in range(5): # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载) mp_thread = Music_Play('music_play') mp_thread.setDaemon(True) mp_thread.start() q.join() # 线程阻塞(等待所有子线程处理完成,再退出) 输出: <Music_Play(music_play, started daemon 1068)>正在播放lipei_00006.mp3 <Music_Play(music_play, started daemon 1072)>正在播放lipei_00007.mp3 <Music_Play(music_play, started daemon 4920)>正在播放lipei_00012.mp3 <Music_Play(music_play, started daemon 3880)>正在播放lipei_00014.mp3 <Music_Play(music_play, started daemon 5400)>正在播放lipei_00021.mp3 lipei_00014.mp3播放结束 ... ... <Music_Play(music_play, started daemon 1068)>正在播放lipei_00117.mp3 lipei_00066.mp3播放结束 <Music_Play(music_play, started daemon 1072)>正在播放lipei_00123.mp3 lipei_00104.mp3播放结束 <Music_Play(music_play, started daemon 4920)>正在播放lipei_00129.mp3 lipei_00123.mp3播放结束 lipei_00117.mp3播放结束 lipei_00087.mp3播放结束 lipei_00106.mp3播放结束 lipei_00129.mp3播放结束
或者(效果与上述一样)
import threading import time from queue import Queue img_lists = ['lipei_00006.mp3','lipei_00007.mp3','lipei_00012.mp3','lipei_00014.mp3', 'lipei_00021.mp3','lipei_00027.mp3','lipei_00028.mp3','lipei_00035.mp3', 'lipei_00039.mp3','lipei_00044.mp3','lipei_00047.mp3','lipei_00049.mp3', 'lipei_00057.mp3','lipei_00058.mp3','lipei_00059.mp3','lipei_00061.mp3', 'lipei_00066.mp3','lipei_00068.mp3','lipei_00070.mp3','lipei_00081.mp3', 'lipei_00087.mp3','lipei_00104.mp3','lipei_00106.mp3','lipei_00117.mp3', 'lipei_00123.mp3','lipei_00129.mp3',] q = Queue(10) class Music_Cols(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): while True: try: music = img_lists.pop(0) q.put(music) except IndexError: break class Music_Play(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): while True: if q.not_empty: music = q.get() print('{}正在播放{}'.format(threading.current_thread(), music)) time.sleep(5) q.task_done() print('{}播放结束'.format(music)) else: break if __name__ == '__main__': mc_thread = Music_Cols('music_cols') mc_thread.setDaemon(True) # 设置为守护进程,主线程退出时,子进程也kill掉 mc_thread.start() # 启动进程 for i in range(5): # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载) mp_thread = Music_Play('music_play') mp_thread.setDaemon(True) mp_thread.start() q.join() # 线程阻塞(等待所有子线程处理完成,再退出)
队列-多线程—图像增强实例
""" 开启多线程:图像增强 """ import os import random import queue import numpy as np import cv2 import time import threading def Affine_transformation(img_array): rows, cols = img_array.shape[:2] pointsA = np.float32([[30, 80], [180, 60], [80, 230]]) # 左偏 pointsB = np.float32([[60, 50], [220, 70], [20, 180]]) # 右偏 pointsC = np.float32([[70, 60], [180, 50], [50, 200]]) # 前偏 pointsD = np.float32([[40, 50], [210, 60], [70, 180]]) # 后偏 points1 = np.float32([[50, 50], [200, 50], [50, 200]]) points2 = random.choice((pointsA, pointsB, pointsC, pointsD)) matrix = cv2.getAffineTransform(points1, points2) Affine_transfor_img = cv2.warpAffine(img_array, matrix, (cols, rows)) return Affine_transfor_img def random_rotate_img(img): rows, cols= img.shape[:2] angle = random.choice([25, 90, -25, -90, 180]) Matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1) res = cv2.warpAffine(img, Matrix, (cols, rows), borderMode=cv2.BORDER_CONSTANT) return res def random_hsv_transform(img, hue_vari, sat_vari, val_vari): """ :param img: :param hue_vari: 色调变化比例范围(0,360) :param sat_vari: 饱和度变化比例范围(0,1) :param val_vari: 明度变化比例范围(0,1) :return: """ hue_delta = np.random.randint(-hue_vari, hue_vari) sat_mult = 1 + np.random.uniform(-sat_vari, sat_vari) val_mult = 1 + np.random.uniform(-val_vari, val_vari) img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.float) img_hsv[:, :, 0] = (img_hsv[:, :, 0] + hue_delta) % 180 img_hsv[:, :, 1] *= sat_mult img_hsv[:, :, 2] *= val_mult img_hsv[img_hsv > 255] = 255 return cv2.cvtColor(np.round(img_hsv).astype(np.uint8), cv2.COLOR_HSV2BGR) def random_gamma_transform(img, gamma_vari): """ :param img: :param gamma_vari: :return: """ log_gamma_vari = np.log(gamma_vari) alpha = np.random.uniform(-log_gamma_vari, log_gamma_vari) gamma = np.exp(alpha) gamma_table = [np.power(x / 255.0, gamma) * 255.0 for x in range(256)] gamma_table = np.round(np.array(gamma_table)).astype(np.uint8) return cv2.LUT(img, gamma_table) def random_flip_img(img): """ 0 = X axis, 1 = Y axis, -1 = both :param img: :return: """ flip_val = [0,1,-1] random_flip_val = random.choice(flip_val) res = cv2.flip(img, random_flip_val) return res def clamp(pv): #防止像素溢出 if pv > 255: return 255 if pv < 0: return 0 else: return pv def gaussian_noise(image): # 加高斯噪声 """ :param image: :return: """ h, w, c = image.shape for row in range(h): for col in range(w): s = np.random.normal(0, 20, 3) b = image[row, col, 0] # blue g = image[row, col, 1] # green r = image[row, col, 2] # red image[row, col, 0] = clamp(b + s[0]) image[row, col, 1] = clamp(g + s[1]) image[row, col, 2] = clamp(r + s[2]) return image def get_img(input_dir): img_path_list = [] for (root_path,dirname,filenames) in os.walk(input_dir): for filename in filenames: Suffix_name = ['.png', '.jpg', '.tif', '.jpeg'] if filename.endswith(tuple(Suffix_name)): img_path = root_path+"/"+filename img_path_list.append(img_path) return img_path_list class IMG_QUEUE(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): while True: try: img_path = img_path_list.pop(0) q.put(img_path) except IndexError: break class IMG_AUG(threading.Thread): def __init__(self, name): super().__init__(name=name) self.q = q def run(self): while True: if q.not_empty: img_path = q.get() try: print("doing...") img_array = cv2.imread(img_path) Affine_transfor_img = Affine_transformation(img_array) cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_Affine_transfor.png', Affine_transfor_img) res_rotate = random_rotate_img(img_array) cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_rotate_img.png',res_rotate) GAMMA_IMG = random_gamma_transform(img_array, 0.3) cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_GAMMA_IMG.png',GAMMA_IMG) res_flip = random_flip_img(img_array) cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_flip_img.png',res_flip) G_Noiseimg = gaussian_noise(img_array) cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_G_Noise_img.png',G_Noiseimg) HSV_IMG = random_hsv_transform(img_array, 2, 0.3, 0.6) cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + '_HSV_IMG.png',HSV_IMG) except: print("图像格式错误!") pass q.task_done() else: break if __name__ == '__main__': input_dir = './cccc' output_dir = './eeee' start_time = time.time() # 开始计时 img_path_list = get_img(input_dir) # 获取图像数据 q = queue.Queue(10) # 设置队列元素个数 my_thread = IMG_QUEUE('IMG_QUEUE') # 实例化 my_thread.setDaemon(True) # 设置为守护进程,主线程退出时,子进程也kill掉 my_thread.start() # 启动进程 for i in range(5): # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载) mp_thread = IMG_AUG('IMG_AUG') mp_thread.setDaemon(True) mp_thread.start() q.join() # 线程阻塞(等待所有子线程处理完成,再退出) end_time = time.time() print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")
多线程-创建图像缩略图(等比缩放图像)
import os from PIL import Image import threading import time import queue def get_img(input_dir): img_path_list = [] for (root_path,dirname,filenames) in os.walk(input_dir): for filename in filenames: Suffix_name = ['.png', '.jpg', '.tif', '.jpeg'] if filename.endswith(tuple(Suffix_name)): img_path = root_path+"/"+filename img_path_list.append(img_path) return img_path_list class IMG_QUEUE(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): while True: try: img_path = img_path_list.pop(0) q.put(img_path) except IndexError: break class IMG_RESIZE(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): while True: if q.not_empty: img_path = q.get() try: im = Image.open(img_path) im.thumbnail((size, size)) print(im.format, im.size, im.mode) im.save(img_path, 'JPEG') except: print("图像格式错误!") pass q.task_done() else: break if __name__=='__main__': input_dir = 'D:\\20190112_20190114_all' #需要创建缩略图,图片的目录 start_time = time.time() # 开始计时 img_path_list = get_img(input_dir) # 获取图像数据 size = 800 q = queue.Queue(100) # 设置队列元素个数 my_thread = IMG_QUEUE('IMG_QUEUE') # 实例化 my_thread.setDaemon(True) # 设置为守护进程,主线程退出时,子进程也kill掉 my_thread.start() # 启动进程 for i in range(5): # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载) mp_thread = IMG_RESIZE(str(i)) mp_thread.setDaemon(True) mp_thread.start() q.join() # 线程阻塞(等待所有子线程处理完成,再退出) end_time = time.time() # 计时结束 print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")
-
-
python队列基本操作与多线程队列
2022-02-03 11:10:49文章目录队列基本操作多线程队列 队列基本操作 from queue import Queue q = Queue(5) # 创建一个容量为5的队列。如果给一个小于0的数,则队列为无限大小。(这是官方的解释,实际不是无限大小,而是跟内存有关) ...队列基本操作
from queue import Queue q = Queue(5) # 创建一个容量为5的队列。如果给一个小于0的数,则队列为无限大小。(这是官方的解释,实际不是无限大小,而是跟内存有关) # 存储数据 q.put(123) # 数值 q.put('hello world!') # 字符串 q.put(['hello', 'world']) # 列表 q.put(('hello', 'world')) # 元组 q.put({'hello': 'world'}) # 字典 # 如果再试图存储第六个,则会发生阻塞,因为容量已设定为5 # q.put({'hello': 'python'})
取出队列中的值
print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get())
如图五个值被依次取出。Queue队列遵循的是先进先出。- q.put_nowait()
q.put_nowait()方法可以无阻碍地向队列中添加内容,如果队列已满则会立即报错,不会等待(即不会发生阻塞)。 - q.get_nowait()
q.get_nowait()方法可以无阻碍地从队列中取出内容,如果队列是空的则也会直接报错,不会等待。
具体使用不再示例。
查看队列当前大小
- q.qsize()
print(q.qsize()) print(q.get()) print(q.qsize()) print(q.get()) print(q.qsize()) print(q.get()) print(q.qsize()) print(q.get()) print(q.qsize()) print(q.get()) print(q.qsize())
如图,每取出一个值,队列大小就减一。同样每存入一个值队列大小就会加一。- q.full()
判断队列是否是满的。 - q.empty()
判断队列是否是空的。
print(q.full()) print(q.get()) print(q.get()) print(q.full()) print(q.empty()) print(q.get()) print(q.get()) print(q.get()) print(q.empty())
多线程队列
from queue import Queue import threading import time # 存储值,每隔一秒存储一个 def set_value(q): num = 0 while True: q.put(num) num += 1 time.sleep(1) # 取值,不间断地取 def get_value(q): while True: print(q.get()) if __name__ == '__main__': q = Queue(4) t1 = threading.Thread(target=set_value, args=(q, )) t2 = threading.Thread(target=get_value, args=(q, )) t1.start() t2.start()
程序开始运行,一边存储,一边取值:
此思想应用在爬虫上,即一边访问并获取数据,一边下载数据。 - q.put_nowait()
-
java多线程加队列上传文件_后台处理
2016-05-12 09:55:37java多线程加队列上传文件_后台处理 -
多线程任务队列模型-易语言
2021-06-12 19:11:00多线程任务队列模型 -
java多线程消息队列的实现
2021-02-12 22:04:38private static List queueCache = new LinkedList();2、定义队列缓冲池最大消息数,如果达到该值,...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中new Thread(){public void run(){while(...private static List queueCache = new LinkedList();
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。
private Integer offerMaxQueue = 2000;
3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中
new Thread(){
public void run(){
while(true){
String ip = null;
try {
synchronized (queueCache) {
Integer size = queueCache.size();
if(size==0){
//队列缓存池没有消息,等待。。。。queueCache.wait();
}
Queue queue = queueCache.remove(0);
if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
continue;
}else{
;//这里是处理该消息的操作。
}
size = queueCache.size();
if(size=0){queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
try {//检出该消息队列的锁
unIpLock(queueStr);
} catch (Execption e) {//捕获异常,不能让线程挂掉
e.printStackTrace();
}
}
}
}.start();
4、检入队列
synchronized (queueCache) {
while(true){
Integer size = queueCache.size();
if(size>=offerMaxQueue){
try {
queueCache.wait();
continue;//继续执行等待中的检入任务。
} catch (InterruptedException e) {
e.printStackTrace();
}
}//IF
if(size<=offerMaxQueue&&size>0){
queueCache.notifyAll();
}
break;//检入完毕
}//while
}
5、锁方法实现
/**
* 锁
* @param ip
* @return
* @throws
*/
public Boolean isLock(String queueStr) {
return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
}
//解锁
public void unIpLock(String queueStr) {
if(ip!=null){
this.redisManager.del(queueStr+"_lock");
//lock.unlock();
}
}
分享到:
2014-10-12 23:38
浏览 4127
评论
-
IOS开发-多线程队列测试代码
2021-01-04 11:47:37// 单线程和多线程并发队列测试(同步和异步) // // 基础原理: // 1)队列: // 1.1 dispatch_queue_t 是一个队列,一个FIFO的消费队列 // 1.2 消费者:队列配置的运行线程 // 1.3 被消费对象: 添加到队列中的运行任务... -
多线程并发队列实现
2019-04-20 13:53:59如果get执行时,队列为空,线程必须阻塞等待,直到有队列有数据。如果add时,队列已经满,则add线程要等待,直到队列有空闲空间。 /** * 1.使用 wait notify 实现一个队列,队列有2个方法,add 和 ge... -
SemQueue_多线程_缓存队列_源码
2021-10-01 00:09:01适用于windows平台和linux平台的缓存队列实现,支持多线程。 -
利用Tensorflow的队列多线程读取数据方式
2020-09-18 00:47:25今天小编就为大家分享一篇利用Tensorflow的队列多线程读取数据方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 -
Java多线程-队列
2021-09-11 14:11:24Java多线程-队列 介绍 在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。 使用阻塞算法的队列可以用一个锁(入队和出队用同一把...