精华内容
下载资源
问答
  • 使用内存队列方式读取文件数据
    千次阅读
    2020-01-13 13:55:50

    本篇博客已迁移至:https://www.ivdone.top/article/287.html 

    请帮个忙,去新的地址访问,谢谢!

     

    更多相关内容
  • 实现java内存队列消费事件

    千次阅读 2020-04-26 11:39:31
    当事件量不大时,可以使用java内存队列作为中间件去接收事件。 注意:内存队列只允许所在项目的所在ip来消费这个内存队列,有且只有一个ip来操作这个队列。 实现具体如下: class Pusher implements Runnable { ...

    当事件量不大时,可以使用java内存队列作为中间件去接收事件。
    注意:内存队列只允许所在项目的所在ip来消费这个内存队列,有且只有一个ip来操作这个队列。
    实现具体如下:

    import com.google.common.collect.Queues;
    
    public class Pusher implements Runnable {
    		private Queue<String> msgs = Queues.newConcurrentLinkedQueue();
    		// 怼事件到内存队列
    		void push(String event) {
    			msgs.add(event);
    		}
    
    		@Override
    		public void run() {
    			while (true) {
    			//消费事件
    			String data = msgs.poll();
    		}
    
    展开全文
  • 一、tensorflow读取机制图解为了提高GPU/CPU的对数据的运算效率,引入“内存”的概念,我们把“读入数据到内存队列”和“GPU/CPU计算数据”分别放入两个线程中,其中”读入数据到内存队列”的线程的图示如下:为了...

    详细信息请参考:https://zhuanlan.zhihu.com/p/27238630

    以从文件中读入图像数据为例。


    一、tensorflow读取机制图解

    为了提高GPU/CPU的对数据的运算效率,引入“内存”的概念,我们把“读入数据到内存队列”和“GPU/CPU计算数据”分别放入两个线程中,其中”读入数据到内存队列”的线程的图示如下:


    为了方便管理,还需要在“内存队列”前加入“文件名队列”:


    然后有没有发现,红色框里流程特别像工厂流水线上的女工在勤劳工作,这就是“文件读取管线”的概念了。程序运行后

    首先,把ABC依次放入“文件名队列并在之后标注队列结束;

    然后,“内存队列”获得“文件名队列”中的ABC(坑:获得的顺序也有可能是CBA或BCA,代码部分会给出解释);

    最后,系统检测到“结束”就可以结束程序了。

    以上就是tensorflow中读取数据的基本机制。


    二、代码详解

    所谓“机制”不过是为了简化理解一个过程的概念而已。下面通过代码来看看每句命令对应的状态。注:以读入一张“A.jpg”的3*3*3的图为例(如下图,虽然看起来是灰度图,但实际是3通道的)。


    1.image_name = ['./A.jpg']

    这一句好理解,获取文件名。

    2.filename_queue = tf.train.string_input_producer(image_name,shuffle=False)

    tf.train.string_input_producer()表示创建“文件名队列”,注意这里仅仅只是创建哦!创建后,整个系统还是处于“停滞状态”,文件名并没有被加入到队列中(如下图所示)此时如果我们开始计算,因为内存队列中什么也没有,计算单元就会一直等待,导致整个系统被阻塞。也就说女工们已经就位,第一道工序还没开始,大家就都没活干,得等着。

    填坑:然后注意到参数shuffle = False,意思是要从“内存队列”中顺序获得“文件名队列”得到A、B、C,如果是shuffle=True(默认),那么就会乱序获取到“内存队列”,结果变为CBA或BCA等。当然我们只读入一张图“A.jpg”的话,shuffle为False还是True都无所谓。


    
    
    3 image_reader = tf.WholeFileReader()
    4._,image_file = image_reader.read(filename_queue)

    实际上在tensorflow中,内存队列不需要我们自己建立,我们只需要使用reader对象从文件名队列中读取数据就可以了,这里读取后的数据保存在 image_file 中。


    5.image = tf.image.decode_jpeg(image_file,channels=3)

    对读取的图片解码成jpg的格式。


    6. coord = tf.train.Coordinator() #协同启动的线程
    7. threads = tf.train.start_queue_runners(sess=sess, coord=coord) #启动线程运行队列
    8. print(sess.run(image))
    9. coord.request_stop() #停止所有的线程
    10.coord.join(threads)

    刚才说过,队列只被创建了,要打破僵局,需要使用tf.train.start_queue_runners(),才能启动填充队列,这时系统不再“停滞”,整个系统才能跑起来。该函数一般搭配Coordinator一起使用,这是负责在收到任何关闭信号的时候,让所有的线程都知道。本人一开始也是没有写这一句,导致程序一直停滞。

    以下是完整的代码和显示结果:

    import tensorflow as tf
    
    sess = tf.Session()
    
    image_name = ['./A.jpg']
    filename_queue = tf.train.string_input_producer(image_name)
    image_reader = tf.WholeFileReader()
    _,image_file = image_reader.read(filename_queue)
    image = tf.image.decode_jpeg(image_file,channels=3)
    
    coord = tf.train.Coordinator() #协同启动的线程
    threads = tf.train.start_queue_runners(sess=sess, coord=coord) #启动线程运行队列
    print(sess.run(image))
    coord.request_stop() #停止所有的线程
    coord.join(threads)
    结果显示:

    以上每一块代表图像的一行信息,所以显示共有三个分块,每一块包含所有列的信息。




    展开全文
  • Linux采用共享内存与消息队列,进程间通信,完成视频推流功能
  • 持久队列 将数据持久化到内存映射文件的队列。 工作正在进行中...
  • c# BlockingCollection ConcurrentQueue 内存队列的生产和消费

    开源地址:https://github.com/tangxuehua/enode

    上一篇文章,简单介绍了enode框架内部的整体实现思路,用到了staged event-driven architecture的思想。通过前一篇文章,我们知道了enode内部有两种队列:command queue、event queue;用户发送的command会进入command queue排队,domain model产生的domain event会进入event queue,然后等待被dispatch到所有的event handlers。本文介绍一下enode框架中这两种消息队列到底是如何设计的。

    先贴一下enode框架的内部实现架构图,这样对大家理解后面的分析有帮助。

    我们需要什么样的消息队列

    enode的设计初衷是在单个进程内提供基于DDD+CQRS+EDA的应用开发。如果我们的业务需要和其他系统交互,那也可以,就是通过在event handler中与其他外部系统交互,比如广播消息出去或者调用远程接口,都可以。也许将来,enode也会内置支持远程消息通信的功能。但是不支持远程通信并不表示enode只能开发单机应用了。enode框架需要存储的数据主要有三种:

    1. 消息,包括command消息和event消息,目前出于性能方面的考虑,是存储在mongodb中;之所以要持久化消息是因为消息队列里的消息不能丢失;
    2. 聚合根,聚合根会被序列化,然后存储在内存缓存中,如redis或memcached中;
    3. 事件,就是由聚合根产生的事件,事件存储在eventstore中,如mongodb中;

    好,通过上面的分析,我们知道enode框架运行时的所有数据,就存储在mongodb和redis这两个地方。而这两种存储都是部署在独立的服务器上,与web服务器无关。所以运行enode框架的每台web服务器上是无状态的。所以,我们就能方便的对web服务器进行集群,我们可以随时当用户访问量的增加时增加新的web服务器,以提高系统的响应能力;当然,当你发现随着web服务器的增加,导致单台mongodb服务器或单台redis服务器处理不过来成为瓶颈时,也可以对mongodb和redis做集群,或者对数据做sharding(当然这两种做法不是很好做,需要对mongodb,redis很熟悉才行),这样就可以提高mongodb,redis的吞吐量了。

    好了,上面的分析主要是为了说明enode框架的使用范围,讨论清楚这一点对我们分析需要什么样的消息队列有很大帮助。

    现在我们知道,我们完全不需要分布式的消息队列了,比如不需要MSMQ、RabbitMQ,等重量级成熟的支持远程消息传递的消息队列了。我们需要的消息队列的特征是:

    1. 基于内存的消息队列;
    2. 虽然基于内存,但消息不能丢失,也就是消息要支持持久化;
    3. 消息队列要性能尽量高;
    4. 消息队列里没有消息的时候,队列的消费者不能让CPU空转,CPU空转会直接导致CPU占用100%,导致机器无法工作;
    5. 要支持多个消费者线程同时从队列取消息,但是同一个消息只能被一个消费者处理,也就是一个消息不能同时被两个消费者取走,也就是要支持并发的dequeue;
    6. 需要一种设计,实现消息至少会被处理一次;具体指:消息被消费者取走然后被处理的过程中,如果没有处理成功(消费者自己知道有没有处理成功)或者根本没来得急处理(比如那时正好断电了),那需要一种设计,可以我们有机会重新消费该消息;
    7. 因为我们做不到100%不会重复处理一个消息,所以我们的所有消息消费者要尽量做到支持等幂操作,就是重复的操作不会引起副作用;比如插入前先查询是否存在就是一种支持等幂的措施;这一点,框架会尽量提供支持等幂的逻辑,当然,用户自己在设计command handler或event handler时,也要尽量考虑等幂的问题。注意:一般command handler不用考虑,我们主要要考虑的是event handler。原因,下次文章中再细谈吧。

    内存队列的设计

    内存队列,特点是快。但是我们不光是需要快,还要能支持并发的入队和出对。那么看起来ConcurrentQueue<T>似乎能满足我们的要求了,一方面性能还可以,另一方面内置支持了并发操作。但是有一点没满足,那就是我们希望当队列里没有消息的时候,队列的消费者不能让CPU空转,CPU空转会直接导致CPU占用100%,导致机器无法工作。幸运的是,.net中也有一个支持这种功能的集合,那就是:BlockingCollection<T>,这种集合能提供在队列内无元素的时候block当前线程的功能。我们可以用以下的方式来实例化一个队列:

    private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>());

    并发入队的时候,我们只要写下面的代码即可:

    _queue.Add(message);

    并发出队的时候,只要:

    _queue.Take();

    我们不难看出,ConcurrentQueue<T>是提供了队列加并发访问的支持,而BlockingCollection<T>是在此基础上再增加blocking线程的功能。

    是不是非常简单,经过我的测试,BlockingCollection<T>的性能已经非常好,每秒10万次入队出对肯定没问题,所以不必担心成为瓶颈。

    关于Disruptor的调研:

    了解过LMAX架构的朋友应该听说过Disruptor,LMAX架构能支持每秒处理600W订单,而且是单线程。这个速度是不是很惊人?大家有兴趣的可以去了解下。LMAX架构是完全in memory的架构,所有的业务逻辑基于纯内存实现,粗粒度的架构图如下:

    1. Business Logic Processor完全在in memory中跑,简称BLP;
    2. Input Disruptor是一种特殊的基于内存运行的环形队列(基于一种叫Ring Buffer的环形数据结构),负责接收消息,然后让BLP处理消息;
    3. Output Disruptor也是同样的队列,负责将BLP产生的事件发布出去,给外部组件消费,外部组件消费后可能又会产生新的消息塞入到Input Disruptor;

    LMAX架构之所以能这么快,除了完全基于in memory的架构外,还归功于延迟率在纳秒级别的disruptor队列组件。下面是disruptor与java中的Array Blocking Queue的延迟率对比图:

    ns是纳秒,我们可以从数据上看到,Disruptor的延迟时间比Array Blocking Queue快的不是一个数量级。所以,当初LMAX架构出来时,一时非常轰动。我曾经也对这个架构很好奇,但因为有些细节问题没想清楚,就不敢贸然实践。

    通过上面的分析,我们知道,Disruptor也是一种队列,并且也完全可以替代BlockingCollection,但是因为我们的BlockingCollection目前已经满足我们的需要,且暂时不会成为瓶颈,所以,我暂时没有采用Disruptor来实现我们的内存队列。关于LMAX架构,大家还可以看一下这篇我以前写的文章。

    队列消息的持久化

    我们不光需要一个高性能且支持并发的内存队列,还要支持队列消息的持久化功能,这样我们才能保证消息不会丢失,从而才能谈消息至少被处理一次。

    那消息什么时候持久化?

    当我们发送一个消息给队列,一旦发生成功,我们肯定认为消息已经不会丢了。所以,很明显,消息队列内部肯定是要在接收到入队的消息时先持久化该消息,然后才能返回。

    那么如何高效的持久化呢?

    第一个想法:

    基于txt文本文件的顺序写。原理是:当消息入队时,将消息序列化为文本,然后append到一个txt1文件;当消息被处理完之后,再把该消息append到另一个txt2文件;然后,如果当前机器没重启,那内存队列里当前存在的消息就是还未被处理的消息;如果机器重启了,那如何知道哪些消息还没被处理?很简单,就是对比txt1,txt2这两个文本文件,然后只要是txt1中存在,但是txt2中不存在的消息,就认为是没被处理过,那需要在enode框架启动时读取txt1中这些没被处理的消息文本,反序列化为消息对象,然后重新放入内存队列,然后开始处理。这个思路其实挺好,关键的一点,这种做法性能非常高。因为我们知道顺序写文本文件是非常快的,经过我的测试,每秒200W行普通消息的文本不在话下。这意味着我们每秒可以持久化200W个消息,当然实际上我们肯定达不到这个高的速度,因为消息的序列化性能达不到这个速度,所以瓶颈是在序列化上面。但是,通过这种持久化消息的思路,也会有很多细节问题比较难解决,比如txt文件越来越大,怎么办?txt文件不好管理和维护,万一不小心被人删除了呢?还有,如何比较这两个txt文件?按行比较吗?不行,因为消息入队的顺序和处理的顺序不一定相同,比如command就是如此,当用户发送一个command到队列,但是处理的时候发现第一次由于并发冲突,导致command执行没成功,所以会重试command,如果重试成功了,然后持久化该command,但是我们知道,此时持久化的时候,它的顺序也许已经在后面的command的后面了。所以,我们不能按行比较;那么就要按消息的ID比较了?就算能做到,那这个比较过程也是很耗时的,假设txt1有100W个消息;txt2中有80W个消息,那如果按照ID来比较txt1中哪20W个消息还没被处理,有什么算法能高效比较出来吗?所以,我们发现,这个思路还是有很多细节问题需要考虑。

    第二个想法:

    采用NoSQL来存储消息,通过一些思考和比较后,觉得还是MongoDB比较合适。一方面MongoDB实际上所有的存取操作优先使用内存,也就是说不会马上持久化到磁盘。所以性能很快。另一方面,mongodb支持可靠的持久化功能,可以放心的用来持久化消息。性能方面,虽然没有写txt那么快,但也基本能接受了。因为我们毕竟不是整个网站的所有用户请求的command都是放在一个队列,如果我们的网站用户量很大,那肯定会用web服务器集群,且每个集群机器上都会有不止一个command queue,所以,单个command queue里的消息我们可以控制为不会太多,而且,单个command queue里的消息都是放在不同的mongodb collection中存储;当然持久化瓶颈永远是IO,所以真的要快,那只能一个独立的mongodb server上设计一个collection,该collection存放一个command queue里的消息;其他的command queue的消息就也采用这样的做法放在另外的mongodb server上;这样就能做到IO的并行,从而根本上提高持久化速度。但是这样做代价很大的,可能需要好多机器呢,整个系统有多少个queue,那就需要多少台机器,呵呵。总而言之,持久化方面,我们还是有一些办法可以去尝试,还有优化的余地。

    再回过头来简单说一下,采用mongodb来持久化消息的实现思路:入队的时候持久化消息,出队的时候删除该消息;这样当机器重启时,要查看某个队列有多少消息,只要通过一个简单的查询返回mongodb collection中当前存在的消息即可。这种做法设计简单,稳定,性能方面目前应该还可以接受。所以,目前enode就是采用这种方法来持久化所有enode用到的内存队列的消息。

    代码示意,有兴趣的可以看看:

      View Code

    如何保证消息至少被处理一次

    思路应该很容易想到,就是先把消息从内存队列dequeue出来,然后交给消费者处理,然后由消费者告诉我们当前消息是否被处理了,如果没被处理好,那需要尝试重试处理,如果重试几次后还是不行,那也不能把消息丢弃了,但也不能无休止的一直只处理这个消息,所以需要把该消息丢到另一个专门用于处理需要重试的本地纯内存队列。如果消息被处理成功了,那就把该消息从持久化设备中删除即可。看一下代码比较清晰吧:

    复制代码
        private void ProcessMessage(TMessageExecutor messageExecutor)
        {
            var message = _bindingQueue.Dequeue();
            if (message != null)
            {
                ProcessMessageRecursively(messageExecutor, message, 0, 3);
            }
        }
        private void ProcessMessageRecursively(TMessageExecutor messageExecutor, TMessage message, int retriedCount, int maxRetryCount)
        {
            var result = ExecuteMessage(messageExecutor, message); //这里表示在消费(即处理)消息
    
            //如果处理成功了,就通知队列从持久化设备删除该消息,通过调用Complete方法实现
            if (result == MessageExecuteResult.Executed)
            {
                _bindingQueue.Complete(message);
            }
            //如果处理失败了,就重试几次,目前是3次,如果还是失败,那就丢到一个重试队列,进行永久的定时重试
            else if (result == MessageExecuteResult.Failed)
            {
                if (retriedCount < maxRetryCount)
                {
                    _logger.InfoFormat("Retring to handle message:{0} for {1} times.", message.ToString(), retriedCount + 1);
                    ProcessMessageRecursively(messageExecutor, message, retriedCount + 1, maxRetryCount);
                }
                else
                {
                    //这里是丢到一个重试队列,进行永久的定时重试,目前是每隔5秒重试一下,_retryQueue是一个简单的内存队列,也是一个BlockingCollection<T>
                    _retryQueue.Add(message);
                }
            }
        }
    复制代码

    代码应该很清楚了,我就不多做解释了。

    总结:

    本文主要介绍了enode框架中消息队列的设计思路,因为enode中有command queue和event queue,两种queue,所以逻辑是类似的;所以本来还想讨论一下如何抽象和设计这些queue,已去掉重复代码。但时间不早了,下次再详细讲吧。

    展开全文
  • Linux进程通信之共享内存与消息队列

    千次阅读 多人点赞 2022-03-26 17:55:57
    Linux进程通信之共享内存与消息队列 文章目录1.共享内存的原理2.共享内存的接口3.共享内存代码4.共享内存特性5.消息队列原理6.消息队列接口7.消息队列代码 1.共享内存的原理 2.共享内存的接口 3.共享内存代码 4.共享...
  • Java知识总结----队列的使用

    千次阅读 2021-03-13 08:45:10
    首先我们要知道使用队列的目的是什么?一般情况下,如果是一些及时消息的处理,并且处理时间很短的情况下是不需要使用队列的,直接阻塞式的方法调用就可以了。但是,如果在消息处理的时候特别费时间,这个时候如果有...
  • 有一个需求,是在servlet的业务方法中。有一些dao操作,因为表建了索引,所以做插入可能比较耗时,就想实现异步。 把dao操作的那一部分放入消息...因为不能涉及第三方工具,只能用内存实现消息队列。求大神帮忙。。。
  • 共享内存循环形队列池设计

    千次阅读 2016-12-01 10:47:44
    1、 简述 队列是一种先进先出(FIFO)的线性表数据结构,常见的操作如在表的尾部插入,在头部... 本文采用的是共享循环队列池,共享内存队列来解决进程间通信数据量大的场景。 队列长度计算公式:nCount = (rear - fr
  • zeromq是一个基于内存的消息队列,支持windows,linux和各种平台,支持python,java,php,.net等各种语言。
  • 共享内存环形队列的功能介绍: 1、共享内存的操作:创建/获取、映射、反映射、删除 2、无锁环形队列,本文(支持多生产者多消费者模型) 3、若想(支持单生产者单消费者模型),可以将“信号量”、“互斥锁”的相关...
  • Java的Executors框架提供的定长线程池内部默认...即使没有内存溢出,队列的延迟势必会变大,而且如果进程突然遇到退出信号,队列里的消息还没有被处理就被丢弃了,那必然会对系统的消息可靠性造成重大影响。那如何解...
  • 共享内存无锁队列的实现

    千次阅读 2017-11-09 14:52:24
    共享内存无锁队列的实现 躲在树上的数据库 2017-11-06 211标签: 消息队列 , 无锁队列 作者:范健 导语: 共享内存无锁队列是老调重弹了,相关的实现网上都能找到很多。但看了公司内外的很多...
  • 今天跟大家来看看如何在项目中使用队列。首先我们要知道使用队列的目的是什么?一般情况下,如果是一些及时消息的处理,并且处理时间很短的情况下是不需要使用队列的,直接阻塞式的方法调用就可以了。但是,如果在...
  • 使用消息队列与共享内存完成一个简单的终端聊天程序
  • 消息队列和共享内存

    千次阅读 2018-10-24 22:12:06
    共享内存和消息队列都是在进程间传递数据的工具。 消息队列也是队列的一种,在同一类型上先进先出,可以完成多进程间的通讯,每个数据都带有类型,读取数据的进程只会读取自己关注的类型的数据,并且同一种类型的...
  • uCosIII之消息队列内存管理

    千次阅读 2017-12-30 16:14:00
    消息队列是由消息按照一定的队列规则组成的,而消息包含了一个指向数据的指针、该数据的大小、时间戳变量,是任务间数据传递的重要手段。消息队列中存放了等待该消息的任务。多个任务可以在消息队列中等待消息,当一...
  • lmax.distruptor是一种高效的本地内存队列。特性就在此不啰嗦了,需要请自行百度。本文要解决的,是一种以distruptor为基础的快速开发模版。 概念 为了有效的设计开发模板,必须先弄清楚distruptor的几个...
  • 高效内存无锁队列 Disruptor

    千次阅读 2016-11-09 00:03:41
    高效内存无锁队列 Disruptor
  • 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,...
  • 进程间通信就是在不同进程之间传播或交换信息,通讯方式一般分为管道、命名管道(FIFO)、消息队列、信号量(Semaphore)、共享内存(Shared Memory)五种。 管道:只能用于父子进程或兄弟进程等有亲缘关系的通信,...
  • 我启动2个进程,一个进程往队列中插入数据,另一个进程从队列中取数据,这个队列使用共享内存实现的,经过测试往队列里添加数据和从队列中取数据的时间消耗大概是0.5毫秒左右,我希望能把时间控制在10微秒以内,请问...
  • C++的静态内存分配队列模块,可移植性好,比较适合使用在嵌入式系统软件中
  • 队列的形式使用共享内存

    千次阅读 2016-05-14 17:22:07
    下面是共享内存映射图一般描述使用共享内存,都是以普通缓冲区的形式访问,这里除了讲述共享内存的原理和使用方法,还会实现用队列的方式使用共享内存。创建共享内存int shmget(key_t key, size_t si
  • 每个queue一个consumer 就是多一些queue而已,确实麻烦点 或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理 3.2.2 kafka 一个topic,一个partition,...
  • 为了使网络取证系统能够协同多个安全取证系统有效取证,提出了一种共享内存队列的协同取证方法。该方法采用了共享内存通信方式,借助信号机制,设计了基于多个队列进行数据交换算法,解决网络协同取证大数据量通信...
  • 本程序为VS2017_C#多进程通信之消息队列与大数据内存共享,特别是内存共享部分,采用的是图片数据共享,既包含小数据(图片长宽)也包含大数据(图片),需要共享图片或数组等其他数据的可以参考。任何程序相关使用...
  • Kafka 消息队列如何保证顺序性?

    千次阅读 2021-01-20 21:56:40
    如果一个消费者是多个线程消费,则需要把pull来的消息按照key值写入不同的内存队列中,相同key值的消息写入同一个内存队列内存队列内的消息是有序的),然后一个线程消费一个内存队列。 1、rabbitMq 问题分析:...
  • 电子-STM32串口驱动环形队列内存动态分配DMA拼音检索.zip,单片机/嵌入式STM32-F0/F1/F2

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 618,706
精华内容 247,482
关键字:

内存队列

友情链接: 58774653.zip