精华内容
下载资源
问答
  • 内存队列-源码

    2021-04-08 20:22:41
    内存队列,线程队列和基本日志记录框架的基本实现
  • 背景当系统中的业务存在大量的相同任务(比如发送大量邮件),并且每个任务花费的时间也比较长,前段需要较快 的响应,针对这种需求,我们可以采用消息队列进行异步通知,同时也可以采用线程池+内存队列实现异步通知,...

    96a0cabf38870443040f2ece7991bc58.png

    背景

    当系统中的业务存在大量的相同任务(比如发送大量邮件),并且每个任务花费的时间也比较长,前段需要较快 的响应,针对这种需求,我们可以采用消息队列进行异步通知,同时也可以采用线程池+内存队列实现异步通知,处理业务问题。

    代码实现

    以下采用发送邮件作为demo

    邮箱实体类

    @Data

    public class Email implements Serializable {

    private static final long serialVersionUID = 1L;

    /**

    * 自增主键

    */

    private Long id;

    /**

    * 接收人邮箱(多个逗号分开)

    */

    private String receiveEmail;

    /**

    * 主题

    */

    private String subject;

    /**

    * 发送内容

    */

    private String content;

    /**

    * 模板

    */

    private String template;

    /**

    * 发送时间

    */

    private Timestamp sendTime;

    }

    邮件队列

    public class MailQueue {

    //队列大小

    static final int QUEUE_MAX_SIZE = 1000;

    static BlockingQueue blockingQueue = new LinkedBlockingQueue(QUEUE_MAX_SIZE);

    /**

    * 私有的默认构造子,保证外界无法直接实例化

    */

    private MailQueue(){};

    /**

    * 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例

    * 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载

    */

    private static class SingletonHolder{

    /**

    * 静态初始化器,由JVM来保证线程安全

    */

    private static MailQueue queue = new MailQueue();

    }

    //单例队列

    public static MailQueue getMailQueue(){

    return SingletonHolder.queue;

    }

    //生产入队

    public void produce(Email mail) throws InterruptedException {

    blockingQueue.put(mail);

    }

    //消费出队

    public Email consume() throws InterruptedException {

    return blockingQueue.take();

    }

    // 获取队列大小

    public int size() {

    return blockingQueue.size();

    }

    }

    邮件消费队列 实际上一次1000的并发请求最终的有效转化数为2 =CORE_POOL_SIZE+WORK_QUEUE_SIZE,

    当CORE_POOL_SIZE=2,MAX_POOL_SIZE=5,WORK_QUEUE_SIZE=50时,1000的并发请求有效转化数为55

    个,所以自己可以根据自己的业务访问量设置队列缓冲池的大小和最大的线程数量。

    @Component

    public class ConsumeMailQueue {

    private static final Logger logger = LoggerFactory.getLogger(ConsumeMailQueue.class);

    //普通的业务类

    @Autowired

    IMailService mailService;

    // 线程池维护线程的最少数量

    private final static int CORE_POOL_SIZE = 1;

    // 线程池维护线程的最大数量

    private final static int MAX_POOL_SIZE = 1;

    // 线程池维护线程所允许的空闲时间

    private final static int KEEP_ALIVE_TIME = 0;

    // 线程池所使用的缓冲队列大小

    private final static int WORK_QUEUE_SIZE = 1;

    // 消息缓冲队列

    Queue msgQueue = new LinkedList();

    //由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序

    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {

    @Override

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

    logger.info("线程池太忙了处理不了过多任务.........多余的线程将会放入msgQueue");

    //可以新开调度器进行处理这些调度任务,或者把没处理的任务保存到数据库中,然后定时任务继续处理

    msgQueue.add(((PollMail)r).getEmail());

    }

    };

    // 任务线程池

    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(

    CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,

    TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), handler);

    // 调度线程池。此线程池支持定时以及周期性执行任务的需求。

    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    @PostConstruct

    public void init() {

    //开启邮件消费队列检查

    new Thread(new Runnable() {

    @Override

    public void run() {

    try {

    while (true){

    Email mail = MailQueue.getMailQueue().consume();

    logger.info("剩余邮件总数:{}",MailQueue.getMailQueue().size());

    threadPool.execute(new PollMail(mailService,mail));

    }

    } catch (InterruptedException e) {

    logger.info("邮件队列消费失败,失败原因为---->",e.getMessage());

    }

    }

    }).start();

    }

    //lombok

    @Data

    class PollMail implements Runnable {

    IMailService mailService;

    Email email;

    public PollMail(IMailService mailService,Email email) {

    this.mailService = mailService;

    this.email = email;

    }

    @Override

    public void run() {

    logger.info("正在处理的邮件为----->{}",this.email.getEmail());

    mailService.dealSend(this.email);

    }

    }

    @PreDestroy

    public void stopThread() throws InterruptedException {

    /**

    * pool.awaitTermination(1, TimeUnit.SECONDS)

    * 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),

    * 当从 while 循环退出时就表明线程池已经完全终止了。

    */

    scheduler.shutdown();

    threadPool.shutdown();

    while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {

    logger.info("线程还在执行。。。");

    }

    }

    }

    控制层代码如下

    @Api(tags ="邮件管理")

    @RestController

    @RequestMapping("/mail")

    public class mailController {

    @Autowired

    private IMailService mailService;

    @PostMapping("send")

    public Result send(Email mail) throws InterruptedException {

    mailService.send(mail);

    return Result.ok();

    }

    }

    接口层代码如下

    /**

    * @description:

    * @author: 简单的心

    * @version:

    * @modified By:1170370113@qq.com

    */

    public interface IMailService {

    /**

    * 邮件发送业务

    * @param mail

    */

    void send(Email mail) throws InterruptedException;

    /**

    * 处理需要发送的邮件任务

    * @param email

    */

    void dealSend(Email email);

    }

    接口实现层代码如下

    @Service

    public class MailServiceImpl implements IMailService {

    private static final Logger logger = LoggerFactory.getLogger(MailServiceImpl.class);

    @Override

    public void send(Email mail) {

    try {

    MailQueue.getMailQueue().produce(mail);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    //用于表示最终达到的有效请求

    static AtomicInteger flag=new AtomicInteger(0);

    @Override

    public void dealSend(Email email) {

    try {

    Thread.sleep(500);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    logger.info("邮件信息已经发送,具体内容如------->{}",email.toString());

    logger.info("总共达到的有效请求 {}",flag.addAndGet(1));

    }

    }

    展开全文
  • 实现java内存队列消费事件

    千次阅读 2020-04-26 11:39:31
    当事件量不大时,可以使用java内存队列作为中间件去接收事件。 注意:内存队列只允许所在项目的所在ip来消费这个内存队列,有且只有一个ip来操作这个队列。 实现具体如下: class Pusher implements Runnable { ...

    当事件量不大时,可以使用java内存队列作为中间件去接收事件。
    注意:内存队列只允许所在项目的所在ip来消费这个内存队列,有且只有一个ip来操作这个队列。
    实现具体如下:

    import com.google.common.collect.Queues;
    
    public class Pusher implements Runnable {
    		private Queue<String> msgs = Queues.newConcurrentLinkedQueue();
    		// 怼事件到内存队列
    		void push(String event) {
    			msgs.add(event);
    		}
    
    		@Override
    		public void run() {
    			while (true) {
    			//消费事件
    			String data = msgs.poll();
    		}
    
    展开全文
  • 一、tensorflow读取机制图解为了提高GPU/CPU的对数据的运算效率,引入“内存”的概念,我们把“读入数据到内存队列”和“GPU/CPU计算数据”分别放入两个线程中,其中”读入数据到内存队列”的线程的图示如下:为了...

    详细信息请参考:https://zhuanlan.zhihu.com/p/27238630

    以从文件中读入图像数据为例。


    一、tensorflow读取机制图解

    为了提高GPU/CPU的对数据的运算效率,引入“内存”的概念,我们把“读入数据到内存队列”和“GPU/CPU计算数据”分别放入两个线程中,其中”读入数据到内存队列”的线程的图示如下:


    为了方便管理,还需要在“内存队列”前加入“文件名队列”:


    然后有没有发现,红色框里流程特别像工厂流水线上的女工在勤劳工作,这就是“文件读取管线”的概念了。程序运行后

    首先,把ABC依次放入“文件名队列并在之后标注队列结束;

    然后,“内存队列”获得“文件名队列”中的ABC(坑:获得的顺序也有可能是CBA或BCA,代码部分会给出解释);

    最后,系统检测到“结束”就可以结束程序了。

    以上就是tensorflow中读取数据的基本机制。


    二、代码详解

    所谓“机制”不过是为了简化理解一个过程的概念而已。下面通过代码来看看每句命令对应的状态。注:以读入一张“A.jpg”的3*3*3的图为例(如下图,虽然看起来是灰度图,但实际是3通道的)。


    1.image_name = ['./A.jpg']

    这一句好理解,获取文件名。

    2.filename_queue = tf.train.string_input_producer(image_name,shuffle=False)

    tf.train.string_input_producer()表示创建“文件名队列”,注意这里仅仅只是创建哦!创建后,整个系统还是处于“停滞状态”,文件名并没有被加入到队列中(如下图所示)此时如果我们开始计算,因为内存队列中什么也没有,计算单元就会一直等待,导致整个系统被阻塞。也就说女工们已经就位,第一道工序还没开始,大家就都没活干,得等着。

    填坑:然后注意到参数shuffle = False,意思是要从“内存队列”中顺序获得“文件名队列”得到A、B、C,如果是shuffle=True(默认),那么就会乱序获取到“内存队列”,结果变为CBA或BCA等。当然我们只读入一张图“A.jpg”的话,shuffle为False还是True都无所谓。


    3 image_reader = tf.WholeFileReader()
    4._,image_file = image_reader.read(filename_queue)

    实际上在tensorflow中,内存队列不需要我们自己建立,我们只需要使用reader对象从文件名队列中读取数据就可以了,这里读取后的数据保存在 image_file 中。


    5.image = tf.image.decode_jpeg(image_file,channels=3)

    对读取的图片解码成jpg的格式。


    6. coord = tf.train.Coordinator() #协同启动的线程
    7. threads = tf.train.start_queue_runners(sess=sess, coord=coord) #启动线程运行队列
    8. print(sess.run(image))
    9. coord.request_stop() #停止所有的线程
    10.coord.join(threads)

    刚才说过,队列只被创建了,要打破僵局,需要使用tf.train.start_queue_runners(),才能启动填充队列,这时系统不再“停滞”,整个系统才能跑起来。该函数一般搭配Coordinator一起使用,这是负责在收到任何关闭信号的时候,让所有的线程都知道。本人一开始也是没有写这一句,导致程序一直停滞。

    以下是完整的代码和显示结果:

    import tensorflow as tf
    
    sess = tf.Session()
    
    image_name = ['./A.jpg']
    filename_queue = tf.train.string_input_producer(image_name)
    image_reader = tf.WholeFileReader()
    _,image_file = image_reader.read(filename_queue)
    image = tf.image.decode_jpeg(image_file,channels=3)
    
    coord = tf.train.Coordinator() #协同启动的线程
    threads = tf.train.start_queue_runners(sess=sess, coord=coord) #启动线程运行队列
    print(sess.run(image))
    coord.request_stop() #停止所有的线程
    coord.join(threads)
    结果显示:

    以上每一块代表图像的一行信息,所以显示共有三个分块,每一块包含所有列的信息。




    展开全文
  • 分布式内存队列系统

    2016-05-10 10:02:43
    分布式内存队列系统: Memcacheq Fqueue RabbitMQ Beanstalkd kafka activemq Beanstalkd

    分布式内存队列系统:

    Memcacheq

    Fqueue

    RabbitMQ

    Beanstalkd

    kafka

    activemq

    Beanstalkd

    展开全文
  • Disruptor内存队列分析

    千次阅读 2018-06-22 01:38:12
    背景Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在...
  • 共享内存队列的实现

    千次阅读 2017-01-18 10:33:48
    共享内存队列类:(结构体Header,结构体Record,结构体Attr) 共享内存队列 class ShmQueue { public: /** * 队列头 */ struct Header { volatile unsigned int head; volatile unsigned int tail;
  • 基于LinkedBlockingQueue实现的内存队列demo 配置线程池 项目启动即开始等待消费队列中的消息(模拟生产消息) 描述 如果在消息处理的时候特别费时间,这个时候如果有新的消息来了,就只能处于阻塞状态,造成...
  • 使用内存队列方式读取文件数据

    千次阅读 2020-01-13 13:55:50
    Tensorflow创建文件名队列 tf.train.string_input_producer 1 传入一个文件名list, 系统将自动将其转换为一个文件名队列 ...内存队列不需要自己建立,只需要使用reader对象从文件名队列中读取书即可 其他参...
  • 高性能内存队列-disruptor disruptor为啥这么快 无锁设计 内部采用CAS方式获取下一个任务序列号,没有锁竞争,不需要线程上下文切换 伪共享问题解决 当多线程修改互相独立的变量时,如果这些变量共享同一个...
  • public class Pusher implements Runnable { private Queue<String>... // 怼事件到内存队列 void push(String event) { msgs.add(event); } @Override public void run() .
  • c# BlockingCollection ConcurrentQueue 内存队列的生产和消费
  • 一个高性能分布式内存队列系统

    千次阅读 2014-09-16 12:10:29
    Beanstalkd 一个高性能分布式内存队列系统
  • // 对hash值取模,将hash值路由到指定的内存队列中,比如内存队列大小8 // 用内存队列的数量对hash值取模之后,结果一定是在0~7之间 // 所以任何一个商品id都会被固定路由到同样的一个内存队列中去的 int index = ...
  • 需要一个分布式内存队列,能支持这些特性:任务不重不漏的分发给消费者(最基础的)、分布式多点部署、任务持久化、批量处理、错误重试..... 转载:...
  • 一个之前在微博上调查过大家正在使用的分布式内存队列系统,反馈有Memcacheq,Fqueue, RabbitMQ , Beanstalkd以及linkedin的kafka。 RabbitMQ使用比较广泛,Beanstalkd是后起之秀。Beanstalkd之于RabbitMQ,就...
  • XSI IPC之共享内存 和 消息队列(有固定的套路)共享内存/消息队列/信号量集 遵循相同的规范,因此编程上有很多共性的东西共同点:1.XSI IPC 都是系统内核管理的,叫内核结构。2.XSI IPC 都有外部的key,类型是key_t,...
  • 环形共享内存队列

    2020-11-28 20:01:45
    https://github.com/wangzhicheng2013/shared_memory_ring_queue
  • 大家正在使用的分布式内存队列系统,有Memcacheq,Fqueue, RabbitMQ, Beanstalkd以及linkedin的kafka。RabbitMQ使用比较广泛,Beanstalkd是后起之秀。Beanstalkd之于 RabbitMQ,就好比Nginx之于Apache,Varnish之于...
  • ** * 使用共享内存的PHP循环内存队列实现 * 支持多进程, 支持各种数据类型的存储 * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区 * * @author wangbinandi@gmail.com * @created 2009-12-23 */class ...
  • Beanstalkd一个高性能分布式内存队列系统 Beanstalkd之于RabbitMQ,就好比Nginx之于Apache,Varnish之于Squid。后面在项目中使用Beanstalkd的过程中,更发现其简单、轻量级、高性能、易使用等特点,...
  • 我启动2个进程,一个进程往队列中插入数据,另一个进程从队列中取数据,这个队列使用共享内存实现的,经过测试往队列里添加数据和从队列中取数据的时间消耗大概是0.5毫秒左右,我希望能把时间控制在10微秒以内,请问...
  • Beanstalk(内存队列

    2019-02-26 17:17:02
    Beanstalk是一个高性能、轻量级的、分布式的、内存型的消息队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。其实Beanstalkd是典型的类Memcached设计,协议和使用...
  • 一个高性能、轻量级的分布式内存队列系统--beanstalk Beanstalk是一个高性能、轻量级的、分布式的、内存型的消息队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。...
  • 原标题:Java基础之数组队列及Java堆外内存学习笔记[图]Java基础之数组队列及Java堆外内存学习笔记[图]1.数组1.1 数组基本概念:数组是一个容器,可以存储同一数据类型的N个数据;数组是一个数据结构,是数据结构中...
  • 首先我们要知道使用队列的目的是什么?一般情况下,如果是一些及时消息的处理,并且处理时间很短的情况下是不需要使用队列的,直接阻塞式的方法调用就可以了。但是,如果在消息处理的时候特别费时间,这个时候如果有...
  • } CommonService 是我的一个业务处理的类,大家可以自己换成自己要用的 调用结果就不展示了,有兴趣可以自己跑起来看看 缓存队列借助备忘录模式思想,线程池的工作队列满了后,再来的请求会放入缓存队列,等线程池的...
  • 一种高效无锁内存队列的实现

    千次阅读 2017-02-22 17:49:33
    Disruptor是LMAX公司开源的一个高效的内存无锁队列。这两天看了一下相关的设计文档和博客,下面尝试进行一下总结。 第一部分。引子 谈到并发程序设计,有几个概念是避免不了的。 1.锁:锁是用来做并发最简单的方式,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 22,262
精华内容 8,904
关键字:

内存队列