精华内容
下载资源
问答
  • 2019-08-02 11:16:38

    问题描述

    最近公司有个转发服务,业务逻辑是从kafka消费到大量的数据,然后放入一个队列中。之后用一个线程池,去消费这个队列。

    但是发现这四个线程消费队列的地方又严重的延迟。特此想解决此问题。

    贴代码

    • 往队列里push数据
    void KafkaConsumer::msgConsume(RdKafka::Message* message, void* opaque)
    {
    	KafkaConsumer::Data cData;
    	int errcode = message->err();
    
    	if (errcode == RdKafka::ERR__TIMED_OUT)
    	{
    		return;
    	}
    	else if (errcode == RdKafka::ERR_NO_ERROR)  //消费数据,放入队列
    	{
    		Data *pData=new Data;
    		pData->buffer.writeBlock(static_cast<const char*>(message->payload()),static_cast<int>(message->len())); // payload 装载,载荷;这里就是里面的内容
    		//pData->topic = message->topic()->name();  
    		pData->topic = message->topic_name();   // 注意这里
    		pData->ipartition = message->partition();
    
    		_cMutex.lock();
    		_cDataQue.push(pData); // 放入队列
    		_cMutex.unlock();
    	}
    	else if (RdKafka::ERR__PARTITION_EOF)
    	{
    		if (_exit_eof) _run = false;
    	}
    	else
    	{
    		LOG(INFO) << "kafkaConsumer--error: Consumer failed:" << message->errstr();
    	}
    }
    • 取队列数据,处理篇
    void KafkaConsumer::run(void* param)
    {
    	int tag;
    	memcpy(&tag,&param,sizeof(int));
    	while (1)
    	{
    		if (tag == CDATA)
    		{
    			if(_cDataQue.size() == 0) {
    				usleep(2000);
    				continue;
    			}
    			_cMutex.lock();
    			while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
    			{
    				Data *pData = _cDataQue.pop(); // 队列中取出
    				HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
    				SAFE_DELETE(pData);
    			}
    			_cMutex.unlock();
    		} else {
    			break;
    		}
    	}
    }

    代码错误分析

        _cMutex.lock();
                while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
                {
                    Data *pData = _cDataQue.pop(); // 队列中取出
                    HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
                    SAFE_DELETE(pData);
                }
                _cMutex.unlock();

     线程在数据队列_cDataQue中的数据时,先上锁,然后不断的循环取出队列中的数据并处理。(取出数据 和处理数据在一起)

    处理完每条数据之后delete.

    当锁定时的整个队列中的数据处理完毕之后,解锁。

    定义几个变量:

    N : 锁时队列的长度

    T1: pop 一条数据的时间

    T2:HandleMsg 函数执行的时间

    T3:push 一条数据的时间

    此活动中的动作:

    1. kafka消费到数据,锁队列,写队列,解锁队列。

    2.数据解析线程,锁队列,读数据,解锁队列,处理数据。

    此时的处理方式,几乎没有发挥多线程的优势,每次都是把锁时的队列的全部内容处理完。其他三个线程和生产数据的线程干等

    t = N * (T1+T2) 的时间。 若此时是程序刚启动。kafka瞬间消费到很多数据成万条的数据。 那么t 将是一个很大的时间。且kafka消费到的数据还不能及时的存放如队列中。于是就造成了延迟。

    隐患就是:

    1.根本没发挥多线程的优势和能力

    2.若数据量大,取数据和处理数据放一起,导致锁态占用的时间很长,影响其他线程(往queue里放数据的线程)干活

    3.其他线程竞争不到,干等,浪费CPU时间。一个线程死干活,处理不完,数据堆积。延迟。

    改进方法

    1. 将取数据的地方放在锁的里面,处理数据的地方放在锁的外面。

    2.每次取固定数量的nCount 个数据,放在一个容器里。然后出锁后慢慢处理。

    同时,每次取固定数量的来处理,锁占用的时间是固定的,t = nCount * T1 .也就是说,其他3个处理线程和1个往queue里塞数据的线程。最多只等 3 * t 的时间就能拿到 queue的控制权,并对其进行操作。

    而数据处理的时间 T2 与queue的操作(加锁,读取,塞入)没有关系。

    不过要控制nCount的值,太小。锁的次数很频繁; 太大,t 的时间会变大。

    这样多线程就用其来了。队列应用也灵活了。处理能力大大提升。

    void KafkaConsumer::run(void* param)
    {
    	int tag;
    	memcpy(&tag,&param,sizeof(int));
    	while (1){
    		if(_cDataQue.size() == 0) {
    			usleep(2000);
    			continue;
    		}
    		std::vector<Data*> vDatas;
    		_cMutex.lock();
    		while(_cDataQue.size()>0) {//上锁的时间尽量短,为其他线程争取到和写入线程腾出时间
    			Data *pData = _cDataQue.pop(); // 队列中取出
    			vDatas.push_back(pData);
    			if(vDatas.size() > 10){ //这里能限制这个长度 ,最多弄10条。处理快,节省时间。
    				break;
    			}
    		}
    		_cMutex.unlock();
    		// 将处理移除在锁之外,慢慢处理这些数据,处理完释放
    		for(vector<Data*>::iterator iter = vDatas.begin(); iter != vDatas.end(); ++iter){
    			Data *pData = *iter;
    			HandleMsg(pData);
    			SAFE_DELETE(pData);
    		}	
    	}
    }

     

    用生活实例来解释描述:

    1.角色 : 大厨 (生产者) , 取餐台/口(queue),包子(数据),顾客(消费处理线程)

    2.动作:生产数据(push进queue),取出数据(pop出queue),占住取餐台(Lock),放开取餐台(UNLock),吃包子(HandleMsg)

     

    方案一

    大厨们生产包子,锁住取餐口,放下包子。然后顾客1 占住取餐口,假如这里有10个包子,他就取一个吃了,再去一个吃了,直到10个取完吃完才离开取餐口。此时,大厨没法往里放包子,其他三个顾客都干等着。

    方案二

    大厨们生产包子,占住取餐口,放下包子。顾客1,占住取餐口,取了10个包子,去一边吃去。顾客2 ,马上来也取10个,然后一遍吃去。同理顾客3,4 也一样。当然这里只是理想情况,顾客1去完之后,也可能大厨又占住取餐口,放了1w个包子。

    关键是,每次取餐口被占用的时间,之后顾客们取包子的时间。非常短。而且每个顾客取完之后就去一边吃包子。同时大家可能都在吃包子,实现了多线程处理。


    哈哈。就酱紫。


     

    更多相关内容
  • 主要介绍了C#队列Queue多线程用法,实例分析了队列的相关使用技巧,需要的朋友可以参考下
  • 多线程使用消息队列

    2014-05-14 22:24:50
    多线程使用消息队列
  • Java多线程案例之阻塞队列

    千次阅读 多人点赞 2022-04-05 17:22:31
    本篇文章将介绍Java多线程案例,阻塞队列,阻塞队列在普通队列的基础上多了两种情况,一是阻塞队列为空时,如果进行出队操作,会使当前线程阻塞,直到有新元素插入阻塞队列,该线程才被通知继续执行出队操作;...

    ⭐️前面的话⭐️

    本篇文章将介绍Java多线程案例,阻塞队列,阻塞队列在普通队列的基础上多了两种情况,一是阻塞队列为空时,如果进行出队操作,会使当前线程阻塞,直到有新元素插入阻塞队列,该线程才被通知继续执行出队操作;二是阻塞队列为满时,如果进行入队操作,会使当前线程阻塞,直到有元素出队时,该线程才会被通知继续执行入队操作。在实际开发中,常常使用消息队列,而消息队列就是阻塞队列,只是在阻塞队列的基础上增加了很多功能。

    📒博客主页:未见花闻的博客主页
    🎉欢迎关注🔎点赞👍收藏⭐️留言📝
    📌本文由未见花闻原创,CSDN首发!
    📆首发时间:🌴2022年4月8日🌴
    ✉️坚持和努力一定能换来诗与远方!
    💭参考书籍:📚《java核心技术》,📚《java编程思想》
    💬参考在线编程网站:🌐牛客网🌐力扣
    博主的码云gitee,平常博主写的程序代码都在里面。
    博主的github,平常博主写的程序代码都在里面。
    🍭作者水平很有限,如果发现错误,一定要及时告知作者哦!感谢感谢!



    封面区


    🍒1.阻塞队列概论

    🍇1.1阻塞队列的概念与作用

    阻塞队列本质上还是一种队列,遵循先进先出,后进后出的原则,在此基础上,如果出队时阻塞队列为空,则会使当前线程陷入阻塞,直到入队新元素时通知线程继续执行,如果入队时阻塞队列为满,则会使当前线程陷入阻塞,直到出队旧元素时才通知线程进行执行。

    🍇1.2标准库中阻塞队列类

    java官方也提供了阻塞队列的标准类,主要有下面几个:

    • ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。
    • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue: 一个不存储元素的阻塞队列。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
    • BlockingQueue接口: 单向阻塞队列实现了该接口。
    • BlockingDeque接口: 双向阻塞队列实现了该接口。

    阻塞队列类的核心方法:

    方法解释
    void put(E e) throws InterruptedException带有阻塞特性的入队操作方法
    E take() throws InterruptedException带有阻塞特性的出队操作方法
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException带有阻塞特性的入队操作方法,并且可以设置最长等待时间
    E poll(long timeout, TimeUnit unit) throws InterruptedException带有阻塞特性的出队操作方法,并且可以设置最长等待时间
    public boolean contains(Object o)判断阻塞队列中是否包含某个元素

    其他一些普通队列的方法也支持,但是你都使用阻塞队列了,为什么还要使用普通队列的方法呢。

    🍇1.3生产者消费者模型

    这个模型怎么说呢,嗯…不好说直接看图吧。

    生产者消费者模型

    生产者消费者是一种高内聚,低耦合的模型,这也是它的优势,特别是在服务器场景中,假设有两个服务器A(请求服务器),B(应用服务器),如果A,B直接传递消息,而不通过阻塞队列,那么当A请求突然暴涨的时候,B服务器的请求也会跟着暴涨,由于B服务器是应用服务器,处理的任务是重量级的,所以该情况B服务器大概率会挂。
    请求暴涨
    但是,如果使用生产者消费者模型,那么即使A请求暴涨,也不会影响到B,顶多A挂了,应用服务器不会受到影响,这是因为A请求暴涨后,用户的请求都被打包到阻塞队列中(如果阻塞队列有界,则会引起队列阻塞,不会影响到B),B还是以相同的速度处理这些请求,所以生产者消费者模型可以起到“削峰填谷”的作用。
    削峰填谷
    了解清楚阻塞队列和生产者消费者模型,来简单实现一下,阻塞队列我们就基于数组实现吧,那么就先的实现循环队列。

    🍒2.通过循环队列简单实现阻塞队列

    🍇2.1循环队列的简单实现

    循环队列是基于数组实现的,最重要的就是如何将队列为空状态与满状态区分开来,前面介绍数据结构的时候已经简单实现过了,现在就再简单复习一下,对队列不懂的,先好好学习队列:队列,Queue,Deque接口与LinkedList类

    区分判断空与满状态的方法如下:

    不妨设对头索引为front,队尾索引为rear,顺序表长度为len

    方式1:记录队列元素个数size,当size的值与顺序表的大小相等时,代表队列已满。size值为0表示队列为空。
    方式2:使用一个boolean类型的成员变量flag标记,初始为false,当每次入队时将flag设置为true,出队将flag设置为false,当rear == front && flag == true表示队列已满,当rear == front && flag == false表示队列为空。
    方式3:牺牲一个单位的空间,在每次入队前判断(rear+1)% len 是否与front相等,如果相等表示队列已满,如果rear == front则表示队列为空。

    比如我按照方式1创建循环队列,大小为8,如图,size=0为空队列,size=8为满队列。
    1-2
    1-3
    1-4

    方式1最简单,我们通过方式1实现循环队列,阻塞队列最核心的就是出队和入队操作,我们重点实现这两个方法。

    //循环队列
    class MyCircularQueue {
        //队列数据
        private int[] elem = new int[100];
        //队头指针
        private int head;
        //队尾指针
        private int tail;
        //队列元素个数
        private int size;
    
    
        //出队头元素
        public Integer take() {
            if (size == 0) {
                //队列为空
                return null;
            }
            int ret = elem[head];
            head++;
            //作用等价于 head %= elem.length
            if (head >= elem.length) {
                head = 0;
            }
            size--;
            return ret;
        }
    
        //入队尾元素
        public void put(int val) {
            if (size == elem.length) {
                //队列满
                return;
            }
            elem[tail++] = val;
            //作用等价于 tail %= elem.length
            if (tail >= elem.length) {
                tail = 0;
            }
            size++;
        }
    }
    

    🍇2.2阻塞队列的简单实现

    目前上面实现的循环队列不是线程安全的,由于takeput方法都有写操作,直接无脑加锁。

    //线程安全的循环队列
    class MySafeCircularQueue {
        //队列数据
        private int[] elem = new int[100];
        //队头指针
        private int head;
        //队尾指针
        private int tail;
        //队列元素个数
        private int size;
        //专门的锁对象
        private final Object locker = new Object();
    
        //出队头元素
        public Integer take() {
            synchronized (locker) {
                if (size == 0) {
                    //队列为空
                    return null;
                }
                int ret = elem[head];
                head++;
                //作用等价于 head %= elem.length
                if (head >= elem.length) {
                    head = 0;
                }
                size--;
                return ret;
            }
        }
    
        //入队尾元素
        public void put(int val) {
            synchronized (locker) {
                if (size == elem.length) {
                    //队列满
                    return;
                }
                elem[tail++] = val;
                //作用等价于 tail %= elem.length
                if (tail >= elem.length) {
                    tail = 0;
                }
                size++;
            }
        }
    }
    

    好了,重点来了,如何实现阻塞效果,关键是使用waitnotify机制:

    入队时,队列为满需要使用wait方法使线程阻塞,直到有旧元素出队才使用notify通知线程执行。
    出队时,队列为空需要使用wait方法使线程阻塞,直到有新元素入队才使用notify通知线程执行。

    阻塞有界队列代码:

    //基于循环队列实现阻塞队列
    class MyBlockingQueue {
        //初始化循环队列
        private int[] elem = new int[100];
    
        //队头指针
        private int head;
        //队尾指针
        private int tail;
        //元素个数
        private int size;
        //专门的锁对象
        private final Object locker = new Object();
    
    
        //队头出元素,如果队列为空则阻塞
        public Integer take() throws InterruptedException {
            //循环队列为空,需要阻塞线程,直到循环队列入元素后才通知线程继续执行该操作
            synchronized (locker) {
                if (size == 0) {
                    locker.wait();
                }
                int ret = elem[head++];
                if (head >= elem.length) {
                    head = 0;
                }
                size--;
                //循环队列出元素后,队列就不为满了,可以通知线程继续进行入队操作
                locker.notify();
                return ret;
            }
        }
        //队尾入元素,如果队列满了,就阻塞
        public void put(int val) throws InterruptedException {
            //循环队列如果满了,则需要使线程阻塞,直到循环队列出元素后才通知线程继续执行该操作
            synchronized (locker) {
                if (size == elem.length) {
                    locker.wait();
                }
                elem[tail++] = val;
                if (tail >= elem.length) {
                    tail = 0;
                }
                size++;
                //循环队列入元素后,队列就不为空了,可以通知线程继续进行出队操作
                locker.notify();
            }
        }
    }
    

    我们来简单实现一个生产者消费者模型来验证一下我们所实现的阻塞队列是否有问题。
    生产者生产数字,消费者消费数字,为了使效果更加明显,我们把我们实现的阻塞队列的大小改为3,即:private int[] elem = new int[3];

    我们使用sleep方法来模拟生产者消费者的生产或消费的频率。

    情况1:生产者生产与消费者消费的频率一致

    public class PCMod {
        private static final MyBlockingQueue queue = new MyBlockingQueue();
        public static void main(String[] args) {
            //生产者 每秒生产1个
            Thread producer = new Thread(() -> {
                int num = 0;
                while (true) {
                    try {
                        System.out.println("生产了:" + num);
                        queue.put(num++);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            producer.start();
            //消费者 每秒消费1个
            Thread customer = new Thread(() -> {
                while (true) {
                    try {
                        int product = queue.take();
                        System.out.println("消费了:" + product);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            customer.start();
        }
    }
    

    运行结果:
    相同频率
    因为生产者与消费者频率一致,所以生产者刚生产好,就立即消费者被消费了。

    情况2:生产者生产频率比消费者消费的频率更快

    public class PCMod {
        private static final MyBlockingQueue queue = new MyBlockingQueue();
        public static void main(String[] args) {
            //生产者  每秒生产1个
            Thread producer = new Thread(() -> {
                int num = 0;
                while (true) {
                    try {
                        System.out.println("生产了:" + num);
                        queue.put(num++);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            producer.start();
            //消费者 每2秒消费1个
            Thread customer = new Thread(() -> {
                while (true) {
                    try {
                        int product = queue.take();
                        System.out.println("消费了:" + product);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            customer.start();
        }
    }
    

    运行结果:
    生产者快
    因为生产者生产快,消费者消费慢,所以阻塞队列满了之后生产者需要等待消费者消费后才能生产,此时生产者步调与消费者一致。

    情况3:生产者生产频率比消费者消费的频率更慢

    public class PCMod {
        private static final MyBlockingQueue queue = new MyBlockingQueue();
        public static void main(String[] args) {
            //生产者  每秒生产1个
            Thread producer = new Thread(() -> {
                int num = 0;
                while (true) {
                    try {
                        System.out.println("生产了:" + num);
                        queue.put(num++);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            producer.start();
            //消费者 每秒消费1个
            Thread customer = new Thread(() -> {
                while (true) {
                    try {
                        int product = queue.take();
                        System.out.println("消费了:" + product);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            customer.start();
        }
    }
    

    运行结果:
    生产者慢

    因为生产者生产慢,消费者消费快,所以阻塞队列为空后,消费者需要等待生产者生产,消费者才能消费,此时消费者步调与生产者一致。

    好了,阻塞队列你学会了吗?


    下期预告:多线程案例之定时器

    觉得文章写得不错的老铁们,点赞评论关注走一波!谢谢啦!

    1-99

    展开全文
  • python队列基本操作与多线程队列

    千次阅读 2022-02-03 11:10:49
    文章目录队列基本操作多线程队列 队列基本操作 from queue import Queue q = Queue(5) # 创建一个容量5的队列。如果给一个小于0的数,则队列为无限大小。(这是官方的解释,实际不是无限大小,而是跟内存有关) ...

    队列基本操作

    from queue import Queue
    
    q = Queue(5)  # 创建一个容量为5的队列。如果给一个小于0的数,则队列为无限大小。(这是官方的解释,实际不是无限大小,而是跟内存有关)
    
    # 存储数据
    q.put(123)  # 数值 
    q.put('hello world!')  # 字符串
    q.put(['hello', 'world'])  # 列表
    q.put(('hello', 'world'))  # 元组
    q.put({'hello': 'world'})  # 字典
    
    # 如果再试图存储第六个,则会发生阻塞,因为容量已设定为5
    # q.put({'hello': 'python'})
    

    取出队列中的值

    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    

    在这里插入图片描述
    如图五个值被依次取出。Queue队列遵循的是先进先出。

    • q.put_nowait()
      q.put_nowait()方法可以无阻碍地向队列中添加内容,如果队列已满则会立即报错,不会等待(即不会发生阻塞)。
    • q.get_nowait()
      q.get_nowait()方法可以无阻碍地从队列中取出内容,如果队列是空的则也会直接报错,不会等待。

    具体使用不再示例。

    查看队列当前大小

    • q.qsize()
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    

    在这里插入图片描述
    如图,每取出一个值,队列大小就减一。同样每存入一个值队列大小就会加一。

    • q.full()
      判断队列是否是满的。
    • q.empty()
      判断队列是否是空的。
    print(q.full())
    print(q.get())
    print(q.get())
    print(q.full())
    print(q.empty())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())
    

    在这里插入图片描述

    多线程队列

    from queue import Queue
    import threading
    import time
    
    
    # 存储值,每隔一秒存储一个
    def set_value(q):
        num = 0
        while True:
            q.put(num)
            num += 1
            time.sleep(1)
    
    # 取值,不间断地取
    def get_value(q):
        while True:
            print(q.get())
    
    
    if __name__ == '__main__':
        q = Queue(4)
        t1 = threading.Thread(target=set_value, args=(q, ))
        t2 = threading.Thread(target=get_value, args=(q, ))
    
        t1.start()
        t2.start()
    

    程序开始运行,一边存储,一边取值:
    在这里插入图片描述
    此思想应用在爬虫上,即一边访问并获取数据,一边下载数据。

    展开全文
  • 队列多线程使用

    千次阅读 2017-10-21 12:42:28
    1. 概述:1.1 队列简介队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,...

    1. 概述:

    1.1 队列简介

    队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。
    1.2 队列基本运算:

    队列操作函数初始条件操作结果
    初始化队列Init_Queue(q)队q不存在构造了一个空队
    入队操作In_Queue(q,x)队q存在对已存在的队列q,插入一个元素x到队尾,队发生变化
    出队操作Out_Queue(q,x)队q存在且非空删除队首元素,并返回其值,队发生变化
    读队头Front_Queue(q,x)队q存在且非空读队头元素,并返回其值,队不变
    判队空操作Empty_Queue(q)队q存在若q为空队则返回为1,否则返回为0。

    2. 队列在多线程中的使用

    队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行得比第二个慢,则第二个 线程集在等待结果时就会阻塞。如果第一个线程集运行得快,那么它将等待第二个线程集赶上来。
    Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

    3. 阻塞队列:BlockingQueue

    一)BlockingQueue提供的常用方法

    操作可能报异常返回布尔值可能阻塞设定等待时
    入队add(e)offer(e)put(e)offer(e,timeout,unit)
    出队remove()poll()take()poll(timeout, unit)
    查看element()peek()

    从上表可以很明显看出每个方法的作用,这个不用多说。这里强调度的是:
    1) add(e) remove() element() 方法不会阻塞线程。当不满足约束条件时,会抛出IllegalStateException异常。例如:当队列被元素填满后,再调用add(e),则会抛出异常。
    2) offer(e) poll() peek() 方法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调用offer(e),则不会插入元素,函数返回false。
    3) 要想要实现阻塞功能,需要调用put(e) take() 方法。当不满足约束条件时,会阻塞线程。

    二)BlockingQueue接口的具体实现类:

    ArrayBlockingQueue,其构造函数必须带一个int参数来指明其大小, LinkedBlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定,PriorityBlockingQueue,其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序

    4. 非阻塞队列:ConcurrentLinkedQueue

    ConcurrentLinkedQueue是一个无锁的并发线程安全的队列。对比锁机制的实现,使用无锁机制的难点在于要充分考虑线程间的协调。简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够检测并帮助完成剩下的操作。这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况。
    ConcurrentLinkedQueue有两个volatile的线程共享变量:head,tail。要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证。
    ConcurrentLinkedQueue有两个volatile的线程共享变量:head,tail。要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证
    队列总是处于两种状态之一:正常状态(或称静止状态,图 1 和图 3)或中间状态(图 2)。在插入操作之前和第二个 CAS(D)成功之后,队列处在静止状态;在第一个 CAS(C)成功之后,队列处在中间状态。在静止状态时,尾指针指向的链接节点的 next 字段总为 null,而在中间状态时,这个字段为非 null。任何线程通过比较 tail.next 是否为 null,就可以判断出队列的状态,这是让线程可以帮助其他线程 “完成” 操作的关键

    这里写图片描述

    插入操作在插入新元素(A)之前,先检查队列是否处在中间状态。如果是在中间状态,那么肯定有其他线程已经处在元素插入的中途,在步骤(C)和(D)之间。不必等候其他线程完成,当前线程就可以“帮助” 它完成操作,把尾指针向前移动(B)。如果有必要,它还会继续检查尾指针并向前移动指针,直到队列处于静止状态,这时它就可以开始自己的插入了。
    第一个 CAS(C)可能因为两个线程竞争访问队列当前的最后一个元素而失败;在这种情况下,没有发生修改,失去 CAS 的线程会重新装入尾指针并再次尝试。如果第二个 CAS(D)失败,插入线程不需要重试 —— 因为其他线程已经在步骤(B)中替它完成了这个操作!
    这里写图片描述
    上图显示的是:处在插入中间状态的队列,在新元素插入之后,尾指针更新之前

    这里写图片描述
    上图显示的是:在尾指针更新后,队列重新处在静止状态

    展开全文
  • spring 多线程队列执行

    2012-08-06 00:18:01
    spring 多线程队列执行
  • JUC多线程:AQS抽象队列同步器原理

    千次阅读 2021-10-09 12:58:04
    工作原理就是如果被请求的共享资源空闲,则将当前请求资源的线程设置有效的工作线程,并且将共享资源设置锁定状态,如果被请求的共享资源被占用,那么就将获取不到锁的线程加入到等待队列中。这时,就需要一套...
  • 多线程下的消息队列实现

    千次阅读 2019-01-19 06:14:09
    1、定义一个队列缓存池: private static List queueCache = new LinkedList(); 2、定义队列缓冲池最大消息数,如果达到该值...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程线程等待中 new Thread(){ ...
  • 消息队列多线程的选择

    千次阅读 2020-06-17 11:33:40
    为什么发送邮件要使用消息队列而不是多线程? 1.消息队列多线程应该怎么选择呢? 可靠性要求高时选择消息队列:消息队列多线程两者并不冲突,多线程可以作为队列的生产者和消费者。 使用外部的消息队列时,第一...
  • 多线程并发队列实现

    千次阅读 2019-04-20 13:53:59
    如果get执行时,队列为空,线程必须阻塞等待,直到有队列有数据。如果add时,队列已经满,则add线程要等待,直到队列有空闲空间。 /** * 1.使用 wait notify 实现一个队列,队列有2个方法,add 和 ge...
  • C++多线程队列

    2013-05-18 10:07:57
    构造一个队列,并完成入队列和出队列的函数,要求该队列支持多线程 (即一个线程做入队列操作而另一个线程做出队列操作而且两个线程必须同时运行)
  • python 多线程队列

    千次阅读 2019-03-06 10:17:00
    线程不能独立运行,必须依存在应用程序中,由应用程序提供线程执行控制 线程的结束 一般依靠线程函数的自然结束 也可以在线程函数中调用thread.exit(),抛出SystemExit exception,达到退出...
  • 多线程c++队列使用

    千次阅读 2017-07-22 00:39:36
    我有一个队列的全局变量,然后一个线程不断的获取数据,向这个队列里面压入,然后另一个线程每次读取队列的第一个元素,然后删掉。 #include #include #include #include #include #include #include #include ...
  • 多线程+队列的简单使用

    千次阅读 2017-08-15 21:03:16
    以前在项目中很少用到队列,其实队列对于解决并发很是重要,今天着手看了相关资料并实际简单操作,如下: 1.队列的含义:队列就是一种特殊的线性表,采用FIFO方式,而栈是LIFO方式。 2.常用的队列:LinkedList实现...
  • c++多线程队列

    千次阅读 2019-05-06 22:48:40
    多线程中,使用队列不需要锁也能运行,取得时候先判断一下长度,长度0线Sleep(3)。 struct RevData{ int size; char data[1000]; } recv_data; #include<iostream> #include<queue> #...
  • 多线程和消息队列的区别?

    千次阅读 2019-02-15 17:06:44
    多线程是防止系统的阻塞 消息队列是提高系统处理业务的效率
  • c++多线程安全的消息队列模板

    热门讨论 2009-03-17 14:35:11
    多线程安全的消息队列模板多线程安全的消息队列模板
  • 比如线程1处理完的结果以结构体模式,写进队列1,  线程2读出队列1中的结构体,进行处理。
  • Redis提供了两种方式来作消息队列。...1.消息队列多线程两者并不冲突,多线程可以作为队列的生产者和消费者。 使用外部的消息队列时,第一是可以提高应用的稳定性,当程序fail后,写入外部消息队列的...
  • 多线程-线程池(队列-最大线程数-核心线程数)

    千次阅读 热门讨论 2020-01-10 16:08:11
    java 多线程: 一般通过继承Thread类,实现Runnable接口,实现Callable接口,以及线程池。 这里主要是讲解线程池: 通过线程池主要是创建以下4种类型的线程池。 工具类 : Executors ExecutorService ...
  • C#中的队列,Queue类与多线程使用

    万次阅读 2019-05-17 09:48:02
    我想学习过数据结构应该很清楚,如果没有仔细了解,只要记住队列是一个先进先出的列表即可,列表中可以是线程,可以是预备执行的函数的入口,可以是地址,可以是数据,在C#中,Queue<T> 类可以实现队列,这一...
  • 《秒杀多线程第十六篇 多线程十大经典案例之一 双线程读写队列数据》 http://blog.csdn.net/morewindows/article/details/8646902 配套程序 在《秒杀多线程系列》的前十五篇中介绍多线程的相关概念,多线程同步互斥...
  • 多线程队列模式

    千次阅读 2020-04-02 13:24:22
    阻塞队列在get元素时如果队列为空那么直接阻塞当前线程,在添加元素时notifyAll所有线程 public class RequestQueue { private final LinkedList<Request> queue = new LinkedList<>(); public ...
  • 多线程队列的通俗理解

    千次阅读 2017-12-29 16:00:20
    我们先不要用专业的角度来看多线程。 先来回忆一下,现实生活中,自己曾经一个人做的事,和多个人同时做的事。  假如现在某百货商场做一个抢购活动,限时限量的。那么就意味着你抢得越多就越占便宜,...
  • Java多线程总结之线程安全队列Queue

    万次阅读 2017-09-19 10:21:54
    在Java多线程应用中,队列使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是...
  • RabbitMQ设置多线程处理队列消息

    千次阅读 2019-05-17 19:08:32
    @RabbitListener注解指定消费方法,默认...可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。 1、RabbitmqConfig.java中添加容器工厂配置: @Bean("customContainerFac...
  • 线程池中为什么使用阻塞队列

    千次阅读 2020-05-24 22:11:26
    在线程池中活跃线程数达到corePoolSize时,线程池将会将后续的task提交到BlockingQueue中,为什么这样设计呢? 原因为: 线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲。 ...
  • Python的Queue模块提供了同步的,线程安全的队列类,包括:FIFO队列Queue,LIFO队列LifeQueue,优先级队列PriorityQueue,这些队列都实现了锁原语,能在多线程中直接使用,可以使用队列来实现线程间的同步。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 595,191
精华内容 238,076
关键字:

多线程为什么使用队列