精华内容
下载资源
问答
  • 2020-12-15 18:02:53

    一、概述

    队列(Queue)是一种先进先出(FIFO)的线性数据结构,插入操作在队尾(rear)进行,删除操作在队首(front)进行。

    二、ADT

    队列ADT(抽象数据类型)一般提供以下接口:1、Queue() 创建队列

    2、enqueue(item) 向队尾插入项

    3、dequeue() 返回队首的项,并从队列中删除该项

    4、empty() 判断队列是否为空

    5、size() 返回队列中项的个数

    三、Python实现

    使用Python的内建类型list列表,可以很方便地实现队列ADT:#!/usr/bin/env python

    # -*- coding: utf-8 -*-

    class Queue:

    def __init__(self):

    self.items = []

    def enqueue(self, item):

    self.items.append(item)

    def dequeue(self):

    return self.items.pop(0)

    def empty(self):

    return self.size() == 0

    def size(self):

    return len(self.items)

    四、应用

    著名的 约瑟夫斯问题(Josephus Problem)是应用队列(确切地说,是循环队列)的典型案例。在 约瑟夫斯问题 中,参与者围成一个圆圈,从某个人(队首)开始报数,报数到n+1的人退出圆圈,然后从退出人的下一位重新开始报数;重复以上动作,直到只剩下一个人为止。

    值得注意的是,Queue类只实现了简单队列,上述问题实际上需要用循环队列来解决。在报数过程中,通过“将(从队首)出队的人再入队(到队尾)”来模拟循环队列的行为。具体代码如下:#!/usr/bin/env python

    # -*- coding: utf-8 -*-

    def josephus(namelist, num):

    simqueue = Queue()

    for name in namelist:

    simqueue.enqueue(name)

    while simqueue.size() > 1:

    for i in xrange(num):

    simqueue.enqueue(simqueue.dequeue())

    simqueue.dequeue()

    return simqueue.dequeue()

    if __name__ == '__main__':

    print(josephus(["Bill", "David", "Kent", "Jane", "Susan", "Brad"], 3))

    运行结果,代码如下:$ python josephus.py

    Susan

    更多相关内容
  • 线程池超过核心线程数corePoolsize的线程会放入线程池存储队列中,存储队列是一种阻塞式队列。 二 阻塞队列 阻塞队列是支持阻塞插入和阻塞移除方法的队列。阻塞队列常用于生产者和消费者的场景,生产者是向...

    一 概述

    线程池中超过核心线程数corePoolsize的线程会放入线程池中存储队列中,存储队列是一种阻塞式队列。

    二 阻塞队列

    阻塞队列是支持阻塞插入和阻塞移除方法的队列。阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    支持阻塞的插入方法(put(E e)):当队列满时,队列会阻塞插入元素的线程,直至队列不满。

    支持阻塞的移除方法(take()):当队列空时,队列会阻塞移除元素的线程,直至队列不为空。

     /* 
     * @since 1.5
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public interface BlockingQueue<E> extends Queue<E> {
        //插入方法
        boolean add(E e);//非阻塞方法
        boolean offer(E e);
        void put(E e) throws InterruptedException;//阻塞方法
        boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;//阻塞指定时间,超时退出
        //移除方法
        E take() throws InterruptedException;//阻塞方法
        E poll(long timeout, TimeUnit unit) throws InterruptedException;//阻塞指定时间,超时退出
        boolean remove(Object o);//非阻塞方法
        //判断元素是否存在
        public boolean contains(Object o);
        //查询队列剩余空间
        int remainingCapacity();
        //移出元素并将元素放入指定集合
        int drainTo(Collection<? super E> c);
        int drainTo(Collection<? super E> c, int maxElements);
    }

    如果队列是无界阻塞队列,队列不可能会出现满队列的情况,所以使用put(E e)或者offer(E e,long timeout,TimeUnit unit)方法是不会出现被阻塞的情况,而且后者会一直返回true。

    三 阻塞队列的阻塞实现原理

    阻塞队列是通过ReentranLock锁+Condition条件状态来实现队列的阻塞。其中,Condition通过等待await()/通知机制signal()来实现线程之间的通信。类似于Object的wait()与notify(),通过Synchronized的同步锁机制使用wait(等待)和notify(通知)来实现线程的通信。

    ReentranLock

    /**
     * @since 1.5
     * @author Doug Lea
     */
    public class ReentrantLock implements Lock, java.io.Serializable {
    
    }

    Condition

    /**
     * @since 1.5
     * @author Doug Lea
     */
    public interface Condition {
    
        void await() throws InterruptedException;
        boolean await(long time, TimeUnit unit) throws InterruptedException;
        void signal();
        void signalAll();
    }

    阻塞过程:在阻塞队列A中调用put(E e)方法向队列插入数据时,如果队列已经满了,则A线程就会挂起,put(E e)方法被阻塞,会等到队列到非满的状态时恢复。如果B线程调用take()方法移出队列中的数据后,会通知执行put(E e)方法而挂起的A线程,因为这个时候B线程已经消费了队列中的一个元素,所以此时的队列为非满状态,则会激活线程A,同时线程A会调用之前挂起的put(E e)方法进行数据插入操作。相反,线程B调用take()方法移除数据时,如果队列为空,线程B也会挂起,take()方法被阻塞,当A调用put(E e)方法向队列插入数据后使得队列为非空时,此时会通知激活线程B,同时线程B调用会调用之前挂起的take()方法进行数据移出操作。

    四Java8中阻塞队列的实例

    ArrayBlockingQueue

     /* 
     * A bounded {@linkplain BlockingQueue blocking queue} backed by an
     * array.  This queue orders elements FIFO (first-in-first-out).
     * @since 1.5
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        /** The queued items */
        final Object[] items;
    
        //ReentrantLock 锁
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        //非空的Condition状态
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        //非满的Condition状态
        /** Condition for waiting puts */
        private final Condition notFull;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        //默认非公平访问策略
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            /*在tack()方法中,如果队列为空时会调用notEmpty.await()挂起当先调用take()的线程,
              当其他线程调用put(E e)方法插入元素成功后会通过notEmpty.signal()唤醒之前挂起的线程
              调用take()方法移出元素*/
            notEmpty.signal();
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            /*在put(E e)方法中,如果队列为满时会调用notFull.await()挂起当先调用put(E e)的线程,
              当其他线程调用take()方法成功移出队列中的元素后会通过notFull.signal()唤醒之前挂起的线                
              程调用put(E e)方法插入元素*/
            notFull.signal();
            return x;
        }
    
        /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            //非空校验
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            //加锁
            lock.lockInterruptibly();
            try {
                //循环判断队列是否满
                while (count == items.length)
                    //队列为满,挂起线程
                    notFull.await();
                //队列不满,向队列中添加数据元素
                enqueue(e);
            } finally {
                //释放锁
                lock.unlock();
            }
        }
    
            public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            //加锁
            lock.lockInterruptibly();
            try {
                //循环判断 队列是否为空
                while (count == 0)
                    //队列为空,挂起线程        
                    notEmpty.await();
                //队列非空,从队列中移出数据元素
                return dequeue();
            } finally {
                //释放锁
                lock.unlock();
            }
        }
    }

    ArrayBlockingQueue是一个基于数组实现的有界阻塞队列。此队列按照先进先出FIFO(first-in-first-out)的原则对元素进行排序,其中items是一个Object的数组保存队列元素,takeIndex为出队元素的下标,putIndex为入队列的下标,count为队列中的元素总数。

    ArrayBlockingQueue获取数据和添加数据都是使用同一个锁对象,这样添加和获取就不会产生并发操作,但是,它使用的是Condition的等待/通知机制,使得ArrayBlockingQueue的数据写入和获取操作足够轻巧,以至于引入独立的锁机制,从而给代码的带来额外的复杂性。

    此外,ArrayBlockingQueue默认情况下是不保证访问者公平的访问队列,所谓公平访问队列是指被阻塞的所有生产者线程和消费者线程在队列可用的时候,可以按照入队的先后顺序反问队列。也就是说,先阻塞的生产者线程可以先向队列插入元素,而先阻塞的消费者线程可以先从队列中获取元素。Java源码中使用了一个公平锁来保证访问线程可以公平的访问阻塞队列。

    //将fair参数设置为true
    ArrayBlockingQueue abq = new ArrayBlockingQueue(666, true);
    //设置公平锁    
    public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }

    DelayQueue

    /**
     * @since 1.5
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    
        public DelayQueue() {}
    
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }
        public boolean addAll(Collection<? extends E> c) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            boolean modified = false;
            for (E e : c)
                if (add(e))
                    modified = true;
            return modified;
        }
    }

    DelayQueue是一个支持延时获取元素的无界阻塞队列,按延迟之间进行排序,需要实现Delayed接口来实现延迟功能。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在对队列进行操作的时候可以指定延迟多久,当延迟时间满了才能够对队列进行操作。

    基于DelayQueue的上述特点可以进行下列设计:

    1. 缓存系统设计:使用延时阻塞队列DelayQueue保存缓存的有效期,使用一个线程循环查询DelayQueue,一旦能够从DelayQueue中获取元素,表示缓存有效期到了。
    2. 定时任务调度:使用延时阻塞队列DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,如TimerQueue就是使用阻塞队列来实现的。

    DelayedWorkQueue

    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService {
    
     static class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {
        
            private static final int INITIAL_CAPACITY = 16;
            private RunnableScheduledFuture<?>[] queue =
                new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
            private final ReentrantLock lock = new ReentrantLock();
            private int size = 0;
    
            public int remainingCapacity() {
                return Integer.MAX_VALUE;
            }
    }

    DelayedWorkQueue是类的一个内部类ScheduledThreadPoolExecutor,它也实现了BlockingQueue<Runnable>接口,它是基于堆结构的阻塞延时队列,堆结构为数组实现的二叉树,初始容量为16。

    LinkedBlockingQueue

     /**
     * This queue orders elements FIFO (first-in-first-out).
     * @since 1.5
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        
        /**
         * Linked list node class
         */
        static class Node<E> {
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
        /**
         * Creates a {@code LinkedBlockingQueue} with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
        /**
         * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity} is not greater
         *         than zero
         */
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
            /**
         * Creates a {@code LinkedBlockingQueue} with a capacity of
         * {@link Integer#MAX_VALUE}, initially containing the elements of the
         * given collection,
         * added in traversal order of the collection's iterator.
         *
         * @param c the collection of elements to initially contain
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    }

    LinkedBlockingQueue是一个基于链表实现的无界阻塞队列,此队列的默认最大容量为Integer.MAX_VALUE(2^31-1)。此队列的出队方式为先进先出,即FIFO(first-in-first-out)。此外,它在添加和删除队列中的元素的时候会伴随着对象的对象的创建和销毁,所以在高并发和数据量大的时候会造成较大的GC压力,而且她获取元素和添加的元素使用的是不同的锁对象。

    LinkedBlockingDeque

     * @since 1.6
     * @author  Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public class LinkedBlockingDeque<E>
        extends AbstractQueue<E>
        implements BlockingDeque<E>, java.io.Serializable {
    
       /** Doubly-linked list node class */
        static final class Node<E> {
            /**
             * The item, or null if this node has been removed.
             */
            E item;
    
            /**
             * One of:
             * - the real predecessor Node
             * - this Node, meaning the predecessor is tail
             * - null, meaning there is no predecessor
             */
            Node<E> prev;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head
             * - null, meaning there is no successor
             */
            Node<E> next;
    
            Node(E x) {
                item = x;
            }
        }
    
        /**
         * Creates a {@code LinkedBlockingDeque} with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        public LinkedBlockingDeque() {
            this(Integer.MAX_VALUE);
        }
    
        /**
         * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
         *
         * @param capacity the capacity of this deque
         * @throws IllegalArgumentException if {@code capacity} is less than 1
         */
        public LinkedBlockingDeque(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
        }
    
        /**
         * Creates a {@code LinkedBlockingDeque} with a capacity of
         * {@link Integer#MAX_VALUE}, initially containing the elements of
         * the given collection, added in traversal order of the
         * collection's iterator.
         *
         * @param c the collection of elements to initially contain
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public LinkedBlockingDeque(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock lock = this.lock;
            lock.lock(); // Never contended, but necessary for visibility
            try {
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (!linkLast(new Node<E>(e)))
                        throw new IllegalStateException("Deque full");
                }
            } finally {
                lock.unlock();
            }
        }
    }

    LinkedBlockingDeque是一个由双向链表构成的双向阻塞队列。双向阻塞队列意味着它可以从队列的两端插入和移出元素。由于它提供了两个队列的操作入口,所以在多线程进行插入或者移除操作时会减少一半的竞争。同时为各种方法提供了操作队首元素的方法(xxxFirst())和操作队尾元素的方法(xxxLast())。此外,双向阻塞队列可以运用在"工作窃取"模式中。

    LinkedTransferQueue

    /**
     * An unbounded {@link TransferQueue} based on linked nodes.
     * This queue orders elements FIFO (first-in-first-out) with respect
     * to any given producer.
     * @since 1.7
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public class LinkedTransferQueue<E> extends AbstractQueue<E>
        implements TransferQueue<E>, java.io.Serializable {
    
         static final class Node {
         final boolean isData;   // false if this is a request node
         volatile Object item;   // initially non-null if isData; CASed to match
         volatile Node next;
         volatile Thread waiter; // null until waiting
    
         // CAS methods for fields
         final boolean casNext(Node cmp, Node val) {
             return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
    
         final boolean casItem(Object cmp, Object val) {
             // assert cmp == null || cmp.getClass() != Node.class;
                return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
         }
    
        /**
         * Creates an initially empty {@code LinkedTransferQueue}.
         */
        public LinkedTransferQueue() {
        }
    
        /**
         * Creates a {@code LinkedTransferQueue}
         * initially containing the elements of the given collection,
         * added in traversal order of the collection's iterator.
         *
         * @param c the collection of elements to initially contain
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public LinkedTransferQueue(Collection<? extends E> c) {
            this();
            addAll(c);
        }
    
        /**
         * Transfers the element to a waiting consumer immediately, if possible.
         *
         * <p>More precisely, transfers the specified element immediately
         * if there exists a consumer already waiting to receive it (in
         * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
         * otherwise returning {@code false} without enqueuing the element.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean tryTransfer(E e) {
            return xfer(e, true, NOW, 0) == null;
        }
    
        /**
         * Transfers the element to a consumer, waiting if necessary to do so.
         *
         * <p>More precisely, transfers the specified element immediately
         * if there exists a consumer already waiting to receive it (in
         * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
         * else inserts the specified element at the tail of this queue
         * and waits until the element is received by a consumer.
         *
         * @throws NullPointerException if the specified element is null
         */
        public void transfer(E e) throws InterruptedException {
            if (xfer(e, true, SYNC, 0) != null) {
                Thread.interrupted(); // failure possible only due to interrupt
                throw new InterruptedException();
            }
        }
    
        /**
         * Transfers the element to a consumer if it is possible to do so
         * before the timeout elapses.
         *
         * <p>More precisely, transfers the specified element immediately
         * if there exists a consumer already waiting to receive it (in
         * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
         * else inserts the specified element at the tail of this queue
         * and waits until the element is received by a consumer,
         * returning {@code false} if the specified wait time elapses
         * before the element can be transferred.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean tryTransfer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
                return true;
            if (!Thread.interrupted())
                return false;
            throw new InterruptedException();
        }
    
        /**
         * Always returns {@code Integer.MAX_VALUE} because a
         * {@code LinkedTransferQueue} is not capacity constrained.
         *
         * @return {@code Integer.MAX_VALUE} (as specified by
         *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
         *         BlockingQueue.remainingCapacity})
         */
        public int remainingCapacity() {
            return Integer.MAX_VALUE;
        }
    }

    LinkedTransferQueue时一个由链表结构组成的无界阻塞TransferQueue队列。无界可以从remainingCapacity()方法一直返回的是整数的最大值可以看出。此外,它还多了tryTransfer()方法和transfer()方法。

    1. transfer(E e)方法:如果当前消费者正在等待接收操作元素的时候,即消费者使用阻塞的take()方法和带时间超时限制的阻塞方法poll(long timeout, TimeUnit unit)时,tansfer方法可以把生产者传入的元素立即transfer(E e)给消费者。如果没有消费者在等待接收元素时,transfer(E e)方法会将元素存放在队列的尾部(tail)节点,并等到该元素被消费者消费后才返回。
    2. tryTransfer(E e)方法:tryTransfer()方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素就会返回false。同fransfer(E e)方法的区别是tryTransfer(E e)方法无论消费者是否接收,方法都会立即返回,而transfer(E e)方法必须等到消费者消费后才返回。
    3. tryTransfer(E e,long timeout,TimeUnit nuit) 方法,该方法试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间则返回false,如果在这段时间内元素被消费者消费了则返回true。

    PriorityBlockingQueue

    /**
     * @since 1.5
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    @SuppressWarnings("unchecked")
    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
        /**
         * Default array capacity.
         */
        private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
        /**
         * Creates a {@code PriorityBlockingQueue} with the default
         * initial capacity (11) that orders its elements according to
         * their {@linkplain Comparable natural ordering}.
         */
        public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }
    
        /**
         * Creates a {@code PriorityBlockingQueue} with the specified
         * initial capacity that orders its elements according to their
         * {@linkplain Comparable natural ordering}.
         *
         * @param initialCapacity the initial capacity for this priority queue
         * @throws IllegalArgumentException if {@code initialCapacity} is less
         *         than 1
         */
        public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    
        /**
         * Creates a {@code PriorityBlockingQueue} with the specified initial
         * capacity that orders its elements according to the specified
         * comparator.
         *
         * @param initialCapacity the initial capacity for this priority queue
         * @param  comparator the comparator that will be used to order this
         *         priority queue.  If {@code null}, the {@linkplain Comparable
         *         natural ordering} of the elements will be used.
         * @throws IllegalArgumentException if {@code initialCapacity} is less
         *         than 1
         */
        public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }
    
        /**
         * Creates a {@code PriorityBlockingQueue} containing the elements
         * in the specified collection.  If the specified collection is a
         * {@link SortedSet} or a {@link PriorityQueue}, this
         * priority queue will be ordered according to the same ordering.
         * Otherwise, this priority queue will be ordered according to the
         * {@linkplain Comparable natural ordering} of its elements.
         *
         * @param  c the collection whose elements are to be placed
         *         into this priority queue
         * @throws ClassCastException if elements of the specified collection
         *         cannot be compared to one another according to the priority
         *         queue's ordering
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public PriorityBlockingQueue(Collection<? extends E> c) {
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            boolean heapify = true; // true if not known to be in heap order
            boolean screen = true;  // true if must screen for nulls
            if (c instanceof SortedSet<?>) {
                SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
                this.comparator = (Comparator<? super E>) ss.comparator();
                heapify = false;
            }
            else if (c instanceof PriorityBlockingQueue<?>) {
                PriorityBlockingQueue<? extends E> pq =
                    (PriorityBlockingQueue<? extends E>) c;
                this.comparator = (Comparator<? super E>) pq.comparator();
                screen = false;
                if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                    heapify = false;
            }
            Object[] a = c.toArray();
            int n = a.length;
            // If c.toArray incorrectly doesn't return Object[], copy it.
            if (a.getClass() != Object[].class)
                a = Arrays.copyOf(a, n, Object[].class);
            if (screen && (n == 1 || this.comparator != null)) {
                for (int i = 0; i < n; ++i)
                    if (a[i] == null)
                        throw new NullPointerException();
            }
            this.queue = a;
            this.size = n;
            if (heapify)
                heapify();
        }
    
        //扩容操作
        private void tryGrow(Object[] array, int oldCap) {
            lock.unlock(); // must release and then re-acquire main lock
            Object[] newArray = null;
            if (allocationSpinLock == 0 &&
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                         0, 1)) {
                try {
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) : // grow faster if small
                                           (oldCap >> 1));
                    if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                            throw new OutOfMemoryError();
                        newCap = MAX_ARRAY_SIZE;
                    }
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];
                } finally {
                    allocationSpinLock = 0;
                }
            }
            if (newArray == null) // back off if another thread is allocating
                Thread.yield();
            lock.lock();
            if (newArray != null && queue == array) {
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }
    }

    PriorityBlockingQueue是一个支持优先级的无界阻塞队列,按自然排序,但不是先进先出的顺序。默认情况下元素初始容量为11,当数据量超过当前容量的时候就会执行扩容操作实现容量扩充,所以是无界阻塞队列,同时默认采取的自然顺序为升序排序。通过Comparable接口的compareTo()方法来指定元素的自然排序规则,通过构造函数参数Comparator来队元素进行排序。但是无法保证同优先级元素的顺序。

    注意,由于需要Priority的属性,所以插入的数据必须是可以比较的,所以不可插入null值,同时可以将其视为PriorityQueue的线程安全版本,当然由于它无界的属性,所以执行put的操作的时候是不存在阻塞情况的,而take操作在队列为空的状态时就会被阻塞。

    SynchronousQueue

    /**
     * @since 1.5
     * @author Doug Lea and Bill Scherer and Michael Scott
     * @param <E> the type of elements held in this collection
     */
    public class SynchronousQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
        /**
         * Shared internal API for dual stacks and queues.
         */
        abstract static class Transferer<E> {
            /**
             * Performs a put or take.
             *
             * @param e if non-null, the item to be handed to a consumer;
             *          if null, requests that transfer return an item
             *          offered by producer.
             * @param timed if this operation should timeout
             * @param nanos the timeout, in nanoseconds
             * @return if non-null, the item provided or received; if null,
             *         the operation failed due to timeout or interrupt --
             *         the caller can distinguish which of these occurred
             *         by checking Thread.interrupted.
             */
            abstract E transfer(E e, boolean timed, long nanos);
        }
    
        /**
         * Creates a {@code SynchronousQueue} with nonfair access policy.
         */
        public SynchronousQueue() {
            this(false);
        }
    
        /**
         * Creates a {@code SynchronousQueue} with the specified fairness policy.
         *
         * @param fair if true, waiting threads contend in FIFO order for
         *        access; otherwise the order is unspecified.
         */
        public SynchronousQueue(boolean fair) {
            transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
        }
        
        /**
         * Always returns {@code true}.
         * A {@code SynchronousQueue} has no internal capacity.
         *
         * @return {@code true}
         */
        public boolean isEmpty() {
            return true;
        }
    
        /**
         * Always returns zero.
         * A {@code SynchronousQueue} has no internal capacity.
         *
         * @return zero
         */
        public int size() {
            return 0;
        }
    
        /**
         * Always returns zero.
         * A {@code SynchronousQueue} has no internal capacity.
         *
         * @return zero
         */
        public int remainingCapacity() {
            return 0;
        }
    }
    

    SynchronouseQueue是一个不存储元素的队列,即容量为0,它完成的是直接传递的队列,从isEmpty()方法一直返回true可知。对应该队列的每一个put操作必须等待下一操作,即一个插入操作要等待一个移除操作,否则就不能进行下一次的添加元素操作。他支持公平访问队列方式,默认情况下线程采用非公平性策略访问队列。

      SynchronousQueue<E> queue = new SynchronousQueue<E>(true); //设置公平性访问策略
      public SynchronousQueue(boolean fair) {
            transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
        }

    SynchronousQueue队列负责把生产者线程处理的数据直接传递给消费者线程,本身并不会存储任何线程,所以非常适合传递性场景,且SynchronousQueue的吞吐量会高于LinkedBlockingQueue和ArrayBlockingQueue。

    展开全文
  • 消息队列(Message Queue),是分布式系统重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。 消息队列主要解决了...

    一、消息队列(MQ)概述

    消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:

    当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。

    消息队列主要解决了应用耦合、异步处理、流量削锋等问题。

    当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。

    二、消息队列使用场景

    消息队列在实际应用中包括如下四个场景:

    • 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
    • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
    • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
    • 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

    下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用:

    2.1 异步处理

    具体场景:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。对这两个操作的处理方式有两种:串行及并行。

    (1)串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信;

    在这种方式下,需要最终发送验证短信后再返回给客户端。

    (2)并行处理:新注册信息写入后,由发短信和发邮件并行处理;

    在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。

    假设以上三个子系统处理的时间均为50ms,且不考虑网络延迟,则总的处理时间:

    串行:50+50+50=150ms
    并行:50+50 = 100ms

    若使用消息队列:

    并在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本可以忽略不计,因此总的处理时间相比串行提高了2倍,相比并行提高了一倍;

    2.2 应用耦合

    具体场景:用户使用QQ相册上传一张图片,人脸识别系统会对该图片进行人脸识别,一般的做法是,服务器接收到图片后,图片上传系统立即调用人脸识别系统,调用完成后再返回成功,如下图所示:

    该方法有如下缺点:

    • 人脸识别系统被调失败,导致图片上传失败;
    • 延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果;
    • 图片上传系统与人脸识别系统之间互相调用,需要做耦合;

    若使用消息队列:

    客户端上传图片后,图片上传系统将图片信息如uin、批次写入消息队列,直接返回成功;而人脸识别系统则定时从消息队列中取数据,完成对新增图片的识别。
    此时图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时间,对队列中的图片信息进行处理。

    2.3 限流削峰

    具体场景:购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。

    该方法有如下优点:

    1. 请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力;
    2. 队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;

    2.4 消息驱动的系统

    具体场景:用户新上传了一批照片, 人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用户的人脸索引(加快查询)。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。

    该方法有如下优点:

    • 避免了直接调用下一个系统导致当前系统失败;
    • 每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处理;

    三、消息队列的两种模式

    消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)。

    3.1 点对点模式

    点对点模式下包括三个角色:

    • 消息队列
    • 发送者 (生产者)
    • 接收者(消费者)

    消息发送者生产消息发送到queue中,然后消息接收者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

    点对点模式特点:

    • 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
    • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
    • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

    3.2 发布/订阅模式

    发布/订阅模式下包括三个角色:

    • 角色主题(Topic)
    • 发布者(Publisher)
    • 订阅者(Subscriber)

    发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

    发布/订阅模式特点:

    • 每个消息可以有多个订阅者;
    • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
    • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

    四、常用消息队列介绍

    本部分主要介绍四种常用的消息队列(RabbitMQ/ActiveMQ/RocketMQ/Kafka)的主要特性、优点、缺点。

    4.1 RabbitMQ

    RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

    主要特性:

    1. 可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;
    2. 灵活的路由: 消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用;
    3. 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用;
    4. 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
    5. 多种协议的支持:支持多种消息队列协议;
    6. 服务器端用Erlang语言编写,支持只要是你能想到的所有编程语言;
    7. 管理界面: RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;
    8. 跟踪机制:如果消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么;
    9. 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件;

    使用RabbitMQ需要:

    • ErLang语言包
    • RabbitMQ安装包

    RabbitMQ可以运行在Erlang语言所支持的平台之上:

    Solaris
    BSD
    Linux
    MacOSX
    TRU64
    Windows NT/2000/XP/Vista/Windows 7/Windows 8
    Windows Server 2003/2008/2012
    Windows 95, 98
    VxWorks

    优点:

    1. 由于erlang语言的特性,mq 性能较好,高并发;
    2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
    3. 有消息确认机制和持久化机制,可靠性高;
    4. 高度可定制的路由;
    5. 管理界面较丰富,在互联网公司也有较大规模的应用;
    6. 社区活跃度高;

    缺点:

    1. 尽管结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
    2. 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,但是使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;
    3. 需要学习比较复杂的接口和协议,学习和维护成本较高;

    4.2 ActiveMQ

    ActiveMQ是由Apache出品,ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

    主要特性:

    1. 服从 JMS 规范:JMS 规范提供了良好的标准和保证,包括:同步或异步的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
    2. 连接性:ActiveMQ 提供了广泛的连接选项,支持的协议有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性。
    3. 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP ;
    4. 持久化插件和安全插件:ActiveMQ 提供了多种持久化选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行自定义鉴权和授权;
    5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
    6. 代理集群:多个 ActiveMQ 代理可以组成一个集群来提供服务;
    7. 异常简单的管理:ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通过处理 JMX 的告警消息,通过使用命令行脚本,甚至可以通过监控各种类型的日志。

    使用ActiveMQ需要:

    • Java JDK
    • ActiveMQ安装包

    ActiveMQ可以运行在Java语言所支持的平台之上。

    优点:

    1. 跨平台(JAVA编写与平台无关有,ActiveMQ几乎可以运行在任何的JVM上)
    2. 可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都是开发人员最熟悉的存储介质。将消息存到数据库,看得见摸得着。而且公司有专门的DBA去对数据库进行调优,主从分离;
    3. 支持JMS :支持JMS的统一接口;
    4. 支持自动重连;
    5. 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权。
    6. 监控完善:拥有完善的监控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;
    7. 界面友善:提供的Web Console可以满足大部分情况,还有很多第三方的组件可以使用,如hawtio;
      缺点:

    8. 社区活跃度不及RabbitMQ高;

    9. 根据其他用户反馈,会出莫名其妙的问题,会丢失消息;
    10. 目前重心放到activemq6.0产品-apollo,对5.x的维护较少;
    11. 不适合用于上千个队列的应用场景;

    4.3 RocketMQ

    RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

    主要特性:

    1. 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点;
    2. Producer、Consumer、队列都可以分布式;
    3. Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
    4. 能够保证严格的消息顺序;
    5. 提供丰富的消息拉取模式;
    6. 高效的订阅者水平扩展能力;
    7. 实时的消息订阅机制;
    8. 亿级消息堆积能力;
    9. 较少的依赖;

    使用RocketMQ需要:

    • Java JDK
    • 安装git、Maven
    • RocketMQ安装包

    RocketMQ可以运行在Java语言所支持的平台之上。

    优点:

    1. 单机支持 1 万以上持久化队列
    2. RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,
      访问时,直接从内存读取。
    3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);
    4. 性能非常好,可以大量堆积消息在broker中;
    5. 支持多种消费,包括集群消费、广播消费等。
    6. 各个环节分布式扩展设计,主从HA;
    7. 开发度较活跃,版本更新很快。

    缺点:

    支持的客户端语言不多,目前是java及c++,其中c++不成熟;
    RocketMQ社区关注度及成熟度也不及前两者;
    没有web管理界面,提供了一个CLI(命令行界面)管理工具带来查询、管理和诊断各种问题;
    没有在 mq 核心中去实现JMS等接口;

    4.4 Kafka

    Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),,之后成为Apache项目的一部分。Kafka系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

    主要特性:

    1. 快速持久化,可以在O(1)的系统开销下进行消息持久化;
    2. 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;
    3. .完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
    4. 支持同步和异步复制两种HA;
    5. 支持数据批量发送和拉取;
    6. zero-copy:减少IO操作步骤;
    7. 数据迁移、扩容对用户透明;
    8. 无需停机即可扩展机器;
    9. 其他特性:严格的消息顺序、丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制;

    使用Kafka需要:

    • Java JDK
    • Kafka安装包

    优点:

    1. 客户端语言丰富,支持java、.net、php、ruby、python、go等多种语言;
    2. 性能卓越,单机写入TPS约在百万条/秒,消息大小10个字节;
    3. 提供完全分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积;
    4. 支持批量操作;
    5. 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
    6. 有优秀的第三方Kafka Web管理界面Kafka-Manager;
    7. 在日志领域比较成熟,被多家公司和多个开源项目使用;

    缺点:

    1. Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
    2. 使用短轮询方式,实时性取决于轮询间隔时间;
    3. 消费失败不支持重试;
    4. 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
    5. 社区更新较慢;

    4.5 RabbitMQ/ActiveMQ/RocketMQ/Kafka对比

    这里列举了上述四种消息队列的差异对比:

    结论:

    Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka。

    五、参考资料:

    5.1 消息队列:

    1. 大型网站架构之分布式消息队列 http://blog.csdn.net/shaobingj126/article/details/50585035
    2. 消息队列的使用场景 https://www.zhihu.com/question/34243607/answer/127666030
    3. 浅谈异步消息队列模型 http://www.cnblogs.com/sunkeydev/p/5248855.html
    4. 消息队列的两种模式 http://blog.csdn.net/heyutao007/article/details/50131089

    5.2 RabbitMQ

    1. RabbitMQ主页 https://www.rabbitmq.com/
    2. RabbitMQ学习教程 https://www.rabbitmq.com/getstarted.html
    3. 专栏:RabbitMQ从入门到精通 http://blog.csdn.net/column/details/rabbitmq.html
    4. RabbitMQ能为你做些什么 http://rabbitmq.mr-ping.com/description.html
    5. RabbitMQ指南(1)-特性及功能 https://blog.zenfery.cc/archives/79.html

    5.3 ActiveMQ

    1. ActiveMQ主页 http://activemq.apache.org/
    2. Apache ActiveMQ介绍 http://jfires.iteye.com/blog/1187688
    3. ActiveMQ的简介与安装 http://blog.csdn.net/sl1992/article/details/72824562
    4. ActiveMQ 和消息简介 http://www.cnblogs.com/craftsman-gao/p/7002605.html

    5.4 RocketMQ

    1. 主页 https://github.com/alibaba/RocketMQ
    2. RocketMQ 原理简介 http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf
    3. RocketMQ与kafka对比(18项差异) http://jm.taobao.org/2016/03/24/rmq-vs-kafka/

    5.5 Kafka

    1.Kafka主页: http://kafka.apache.org/

    1. Kafka特性 http://www.cnblogs.com/lsx1993/p/4847719.html
    2. Kafka客户端支持语言 https://cwiki.apache.org/confluence/display/KAFKA/Clients

    5.6 RabbitMQ/ActiveMQ/RocketMQ/Kafka对比

    1. RocketMQ,队列选型 http://www.zmannotes.com/index.php/2016/01/17/rocketmq/
    2. RabbitMQ和Kafka http://www.dongcoder.com/detail-416804.html
    3. 即时通信RabbitMQ二-性能测试 http://www.jianshu.com/p/d31ae9e3bfb6
    4. RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总 http://blog.csdn.net/linsongbin1/article/details/47781187
    5. 消息队列软件产品大比拼 http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html

    总结:

    消息队列利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用数据库redis充当消息队列的案例。而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。

    展开全文
  • 异步执行的函数(回调函数)放入队列中执行。队列分为宏队列与队列。 宏队列:用来保存执行的宏任务(回调),比如:dom事件回调,ajax回调,定时器回调 微队列:用来保存执行的微任务(回调),比如:promise回调,...

    JavaScript 异步之宏队列与微队列

    事件循环机制

    事件循环的顺序,决定了JavaScript代码的执行顺序

    宏任务:setTimeout, setInterval, setImmediate, I/O, UI rendering。
    微任务: process.nextTick, Promise, MutationObserver(html5新特性)

    浏览器

    在这里插入图片描述
    1.JS线程负责处理JS代码,当遇到一些异步操作的时候,则将这些异步事件移交给Web APIs 处理,自己则继续往下执行。
    2.Web APIs线程将接收到的事件按照一定规则添加到任务队列中,宏事件(DOM事件、Ajax事件、setTimeout事件等)添加到宏任务队列中,微事件(Promise、nextTick)添加到微事件队列中。
    3.JS线程处理完当前的所有任务以后(执行栈为空),它会先去微任务队列获取事件,并将微任务队列中的所有事件一件件执行完毕,直到微任务队列为空后再去宏任务队列中取出一个事件执行(每次取完一个宏任务队列中的事件执行完毕后,都先检查微任务队列)。
    4.然后不断循环第3步。

    补充:

    1. 浏览器标准环境中(比如说谷歌webkit内核),是一个宏任务紧接着所有微任务执行。
    2. 在node环境中,则又不一样了,是一个类型宏任务队列执行完,再去执行微任务。

    宏队列与微队列

    异步执行的函数(回调函数)放入队列中执行。队列分为宏队列与微队列。

    宏队列:用来保存执行的宏任务(回调),比如:dom事件回调,ajax回调,定时器回调
    微队列:用来保存执行的微任务(回调),比如:promise回调,mutation回调

    在这里插入图片描述

    1.JS为单线程引擎,必须先执行所有的初始化同步任务代码。
    2.每次取出第一个宏任务执行前,都要将所有的微任务执行完毕。

    同步->微队列->宏队列

     setTimeout(()=>{ //会立即放入宏队列
            console.log("timeout callback1()");
            Promise.resolve(3).then(
                value =>{ //会立即放入微队列
                    console.log("Promise onResolved3()",value);
                }
            )
        },0)
        setTimeout(()=>{ //会立即放入宏队列
            console.log("timeout callback2()");
        },0)
        Promise.resolve(1).then(
            value =>{ //会立即放入微队列
                console.log("Promise onResolved1()",value);
            }
        )
        Promise.resolve(2).then(
            value =>{ //会立即放入微队列
                console.log("Promise onResolved2()",value);
            }
        )
     /*
      输出:Promise onResplved(),1
           Promise onResplved(),2
           timeout callback1()
           Promise onResolved3()
           timeout callback2()
    )
    */
    

    注:取到栈里执行,若有嵌套的情况如下列代码,执行第一个setTimeout回调时,把里面的微任务放入微队列,执行第二宏任务前,先将所有的微任务执行完毕。

    promise、async-await、setTimeout的执行顺序

    async 函数返回一个 Promise 对象,当函数执行的时候,一旦遇到 await 就会先返回,等到触发的异步操作完成,再接着执行函数体内后面的语句。

    1.await是一个让出线程的标志,await后面的函数会先执行一遍,然后跳出整个async函数来执行后面js栈的代码。

    2.async函数总是会返回一个Promise的实例,调用一个async函数时,可以理解为里边的代码都是处于new Promise中,所以是同步执行的而最后return的操作,则相当于在Promise中调用resolve。
    3.async函数代码执行是同步的,结果返回(相当于then?)是异步的。
    4.回来执行await后(相当于.then)执行了

    async function async1(){
    	console.log('async1 start')
    	await async2()
    	console.log('async1 end')
    }
    async function async2(){
    	console.log('async2')
    }
    console.log('script start')
    setTimeout(function(){
    	console.log('setTimeout') 
    },0)  
    async1();
    new Promise(function(resolve){
    	console.log('promise1')
    	resolve();
    }).then(function(){
    	console.log('promise2')
    })
    console.log('script end')
    /*
    script start
    根据点2 console.log('async1 start')
    根据点2、点3 console.log('async2') 返回值是promise,await相当于then获取成功的值,要异步!会放入为微队列中等待 await会让出线程,跳出async1继续往下执行
    promise1
    then是异步的异步的! console.log('promise2')放入微队列等待
    script end 
    async1 end 
    promise2  
    setTimeout
    */
    

    await会等同步代码执行完,以及微任务队列清空之后,再执行awati之后的代码。 之后才轮到宏任务队列。

    展开全文
  • 流量控制的一个基本概念是队列(Qdisc),每个网卡都一个队列(Qdisc)相联系, 每当内核需要将报文分组从网卡发送出去, 都会首先将该报文分组添加到该网卡所配置的队列中, 由该队列决定报文分组的发送顺序。...
  • 文章目录第一章——褚论第二章——线性表第三章——栈与队列判断题单选题程序填空题 第一章——褚论 第二章——线性表 第三章——栈与队列 判断题 所谓“循环队列”是指用单向循环链表或者循环数组表示的队列。 ...
  • 文章目录队列介绍数组模拟队列数组模拟队列设计思路...因为队列的输出、输入是分别从前后端来处理,因此需要两个变量 front及 rear分别记录队列前后端的下标,front 会随着数据输出而改变,而 rear则是随着数据输入而改
  • 关于本文 作者:@jrainlau 原文:https://segmentfault.com/a/1190000037567355在最近的业务,接到了一个需要处理约十万条数据的需求。这些...
  • 分布式消息队列的演进

    千次阅读 2021-10-01 00:35:17
    作者:vincentchma,腾讯 IEG 后台开发工程师一、消息队列的演进分布式消息队列中间件是是大型分布式系统常见的中间件。消息队列主要解决应用耦合、异步消息、流量削锋等问题,具有高...
  • 与队列选择题

    2021-10-30 20:36:03
    在解决计算机主机打印机之间速度不匹配问题时通常设置一个打印缓冲区,该缓冲区应该是一个【正确答案: B】结构。 A栈 B队列 C数组 D线性表 2. 【单选题】 设栈S和队列Q的初始...
  • 《算法数据结构考研试题精选》 ——陈守孔 胡潇琨 李玲 冯广惠 编著 前言 内容提要: 1.从数据结构角度讲,栈和队列属于线性结构,其操作是线性表操作的子集,是操作受荣的线性表。但从数据类型的角度看,它们是和...
  • 分布式消息队列

    千次阅读 多人点赞 2021-09-27 00:17:28
    作者:vincentchma,腾讯 IEG 后台开发工程师一、消息队列的演进分布式消息队列中间件是是大型分布式系统常见的中间件。消息队列主要解决应用耦合、异步消息、流量削锋等问题,具有高...
  • 栈和队列主要用于在计算过程保存临时数据,这些数据是计算发现或者产生的,在后面的计算可能需要使用它们。 栈和队列也是最简单的缓存结构,它们只支持数据项的存储和访问,不支持数据项之间的任何关系,因此...
  • Java双向队列Deque栈与队列

    万次阅读 多人点赞 2018-12-22 16:08:06
    Java实际上提供了java.util.Stack来实现栈结构,但官方目前已不推荐使用,而是使用java.util.Deque双端队列来实现队列与栈的各种需求.如下图所示java.util.Deque的实现子类有java.util.LinkedList和java.util....
  • 史上最强数据结构----栈和队列相关笔试面试题

    千次阅读 多人点赞 2022-04-11 11:02:14
    史上最强数据结构----栈和队列相关笔试面试题
  • 消息队列中间件是分布式系统重要的组件,主要解决应用耦合,异步消息,流量削锋等问题 实现高性能,高可用,可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ ...
  • kafka 消息队列

    2021-01-14 13:13:25
    kafka是使用Java和Scala编写的一个快速可扩展的高吞吐量的分布式消息队列系统。kafka将数据持久化存储到磁盘上,自带分区和副本机制,因而具有较好的持久化保证。但是kafka的消息消费没有确认机制,可能因为consumer...
  • 03-栈与队列

    千次阅读 2019-05-12 19:04:13
    3.1 栈 ​ 3.1.1 抽象数据类型栈的定义 ​ 3.1.2 栈的表示和实现 3.2 栈的应用举例 ​ 3.3.1 数制转换 ​ 3.3.2 括号匹配的检验 ​ 3.3.4 行编辑程序 ​ 3.3.5 迷宫求解 ...​ 3.4.2 链队列队列的链式表...
  • 队列 队列是只能在一端插入,另一端删除元素的线性表。 特性:先进先出 队列术语 队列的基本运算 (1)初始化 :设置队列为空。 (2)判断队列为空: 若为空,则返回TRUE,否则返回FALSE. (3)判断队列为满: 若为满,则...
  • 当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。这个过程的exchange和queue就是所谓的”Dead Letter Exchange 和 Queue” 1.5.4 交换机的属性 除交换机类型外,...
  • 消息队列常见的 5 个应用...以下介绍消息队列在实际应用常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。 1、异步处理 场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:.
  • 模拟系统,其中事件的键即为发生的时间,而系统需要按照时间顺序处理所有事件; 任务调度,其中键值对应的优先级决定了应该首先执行哪些任务; 数值计算,键值代表计算错误,而我们需要按照键值指定的顺序来修正它们...
  • 2 DeQueue:使队列中的队首元素删除并返回此元素 3 MaxElement:返回队列中的最大元素 请设计一种数据结构和算法,让MaxElement操作的时间复杂度尽可能地低。     队列是遵守“先入先出”原则的一种复杂数据结构。...
  • 一、 队列的定义 你们在用电脑时有没有经历,机器有时会处于疑似死机的状态,鼠标点什么似乎都没用,双击任何快捷方式都不动弹。就当你失去耐心,打算rest时。突然他像酒醒了一样,把你刚才点击的所有操作全部按顺序...
  • 在分布式系统,两个组件要基于消息队列进行通信,一个组件就会把要处理的数据以消息的形式传递给消息队列,然后这个组件就可以继续执行其他操作; 远端的另一个组件从消息队列中把消息读取出来,在本地进行处理。...
  • 顺序栈类似,在队列的顺序存储结构,用一组地址连续的存储单元依次存放从队头到队尾的元素,如一维数组 Queue[MAXSIZE]。 由于队列中队头和队尾的位置都是动态变化的,因此需要附设两个指针 front 和 rear。 ...
  • 以淘宝卖商品为例,在单体项目应用,针对一个接口的处理,如:顾客下订单,往往会包含以下几种步骤: 1、订单系统创建 2、库存系统减库存 3、支付扣除顾客钱 4、生成物流单 … 假设因为逻辑问题,导致其中某一个...
  • 多级反馈队列调度算法是目前操作系统调度算法被公认的一种较好的调度算法。它可以满足各种类型进程的需要,既能使高优先级的作业得到响应又能使短作业(进程)迅速完成。 基本概念 多级反馈队列调度算法是一种根据...
  • 书面作业: 第三章 栈与队列

    千次阅读 2020-11-18 21:04:19
    判断 1-1 n个元素通过一个栈产生n个元素的出栈序列,其中进栈和出栈操作的次数总是相等的。 1-2 通过对堆栈S操作:Push(S,1), Push(S,2), Pop(S), ...环形队列中有多少个元素可以根据队首指针和队尾指针的值来计算 1-6
  • 介绍消息队列

    2019-04-26 15:49:24
    流程A在处理时,没有在当前线程同步的处理完,而是直接发送一条消息A1到消息队列中,然后消息队列过了一段时间,这段时间可能是几毫秒、几秒甚至几分钟都有可能,这个消息开始被处理,消息被处理的过程,其实就相当...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 50,191
精华内容 20,076
关键字:

下列处理中与队列有关