精华内容
下载资源
问答
  • 主要介绍了Java延迟队列原理与用法,结合实例形式详细分析了延迟队列的概念、原理、功能及具体使用方法,需要的朋友可以参考下
  • 主要介绍了Java 队列实现原理及简单实现代码的相关资料,需要的朋友可以参考下
  • 无锁队列的实现原理

    千次阅读 2019-07-17 22:07:44
    队列在计算机中非常重要的一种数据结构,尤其在操作系统中。队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。结合自己在工作中遇到的队列...

    队列在计算机中非常重要的一种数据结构,尤其在操作系统中。队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现。根据操作队列的场景分为:单生产者——单消费者、多生产者——单消费者、单生产者——多消费者、多生产者——多消费者四大模型。其实后面三种的队列,可以归纳为一种多对多。根据队列中数据分为:队列中的数据是定长的、队列中的数据是变长的。

    无锁队列的实现

    关于CAS等原子操作

    在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

    这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。

    int compare_and_swap (int* reg, int oldval, int newval)
    {
      int old_reg_val = *reg;
      if (old_reg_val == oldval)
         *reg = newval;
      return old_reg_val;
    }
    

    这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):

    bool compare_and_swap (int *accum, int *dest, int newval)
    {
      if ( *accum == *dest ) {
          *dest = newval;
          return true;
      }
      return false;
    }
    

    与CAS相似的还有下面的原子操作:

    Fetch And Add,一般用来对变量做 +1 的原子操作
    Test-and-set,写值到某个内存位置并传回其旧值。汇编指令BST
    Test and Test-and-set,用来低低Test-and-Set的资源争夺情况
    注:在实际的C/C++程序中,CAS的各种实现版本如下:

    1)GCC的CAS

    GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins)

    bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
    

    2)Windows的CAS

    在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions)

    InterlockedCompareExchange ( __inout LONG volatile *Target,
                    __in LONG Exchange,
                    __in LONG Comperand);
    

    3) C++11中的CAS

    C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library)

    template< class T >
    bool atomic_compare_exchange_weak( std::atomic* obj,
                      T* expected, T desired );
    template< class T >
    bool atomic_compare_exchange_weak( volatile std::atomic* obj,
                      T* expected, T desired );
    

    无锁队列的链表实现

    下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。

    我们先来看一下进队列用CAS实现的方式:

    EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q->value = x;
        q->next = NULL;
     
        do {
            p = tail; //取链表尾指针的快照
        } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
     
        CAS(tail, p, q); //置尾结点
    }
    

    我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

    你会看到,为什么我们的“置尾结点”的操作(第12行)不判断是否成功,因为:

    如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
    此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
    直到T1线程更新完tail指针,于是其它的线程中的某个线程就可以得到新的tail指针,继续往下走了。
    这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()

    EnQueue(x) //进队列改良版
    {
        q = new record();
        q->value = x;
        q->next = NULL;
     
        p = tail;
        oldp = p
        do {
            while (p->next != NULL)
                p = p->next;
        } while( CAS(p.next, NULL, q) != TRUE); //如果没有把结点链在尾上,再试
     
        CAS(tail, oldp, q); //置尾结点
    }
    

    我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而通实际情况看下来,99.9%的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个版本,如果retry的次数超了一个值的话(比如说3次),那么,就自己fetch指针。

    好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

    DeQueue() //出队列
    {
    do{
    p = head;
    if (p->next == NULL){
    return ERR_EMPTY_QUEUE;
    }
    while( CAS(head, p, p->next) != TRUE );
    return p->next->value;
    }
    我们可以看到,DeQueue的代码操作的是 head->next,而不是head本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了。

    注:上图的tail正处于更新之前的装态。

    CAS的ABA问题

    所谓ABA(见维基百科的ABA词条),问题基本是这个样子:

    进程P1在共享变量中读到值为A
    P1被抢占了,进程P2执行
    P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
    P1回来看到共享变量里的值没有被改变,于是继续执行。
    虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)

    比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为)

    这个例子你可能没有看懂,维基百科上给了一个活生生的例子——

    你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

    这就是ABA的问题。

    解决ABA的问题
    维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,我们要检查64位的内容

    1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。

    2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

    这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

    当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数refcnt!

    SafeRead(q)
    {
        loop:
            p = q->next;
            if (p == NULL){
                return p;
            }
     
            Fetch&Add(p->refcnt, 1);
     
            if (p == q->next){
                return p;
            }else{
                Release(p);
            }
        goto loop;
    }
    

    其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。

    用数组实现无锁队列

    本实现来自论文《Implementing Lock-Free Queues》

    使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

    1)数组队列应该是一个ring buffer形式的数组(环形数组)

    2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)

    3)数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。

    4)EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。

    5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。

    算法的一个关键是——如何定位HEAD或TAIL?

    1)我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。

    2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。

    3)累加后求个模什么的就可以知道TAIL和HEAD的位置了。

    小结

    以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

    1)无锁队列主要是通过CAS、FAA这些原子操作,和Retry-Loop实现。

    2)对于Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。

    展开全文
  • 无锁队列原理和实现

    千次阅读 2016-07-07 20:43:11
    最近在研究无锁编程相关的东西,特别是无锁数据结构,看了不少人的文章和博客,深受启发,于是决定动手基于数组实现一个C++的无锁队列。基本数据结构无锁队列的底层数据结构特别简单:template class NolockQueue {...

    最近在研究无锁编程相关的东西,特别是无锁数据结构,看了不少人的文章和博客,深受启发,于是决定动手基于数组实现一个C++的无锁队列。

    基本数据结构

    无锁队列的底层数据结构特别简单:

    template <typename T>
    class NolockQueue
    {
    public:
      NolockQueue(int size);
      ~NolockQueue();
    public:
      int push(T *ptr);
      int pop(T *&ptr);
      inline int get_total() const;
      inline int get_free() const;
      int capacity() const {return size;}
    private:
      struct Item
      {
        T *data;
      };
    private:
      int size;
      Item *array;
      uint64_t consumer;
      uint64_t producer;
    };

    用一个array数组放置指向数据的指针们,consumer表示pop操作弹出(获取)元素的位置,每次弹出consumer都会加1,producer表示push操作插入元素的位置,每次弹出producer都会加1,初始化时二者均为0:

    template <typename T>
    NolockQueue<T>::NolockQueue(int size)
    {
      array = reinterpret_cast<Item *>(new char[sizeof(Item) * size]);
      memset(array, 0, sizeof(Item) * size); 
      this->size = size;
      consumer = 0;
      producer = 0;
    }

    关于队列大小的计算:

    template <typename T>
    inline int NolockQueue<T>::get_total() const
    {
      return (producer - consumer);
    }

    由于producer和consumer都只加不减,所以很容易超过array的大小,通过对size取模的方式来定位到对应的位置上,这也是环形缓冲区(Ring buffer)的基本思想。那么producer是否一定大于consumer呢?producer - consumer是否一定非负呢?
    producer在超过无符号常整型的最大值时会变为0,而这时consumer很可能还没跨过边界,所以就会导致producer小于consumer,但是不用担心,producer - consumer仍然可以得到正确的值,这里是利用了无符号数字的溢出特性:
    0 - 0xffff = 1
    写段程序验证一下:

    #include <stdint.h>
    #include <stdio.h>
    int main()
    {
      uint64_t z = 0xffffffffffffffff;
      uint64_t zplus1 = z + 1;
      printf("zplus1 - z: %llu\n", zplus1 - z);
      return 0;
    }

    输出是1
    BTW, 要等到一个64位整数越界要多久呢?假设1ns写入一个数据,那么producer离越界也还需要584年。

    push与pop操作

    push操作每次都是先判断当前队列是否为满(producer < consumer + size),如果队列不满的话,就将producer原子性地加1,表示producer旧值(加1前)对应的位置被预留了,当前线程可以将数据写入到这个位置

    pop操作每次都是先判断当前队列是否为空(consumer < producer),如果队列不为空的话,就将consumer原子性地加1,表示consumer旧值对应的位置被预留了,当前线程可以安全地从这个位置读取元素

    由此,我们需要一个原子操作:
    FAA_Bounded(uint64_t *addr, uint64_t delta, uint64_t up_bound)
    表示:
    当addr的值小于up_bound时,将addr的值加上delta,存储在addr上,同时返回旧值
    否则对addr上的值不做修改,直接返回addr上的值

    uint64_t FAA_BOUNDED(uint64_t* addr, uint64_t delta, uint64_t up_bound)
    {
      uint64_t old_value; 
      while((old_value = ATOMIC_LOAD(addr)) < up_bound &&
            old_value != ATOMIC_CAS(addr, old_value, old_value + delta)  
           ) { 
        PAUSE(); //asm("pause\n")
      }
      return old_value;
    }

    有了这样一个原子操作,下面来理一下两个操作的逻辑:
    push操作的逻辑:
    1. 计算producer的上边界,push_limit = consumer + size
    2. 执行push_idx = FAA_BOUNDED(&producer, 1, push_limit)
    3. 判断push_idx 是否合法,即是否小于push_limit,如果不合法,那么重复执行push操作(直到有consumer从队列中取走元素),如果合法,执行4
    4. 为了防止push位置的值不为NULL(这将导致非法插入),需要使用CAS操作将push_idx位置的指针从NULL变为非NULL

    template <typename T>
    int NolockQueue<T>::push(T *ptr) {
      uint64_t push_limit = ATOMIC_LOAD(&consumer) + size;
      uint64_t push_idx = FAA_BOUNDED(&producer, 1, push_limit);
      if(push_idx >= push_limit) {
        push(ptr);
      } else {
        void **pdata = reinterpret_cast<void **>(&array[push_idx % size].data);
        while (NULL != ATOMIC_CAS(pdata, NULL, ptr)) {
          PAUSE();
        } 
      }
      return 1;
    }

    pop操作的逻辑:
    1. 计算consumer的上边界,pop_limit = producer
    2. 执行pop_idx = FAA_BOUNDED(&consumer, 1, push_limit)
    3. 判断pop_idx 是否合法,即是否小于pop_limit,如果不合法,那么重复执行pop操作(直到有producer在队列中放入了元素),如果合法,执行4
    4. 为了防止pop位置的值为NULL(这将导致非法读取),需要使用原子操作将push_idx位置的指针从非NULL变为NULL

    template <typename T>
    int NolockQueue<T>::pop(T *&ptr) {
      uint64_t pop_limit = ATOMIC_LOAD(&producer);
      uint64_t pop_idx = FAA_BOUNDED(&consumer, 1, pop_limit);
      if(pop_idx >= pop_limit) {
        pop(ptr);
      } else {
        void **pdata = reinterpret_cast<void **>(&array[pop_idx % size].data);
        while (NULL == (ptr = static_cast<T *> (ATOMIC_SET(pdata, NULL)))) {
          PAUSE();
        }
      }
      return 1;
    }

    对于本文中用到的ATOMIC原语,统一说明下:

    #define __COMPILER_BARRIER() asm volatile("" ::: "memory")
    #define ATOMIC_LOAD(x) ({__COMPILER_BARRIER(); *(x);})
    #define ATOMIC_SET(val, newv) ({__sync_lock_test_and_set((val), (newv));})
    #define ATOMIC_CAS(val, cmpv, newv) ({__sync_val_compare_and_swap((val), (cmpv), (newv)); }) 
    #define PAUSE() asm("pause\n")

    其他说明:
    1. 当队列为空时(队列满),pop(push)操作会循环请求直到成功,也可以不重复执行push和pop,而返回一个错误码以供异步调用。
    2. 计算producer上边界时,push_limit = consumer + size,可能导致溢出,此时push_idx将会一直大于push_limit从而导致程序死循环,鉴于需要500多年才会发生这种情况且本文仅供参考,故不对此进行debug。

    展开全文
  • 主要介绍了Laravel框架队列原理与用法,结合实例形式分析了Laravel框架队列的原理、使用方法及相关操作注意事项,需要的朋友可以参考下
  • 最近稍微研究了一下CAS无锁队列的实现,首先需要先了解一下CAS无锁队列的概念。CAS的意思是Compare And Swap,从字面意思上面也可以知道实际就是对数据进行交换的一种原子操作。 无锁队列的内部实现实际也是原子...

    简介

        在进入今天的主题之前,我们先来了解一下一般使用的比较常用的锁。互斥锁和自旋锁。

        互斥锁:如果取不到锁就会进入休眠,本身取锁的操作并不耗时,主要就是等待拿到锁的时间,并且这样的话会进行线程切换,比较耗资源;自旋锁就不一样了,在没有获取到锁的情况下不会休眠,而是一直忙等待下去,一直占据CPU,不进行线程的切换,这样的好处就是执行本身耗时比较短的操作时,加锁的代价比较小,但是如果本身加锁执行的操作很长或者会有可能阻塞的话,其他锁一直忙等也是很耗CPU的。

        首先需要先了解一下CAS无锁队列的概念。CAS的意思是Compare And Swap,从字面意思上面也可以知道实际就是对数据进行交换的一种原子操作。对应到CPU指令的话就是cmpxchg。
        无锁队列的内部实现实际也是原子操作,可以避免多线程调用出现的不可预知的情况。主要的核心就是函数__sync_bool_compare_and_swap,返回值为bool型,原子交换操作成功返回true,失败返回false。

        在了解无锁队列之前,我们先了解一下什么是原子操作,原子操作通俗的将就是一段指令的执行期间不会被其他的进程或者线程打断,可以保证执行的一致性。举个简单的反例,i++的过程中实际上就不是一个原子操作。

        i++内部存在三个步骤:

            ① 先将i的值从内存中读到寄存器;

            ② 将寄存器中的i的值加1;

            ③ 将寄存器的值读回内存中。

        对于原子操作本身gcc也封装了一套接口,c++也有一套自己的原子操作的接口可以使用。从原理上来讲,原子操作是在使用Lock#指令前缀的时候,操作执行期间对count内存的并发访问被禁止,保证操作的原子性。

                                                     

    代码实现

        介绍完原子操作,接下来就简单的看一下CAS的实现。CAS主要有三个操作数,当前值A、内存值V和要更改的新值B。当当前值A跟内存值V相等,那么就将内存值V改成B;当当前值A和内存值V不想等的话,要么就重试,要么放弃更新B。实际就是多线程操作的时候,不加锁,多线程操作了共享的资源之后,在实际修改的时候判断是否修改成功。

        而且,CAS会有个ABA的问题。简单的说就是线程A将当前值修改为10,此时线程B将值改为11,然后又有一个线程C把值又改为10,这样的话对于线程A来说取到的内存值和当前值是没变的,所以可以更新,但实际上是经过变化的,所以不符合实际逻辑的。想要解决这个问题的话就需要加一个版本,Java中有AtomicStampedReference类可以添加版本在比对内存值的时候加以区分。

        以下是一个简单的无锁队列的代码:

    // 定义一个链表实现队列
    template <typename ElemType>
    struct qnode // 链表节点
    {
      struct qnode *_next;
      ElemType _data;
    };
    
    template <typename ElemType>
    class Queue
    {
    private:
      struct qnode<ElemType> *volatile _head = NULL;  // 随着pop后指向的位置是不一样的, head不是固定的
      struct qnode<ElemType> *volatile _tail = NULL;
    
    public:
      Queue()
      {
        _head = _tail = new qnode<ElemType>;
        _head->_next = NULL;
        _tail->_next = NULL;
        printf("Queue _head:%p\n", _head);
      }
    
      void push_list(const ElemType& e) {
        struct qnode<ElemType>* p = new qnode<ElemType>;
        if (!p) {
          return ;
        }
        p->next = NULL;
        p->data = e;
        
        struct qnode<ElemType>* t = _tail;
        struct qnode<ElemType>* old_t = _tail;
        
        do {
          while (t->next != NULL) {  // 当非NULL的时候说明不是尾部,因此需要指向下一个节点
            t = t->next;
          }
        // 如果t->next为则null换为p
        } while (!__sync_bool_compare_and_swap(&t->next, NULL, p));
        // 如果尾部和原来的尾部相等,则换为p。
        __sync_bool_compare_and_swap(&_tail, old_t, p);
      }
      
      bool pop_list(ElemType& e) {
        struct qnode<ElemType>* p = NULL;
        do {
          p = _head;
          if (p->next == NULL) {
            return false;
          }
        // 如果头部等于p,那么就指向p的下一个
        } while (!__sync_bool_compare_and_swap(&_head, p, p->next));
    
        e = p->_next->data;
        delete p;
        p = NULL;
        return true;
      }
    };

        以上就是无锁队列的实现,首先定义一个链表,链表的头结点没有存放数据,在push数据的时候,先判断当前指针是否是在队列的尾部,如果是的话就使用CAS判断是不是走到了尾部,如果是的话就将NULL节点更新为P,同时后面需要将尾节点更新为P;同样在pop的时候同样也是在循环中判断头部指向下一个,然后之前的节点可以删除。

    结论:

        无锁队列的原理内部还是原子操作的比较,真返回true,假返回false。无所队列在ZeroMQ等中间件中都有所应用。我们比较熟悉的mutex应用最广泛,他的原理是线程没有获取到锁的话会就如休眠等待,让出CPU,上下文切换,效率较低。在释放锁的时候会先检查owner是否是自己,如果是自己的话就会在休眠的队列中取出一个task,唤醒task去执行。一般的应用场景是共享区域执行的任务较长的时候。上面的队列也可以使用有锁的队列实现,具体就是利用mutex以及c++自带的STL,list来实现的。spinLock也是常见的一种锁,他的特点是线程在没有获取锁之前会进入忙等待,不会让出CPU执行,不会进程线程切换。一般应用在执行任务不会阻塞、耗时短、执行任务简单的场景,这种锁比较耗资源,需要谨慎考虑使用的场景。

    展开全文
  • 主要介绍了剖析Java中阻塞队列的实现原理及应用场景,这里也对阻塞和非阻塞队列的不同之处进行了对比,需要的朋友可以参考下
  • 主要介绍了Java循环队列原理与用法,结合实例形式详细分析了Java循环队列基本概念、原理、用法及操作注意事项,需要的朋友可以参考下
  • 主要介绍了JS异步宏队列与微队列原理区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 本文实例讲述了python双端队列原理、实现与使用方法。分享给大家供大家参考,具体如下: 双端队列 双端队列(deque,全名double-ended queue),是一种具有队列和栈的性质的数据结构。 双端队列中的元素可以从两端弹...
  • 主要介绍了PHP高级编程之消息队列原理与实现方法,结合实例形式详细分析了PHP消息队列相关概念、原理、使用场景及相关操作注意事项,需要的朋友可以参考下
  • RabbitMQ死信队列原理并实现延迟队列

    千次阅读 2020-05-01 19:23:06
    死信队列 死信交换机(Dead-Letter-Exchange),当消息在一个队列中变成死信之后,它能被发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列 消息变成死信一般是由于下面三种情况: 消息被...

    码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071

    死信队列

    死信交换机(Dead-Letter-Exchange),当消息在一个队列中变成死信之后,它能被发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列

    消息变成死信一般是由于下面三种情况:

    • 消息被拒绝(Basic.Reject或Basic.Nack),并在调用时设置requeue参数为false
    • 消息过期
    • 队列达到最大长度

    通过在channel.queueDeclare方法中设置x-dead-letter-exchange参数来为这个队列添加DLX

    延迟队列

    定义2个队列,第一个队列通过设置消息或队列的有效期,使消息到期后变为死信进入到第二个队列中,而我们只消费第二个队列,则可实现延迟队列,下面是用php-amqplib实现的一分钟的延迟队列

    <?php
    require_once __DIR__.'/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'why', 'why');
    $channel = $connection->channel();
    
    //测试死信的上游交换机
    $channel->exchange_declare('dlxBeforeExchange', 'direct',
        false, true, false, false, false, []);
    //死信的交换机
    $channel->exchange_declare('dlxExchange', 'direct',
        false, true, false, false, false, []);
    
    //测试死信队列的上游队列
    $dlx_table = new \PhpAmqpLib\Wire\AMQPTable();
    $dlx_table->set('x-dead-letter-exchange', 'dlxExchange' );
    $dlx_table->set('x-dead-letter-routing-key', 'dlxKey' );
    $channel->queue_declare('dlxBeforeQueue',
        false, true, false, false, false, $dlx_table);
    $channel->queue_bind('dlxBeforeQueue', 'dlxBeforeExchange', 'dlxBeforeKey');
    //死信队列
    $channel->queue_declare('dlxQueue',
        false, true, false, false, false);
    $channel->queue_bind('dlxQueue', 'dlxExchange', 'dlxKey');
    
    
    $head = array_merge(array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT), ['expiration' => '60000']);
    $msg = new AMQPMessage('hello why', $head);
    $res = $channel->basic_publish($msg, 'dlxBeforeExchange', 'dlxBeforeKey');

     

    展开全文
  • 主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 本文实例讲述了PHP队列原理及基于队列的写文件案例。分享给大家供大家参考,具体如下: 队列是一种线性表,按照先进先出的原则进行的: 入队: 出队: PHP实现队列:第一个元素作为队头,最后一个元素作为队尾 &...
  • RabbitMQ延时队列原理讲解

    千次阅读 2020-05-10 20:27:43
    RabbitMQ延时消息队列 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成...
  • 主要介绍了JS异步宏队列微队列原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要介绍了Python queue队列原理与应用,结合具体案例形式分析了Python queue队列的原理、功能、实现方法与使用技巧,需要的朋友可以参考下
  • NULL 博文链接:https://huangtut.iteye.com/blog/286724
  • 本文实例讲述了python队列原理及实现方法。分享给大家供大家参考,具体如下: 队列(queue)是只允许在一端进行插入操作,而在另一端进行删除操作的线性表。 队列是一种先进先出的(First In First Out)的线性表,...
  • 主要介绍了Redis 实现队列原理的实例详解的相关资料,希望通过本文能帮助到大家,需要的朋友可以参考下
  • 主要介绍了JavaScript单线程和任务队列原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 消息队列原理

    万次阅读 2017-06-05 15:36:42
    只要告诉邮局我们要订的杂志名、投递的地址,付了钱就OK。出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中。这样我们就可以看到每一期精彩的杂志了。 仔细思考一下订...
  • 文章目录一、原理详解二、python实现(一)、普通队列1、python实现2、代码测试(二)、双端队列1、Python代码实现2、测试代码 一、原理详解 队列(queue)是只允许在一端进行插入操作,而在另一端进行删除操作的...
  • 无锁队列原理和实现

    千次阅读 2018-03-24 13:20:09
    无锁队列的实现-coolshell CAS 另一篇参考 设计不使用互斥锁的并发数据结构 锁粒度
  • RabbitMQ镜像队列原理分析

    千次阅读 2019-04-02 16:17:17
    对于RabbitMQ的节点来说,有单节点模式和集群模式两种,其中集群模式又分为普通集群模式和镜像队列集群模式,在《RabbitMQ集群架构搭建与高...本文主要介绍镜像队列原理及实现。 1. 创建镜像队列模式 注意,到...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 332,277
精华内容 132,910
关键字:

无所队列原理