-
2018-09-18 18:11:27更多相关内容
-
强大的RabbitMQ优先级队列让你轻松面对现实业务场景
2021-05-18 17:01:25说到队列的话,大家一定不会陌生,但是扯到优先级队列的话,还是有一部分同学是不清楚的,可能是不知道怎么去实现吧,其实呢,这东西已经烂大街了。很简单,用“堆”去实现的,在我们系统中有一个订单催付的场景,...说到队列的话,大家一定不会陌生,但是扯到优先级队列的话,还是有一部分同学是不清楚的,可能是不知道怎么去实现吧,其实呢,这东西已经烂大街了。很简单,用“堆”去实现的,在我们系统中有一个订单催付的场景,我们客户的客户在tmall,taobao下的订单,taobao会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像施华蔻,百雀林这样大商家一年起码能够给我们贡献几百万,所以理应当然,他们的订单必须得到优先处理,而曾今我们的后端系统是使用redis来存放的定时轮询,大家都知道redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用rabbitmq进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级,好了,废话不多说,我们来看看如何去设置。
一:优先级队列
既然是优先级队列,那么必然要在Queue上开一个口子贴上一个优先级的标签,为了看怎么设置,我们用一下rabbitmq的监控UI,看看这个里面是如何
手工的创建优先级队列。
从这个图中可以看到在Arguments栏中有特别多的小属性,其中有一项就是"Maximum priority",这项的意思就是说可以定义优先级的最大值,其实
想想也是,不可能我们定义的优先级是一个非常大的数字,比如int.MaxValue,大多情况下都是10以内的数字就可以了,再或者我们曾今接触过的 MSMQ,
它的优先级只是一些枚举值,什么High,Normal,Low,不知道大家可否记得? 下面来看下代码中该如何实现呢???
1. 在Queue上附加优先级属性
Dictionary dic = new Dictionary();
dic.Add("x-max-priority", 20);
channel.QueueDeclare(queue:"hello",
durable:true,
exclusive:false,
autoDelete:false,
arguments: dic);
上面的代码做了一个简单的队列声明,queuename="hello",持久化,排外。。。然后把"x-max-priority"塞入到字典中作为arguments参数,看起来还
是非常简单吧~~~
2. 在Message上指定优先级属性
var properties =channel.CreateBasicProperties();
properties.Priority= 1;
channel.BasicPublish(exchange:"",
routingKey:"hello",
basicProperties:null,
body: body);
通过上面的代码可以看到,在Message上设置优先级,我是通过在channel通道上设置Priority属性,之后塞到basicProperties中就可以了,好了,有上面这两
个基础之后,下面就可以开始测试了,准备向rabbitmq推送10条记录,其中第5条的优先级最高,所以应该首先就print出来,如下图:
static void Main(string[] args)
{var sb = newStringBuilder();for (int i = 0; i < 11; i++)
{
sb.Append(i);
}var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip"};using (var connection =factory.CreateConnection())
{using (var channel =connection.CreateModel())
{
channel.ExchangeDeclare(exchange:"mydirect", type: ExchangeType.Direct, durable: true);
Dictionary dic = new Dictionary();
dic.Add("x-max-priority", 20);for (int i = 0; i < 10; i++)
{
channel.QueueDeclare(queue:"hello",
durable:true,
exclusive:false,
autoDelete:false,
arguments: dic);string message = string.Format("{0} {1}", i, sb.ToString());var body =Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();
properties.Priority = (i == 5) ? (byte)10 : (byte)i;
channel.BasicPublish(exchange:"",
routingKey:"hello",
basicProperties: properties,
body: body);
Console.WriteLine("[x] Sent {0}", i);
}
}
}
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
图中可以看到10条消息我都送到rabbitmq中去了,接下来打开consume端,来看看所谓的index=5 是否第一个送出来??
static void Main(string[] args)
{for (int m = 0; m < int.MaxValue; m++)
{var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip"};using (var connection =factory.CreateConnection())using (var channel =connection.CreateModel())
{var result = channel.BasicGet("hello", true);if (result != null)
{var str =Encoding.UTF8.GetString(result.Body);
Console.WriteLine("{0} 消息内容 {1}", m, str);
System.Threading.Thread.Sleep(1);
}
}
}
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
一切都是这么的完美,接下来为了进行可视化验证,你可以在WebUI中观察观察,可以发现在Queue上面多了一个 Pri 标记,有意思吧。
好了,这么重要的功能,是不是已经让你足够兴奋啦, 希望大家能够好好的在实际场景中运用吧~~~
RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里
-
【Java 集合框架】PriorityQueue 优先级队列的使用
2021-11-08 16:15:08优先级队列插入元素的细节问题6. PriorityQueue 大根堆的创建方式6.1 思路6.2 代码实现6.3 使用匿名内部类 1. 场景引入 我们知道,Queue是一个先进先出(FIFO)的队列。 在很多应用中,我们通常需要按照优先情况...文章目录
1. 场景引入
我们知道,
Queue
是一个先进先出(FIFO)的队列。在很多应用中,我们通常需要按照优先情况对待处理对象进行处理,比如首先处理优先级最高的对象,然后处理次高的对象。最简单的一个例子就是,在手机上玩游戏时,如果有来电,那么系统应该优先处理进来的电话。
这个时候,我们发现,要实现上述的操作,用
Queue
就不行了,因为Queue
会严格按 FIFO 的原则取出队首元素。故有了我们需要的优先队列:PriorityQueue
。2. PriorityQueue 介绍
Java 中
PriorityQueue
继承了Queue
接口,它的底层是一个堆。3. 知识点
-
PriorityQueue
的底层是一个数组- 我们可以转到它的定义,可以看到它的底层定义是一个数组。为了知道这个数组的初始大小有多大
- 再通过它的参构造方法转到定义,又可以看到
- 再点击 this,转到其定义,我们发现又跳到了一个含两个参数的构造方法
- 又在
PriorityQueue
的定义中DEFAULT_INITIAL_CAPACITY = 11
,即initialCapacity = 11
,所以我们可以知道数组的初始大小为11
- 我们可以转到它的定义,可以看到它的底层定义是一个数组。为了知道这个数组的初始大小有多大
-
PriorityQueue
的底层默认是一个小根堆 -
如何使
PriorityQueue
的底层是一个大根堆?- 引用上图
PriorityQueue
的含参定义,我们知道第一个参数代表数组的大小,而第二个参数就一个比较器,传给他的就是比较的方法,通过给他传入大根堆的比较方式,我们就可以使PriorityQueue
的底层变成大根堆
- 引用上图
4. 常用方法
方法 描述 boolean offer(E e)
入队列 E poll()
出队列 E peek()
得到队首元素 int size()
返回集合中的元素个数 注意: 下面的示例都是一份代码分开拿出来的,上下其实是有逻辑关系的
示例一: 用 Priority Queue 创建一个优先级队列
PriorityQueue<Integer> queue=new PriorityQueue<>();
示例二: 入队列
queue.offer(10); queue.offer(2); queue.offer(5);
示例三: 得到队首元素
System.out.println(queue.peek()); // 结果为:2
示例四: 出队列
System.out.println(queue.poll()); // 结果为:2
示例五: 返回集合中元素个数
System.out.println(queue.size()); // 结果为:2
5. 优先级队列插入元素的细节问题
当我们使用优先级队列的时候,插入元素其实有个前提:
插入的元素不能是 null 或者元素之间必须能够进行比较
而基本的包装类类型都可以进行比较,如:Integer、Double、Float。但是对于我们自定义的类型,其实就可能不能比较,就如下面这个类当我们使用优先级队列对它的对象进行插入时,其实会报错
class Student{ private String name; private int age; public Student(String name, int age, double score) { this.name = name; this.age = age; } } public class TestDemo{ public static void main(String[] args){ PriorityQueue<Student> queue=new PriorityQueue<>(); queue.offer(new Student("Tom",18)); queue.offer(new Student("Hen",34)); } }
这是因为优先级队列的底层默认是一个小根堆,它存入元素时是需要进行比较对象的大小的。
我们可以转到 PriorityQueue 的无参构造方法的定义看看
此时我们的 comparator 默认是 null,我们再转到 offer 方法的定义看看
好像并没有什么异常,但是由于我插入第二个元素时,i 不为0,所以要进行
siftUp
方法,我们转到它的定义由于我们知道 comparator 为 null,那么则要进行
siftUpComparable
方法,继续转到它的定义我们发现,创建的 Student 的对象,被强转为了
Comparable<? super E>
,并且还调用了compareTo
方法。如果大家有看过我写的 解析 Java 的多态、抽象类和接口 和 Java 对象的比较 这两篇文章,那我就有讲到compareTo
这个方法。这个方法是Comparable
的一个抽象方法,定义的是比较对象大小的一个规则。因此为了解决这个问题,我们就可以使用和 Comparable 或 Comparator 接口相关的知识
6. PriorityQueue 大根堆的创建方式
6.1 思路
这里便不对源码做具体分析,我们如果要 PriorityQueue 创建出的是一个大根堆,只需要对具体类型写一个比较器即可
6.2 代码实现
// 定义的某个要比较类型的比较器 class IntegerComparator implements Comparator<Integer>{ @Override public int compare(Integer o1,Integer o2){ // 如果第二个元素-第一个元素就是大根堆的实现方式,反之则为小根堆的创建方式,可以从源码去了解 return o2-o1; } } public class TestDemo{ public static void main(String[] args){ PriorityQueue<Integer> maxHeap=new PriorityQueue<>(IntegerComparator); } }
6.3 使用匿名内部类
上述代码也可以写成
public class TestDemo{ public static void main(String[] args){ PriorityQueue<Integer> maxHeap=new PriorityQueue<>(new Comparator<Integer>(){ @Override public int compare(Integer o1,Integer o2){ return o2-o1; } }) } }
这相当使用了一个匿名的内部类的方式去创建大根堆
-
-
Go 实战 | 一文带你搞懂从单队列到优先级队列的实现
2021-12-30 23:13:29优先级队列一方面可以用来控制流量,同时还可以在资源有限的情况下优先处理高优的流量。本文带你用Go实现从单队列到优先级队列的演进过程大家好,我是「Go学堂」的渔夫子,今天跟大家聊聊在我们项目中的优先级队列的实现及应用场景。
优先级队列概述
队列,是数据结构中实现先进先出策略的一种数据结构。而优先队列则是带有优先级的队列,即先按优先级分类,然后相同优先级的再 进行排队。优先级高的队列中的元素会优先被消费。如下图所示:
图1-优先级队列概况图.png
在Go中,可以定义一个切片,切片的每个元素代表一种优先级队列,切片的索引顺序代表优先级顺序,后面代码实现部分我们会详细讲解。
为什么需要优先级队列
先来看现实生活中的例子。银行的办事窗口,有普通窗口和vip窗口,vip窗口因为排队人数少,等待的时间就短,比普通窗口就会优先处理。同样,在登机口,就有贵宾通道和普通,同样贵宾通道优先登机。
在互联网中,当然就是请求和响应。使用优先级队列的作用是将请求按特定的属性划分出优先级,然后按优先级的高低进行优先处理。在研发服务的时候这里有个隐含的约束条件就是服务器资源(CPU、内存、带宽等)是有限的。如果服务器资源是无限的,那么也就不需要队列进行排队了,来一个请求就立即处理一个请求就好了。所以,为了在最大限度的利用服务器资源的前提下,将更重要的任务(优先级高的请求)优先处理,以更好的服务用户。
对于请求优先级的划分可以根据业务的特点根据价值高的优先原则来进行划分即可。例如可以根据是否是否是会员、是否是VIP会员等属性进行划分优先级。也可以根据是否是付费用户进行划分。在博客的业务中,也可以根据是否是大V的属性进行优先级划分。在互联网广告业务中,可以根据广告位资源价值高低来划分优先级。
优先级队列实现原理
01 四个角色
在完整的优先级队列中有四个角色,分别是优先级队列、工作单元、消费者worker、通知channel。
- 工作单元Job:队列里的元素。我们把每一次业务处理都封装成一个工作单元,该工作单元会进入对应的优先级队列进行排队,然后等待消费者worker来消费执行。
- 优先级队列:按优先级划分的队列,用来暂存对应优先级的工作单元Job,相同优先级的工作单元会在同一个队列里。
- noticeChan通道:当有工作单元进入优先级队列排队后,会在通道里发送一个消息,以通知消费者worker从队列中获取元素(工作单元)进行消费。
- 消费者worker:监听noticeChan,当监听到noticeChan有消息时,说明队列中有工作单元需要被处理,优先从高优先级队列中获取元素进行消费。
02 队列-消费者模式
根据队列个数和消费者个数,我们可以将队列-消费者模式分为单队列-单消费者模式、多队列(优先级队列)- 单消费者模式、多队列(优先级队列)- 多消费者模式。
我们先从最简单的单队列-单消费者模式实现,然后一步步演化成多队列(优先级队列)-多消费者模式。
03 单队列-单消费者模式实现
图2-单消费者模式.png
3.1 队列的实现
我们先来看下队列的实现。这里我们用Golang中的List数据结果来实现,List数据结构是一个双向链表,包含了将元素放到链表尾部、将头部元素弹出的操作,符合队列先进先出的特性。
好,我们看下具体的队列的数据结构:
type JobQueue struct { mu sync.Mutex //队列的操作需要并发安全 jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job noticeChan chan struct{} //入队一个job就往该channel中放入一个消息,以供消费者消费 }
- 入队操作
/** * 队列的Push操作 */ func (queue *JobQueue) PushJob(job Job) { queue.jobList.PushBack(job) //将job加到队尾 queue.noticeChan <- struct{}{} }
到这里有同学就会问了,为什么不直接将job推送到Channel中,然后让消费者依次消费不就行了么?是的,单队列这样是可以的,因为我们最终目标是为了实现优先级的多队列,所以这里即使是单队列,我们也使用List数据结构,以便后续的演变。
还有一点,大家注意到了,这里入队操作时有一个 这样的操作:
queue.noticeChan <- struct{}{}
消费者监听的实际上不是队列本身,而是通道noticeChan。当有一个元素入队时,就往noticeChan通道中输入一条消息,这里是一个空结构体,主要作用就是通知消费者worker,队列里有要处理的元素了,可以从队列中获取了。 这个在后面演化成多队列以及多消费者模式时会很有用。
- 出队操作
根据队列的先进先出原则,是要获取队列的最先进入的元素。Golang中List结构体的Front()函数是获取链表的第一个元素,然后通过Remove函数将该元素从链表中移出,即得到了队列中的第一个元素。这里的Job结构体先不用关心,我们后面实现工作单元Job时,会详细讲解。
/** * 弹出队列的第一个元素 */ func (queue *JobQueue) PopJob() Job { queue.mu.Lock() defer queue.mu.Unlock() /** * 说明在队列中没有元素了 */ if queue.jobList.Len() == 0 { return nil } elements := queue.jobList.Front() //获取队里的第一个元素 return queue.jobList.Remove(elements).(Job) //将元素从队列中移除并返回 }
- 等待通知操作
上面我们提到,消费者监听的是noticeChan通道。当有元素入队时,会往noticeChan中输入一条消息,以便通知消费者进行消费。如果队列中没有要消费的元素,那么消费者就会阻塞在该通道上。
func (queue *JobQueue) WaitJob() <-chan struct{} { return queue.noticeChan }
3.2 工作单元--Job的实现
一个工作单元就是一个要执行的任务。在系统中往往需要执行不同的任务,就是需要有不同类型的工作单元,但这些工作单元都有一组共同的执行流程。我们看下工作单元的类图。
图3-job类图.png
我们看下类图中的几个角色:
- Job接口:定义了所有Job要实现的方法。
- BaseJob类(结构体):定义了具体Job的基类。因为具体Job类中的有共同的属性和方法。所以抽象出一个基类,避免重复实现。但该基类对Execute方法没有实现,因为不同的工作单元有具体的执行逻辑。
- SquareJob和AreaJob类(结构体):是我们要具体实现的业务工作Job。主要是实现Execute的具体执行逻辑。根据业务的需要定义自己的工作Job和对应的Execute方法即可。
接下来,我们以计算一个int类型数字的平方的SquareJob为例来看下具体的实现。
- BaseJob结构体
首先看下该结构体的定义
type BaseJob struct { Err error DoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者 Ctx context.Context cancelFunc context.CancelFunc }
在该结构体中,我们主要关注DoneChan字段就行,该字段是当具体的Job的Execute执行完成后,来通知调用者的。
再来看Done函数,该函数就是在Execute函数完成后,要关闭DoneChan通道,以解除Job的阻塞而继续执行其他逻辑。
/** * 作业执行完毕,关闭DoneChan,所有监听DoneChan的接收者都能收到关闭的信号 */ func (job *BaseJob) Done() { close(job.DoneChan) }
再来看WaitDone函数,该函数是当Job执行后,要等待Job执行完成,在未完成之前,DoneChan里没有消息,通过该函数就能将job阻塞,直到Execute中调用了Done(),以便解除阻塞。
/** * 等待job执行完成 */ func (job *BaseJob) WaitDone() { select { case <-job.DoneChan: return } }
- SquareJob结构体
type SquareJob struct { *BaseJob x int }
从结构体的定义中可知,SquareJob嵌套了BaseJob,所以该结构体拥有BaseJob的所有字段和方法。在该结构体主要实现了Execute的逻辑:对x求平方。
func (s *SquareJob) Execute() error { result := s.x * s.x fmt.Println("the result is ", result) return nil }
3.3 消费者Worker的实现
Worker主要功能是通过监听队列里的noticeChan是否有需要处理的元素,如果有元素的话从队列里获取到要处理的元素job,然后执行job的Execute方法。
我们将该结构体定位为WorkerManager,因为在后面我们讲解多Worker模式时,会需要一个Worker的管理者,因此定义成了WorkerManager。
type WorkerManager struct { queue *JobQueue closeChan chan struct{} }
StartWorker函数,只有一个for循环,不断的从队列中获取Job。获取到Job后,进行消费Job,即ConsumeJob。
func (m *WorkerManager) StartWork() error { fmt.Println("Start to Work") for { select { case <-m.closeChan: return nil case <-m.queue.noticeChan: job := m.queue.PopJob() m.ConsumeJob(job) } } return nil } func (m *WorkerManager) ConsumeJob(job Job) { defer func() { job.Done() }() job.Execute() }
到这里,单队列-单消费者模式中各角色的实现就讲解完了。我们通过main函数将其关联起来。
func main() { //初始化一个队列 queue := &JobQueue{ jobList: list.New(), noticeChan: make(chan struct{}, 10), } //初始化一个消费worker workerManger := NewWorkerManager(queue) // worker开始监听队列 go workerManger.StartWork() // 构造SquareJob job := &SquareJob{ BaseJob: &BaseJob{ DoneChan: make(chan struct{}, 1), }, x: 5, } //压入队列尾部 queue.PushJob(job) //等待job执行完成 job.WaitDone() print("The End") }
04 多队列-单消费者模式
有了单队列-单消费者的基础,我们如何实现多队列-单消费者模式。也就是优先级队列。
图2-多队列-单消费者模式.png
优先级的队列,实质上就是根据工作单元Job的优先级属性,将其放到对应的优先级队列中,以便worker可以根据优先级进行消费。我们要在Job结构体中增加一个Priority属性。因为该属性是所有Job都共有的,因此定义在BaseJob上更合适.
type BaseJob struct { Err error DoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者 Ctx context.Context cancelFunc context.CancelFunc priority int //工作单元的优先级 }
我们再来看看多队列如何实现。实际上就是用一个切片来存储各个队列,切片的每个元素存储一个JobQueue队列元素即可。
var queues = make([]*JobQueue, 10, 100)
那各优先级的队列在切片中是如何存储的呢?切片索引顺序只代表优先级的高于低,不代表具体是哪个优先级。
什么意思呢?假设我们现在对目前的工作单元定义了1、4、7三个优先级。这3个优先级在切片中是按优先级从小到到依次存储在queues切片中的,如下图:
图4-正确的切片存储的优先级.png
那为什么不让切片的索引就代表优先级,让优先级为1的队列存储在索引1处,优先级4的队列存储在索引4处,优先级7的队列存储在索引7处呢?如果这样存储的话,就会变成如下这样:
图4-直接使用索引作为优先级缺点.png
可见如果我们设定的优先级不是连续的,那么就会造成空间的浪费。所以,我们是将队列按优先级高低依次存放到了切片中。
那既然这样,当一个优先级的job来了之后,我该怎么知道该优先级的队列是存储在哪个索引中呢?我们用一个map来映射优先级和切片索引之间的关系。这样当一个工作单元Job入队的时候,以优先级为key,就可以查找到对应优先级的队列存储在切片的哪个位置了。如下图所示:
图5-优先级和索引映射.png
代码定义:
var priorityIdx map[int][int]//该map的key是优先级,value代表的是queues切片的索引
好了,我们重新定义一下队列的结构体:
type PriorityQueue struct { mu sync.Mutex noticeChan chan struct{} queues []*JobQueue priorityIdx map[int]int } //原来的JobQueue会变成如下这样: type JobQueue struct { priority int //代表该队列是哪种优先级的队列 jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job }
这里我们注意到有以下几个变化:
- JobQueue里多了一个Priority属性,代表该队列是哪个优先级别。
- noticeChan属性从JobQueue中移动到了PriorityQueue中。因为现在有多个队列,只要任意一个队列里有元素就需要通知消费者worker进行消费,因此消费者worker监听的是PriorityQueue中是否有元素,而在监听阶段不关心具体哪个优先级队列中有元素。
好了,数据结构定义完了,我们看看将工作单元Job推入队列和从队列中弹出Job又有什么变化。
- 优先级队列的入队操作
优先级队列的入队操作,就需要根据入队Job的优先级属性放到对应的优先级队列中,入队流程图如下:
图6-优先级队列入队流程.png
当一个Job加入队列的时候,有两种场景,一种是该优先级的队列已经存在,则直接Push到队尾即可。一种是该优先级的队列还不存在,则需要先创建该优先级的队列,然后再将该工作单元Push到队尾。如下是两种场景。
队列已经存在的场景
这种场景会比较简单。假设我们要插入优先级为7的工作单元,首先从映射表中查找7是否存在,发现对应关系是2,则直接找到切片中索引2的元素,即优先级为7的队列,将job加入即可。如下图。
图7-已存在队列插入.png
队列不存在的场景
这种场景稍微复杂些,在映射表中找不到要插入优先级的队列的话,则需要在切片中插入一个优先级队列,而为了优先级队列在切片中也保持有序(保持有序就可以知道队列的优先级的高低了),则需要移动相关的元素。我们以插入优先级为6的工作单元为例来讲解。
1、首先,我们的队列有一个初始化的状态,存储了优先级1、4、7的队列。如下图。
图7-优先级查找1.png
2、当插入优先级为6的工作单元时,发现在映射表中没有优先级6的映射关系,说明在切片中还没有优先级为6的队列的元素。所以需要在切片中依次查找到优先级6应该插入的位置在4和7之间,也就是需要存储在切片2的位置。
图7-优先级查找2.png
3、将原来索引2位置的优先级为7的队列往后移动到3,同时更新映射表中的对应关系。
图7-优先级查找3.png
4、将优先级为6的工作单元插入到索引2的队列中,同时更新映射表中的优先级和索引的关系。
图7-优先级查找4.png
我们看下代码实现:
func (priorityQueue *PriorityQueue) Push(job Job) { priorityQueue.mu.Lock() defer priorityQueue.mu.Unlock() //先根据job的优先级找要入队的队列 var idx int var ok bool //从优先级-切片索引的map中查找该优先级的队列是否存在 if idx, ok = priorityQueue.priorityIdx[job.Priority()]; !ok { //如果不存在该优先级的队列,则需要初始化一个队列,并返回该队列在切片中的索引位置 idx = priorityQueue.addPriorityQueue(job.Priority) } //根据获取到的切片索引idx,找到具体的队列 queue := priority.queues[idx] //将job推送到队列的队尾 queue.JobList.PushBack(job) //队列job个数+1 priorityQueue.Size++ //如果队列job个数超过队列的最大容量,则从优先级最低的队列中移除工作单元 if priorityQueue.size > priorityQueue.capacity { priorityQueue.RemoveLeastPriorityJob() }else { //通知新进来一个job priorityQueue.noticeChan <- struct{}{} } }
代码中大部分也都做了注释,不难理解。这里我们来看下addPriorityQueue的具体实现:
func (priorityQueue *PriorityQueue) addPriorityQueue(priority int) int { n := len(priorityQueue.queues) //通过二分查找找到priority应插入的切片索引 pos := sort.Search(n, func(i int) bool { return priority < priorityQueue.priority }) //更新映射表中优先级和切片索引的对应关系 for i := pos; i < n; i++ { priorityQueue.priorityIdx[priorityQueue.queues[i].priority] = i + 1 } tail := make([]*jobQueue, n-pos) copy(tail, priorityQueue.queues[pos:]) //初始化一个新的优先级队列,并将该元素放到切片的pos位置中 priorityQueue.queues = append(priorityQueue.queues[0:pos], newJobQueue(priority)) //将高于priority优先级的元素也拼接到切片后面 priorityQueue.queues = append(priorityQueue.queues, tail...) return pos }
最后,我们再来看一个实际的调用例子:
func main() { //初始化一个队列 queue := &PriorityQueue{ noticeChan: make(chan struct{}, cap), capacity: cap, priorityIdx: make(map[int]int), size: 0, } //初始化一个消费worker workerManger := NewWorkerManager(queue) // worker开始监听队列 go workerManger.StartWork() // 构造SquareJob job := &SquareJob{ BaseJob: &BaseJob{ DoneChan: make(chan struct{}, 1), }, x: 5, priority: 10, } //压入队列尾部 queue.PushJob(job) //等待job执行完成 job.WaitDone() print("The End") }
05 多队列-多消费者模式
图2-多队列-多消费者模式.png
我们在多队列-单消费者的基础上,再来看看多消费者模式。也就是增加worker的数量,提高Job的处理速度。
我们再来看下worker的定义:
type WorkerManager struct { queue *PriorityQueue closeChans []chan struct{} }
这里需要注意,closeChans变成了切片数组。因为我们每启动一个worker,就需要有一个关闭通道。
然后看StartWorker函数的实现:
func (m *WorkerManager) StartWork(n int) error { fmt.Println("Start to Work") for i := 0; i < n; i++ { m.createWorker(); } return nil } func (m *WorkerManager) createWorker() { closeChan := make(chan struct{}) //每个协程,就是一个worker go func(closeChan chan struct{}) { var job Job for { select { case <-m.closeChan: return nil case <-m.queue.noticeChan: job := m.queue.PopJob() m.ConsumeJob(job) } } }(closeChan) m.closeChanMu.Lock() defer m.closeChanMu.Unlock() m.closeChans = append(m.closeChans, closeChan) return nil } func (m *WorkerManager) ConsumeJob(job Job) { defer func() { job.Done() }() job.Execute() }
这里需要注意的是,所有的worker都需要监听队列的noticeChan通道。测试的例子就留给读者自己了。
另外如下图的单队列-多消费者模式是多队列-多消费者模式的一个特例,这里就不再进行实现了。
图2-单队列-多消费者模式.png
总结
队列的作用可以用来控制流量,而优先级队列在兼顾流量控制的同时,还能将流量按优先级高低来进行处理。 本文中一些细节的并发加锁操作做了忽略,大家在实际应用中根据需要进行完善即可。
-
java优先级队列使用
2019-02-21 21:40:11优先级队列是比栈和队列更专用的结构,在多数情况下都非常有用。优先级队列像普通队列一样,有一个队头和队尾,并且也是从队头移除数据。 优先级队列中,数据按关键词有序排列,插入新数据的时候,会自动插入到合适... -
优先级队列(堆)
2022-04-05 22:10:25优先级队列 概念 前面介绍过队列,队列是一种先进先出(FIFO)的数据结构,但有些情况下操作的数据可能带有优先级,一般出队列时,可能需要优先级高的元素先出队列,该中场景下,使用队列显然不合适,比如:在手机上玩... -
Java面试宝典,kafka优先级队列
2021-07-04 23:42:12大量请求阻塞在高并发场景下,大量请求都需要操作数据库,导致连接数不够了,请求处于阻塞状态。 SQL 操作变慢如果数据库中存在一张上亿数据量的表,一条 SQL 没有命中索引会全表扫描,这个查询耗时会非常久。 存储... -
1、关于优先队列及使用场景举例
2018-05-19 15:27:12这里先说明下优先队列与普通队列的区别,然后列举个实际场景问题,再具体给出一个基本实现 优先队列 作为缓存结构,优先队列与栈和队列类似,可以将元素保存其中,可以访问和弹出,优先队列的的特点是存入其中的... -
Go实战 | 一文带你搞懂从单队列到优先级队列的实现
2021-12-24 00:06:12大家好,我是渔夫子,今天跟大家聊聊在我们项目中的优先级队列的实现。 优先级队列概述队列,是数据结构中实现先进先出策略的一种数据结构。而优先队列则是带有优先级的队列,即先按优先级分类,然后... -
c++优先级队列(priority_queue)使用总结
2020-07-02 16:46:161.优先级队列简介 优先级队列(priority_queue)是一种容器适配器,默认基础容器为数组vector,使用时需要包含头文件<queue>。其内部基于堆结构实现,关于堆结构的介绍可以看这里。理解了堆之后也就能理解... -
二叉堆详解实现优先级队列
2020-04-30 20:11:36二叉堆详解实现优先级队列 文章目录二叉堆详解实现优先级队列一、二叉堆概览二、优先级队列概览三、实现 swim 和 sink四、实现 delMax 和 insert五、最后总结 二叉堆(Binary Heap)没什么神秘,性质比二叉搜索树 ... -
优先级队列(堆)及Top K问题
2022-03-01 19:37:57目录 堆简介: 存储方式 实现一个堆(代码): 思想延深: 堆化思想 堆的应用:优先级队列 JDK中优先级队列 堆的应用:Top K问题 做此类题的套路: 堆的应用:堆排序 堆简介: 1. 堆逻辑上是一棵完全二叉树 2. 堆物理... -
九、RabbitMQ 其他知识点:幂等性、优先级队列、惰性队列
2022-03-19 11:03:02Redis 原子性2、优先级队列2.1. 使用场景2.2. 如何使用优先级队列2.3. 代码实战3、惰性队列3.1. 使用场景3.2. 两种模式3.3. 内存开销对比 RabbitMQ 其他知识点 1、幂等性 1.1. 概念 用户对于同一操作发起的一次... -
基于堆实现的优先级队列
2019-05-16 16:53:37/** * @author qcg ... * 应用场景: * 1.topK问题 * 2.不需要FIFO按照权重操作出队的情况 * 3.RabbitMQ中,当消费者不足,不能及时进行消费的情况下,优先级队列会生效 * 4.hadoop中Map结束之后会将IF... -
Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?
2021-04-20 15:29:56方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority 方式二:代码设置 Map<String,Object> args = new HashMap<String,Object>(); args.put("x-max-priority",... -
【Java 数据结构】 优先级队列(堆)
2021-04-01 17:24:59文章目录学习目标:学习内容:一、优先级队列1.1 概念1.2常用接口介绍1.2.1 PriorityQueue特性1.2.2 PriorityQueue常用接口介绍1.3 优先级队列的应用二、优先级队列的模拟实现2.1堆的概念2.2堆的存储方式2.3 堆的... -
java有关于优先级队列和Map/Set的一些知识点
2022-03-06 20:39:17一:优先级队列 优先级对列:按照元素之间的大小动态顺序出队,优先级队列处理的元素个数是动态变化的,有进有出,不像排序处理的集合个数是固定的. JDK中的优先级队列默认是最小堆,所以需要我们去修改compareTo方法. ... -
【数据结构】优先级队列
2021-03-10 17:08:40优先级队列(Priority Queue)优先级队列简介普通队列与优先级队列对比:优先级队列应用场景:优先队列的底层实现二叉堆实现优先级队列代码 优先级队列简介 优先级队列也属于队列,因此也提供以下接口: public ... -
通过阿里任务调度Schedulerx2.0的可抢占任务优先级队列进行应用限流
2022-03-11 19:57:195. 应用场景 该功能上线后,应用场景非常多,很多业务方都有应用级别资源控制和任务优先级的需求。比如数据平台每天要跑报表,可能会有成千上万的任务在晚上跑,如果没有资源控制,所有任务一起跑会把应用打挂。然后... -
Yarn支持队列内多优先级应用调度
2018-08-23 00:22:15在上篇文章中,笔者刚刚阐述过Yarn队列的多优先级调度策略,不同的队列分配不同的优先级等级,这样提交到优先级高的队列上的应用能被优先被处理。但是又一个问题来了,如果我们又想在同一个队列内,对应用的优先级... -
堆排序及优先级队列Java实现
2017-03-19 19:46:55优先队列之前的一篇关于《编程珠玑》的读书笔试介绍过优先队列与堆排序的一些内容...优先队列:实现插入元素和删除最大元素的功能 */ public class MaxPQ { -
分布式消息队列之一---应用场景
2021-03-27 12:18:02队列是消息的载体,用于传输和保存消息,它和数据结构中的队列一样,可以支持先进先出、优先级队列等不同的特性。 二、消息队列的应用场景 消息队列可以用于系统内部组件之间的通信,也可以用于系统跟其他服务... -
优先队列的应用实例
2021-05-09 23:41:05优先队列的应用实例 假设我们对机器服务进行收费.每个用户每次使用机器所付费用都是相同的,但每个 用户所需要服务时间都不同.为获得最大利润,假设只要有用户机器就不会空闲,我们可以把 等待使用该机器的用户组织... -
史上最强优先级队列教程PriorityQueue、DelayedWorkQueue
2021-10-31 16:30:59写这篇文章的目的是,我们寝室大哥,龙哥正在培训java,为了让龙哥更好的理解堆这个数据结构的内容,模拟他以后面试经历的场景。 面试经过 面试官: 你好,欢迎再次参加面试,几天不见,感觉头发又少了呢? 龙哥: ... -
rabbitmq中的优先级队列,死信队列的实现
2020-12-06 15:39:51优先级队列的实现主要有两个方面:队列的优先级 发送消息时的优先这两个问题 代码是在spriingboot整合rabbitmq基础上改造过来的,创建队列时,给队列设置一个优先级 /** * 直连的队列名称 * @return */ @Bean ... -
JAVA版数据结构-----优先级队列(堆)
2021-03-21 10:06:50概念 1.定义: 队列是一种 先进先出(FIFO) 的数据结构,但有些情况下,操作的数据可能带有优先级,...Java集合框架中提供了PriorityQueue和PriorityBlockingQueue两种类型的优先级队列,PriorityQueue是线程不安全的 -
消息队列 RabbitMQ 之 幂等性、优先级队列、惰性队列
2022-04-28 21:51:28RabbitMQ 其他知识点9.1 幂等性9.1.1 概念9.1.2 消息重复消费9.1.3 解决思路9.1.4 消费端的幂等性保障9.1.5 唯一ID + 指纹码机制9.1.6 Redis原子性9.2 优先级队列9.2.1 使用场景9.2.2 如何添加9.3 惰性队列9.3.1 ... -
java并发编程工具类JUC第五篇:PriorityBlockingQueue优先级队列
2021-03-26 07:12:34Java PriorityBlockingQueue队列是BlockingQueue接口的实现类,它根据priority优先级确定队列内元素对象的处理顺序,也就是说在一个PriorityBlockingQueue队列中,被添加到队列中的元素,根据priority进行排序。... -
C# RabbitMQ优先级队列实战项目演练
2019-03-09 15:36:56今天阿笨给大家分享的是通过RabbitMQ的优先级消息队列特性来解决我们业务中需要优先处理的任务。 1.1、本次分享课程适合人群如下: 1、有一定的NET开发基础并对RabbitMQ技术有一定了解和认识。 ... -
【恋上数据结构与算法】优先级队列 PriorityQueue
2021-11-27 00:36:32【恋上数据结构与算法】优先级队列 PriorityQueue优先级队列 PriorityQueue应用场景举例优先级队列的底层实现实现参考 优先级队列 PriorityQueue 应用场景举例 优先级队列的底层实现 实现 public class ...
-
data-priority-queue服务端、可恢复的消息<em>优先级队列</em> <em>应用场景</em> 流水线式的消息/数据处理系统(A -> B -> C)往往需要处理大量的消息,但消息类型间又有优先级的高低之分
-
阿里云消息<em>队列</em>服务(MQS) ——入门指南阿里云消息<em>队列</em>服务(MQS) ——入门指南 带目录 MQS 是一种高效、可靠、安全、便捷、可弹性扩展的分布式消息<em>队列</em>服务。MQS 能够帮助<em>应用</em>开发
-
Go-基于golang和redis实现的简单易用的<em>队列</em>基于golang和redis实现的简单易用的<em>队列</em>
-
TC(Linux下流量控制工具)详细说明及<em>应用</em>实例一、TC的安装 二、TC原来介绍 三、TC规则 四、TC命令 五、具体操作 基本实现步骤 环境模拟实例 建立<em>队列</em> 建立分类 建立过滤器 监视 维护 六、dms小组<em
-
YBTaskScheduler:iOS 任务调度器,为 CPU 和内存减负(用于性能优化)<em>应用场景</em>三:需要将任务按自定义的<em>优先级</em>调度(利用组件的优先<em>队列</em>策略) 安装 CocoaPods 在 Podfile 中添加 pod 'YBTask