• 非阻塞队列2.1 代码举例2.2 ConcurrentLinkedQueue 1.阻塞队列 1.1 代码举例 1个生产者，队列元素大小为2，三个消费者消费 public class TestQueue { private int queueSize = 2; private BlockingQue...
文章目录1.阻塞队列1.1 代码举例1.2 LinkedBlockingQueue2.非阻塞队列2.1 代码举例2.2 ConcurrentLinkedQueue
1.阻塞队列
1.1 代码举例
1个生产者，队列元素大小为2，三个消费者消费
public class TestQueue {
private int queueSize = 2;
private BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(queueSize);

@Test
public void test1() throws InterruptedException {

TestQueue testQueue = new TestQueue();
Producer producer = testQueue.new Producer();
Consumer consumer = testQueue.new Consumer();
Consumer consumer2 = testQueue.new Consumer();
Consumer consumer3 = testQueue.new Consumer();

producer.start();
consumer.start();
consumer2.start();
consumer3.start();

}

@Override
public void run() {
consume();
}

private void consume() {
while(true){
try {
System.out.println("【"+name+"】准备向队列取一个元素");
queue.take();
System.out.println("【"+name+"】从队列取走一个元素，队列剩余"+queue.size()+"个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

@Override
public void run() {
produce();
}

private void produce() {
while(true){
try {
queue.put(1);
System.out.println("【"+name+"】向队列取中插入一个元素，队列剩余空间："+(queueSize-queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}


由上可知：
1.当队列没有元素的时候，所有线程获取元素的时候，都会进行阻塞，知道队列里有元素了才继续往下执行
2.非阻塞队列
2.1 代码举例
1个生产者，一个队列，三个消费者消费
public class TestQueue2 {

@Test
public void test1() throws InterruptedException {

TestQueue2 testQueue = new TestQueue2();
Producer producer = testQueue.new Producer();
Consumer consumer = testQueue.new Consumer();
Consumer consumer2 = testQueue.new Consumer();
Consumer consumer3 = testQueue.new Consumer();

producer.start();
consumer.start();
consumer2.start();
consumer3.start();

}

@Override
public void run() {
consume();
}

private void consume() {
Random random = new Random();
while(true){
try {
System.out.println("【"+name+"】准备向队列取一个元素");
Integer poll = queue.poll();
System.out.println("【"+name+"】从队列取走一个元素"+poll);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

@Override
public void run() {
produce();
}

private void produce() {
while(true){
try {
queue.offer(1);
System.out.println("【"+name+"】向队列取中插入一个元素)");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}


由上可知：
1.非阻塞队列，若线程从中获取元素，若没有则直接返回空，不会阻塞当前线程
3.若要用于生产者和消费者，则需要增加并发控制
是一个适用于高并发场景下的队列，通过无锁的方式，实现了高并发状态下的高性能，通常ConcurrentLikedQueue性能好于BlockingQueue。
它是一个基于连接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的，尾是最近加入的，该队列不允许null元素。
poll()和peek()都是取头元素节点，区别在于前者会删除元素，后者不会
参考：
阻塞队列与非阻塞队列区别应用场景
Java并发编程：阻塞队列
java阻塞队列与非阻塞队列


展开全文
• 测试java阻塞和非阻塞队列几个添加，删除，检查方法。代码与解释如下：import org.junit.Test; import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.*; public ...
测试java，阻塞和非阻塞队列几个添加，删除，检查方法。代码与解释如下：import org.junit.Test;

import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.*;

public class QueueTest
{
@Test
//测试队列，方便起见，测试使用有界队列,无界队列，
//而是直接出现OutOfMemoryError错误
public void queueTest()
{
//              抛出异常 	返回特殊值
//      移除 	remove() 	poll()
//      检查 	element() 	peek()

//      建立了一个长度为5的空阻塞有界队列，数组实现

int i = 0;
while (true)
{
try
{
System.out.println(queue);
i++;
sleepOneSecond();
}
catch (IllegalStateException e)
{
e.printStackTrace();
break;
}
}
System.out.println();

sleepOneSecond();
System.out.println("用remove()方法消费此队列，需要使用try..catch..\n" + queue);
while (true)
{
try
{
queue.remove();
System.out.println(queue);
sleepOneSecond();
}
catch (Exception e)
{
e.printStackTrace();
break;
}
}
System.out.println();

sleepOneSecond();
System.out.println("用offer()方法填充此队列，需要使用只需要判断返回值\n" + queue);
int k = 0;
while (queue.offer(k))
{
k++;
System.out.println(queue);
sleepOneSecond();
}
System.out.println();

sleepOneSecond();
System.out.println("poll()方法消费此队列，只需要判断取出值是否为null即可\n" + queue);
while (queue.poll() != null)
{
System.out.println(queue);
sleepOneSecond();
}
System.out.println();

sleepOneSecond();
System.out.println("尝试拿到一个空队列的队列头，peek方法拿到null：" + queue.peek());
System.out.println();

sleepOneSecond();
try
{
System.out.println("尝试拿到一个空队列的队列头，element方法抛出异常NoSuchElementException：");
sleepOneSecond();
System.out.println(queue.element());
}
catch (NoSuchElementException e)
{
e.printStackTrace();
}
}

private static void sleepOneSecond()
{
try
{
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}

//                  抛出异常 	特殊值 	    阻塞 	超时
//        插入 	    add(e) 	    offer(e) 	put(e) 	offer(e, time, unit)
//        移除 	    remove() 	poll() 	    take() 	poll(time, unit)
//        检查 	    element() 	peek() 	    不可用 	不可用

@Test
public void testBlockingQueue() throws InterruptedException
{

{
try
{
for (int i = 0; i < 11; i++)
{
queue.put(i);
sleepOneSecond();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}, "provider").start();

TimeUnit.SECONDS.sleep(15);

{
System.out.println("消费者线程启动");
try
{
for (int i = 0; i < 15; i++)
{
queue.take();
sleepOneSecond();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}, "consumer").start();

//junit测试在主线程结束之后，其他线程都会直接结束
//故此休眠25s,推荐使用CountDownLatch,此处简单处理
TimeUnit.SECONDS.sleep(25);
}
}

附上集合/容器类的uml类图，包括同步类。
展开全文
不论是阻塞队列BlockingQueue还是非阻塞队列ConcurrentLinkedQueue，都是线程安全的。
注：什么叫线程安全？这个首先要明确。线程安全就是说多线程访问同一代码，不会产生不确定的结果。

并行和并发区别

1、并行是指两者同时执行一件事，比如赛跑，两个人都在不停的往前跑；
2、并发是指资源有限的情况下，两者交替轮流使用资源，比如一段路(单核CPU资源)同时只能过一个人，A走一段后，让给B，B用完继续给A ，交替使用，目的是提高效率
如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法，另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁（入队和出队用同一把锁）或两个锁（入队和出队用不同的锁）等方式来实现，而非阻塞的实现方式则可以使用循环CAS的方式来实现。
适用阻塞队列的好处：多线程操作共同的队列时不需要额外的同步，另外就是队列会自动平衡负载。
阻塞队列与普通队列的区别在于，当队列是空的时，从队列中获取元素的操作将会被阻塞，或者当队列是满时，往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞，直到其他的线程往空的队列插入新的元素。同样，试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞，直到其他的线程使队列重新变得空闲起来，如从队列中移除一个或者多个元素，或者完全清空队列，下图展示了如何通过阻塞队列来合作：

关键在于生产者和消费者可能不会（几乎肯定不会）保持相同的速度，比如，当生产者的速度快于消费者的速度时，队列会越来越大，而且生产对象有40G，很容易就让队列大小超过内存容量。
所以，阻塞队列只允许生产者的速度在一定速度上超过消费者的速度，但不会超过很多。
常用的阻塞队列有：
基于数组的阻塞队列ArrayBlockingQueue
基于数组的阻塞队列实现，在ArrayBlockingQueue内部，维护了一个定长数组，以便缓存队列中的数据对象，这是一个常用的阻塞队列，除了一个定长数组外，ArrayBlockingQueue内部还保存着两个整形变量，分别标识着队列的头部和尾部在数组中的位置。

可以指定容量，也可以不指定，不指定的话，默认最大是Integer.MAX_VALUE，其中主要用到put和take方法，put方法在队列满的时候会阻塞直到有队列成员被消费，take方法在队列空的时候会阻塞，直到有队列成员被放进来。
有优先级的阻塞队列PriorityBlockingQueue
基于优先级的阻塞队列（优先级的判断通过构造函数传入的Compator对象来决定），但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者，而只会在没有可消费的数据时，阻塞数据的消费者。因此使用的时候要特别注意，生产者生产数据的速度绝对不能快于消费者消费数据的速度，否则时间一长，会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时，内部控制线程同步的锁采用的是公平锁。


展开全文
• 阻塞和非阻塞队列的并发安全原理。 之前我们探究了常见的阻塞队列的特点，以 ArrayBlockingQueue 为例， 首先分析 BlockingQueue 即阻塞队列的线程安全原理，然后再看看它的兄弟——非阻塞队列的并发安全原理。...
阻塞和非阻塞队列的并发安全原理。

之前我们探究了常见的阻塞队列的特点，以 ArrayBlockingQueue 为例，

首先分析 BlockingQueue 即阻塞队列的线程安全原理，然后再看看它的兄弟——非阻塞队列的并发安全原理。通过本课时的学习，我们就可以了解到关于并发队列的底层原理了。

ArrayBlockingQueue 源码分析
我们首先看一下 ArrayBlockingQueue 的源码，ArrayBlockingQueue 有以下几个重要的属性：

复制代码
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;
第一个就是最核心的、用于存储元素的 Object 类型的数组；然后它还会有两个位置变量，分别是 takeIndex 和 putIndex，这两个变量就是用来标明下一次读取和写入位置的；另外还有一个 count 用来计数，它所记录的就是队列中的元素个数。

另外，我们再来看下面这三个变量：

复制代码
// 以下3个是控制并发用的工具
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
这三个变量也非常关键，第一个就是一个 ReentrantLock，而下面两个 Condition 分别是由 ReentrantLock 产生出来的，这三个变量就是我们实现线程安全最核心的工具。

ArrayBlockingQueue 实现并发同步的原理就是利用 ReentrantLock 和它的两个 Condition，读操作和写操作都需要先获取到 ReentrantLock 独占锁才能进行下一步操作。进行读操作时如果队列为空，线程就会进入到读线程专属的 notEmpty 的 Condition 的队列中去排队，等待写线程写入新的元素；同理，如果队列已满，这个时候写操作的线程会进入到写线程专属的 notFull 队列中去排队，等待读线程将队列元素移除并腾出空间。

下面，我们来分析一下最重要的 put 方法：

复制代码
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
在 put 方法中，首先用 checkNotNull 方法去检查插入的元素是不是 null。如果不是 null，我们会用 ReentrantLock 上锁，并且上锁方法是 lock.lockInterruptibly()。这个方法我们在第 23 课时的时候讲过，在获取锁的同时是可以响应中断的，这也正是我们的阻塞队列在调用 put 方法时，在尝试获取锁但还没拿到锁的期间可以响应中断的底层原因。

紧接着 ，是一个非常经典的 try  finally 代码块，finally 中会去解锁，try 中会有一个 while 循环，它会检查当前队列是不是已经满了，也就是 count 是否等于数组的长度。如果等于就代表已经满了，于是我们便会进行等待，直到有空余的时候，我们才会执行下一步操作，调用 enqueue 方法让元素进入队列，最后用 unlock 方法解锁。

你看到这段代码不知道是否眼熟，在第 5 课时我们讲过，用 Condition 实现生产者/消费者模式的时候，写过一个 put 方法，代码如下：

复制代码
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
可以看出，这两个方法几乎是一模一样的，所以当时在第 5 课时的时候我们就说过，我们自己用 Condition 实现生产者/消费者模式，实际上其本质就是自己实现了简易版的 BlockingQueue。你可以对比一下这两个 put 方法的实现，这样对 Condition 的理解就会更加深刻。

复制代码
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode);  // Failure is OK.
return true;
}
}
else if (p == q)
// We have fallen off list.  If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable.  Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
在这里我们不去一行一行分析具体的内容，而是把目光放到整体的代码结构上，在检查完空判断之后，可以看到它整个是一个大的 for 循环，而且是一个非常明显的死循环。在这个循环中有一个非常亮眼的 p.casNext 方法，这个方法正是利用了 CAS 来操作的，而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。我们就来看一下 p.casNext 方法的具体实现，其方法代码如下：

复制代码
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
可以看出这里运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作，而 compareAndSwapObject 是一个 native 方法，最终会利用 CPU 的 CAS 指令保证其不可中断。

可以看出，非阻塞队列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重试，来实现线程安全，适合用在不需要阻塞功能，且并发不是特别剧烈的场景。

=======如何选择适合自己的阻塞队列==============

他山之石，可以攻玉。对于如何选择最合适的阻塞队列这个问题，实际上线程池已经率先给我们做了表率。线程池有很多种，不同种类的线程池会根据自己的特点，来选择适合自己的阻塞队列。

所以我们就首先来复习一下这些非常经典的线程池是如何挑选阻塞队列的，借鉴它们的经验之后，我们再去总结一套规则，来归纳出自己在选取阻塞队列时可以对哪些点进行考虑。

线程池对于阻塞队列的选择

下面我们来看线程池的选择要诀。上面表格左侧是线程池，右侧为它们对应的阻塞队列，你可以看到 5 种线程池只对应了 3 种阻塞队列，下面我们对它们进行逐一的介绍。

SynchronousQueue 会直接把任务交给线程，而不需要另外保存它们，效率更高，所以 CachedThreadPool 使用的 Queue 是 SynchronousQueue。

我们来举个例子：例如我们往这个队列中，放一个延迟 10 分钟执行的任务，然后再放一个延迟 10 秒钟执行的任务。通常而言，如果不是延迟队列，那么按照先进先出的排列规则，也就是延迟 10 分钟执行的那个任务是第一个放置的，会放在最前面。但是由于我们此时使用的是阻塞队列，阻塞队列在排放各个任务的位置的时候，会根据延迟时间的长短来排放。所以，我们第二个放置的延迟 10 秒钟执行的那个任务，反而会排在延迟 10 分钟的任务的前面，因为它的执行时间更早。

ArrayBlockingQueue

除了线程池选择的 3 种阻塞队列外，还有一种常用的阻塞队列叫作 ArrayBlockingQueue，它也经常被用于我们手动创建的线程池中。

这种阻塞队列内部是用数组实现的，在新建对象的时候要求传入容量值，且后期不能扩容，所以 ArrayBlockingQueue的最大特点就是容量是有限且固定的。这样一来，使用 ArrayBlockingQueue 且设置了合理大小的最大线程数的线程池，在任务队列放满了以后，如果线程数也已经达到了最大值，那么线程池根据规则就会拒绝新提交的任务，而不会无限增加任务或者线程数导致内存不足，可以非常有效地防止资源耗尽的情况发生。

通常我们可以从以下 5 个角度考虑，来选择合适的阻塞队列：

功能

第 1 个需要考虑的就是功能层面，比如是否需要阻塞队列帮我们排序，如优先级排序、延迟执行等。如果有这个需要，我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。

容量

第 2 个需要考虑的是容量，或者说是否有存储的要求，还是只需要“直接传递”。在考虑这一点的时候，我们知道前面介绍的那几种阻塞队列，有的是容量固定的，如 ArrayBlockingQueue；有的默认是容量无限的，如 LinkedBlockingQueue；而有的里面没有任何容量，如 SynchronousQueue；而对于 DelayQueue 而言，它的容量固定就是 Integer.MAX_VALUE。

所以不同阻塞队列的容量是千差万别的，我们需要根据任务数量来推算出合适的容量，从而去选取合适的 BlockingQueue。

能否扩容

第 3 个需要考虑的是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小，因为业务可能有高峰期、低谷期。

如果一开始就固定一个容量，可能无法应对所有的情况，也是不合适的，有可能需要动态扩容。如果我们需要动态扩容的话，那么就不能选择 ArrayBlockingQueue ，因为它的容量在创建时就确定了，无法扩容。相反，PriorityBlockingQueue 即使在指定了初始容量之后，后续如果有需要，也可以自动扩容。

所以我们可以根据是否需要扩容来选取合适的队列。

内存结构

第 4 个需要考虑的点就是内存结构。在上一课时我们分析过 ArrayBlockingQueue 的源码，看到了它的内部结构是“数组”的形式。

性能

第 5 点就是从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁，它的操作粒度更细，在并发程度高的时候，相对于只有一把锁的 ArrayBlockingQueue 性能会更好。

另外，SynchronousQueue 性能往往优于其他实现，因为它只需要“直接传递”，而不需要存储的过程。如果我们的场景需要直接传递的话，可以优先考虑 SynchronousQueue。

引用：https://kaiwu.lagou.com/course/courseInfo.htm?courseId=16#/detail/pc?id=276


展开全文
• 并发编程栏目代码 GitHub package 地址：点击打开链接 ... 在Java多线程应用中，队列的使用率很高，多数生产消费模型的首选数据结构就是...Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列，其中阻塞队...
• 目录1、并发队列1.1阻塞队列与非阻塞队列的区别1.2代码举例1.2.1非阻塞...并发队列分为阻塞队列和非阻塞队列。 线程池就是阻塞队列实现的 1.1阻塞队列与非阻塞队列的区别 入队时： 非阻塞队列：当队列满了放入数据时
• Java非对称加密源码实例 1个目标文件 摘要:Java源码,算法相关,对称加密 Java非对称加密源程序代码实例，本例中使用RSA加密技术，定义加密算法可用 DES,DESede,Blowfish等。 设定字符串为“张三，你好，我是李四”...
• 线程安全：多线程访问同一代码，不会产生不确定的结果。 （与单线程运行结果一样的就是线程...Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列， 其中阻塞队列的典型例子是BlockingQueue， 非阻塞队列的...
• ## Java常见面试题

千次阅读 2019-04-11 22:43:55
文章目录如何用数组实现队列？...NIO相关，Channels、Buffers、Selectors流与缓冲阻塞与非阻塞IO选择器（Selectors）反射的用途Java注解的继承非静态内部类能定义静态方法吗？Lock Synchronized 有什...

java 订阅