精华内容
下载资源
问答
  • 下载已加入队列不开始下载
    千次阅读
    2016-02-23 11:40:21

    最近的项目是一个与音乐相关的App,其中有一个功能:收藏喜欢的歌曲,在wifi的环境下自动下载。

    考虑到音乐歌曲都是3、4Mb的小文件,断点下载的功能便不需要了。因此只需要实现一个特别轻量、简单的下载管理类,进行管理即可。

    最初的思路便是任务队列,单线程顺序执行,一个文件接着一个文件进行下载。

    之前看过AsyncTask的部分源码,其设计与我的想法类似,于是便借鉴着AsyncTask的源码,实现了一个特别简单、轻量的下载管理类。

    源码如下:

    public class MyDownloadManager {
    
        private static final String TAG = "MyDownloadManager";
        private File downloadDir; // 文件保存路径
        private static MyDownloadManager instance; // 单例
    
        // 单线程任务队列
        public static Executor executor;
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "MyDownloadManager #" + mCount.getAndIncrement());
            }
        };
        private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<>(128);
        public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 1, 1,
                TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
    
    
        private MyDownloadManager() {
            // 初始化下载路径
            downloadDir = new File(AndroidCacheUtils.getCacheDirFile(MiaApplication.getInstance()), "download");
            if (!downloadDir.exists()) {
                downloadDir.mkdirs();
            }
            executor = new SerialExecutor();
        }
    
        /**
         * 顺序执行下载任务
         */
        private static class SerialExecutor implements Executor {
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            scheduleNext();
                        }
                    }
                });
                if (mActive == null) {
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                if ((mActive = mTasks.poll()) != null) {
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
    
        /**
         * 获取单例对象
         *
         * @return
         */
        public static MyDownloadManager getInstance() {
            if (instance == null) {
                instance = new MyDownloadManager();
            }
            return instance;
        }
    
        /**
         * 添加下载任务
         *
         * @param path
         */
        public void addDownloadTask(final String path) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    download(path);
                }
            });
        }
    
        /**
         * 下载文件
         *
         * @param path
         */
        private void download(String path) {
            String fileName = AndroidMD5.MD5(path);
            File savePath = new File(downloadDir, fileName); // 下载文件路径
            File finallyPath = new File(downloadDir, fileName + ".mp3"); // 下载完成后加入.mp3后缀
            if (finallyPath.exists()) { // 文件存在则已下载
                Log.i(TAG, "file is existed");
                return;
            }
            if (AndroidNetWorkUtils.isWifiDataEnable(MiaApplication.getInstance())) { // 如果是Wifi则开始下载
                if (savePath.exists() && savePath.delete()) { // 如果之前存在文件,证明没有下载完成,删掉重新创建
                    savePath = new File(downloadDir, fileName);
                }
                Log.i(TAG, "download start");
                try {
                    byte[] bs = new byte[1024];
                    int len;
                    URL url = new URL(path);
                    InputStream is = url.openStream();
                    OutputStream os = new FileOutputStream(savePath);
                    while ((len = is.read(bs)) != -1) {
                        os.write(bs, 0, len);
                    }
                    os.close();
                    is.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (savePath.renameTo(finallyPath)) { // 下载完成后重命名为.mp3文件
                    Log.i(TAG, "download end");
                    EventBus.getDefault().post(new DownloadDoneEvent(path));
                }
            } else { // 不是wifi则不下载
                Log.i(TAG, "not wifi net, stop download");
            }
    
        }
    
        /**
         * 添加删除任务
         *
         * @param path
         */
        public void addDeleteTask(final String path) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    delete(path);
                }
            });
        }
    
        /**
         * 删除本地文件
         *
         * @param path
         */
        private void delete(String path) {
            String fileName = AndroidMD5.MD5(path);
            File savePath = new File(downloadDir, fileName + ".mp3");
            Log.i(TAG, savePath.getPath());
            if (savePath.exists()) {
                if (savePath.delete()) {
                    Log.i(TAG, "file is deleted");
                }
            }
        }
    
        /**
         * 返回下载路径
         *
         * @return
         */
        public File getDownloadDir() {
            return downloadDir;
        }
    }

    我们看到public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 1, 1,TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);,这句代码便是创建一个线程池。其方法源码及参数说明:

    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default rejected execution handler.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }

    这里我前三个参数传的都是1,既是最多只有1个线程。sPoolWorkQueue参数则是一个容量为128的任务队列,既最多能存放128个任务。

    下面我们看到SerialExecutor的代码,它有一个Runnable队列mTasks ,不断的接受Runnable对象,并通过poll操作,每次取出顶部的Runnable进行执行。结合创建的单一线程池,便实现了我需要的简易、轻量的下载器。

    更多相关内容
  • IDM使用队列下载多个百度云文件

    千次阅读 2020-06-05 11:58:04
    问题 如图,idm正在下载一个百度云文件时,再次申请下载其他文件时会出现以下情况: 解决方法 ...最后打开idm选择开始队列,之前申请的一系列文件就开始下载: 可以看到,多个文件正在同时下载中 ...

    相关问题

    如图,idm正在下载一个百度云文件时,再次申请下载其他文件时会出现以下情况:

    IDM拦截了根据baidupcs.com网站设置只能请求一次的下载,因此idm会尝试在浏览器之前请求它,但截获的请求中没有足够的数据来下载此文件

    在这里插入图片描述


    解决方案

    下载首个百度云文件时,选择稍后下载,点击确定(取消勾选开始执行队列),之后每个文件都如此:

    在这里插入图片描述

    打开idm打开队列设置,将同时进行下载数目设置为1:
    在这里插入图片描述
    最后点击开始队列,即可按添加次序一个个开始下载
    在这里插入图片描述

    展开全文
  • 2.2.3 请求通道的请求队列和响应队列 创建SocketServer也会创建一个请求通道(RequestChannel),在KafkaServer中,会将SocketServer的请求通道传给Kafka请求处理线程(KafkaRequestHandler,下文简称“请求处理线程...

    2.2.3 请求通道的请求队列和响应队列

    创建SocketServer也会创建一个请求通道(RequestChannel),在KafkaServer中,会将SocketServer的请求通道传给Kafka请求处理线程(KafkaRequestHandler,下文简称“请求处理线程”)和KafkaApis。在上一节中客户端的请求已经到达服务端的处理器(processor),那么请求通道就是处理器与请求处理线程和KafkaApis交换数据的地方:如果处理器往请求通道添加请求,请求处理线程和KafkaApis都可以获取到请求通道中的请束;如果请求处理线程和KafkaApis往请求通道添加响应,处理器也可以从请求通道获取晌应。

    处理器会将客户端发送的请求放到全局的请求队列(requestQueue)中,供请求处理线程获取,请求处理线程会将请求转发给KafkaApis处理。最后KafkaApis会将处理完成的响应结果放到响应队列(responseQueue)中,供处理器获取后发送给客户端。相关代码如下:

    requestChannel.addResponseListener(id => processors(id).wakeup())
    
    class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
      private var responseListeners: List[(Int) => Unit] = Nil
      private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
      private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
      for(i <- 0 until numProcessors)
        responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
    
      newGauge(
        "RequestQueueSize",
        new Gauge[Int] {
          def value = requestQueue.size
        }
      )
    
      newGauge("ResponseQueueSize", new Gauge[Int]{
        def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
      })
    
      for (i <- 0 until numProcessors) {
        newGauge("ResponseQueueSize",
          new Gauge[Int] {
            def value = responseQueues(i).size()
          },
          Map("processor" -> i.toString)
        )
      }
    
      /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
      def sendRequest(request: RequestChannel.Request) {
        requestQueue.put(request)
      }
    
      /** Send a response back to the socket server to be sent over the network */
      def sendResponse(response: RequestChannel.Response) {
        responseQueues(response.processor).put(response)
        for(onResponse <- responseListeners)
          onResponse(response.processor)
      }
    
      /** Get the next request or block until specified time has elapsed */
      def receiveRequest(timeout: Long): RequestChannel.Request =
        requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
    
      /** Get the next request or block until there is one */
      def receiveRequest(): RequestChannel.Request =
        requestQueue.take()
    
      /** Get a response for the given processor if there is one */
      def receiveResponse(processor: Int): RequestChannel.Response = {
        val response = responseQueues(processor).poll()
        if (response != null)
          response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
        response
      }
    
      def addResponseListener(onResponse: Int => Unit) {
        responseListeners ::= onResponse
      }
    
      def shutdown() {
        requestQueue.clear()
      }
    }
    

    如图2-20所示,因为请求通道保存了请求和响应两种类型的队列,它的各个方法中关于请求和响应的接收和发送是有顺序的:发送请求→接收请求→发送n向应→接收响应。

    (1)sendRequest():处理器接收到客户端请求后,将请求放入请求队列。
    (2)receiveRequest():请求处理线程从队列中获取请求,并交给KafkaApis处理。
    (3)sendResponse():KafkaApis处理完,将响应结果放入响应队列。
    (4)receiveResponse():处理器从响应队列中获取响应结果发送给客户端。

    在这里插入图片描述

    上面只是一个请求和响应在请求通道上的调用顺序,下面以服务端同时处理多个客户端请求为例,并结合其他相关的组件,来说明处理器将请求放入请求通道,一直到从请求通道获取响应的过程(图中的编号和图2-20编号的含义相同)。如图2-21所示,由于一个SocketServer有多个处理器,每个处理器都负责一部分客户端的请求。如果请求l发送给处理器l,那么请求l对应的响应也只能发送给处理器l,所以每个处理器都有一个响应队列。虽然请求队列是所有处理器全局共享的,不过会有多个请求处理线程同时处理请求队列中的客户端请求。假设处理器3有两个客户端请求,这两个请求进入全局的请求队列后可能被不同的请求处理线程处理,最后KafkaApis会将这两个请求的响应都放入处理器3对应的响应队列中。

    在这里插入图片描述
    从图2-21处理器使用请求通道的方式也可以看到,处理器的processCollpletedReceives()会往请求通道的请求队列添加请求,ProcessNewResponses()则从请求通道的响应队列获取响应。与之相对应的获取请求和添加响应的操作,则属于请求处理线程(KafkaRequestHandler)和KafkaApis的功能。

    展开全文
  • JUC多线程:AQS抽象队列同步器原理

    千次阅读 2021-10-09 12:58:04
    工作原理就是如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就将获取到锁的线程加入到等待队列中。这时,就需要一套...

    一、AQS 的工作原理:

    1.1、什么是 AQS:

            AQS,Abstract Queued Synchronizer,抽象队列同步器,是 J.U.C 中实现锁及同步组件的基础。工作原理就是如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就将获取不到锁的线程加入到等待队列中。这时,就需要一套线程阻塞等待以及被唤醒时的锁分配机制,而 AQS 是通过 CLH 队列实现锁分配的机制。

    1.2、CLH 同步队列的模型:

            CLH 队列是由内部类 Node 构成的同步队列,是一个双向队列(不存在队列实例,仅存在节点之间的关联关系),将请求共享资源的线程封装成 Node 节点来实现锁的分配;同时利用内部类 ConditionObject 构建等待队列,当调用 ConditionObject 的 await() 方法后,线程将会加入等待队列中,当调用 ConditionObject 的 signal() 方法后,线程将从等待队列转移动同步队列中进行锁竞争。AQS 中只能存在一个同步队列,但可拥有多个等待队列。AQS 的 CLH 同步队列的模型如下图:

            AQS 有三个主要变量,分别是 head、tail、state,其中 head 指向同步队列的头部,注意 head 为空结点,不存储信息。而 tail 则是同步队列的队尾,同步队列采用的是双向链表的结构是为了方便对队列进行查找操作。当 Node 节点被设置为 head 后,其 thread 信息和前驱结点将被清空,因为该线程已获取到同步状态,正在执行了,也就没有必要存储相关信息了,head 只保存后继结点的指针即可,便于 head 结点释放同步状态后唤醒后继结点。

            队列的入队和出队操作都是无锁操作,基于 CAS+自旋锁 实现,AQS 维护了一个 volatile 修饰的 int 类型的 state 同步状态,volatile 保证线程之间的可见性,并通过 CAS 对该同步状态进行原子操作、实现对其值的修改。当 state=0 时,表示没有任何线程占有共享资源的锁,当 state=1 时,则说明当前有线程正在使用共享变量,其他线程必须加入同步队列进行等待;

    二、内部类 Node 数据结构分析:

    static final class Node {
        //共享模式
        static final Node SHARED = new Node();
        //独占模式
        static final Node EXCLUSIVE = null;
    
        //标识线程已处于结束状态
        static final int CANCELLED =  1;
        //等待被唤醒状态
        static final int SIGNAL    = -1;
        //条件状态
        static final int CONDITION = -2;
        //在共享模式中使用表示获得的同步状态会被传播
        static final int PROPAGATE = -3;
    
        //等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种取值
        volatile int waitStatus;
    
        //同步队列中前驱结点
        volatile Node prev;
        //同步队列中后继结点
        volatile Node next;
        //请求锁的线程
        volatile Thread thread;
        //等待队列中的后继结点,这个与Condition有关,稍后会分析
        Node nextWaiter;
    
        //判断是否为共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        
        //.....
    }

            AQS分为两种模式:独占模式 EXCLUSIVE 和 共享模式 SHARED,像 ReentrantLock、CyclicBarrier 是基于独占模式模式实现的,Semaphore,CountDownLatch 等是基于共享模式。

            变量 waitStatus 表示当前封装成 Node 节点的线程的等待状态,共有4种取值 CANCELLED、SIGNAL、CONDITION、PROPAGATE:

    • CANCELLED:值为1,表示在同步队列中的线程等待超时或者被中断,处于已结束状态,需要从同步队列中移除该 Node 节点

    • SIGNAL:值为-1,表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL,当该节点释放了同步锁之后,就会唤醒该节点的后继节点

    • CONDITION:值为-2,与 Condition 相关,表示该结点在 condition 等待队列中阻塞,当其他线程调用了Condition 的 signal() 方法后,CONDITION 状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

    • PROPAGATE:值为-3时,在共享模式下使用,表示该线程以及后继线程进行无条件传播。前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

    三、AQS 的设计模式:

    3.1、AQS 的模板方法模式:

            AQS 的基于模板方法模式设计的,在 AQS 抽象类中已经实现了线程在等待队列的维护方式(如获取资源失败入队/唤醒出队等),而对于具体共享资源 state 的获取与释放(也就是锁的获取和释放)则交由具体的同步器来实现,具体的同步器需要实现以下几种方法:

    • isHeldExclusively():该线程是否正在独占资源,只有用到 condition 才需要去实现它

    • tryAcquire(int):独占模式,尝试获取资源,成功则返回 true,失败则返回 false

    • tryRelease(int):独占方式,尝试释放资源,成功则返回 true,失败则返回 false

    • tryAcquireShared(int):共享方式,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源

    • tryReleaseShared(int):共享方式,尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

    3.2、JUC 中提供的同步器:

    • 闭锁 CountDownLatch:用于让主线程等待一组事件全部发生后继续执行。

    • 栅栏 CyclicBarrier:用于等待其它线程,且会阻塞自己当前线程,所有线程必须全部到达栅栏位置后,才能继续执行;且在所有线程到达栅栏处之后,可以触发执行另外一个预先设置的线程。

    • 信号量 Semaphore:用于控制访问资源的线程个数,常常用于实现资源池,如数据库连接池,线程池。在 Semaphore 中,acquire 方法用于获取资源,有的话,继续执行,没有资源的话将阻塞直到有其它线程调用 release 方法释放资源;

    • 交换器 Exchanger:用于线程之间进行数据交换;当两个线程都到达共同的同步点(都执行到exchanger.exchange 的时刻)时,发生数据交换,否则会等待直到其它线程到达;

    CountDownLatch 和 CyclicBarrier 的区别?

    两者都可以用来表示代码运行到某个点上,二者的区别在于:

    ① CyclicBarrier 的某个线程运行到某个位置之后就停止运行,直到所有的线程都到达了这个点,所有线程才重新运行;CountDownLatch 的某线程运行到某个位置之后,只是给计数值-1而已,该线程继续运行;

    ② CyclicBarrier 可重用,CountDownLatch 不可重用,计数值 为 0 时该 CountDownLatch 就不可再用了。

    推荐阅读:https://juejin.cn/post/6989419875366076447

    3.3、ReentranLock 中独占模式下非公平锁的获取流程:

            获取独占锁的过程是定义在 tryAcquire() 中的,当前线程尝试获取同步状态,如果获取失败,就将线程封装成 Node 节点插入到 CLH 同步队列中。插入同步队列后,线程并没有放弃获取同步状态,而是根据前置节点状态状态判断是否继续获取,如果前置节点是 head 结点,继续尝试获取,否则就将线程挂起。如果成功获取同步状态则将自己设置为 head 结点。当持有同步状态的线程释放资源后,也会唤醒队列中的后继线程。

    四、ConditionObject 阻塞队列:

     4.1、什么是 Condition 接口:

            AQS 的阻塞队列是基于内部类 ConditionObject 实现的,而 ConditionObject 实现了 Condition 接口。那 Condition 接口是什么呢?Condition 主要用于线程的等待和唤醒,在JDK5之前,线程的等待唤醒是用 Object 类的 wait/notify/notifyAll 方法实现的,这些方法必须配合 synchronized 关键字使用,使用起来不是很方便,为了解决这个问题,在 JDK5 之后,J.U.C 提供了Condition。

    • Condition.await 对应于 Object.wait;

    • Condition.signal 对应于 Object.notify;

    • Condition.signalAll 对应于 Object.notifyAll;

            与 synchronized 的等待唤醒机制相比,Condition 能够精细的控制多线程的休眠与唤醒,具备更多的灵活性, 通过多个 Condition 实例对象建立不同的等待队列,从而实现同一个锁拥有多个等待队列。而 synchronized 关键字只能有一组等待唤醒队列,使用 notify() 唤醒线程时只能随机唤醒队列中的一个线程。

    4.2、ConditionObject 阻塞队列实现原理:

            Condition 的具体实现之一是 AQS 的内部类 ConditionObject,每个 Condition 都对应着一个等待队列,也就是说如果一个锁上创建了多个 Condition 对象,那么也就存在多个等待队列。当调用 ConditionObject 的 await() 方法后,线程将会加入等待队列中,当调用 ConditionObject 的 signal() 方法后,线程将从等待队列转移动同步队列中进行锁竞争。AQS 的 ConditionObject 中的等待队列模型如下:

     4.3、AQS 的 线程唤醒机制原理:

     AQS 的线程唤醒是通过 singal() 方法实现的,我们先看下 singal() 方法线程唤醒的流程图:

    流程图说明:

    signal() 方法主要调用了 doSignal(),而 doSignal() 方法中做了两件事:

    • (1)从条件等待队列移除被唤醒的节点,然后重新维护条件等待队列的 firstWaiter 和 lastWaiter 的指向。
    • (2)将从等待队列移除的结点加入同步队列(在 transferForSignal() 方法中完成的),如果进入到同步队列失败并且条件等待队列还有不为空的节点,则继续循环唤醒后续其他结点的线程

    注意:无论是同步队列还是等待队列,使用的 Node 数据结构都是同一个,不过是使用的内部变量不同罢了

    所以 signal() 的流程可以概述为:

    • signal() 被调用后,先判断当前线程是否持有独占锁

    • 如果有,那么唤醒当前 Condition 等待队列的第一个结点的线程,并从等待队列中移除该结点,添加到同步队列中

    • 如果加入同步队列失败,那么继续循环唤醒等待队列中的其他结点的线程

    • 如果成功加入同步队列,那么如果其前驱结点已结束或者设置前驱节点状态为 Node.SIGNAL 状态失败,则通过 LockSupport.unpark() 唤醒被通知节点代表的线程

    到此 signal() 任务完成,被唤醒后的线程,将调用 AQS 的 acquireQueued() 方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理。

    文章总结自:https://blog.csdn.net/javazejian/article/details/75043422

    相关阅读:https://juejin.cn/post/6844903997438951437

    展开全文
  • 详细了解IDM的队列功能

    千次阅读 2020-10-29 16:18:46
    两种队列的区别在于:IDM会定期检查同步队列中的文件是否在服务器上发生更改,如果更改,程序将自动下载新的文件并进行替换。下载完成后,所有的文件仍然会在队列里。 图1:【分类】窗口的队列界面 主界面下
  • RabbitMQ的延时重试队列

    千次阅读 2021-11-04 17:18:06
    通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。 2.原理 图是俺在网上找的,请原作者谅解。 发送...
  • 我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练,知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合。场景:现...
  • 之前了解到一个朋友通过监听key实现来实现延时队列的功能。 后面了解到包括Java单机版的DelayQueue以及RabbitMQ延时队列/延迟重试等相对更靠谱一些。 常见的有: 定期轮询(数据库等) DelayQueue Timer ...
  • element()和peek() element()和peek()的语义完全相同,都是获取但删除队首元素,也就是队列中权值最小的那个元素,二者唯一的区别是当方法失败时前者抛出异常,后者返回null。根据小顶堆的性质,堆顶那个元素就是...
  • 并发队列的介绍及使用

    千次阅读 2020-07-06 17:52:56
    在JDK1.5新加入了一个包concurrent,位于java.util.concurrent。在我们写业务代码的时候,可能最为常见就是ConcurrentHashMap。当然今天我们的主角不是他,而是queue。在并发队列上JDK提供了两套实现。 阻塞队列(IO...
  • 某部队进行新兵队列训练,将新兵从一开始按顺序依次编号,并排成一行横队,训练的规则如下:从头开始一至二报数,凡报到二的出列,剩下的向小序号方向靠拢,再从头开始进行一至三报数,凡报到三的出列,剩下的向小序...
  • 核心线程满了后,再有任务需要执行,线程池为什么继续创建新的线程呢,而是中间引入阻塞队列,能能在核心线程满了后,继续创建线程,直到线程数达到最大线程数了,再把任务引入阻塞队列,这样的步骤也是可以的...
  • 一、队列简介 队列是为了任务与任务、任务与中断之间的通信而准备的,可以在任务与任务、任务与中断之间传递消息,队列中可以存储有限的、大小固定的数据项目。 任务与任务、任务与中断之间要交流的数据保存在队列中...
  • 开源队列产品对比 云队列产品对比 调研总结 1.针对自建队列产品: 2.针对云队列产品: 3.综合考虑: 开源队列产品对比 队列名称 ActiveMQ RabbitMQ RocketMQ Kafka 定位 非日志的可靠消息...
  • Android 系统内置下载器服务 DownloadManager 的使用

    万次阅读 多人点赞 2016-09-25 21:39:26
    在 Android 程序开发中如果需要下载文件,除了自己程序内部实现下载外,还可以直接使用 Android 系统自带的下载器进行下载,使用系统下载器通常有两种方式: 1. 浏览器下载下载链接使用浏览器打开,把下载...
  • RabbitMQ之惰性队列与镜像队列

    千次阅读 多人点赞 2020-04-04 00:58:18
    文章目录1、惰性队列1.1、使用场景1.2、定义1.3、队列模式1.4、工作流程1.5、总结2、镜像队列2.1、消息流转过程2.2、负载均衡2.3、消息的可靠性2.4、GM协议2.5、镜像队列宕机2.6、镜像队列启动与停止顺序在这里插入...
  • 2019独角兽企业重金招聘Python...在开发项目的过程中需要向用户发送邮件,但是发送邮件需要耗费大量的时间,这样就会对用户造成很差的体验,所以使用到了消息队列开始使用的是一个外国团队开发的消息队列。 ...
  • 什么是消息队列

    万次阅读 多人点赞 2021-08-17 20:13:42
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,...
  • python队列Queue

    千次阅读 2020-12-11 09:22:59
    一、QueueQueue是python...队列可以并发的派多个线程,对排列的线程处理,并且每个需要处理线程只需要将请求的数据放入队列容器的内存中,线程需要等待,当排列完毕处理完数据后,线程再准时来取数据即可。请求数...
  • 多级队列调度和多级反馈队列调度算法的实现

    千次阅读 多人点赞 2021-04-08 20:28:29
    1、设有N个队列(Q1,Q2…QN),其中各个队列对于处理机的优先级是一样的,也就是说位于各个队列中的作业(进程)的优先级也是一样的。一般来说,优先级Priority(Q1) > Priority(Q2) > … > Priority(QN)...
  • 消息队列

    千次阅读 多人点赞 2019-09-19 21:42:59
    消息队列 “消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高...
  • redis实现延迟队列

    万次阅读 2021-09-27 14:27:00
    redis实现延迟队列一、Redis实现延迟队列二、redis失效监听事件三、此种实现面临的问题四、开发准备五、基础实现六、使用redisson实现延迟队列七、redisson实现延迟队列的原理八、延迟队列配置 一、Redis实现延迟...
  • TP6消息队列使用

    千次阅读 2020-08-24 19:00:34
    tp6中提供了很多种消息队列的链接方式,这里使用Redis数据库链接方式 免费小程序+公众好源码获取,gitee搜索CRMEB即可获取 小程序+公众号美观后台可扫描二维码体验 1.配置消息队列链接方式 首先配置/config/queue....
  • MQ消息队列

    万次阅读 2022-02-22 10:27:16
    什么是消息队列 我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息...因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列能立即返回
  • springboot添加task任务执行队列

    千次阅读 2019-05-17 16:05:13
    主要分为三块: 任务task,TaskExecutor任务执行器,TaskQueue任务队列。 下面详细讲解一下: 任务task的接口ITask: public interface ITask { //执行方法 void run(); } 任务task的接口实现类: public ...
  • 常用的消息队列有,rabbitMq、kafka、RocketMq、ActiveMq等。这些消息队列需要独立安装部署,作为一个中间件来提供服务,虽然有着高性能、高可靠的优点,但是额外部署这些中间件也会增加运维成本,和服务器成本。 本...
  • Java/Android中的优先级任务队列的实践

    万次阅读 多人点赞 2017-05-12 22:18:32
    本篇文章适用于Java和Android开发者,会从实现一个最简单的队列过渡到实现一个带有优先级的队列,使用生活中最常见的的例子结合讲解,保准你可以掌握基本的队列原理。
  • 图解Java数据结构之队列

    千次阅读 2019-08-06 16:00:07
    本篇文章,将对队列进行一个深入的解析。 使用场景 队列在日常生活中十分常见,例如:银行排队办理业务、食堂排队打饭等等,这些都是队列的应用。那么队列有什么特点呢? 我们知道排队的原则就是先来后到,排在前面...
  • 关于消息队列的使用

    万次阅读 多人点赞 2019-03-05 13:58:31
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,...
  • 延迟队列DelayQueue原理

    千次阅读 多人点赞 2021-07-10 15:50:28
    DelayQueue 是一个通过PriorityBlockingQueue实现延迟获取元素的无界队列无界阻塞队列,其中添加进该队列的元素必须实现Delayed接口(指定延迟时间),而且只有在延迟期满后才能从中提取元素。 什么是...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 419,666
精华内容 167,866
关键字:

下载已加入队列不开始下载