精华内容
下载资源
问答
  • 、为什么要用线程池? 诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方 式可能是通过网络协议...
    1、
    
    为什么要用线程池?
    

    诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方 式可能是通过网络协议(例如 HTTP、FTP 或 POP)、通过 JMS 队列或者可能通过轮询数据库。不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。

    构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。实际上,对于原型开发这种方法工作得很 好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。每个请求对应一个线程(thread-per-request)方 法的不足之一是:为每个请求创建一个新线程的开销很大;为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用 户请求的时间和资源更多。

    除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或“切换过度”。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目。

    线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时 线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也 就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

    2、线程池的替代方案

    线程池远不是服务器应用程序内使用多线程的唯一方法。如同上面所提到的,有时,为每个新任务生成一个新线程是十分明智的。然而,如果任务创建过于频繁而任务的平均处理时间过短,那么为每个任务生成一个新线程将会导致性能问题。

    另一个常见的线程模型是为某一类型的任务分配一个后台线程与任务队列。AWT 和 Swing 就使用这个模型,在这个模型中有一个 GUI 事件线程,导致用户界面发生变化的所有工作都必须在该线程中执行。然而,由于只有一个 AWT 线程,因此要在 AWT 线程中执行任务可能要花费相当长时间才能完成,这是不可取的。因此,Swing 应用程序经常需要额外的工作线程,用于运行时间很长的、同 UI 有关的任务。

    每个任务对应一个线程方法和单个后台线程(single-background-thread)方法在某些情形下都工作得非常理想。每个任务一个线程方法 在只有少量运行时间很长的任务时工作得十分好。而只要调度可预见性不是很重要,则单个后台线程方法就工作得十分好,如低优先级后台任务就是这种情况。然 而,大多数服务器应用程序都是面向处理大量的短期任务或子任务,因此往往希望具有一种能够以低开销有效地处理这些任务的机制以及一些资源管理和定时可预见 性的措施。线程池提供了这些优点。

    3、工作队列

    就线程池的实际实现方式而言,术语“线程池”有些使人误解,因为线程池“明显的”实现在大多数情形下并不一定产生我们希望的结果。术语“线程池”先于 Java 平台出现,因此它可能是较少面向对象方法的产物。然而,该术语仍继续广泛应用着。

    虽然我们可以轻易地实现一个线程池类,其中客户机类等待一个可用线程、将任务传递给该线程以便执行、然后在任务完成时将线程归还给池,但这种方法却存在几 个潜在的负面影响。例如在池为空时,会发生什么呢?试图向池线程传递任务的调用者都会发现池为空,在调用者等待一个可用的池线程时,它的线程将阻塞。我们 之所以要使用后台线程的原因之一常常是为了防止正在提交的线程被阻塞。完全堵住调用者,如在线程池的“明显的”实现的情况,可以杜绝我们试图解决的问题的 发生。

    我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait()notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。清单 1 显示了简单的合用工作队列的示例。尽管 Thread API 没有对使用Runnable 接口强加特殊要求,但使用 Runnable 对象队列的这种模式是调度程序和工作队列的公共约定。

    清单 1. 具有线程池的工作队列

    <span class="kwd">public</span><span class="pln"> </span><span class="kwd">class</span><span class="pln"> </span><span class="typ">WorkQueue</span><span class="pun">{</span>
    <span class="pln">     </span><span class="kwd">private</span><span class="pln"> </span><span class="kwd">final</span><span class="pln"> </span><span class="kwd">int</span><span class="pln"> nThreads</span><span class="pun">;</span>
    <span class="pln">     </span><span class="kwd">private</span><span class="pln"> </span><span class="kwd">final</span><span class="pln"> </span><span class="typ">PoolWorker</span><span class="pun">[]</span><span class="pln"> threads</span><span class="pun">;</span>
    <span class="pln">     </span><span class="kwd">private</span><span class="pln"> </span><span class="kwd">final</span><span class="pln"> </span><span class="typ">LinkedList</span><span class="pln"> queue</span><span class="pun">;</span>
    <span class="pln">     </span><span class="kwd">public</span><span class="pln"> </span><span class="typ">WorkQueue</span><span class="pun">(</span><span class="kwd">int</span><span class="pln"> nThreads</span><span class="pun">){</span>
    <span class="pln">         </span><span class="kwd">this</span><span class="pun">.</span><span class="pln">nThreads </span><span class="pun">=</span><span class="pln"> nThreads</span><span class="pun">;</span><span class="pln">         </span>
    <span class="pln">         queue </span><span class="pun">=</span><span class="pln"> </span><span class="kwd">new</span><span class="pln"> </span><span class="typ">LinkedList</span><span class="pun">();</span><span class="pln">         </span>
    <span class="pln">         threads </span><span class="pun">=</span><span class="pln"> </span><span class="kwd">new</span><span class="pln"> </span><span class="typ">PoolWorker</span><span class="pun">[</span><span class="pln">nThreads</span><span class="pun">];</span><span class="pln">         </span>
    <span class="pln">         </span><span class="kwd">for</span><span class="pln"> </span><span class="pun">(</span><span class="kwd">int</span><span class="pln"> i</span><span class="pun">=</span><span class="lit">0</span><span class="pun">;</span><span class="pln"> i</span><span class="pun"><</span><span class="pln">nThreads</span><span class="pun">;</span><span class="pln"> i</span><span class="pun">++)</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">              threads</span><span class="pun">[</span><span class="pln">i</span><span class="pun">]</span><span class="pln"> </span><span class="pun">=</span><span class="pln"> </span><span class="kwd">new</span><span class="pln"> </span><span class="typ">PoolWorker</span><span class="pun">();</span>
    <span class="pln">              threads</span><span class="pun">[</span><span class="pln">i</span><span class="pun">].</span><span class="pln">start</span><span class="pun">();</span><span class="pln">         </span>
    <span class="pln">         </span><span class="pun">}</span><span class="pln">     </span>
    <span class="pln">     </span><span class="pun">}</span>
    <span class="pln">     </span><span class="kwd">public</span><span class="pln"> </span><span class="kwd">void</span><span class="pln"> execute</span><span class="pun">(</span><span class="typ">Runnable</span><span class="pln"> r</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">         </span>
    <span class="pln">            </span><span class="kwd">synchronized</span><span class="pun">(</span><span class="pln">queue</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">                queue</span><span class="pun">.</span><span class="pln">addLast</span><span class="pun">(</span><span class="pln">r</span><span class="pun">);</span>
    <span class="pln">                queue</span><span class="pun">.</span><span class="pln">notify</span><span class="pun">();</span>
    <span class="pln">            </span><span class="pun">}</span><span class="pln">     </span>
    <span class="pln">     </span><span class="pun">}</span>
    <span class="pln">    </span><span class="kwd">private</span><span class="pln"> </span><span class="kwd">class</span><span class="pln"> </span><span class="typ">PoolWorker</span><span class="pln"> </span><span class="kwd">extends</span><span class="pln"> </span><span class="typ">Thread</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">         </span><span class="kwd">public</span><span class="pln"> </span><span class="kwd">void</span><span class="pln"> run</span><span class="pun">()</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">             </span><span class="typ">Runnable</span><span class="pln"> r</span><span class="pun">;</span>
    <span class="pln">             </span><span class="kwd">while</span><span class="pln"> </span><span class="pun">(</span><span class="kwd">true</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">                 </span><span class="kwd">synchronized</span><span class="pun">(</span><span class="pln">queue</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">                     </span><span class="kwd">while</span><span class="pln"> </span><span class="pun">(</span><span class="pln">queue</span><span class="pun">.</span><span class="pln">isEmpty</span><span class="pun">())</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">                         </span><span class="kwd">try</span>
    <span class="pln">                         </span><span class="pun">{</span>
    <span class="pln">                             queue</span><span class="pun">.</span><span class="pln">wait</span><span class="pun">();</span>
    <span class="pln">                         </span><span class="pun">}</span>
    <span class="pln">                         </span><span class="kwd">catch</span><span class="pln"> </span><span class="pun">(</span><span class="typ">InterruptedException</span><span class="pln"> ignored</span><span class="pun">)</span>
    <span class="pln">                         </span><span class="pun">{</span>
    <span class="pln">                         </span><span class="pun">}</span>
    <span class="pln">                     </span><span class="pun">}</span>
    <span class="pln">                     r </span><span class="pun">=</span><span class="pln"> </span><span class="pun">(</span><span class="typ">Runnable</span><span class="pun">)</span><span class="pln"> queue</span><span class="pun">.</span><span class="pln">removeFirst</span><span class="pun">();</span>
    <span class="pln">                 </span><span class="pun">}</span>
    <span class="pln">                 </span><span class="com">// If we don't catch RuntimeException, </span>
    <span class="pln">                 </span><span class="com">// the pool could leak threads</span>
    <span class="pln">                 </span><span class="kwd">try</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">                     r</span><span class="pun">.</span><span class="pln">run</span><span class="pun">();</span>
    <span class="pln">                 </span><span class="pun">}</span>
    <span class="pln">                 </span><span class="kwd">catch</span><span class="pln"> </span><span class="pun">(</span><span class="typ">RuntimeException</span><span class="pln"> e</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span>
    <span class="pln">                     </span><span class="com">// You might want to log something here</span>
    <span class="pln">                 </span><span class="pun">}</span>
    <span class="pln">             </span><span class="pun">}</span>
    <span class="pln">         </span><span class="pun">}</span>
    <span class="pln">     </span><span class="pun">}</span>
    <span class="pln"> </span><span class="pun">}</span>

    您可能已经注意到了清单 1 中的实现使用的是 notify() 而不是 notifyAll() 。大多数专家建议使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有难以捉摸的风险,只有在某些特定条件下使用该方法才是合适的。另一方面,如果使用得当, notify() 具有比 notifyAll() 更可取的性能特征;特别是, notify() 引起的环境切换要少得多,这一点在服务器应用程序中是很重要的。

    清单 1 中的示例工作队列满足了安全使用 notify() 的需求。因此,请继续,在您的程序中使用它,但在其它情形下使用 notify() 时请格外小心。

    4、使用线程池的风险

    虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。

    5、死锁

    任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程死锁 了。死锁的最简单情形是:线程 A 持有对象 X 的独占锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁。除非有某种方法来打破对锁的等待(Java 锁定不支持这种方法),否则死锁的线程将永远等下去。

    虽然任何多线程程序中都有死锁的风险,但线程池却引入了另一种死锁可能,在那种情况下,所有池线程都在执行已阻塞的等待队列中另一任务的执行结果的任务, 但这一任务却因为没有未被占用的线程而不能运行。当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任 务执行,查询对象又同步等待着响应时,会发生这种情况。
    6、 资源不足

    线程池的一个优点在于:相对于其它替代调度机制(有些我们已经讨论过)而言,它们通常执行得很好。但只有恰当地调整了线程池大小时才是这样的。线程消耗包括内存和其它系统资源在内的大量资源。除了Thread 对象所需的内存之外,每个线程都需要两个可能很大的执行调用堆栈。除此以外,JVM 可能会为每个 Java 线程创建一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但如果有很多线程,环境切换也可能严重地影响程序的性能。

    如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏 问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。除了线程自身所使用的资源以外,服务请求时所做的工作可能需要其它资源,例 如 JDBC 连接、套接字或文件。这些也都是有限资源,有太多的并发请求也可能引起失效,例如不能分配 JDBC 连接。

    7、并发错误
    线程池和其它排队机制依靠使用wait()notify() 方法,这两个方法都难于使用。如果编码不正确,那么可能丢失通知,导致线程保持空闲状态,尽管队列中有工作要处理。使用这些方法时,必须格外小心;即便是专家也可能在它们上面出错。而最好使用现有的、已经知道能工作的实现,例如在下面的 无需编写自己的线程池中讨论的 util.concurrent 包。
    8、
    线程泄漏

    各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个RuntimeException 或一个 Error 时。如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。

    有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证变得可用,用户可能也已经回家了,诸如此类的任务会永久停止,而这些停止的任务也 会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,应该要么只给予它们自己的线程,要 么只让它们等待有限的时间。

    请求过载

    仅仅是请求就压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会 消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于您自己;在某些情况下,您可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也可 以用一个指出服务器暂时很忙的响应来拒绝请求。



    有效使用线程池的准则

    只要您遵循几条简单的准则,线程池可以成为构建服务器应用程序的极其有效的方法:

    • 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为所有的线程都很忙。
    • 在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得某些 进展。
    • 理解任务。要有效地调整线程池大小,您需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?您的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队 列可能会有意义,这样可以相应地调整每个池。


    调整池的大小

    调整线程池的大小基本上就是避免两类错误:线程太少或线程太多。幸运的是,对于大多数应用程序来说,太多和太少之间的余地相当宽。

    请回忆:在应用程序中使用线程有两个主要优点,尽管在等待诸如 I/O 的慢操作,但允许继续进行处理,并且可以利用多处理器。在运行于具有 N 个处理器机器上的计算限制的应用程序中,在线程数目接近 N 时添加额外的线程可能会改善总处理能力,而在线程数目超过 N 时添加额外的线程将不起作用。事实上,太多的线程甚至会降低性能,因为它会导致额外的环境切换开销。

    线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有 N 或 N+1 个线程时一般会获得最大的 CPU 利用率。

    对于那些可能需要等待 I/O 完成的任务(例如,从套接字读取 HTTP 请求的任务),需要让池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。通过使用概要分析,您可以估计某个典型请求的等待时间(WT)与服 务时间(ST)之间的比例。如果我们将这一比例称之为 WT/ST,那么对于一个具有 N 个处理器的系统,需要设置大约 N*(1+WT/ST) 个线程来保持处理器得到充分利用。

    处理器利用率不是调整线程池大小过程中的唯一考虑事项。随着线程池的增长,您可能会碰到调度程序、可用内存方面的限制,或者其它系统资源方面的限制,例如套接字、打开的文件句柄或数据库连接等的数目。


    无须编写您自己的池

    Doug Lea 编写了一个优秀的并发实用程序开放源码库 util.concurrent ,它包括互斥、信号量、诸如在并发访问下执行得很好的队列和散列表之类集合类以及几个工作队列实现。该包中的PooledExecutor 类是一种有效的、广泛使用的以工作队列为基础的线程池的正确实现。您无须尝试编写您自己的线程池,这样做容易出错,相反您可以考虑使用util.concurrent 中的一些实用程序。参阅 参考资料 以获取链接和更多信息。

    util.concurrent 库也激发了 JSR 166,JSR 166 是一个 Java 社区过程(Java Community Process (JCP))工作组,他们正在打算开发一组包含在java.util.concurrent 包下的 Java 类库中的并发实用程序,这个包应该用于 Java 开发工具箱 1.5 发行版。

    展开全文
  • java线程池应用场景 数据分析和网络爬虫 一:java多线程应用 我们在实际的开发中经常会用到多线程,比如我们对磁盘文件的读写,为了提高对cpu的利用率, 我们会在很多地方用到多线程,: 1、比如我们可能会启动...

                              java线程池应用场景 数据分析和网络爬虫

    一:java多线程应用

    我们在实际的开发中经常会用到多线程,比如我们对磁盘文件的读写,为了提高对cpu的利用率,

    我们会在很多地方用到多线程,:

    1、比如我们可能会启动多个线程来读写磁盘。我们通过开启多个线程来不断轮询对应的设备寄存器是否准备好数据,没

    有准备好这个线程就去处理别的东西,然后再有线程来询问。由于要进行的磁盘文件可以被任何I/O操作线程所访问,所以

    这时候来轮询的数据和要拿数据的线程不一定是原来的线程。当设备驱动程序准备好数据之后,这时候可能是随机的一个

    线程来处理数据。

    2、或者我们开发完网站之后响应用户的请求,会开多个线程来响应用户的请求。其实tomcat也是这么做的。当有用户访

    问的时候。tomcat就会为每个用户开启很多线程来响应用户的请求,如果用户发送请求的大小是1k,那么至少需要1M的

    线程和处理内存的开销来服务每个用户。

    二:java线程池的引入:

    如果说线程池是为了提高cpu的利用率,那么线程池就是多线程的升级版,能够在多线程的基础上更好的利用宝贵的cpu

    资源。尤其是要处理的数据量很大的时候。因为不断创建线程,线程消亡也需要一定的性能开销。为了更高效的处理数

    据,我们采用线程池来进行管理线程,程序启动之后,开很多线程来获取数据,处理数据。但是处理完数据之后,线程

    并不会消亡。而是被分配新的任务,获取或者处理数据。直到所有数据处理完毕。然后线程池释放所有线程,断开与线

    程池连接。很厉害的大佬可能可以手写一套类似于线程池的线程管理工具。但是术业有专攻。这种东西人家帮我们开发

    好了,功能和细节也比较完善,我们可以拿来直接用。jdk中给我们提供了线程池。我们也可以用一些第三方的jar包来配

    置线程连接池。比如引入阿里的jar包来连接。其实是对jdk自带的封装。我们来看看jdk线程池的源码,了解一些基本的

    配置参数。

    corePoolSize:核心线程池。线程池的基本大小,没有任务在运行的时候线程池的数量。但是allowCoreThreadTimeOut

    设置为true时,并且超过了keepAliveTime的时间限制,这时候线程池是没有线程的。需要注意的是,线程创建之后并不

    会立即执行,只有有任务到达的时候线程才会运行。

    maximumPoolSize:允许线程池创建线程的最大数量。当任务队列塞满了corePoolSize允许的线程,会生成新的线程来

    接受任务。线程池创建线程的数量受这个参数的限制。但是在线程池启动的时候,可以通过setmaximumPoolSize的大小

    来调整开启的最大的线程数。不过不建议修改这个参数。因为线程数过多的话线程频繁切换会带来一定的开销。

    keepAliveTime :这个参数指定了当任务队列中不再有任务的时候会结束线程。

    unit:keepAliveTime的单位,

    BlockingQueue:任务缓存队列,当线程执行完一个任务之后,回在这个缓存队列中分配任务给空闲的线程。

    ThreadFactory:用来创建线程。

    三:利用Executors 工厂创建线程池:

    为了防止使用者错误搭配ThreadPoolExecutor构造函数的各个参数以及更加方便简洁的创建ThreadPoolExecutor对象,

    JavaSE中又定义了Executors类,Eexcutors类底层调用的是ThreadPoolExecutor的构造方法,Eexcutors类提供了创建

    常用配置线程池的方法。

    1、 newSingleThreadExecutor:这种情况是在单线程情况下创建线程池,相当于只有一个线程同步执行所有的任务。

    对应的任务策略是无界队列的方式。

    2、newFixedThreadPool:固定大小的缓冲池,对应的任务排队策略是无界队列的方式。

    3、newCachedThreadPool:可缓存的以根据任务量来动态调整缓冲池的数量,可以无限制扩大线程池中线程的数量。

    对应的任务策略是直接提交的方式。

    4、newScheduledThreadPool:线程池线程数量固定的线程池,以延时或者定时的方式执行任务。对应的任务策略是

    优先级队列。

    注意事项:java开发手册中一般不采用Executors来创建线程池。而是采用ThreadPoolExecutor方式创建线程池。

    这样可以更加明确线程池的运行规则,规避资源耗尽的风险。

    1:newSingleThreadExecutor和newFixedThreadPool采用的任务策略是无界队列的方式,可接收ude任务数量是Integer.MAX_VALUE,可能会造成内存溢出的问题。

    2:newCachedThreadPool和newScheduledThreadPool可创建线程的数量是Integer.MAX_VALUE可能会创建大量线程,导致内存溢出

    四:线程池的任务排队策略:

    1、有界队列:

    实现方式:ArrayBlockingQueue。这种情况存在任务缓冲队列,当有任务添加到线程池的时候,并且线程池中线程的数

    量小于corePoolSize允许的数量,会试着新建线程来处理任务(即使已经创建的线程处于空闲状态也会创建新线程),达

    到corePoolSize数量的时候,会把任务放在缓冲队列中。当线程池中的数量达到corePoolSize的时候,就会把任务放在

    缓存队列中。当线程的数量达到corePoolSize的最大线程数并且缓冲队列满的时候,然后线程池会在maximumPoolSize

    的允许范围内新建线程处理添加的任务。如果都满了之后就会产生任务拒绝。

    2、无界队列:

    实现方式:LinkedBlockingQueue。这种情况下,当有任务到达的时候,就会在线程池中者创建线程来执行任务,当达

    到corePoolSize所允许的最大数量的时候,就会把多余的任务放在缓存队列中。这时候实际上maximumPoolSize参数

    就无效了。这种情况适合于各个任务都互不影响的情况。

    3、直接提交:

    实现方式:SynchronousQueue。在这种任务提交方式下,能保证任务能够顺序执行,这种方式没有任务缓冲区,当有

    空闲线程的时候,会有线程在队列头部等待任务到来。然后牵手,离开。当没有空闲线程的时候,会在任务达到的时候

    生成一个线程来处理这个任务,所以这种队列不限制线程的数量。

    4、优先级队列:

    实现方式:DelayedWorkQueue。这种队列底层实现的是阻塞队列的方式,只不过有优先级的区别。当线程没达到最大

    数量的时候,会新建一个线程来执行达到的任务,当线程不能再创建的时候,就会把多余的任务放在优先级队列中。延时

    时间越短的线程越会放在队列靠前的位置。

    五:线程池的任务拒绝发生的情况:

    如果接受任务队列底层用阻塞队列的情况下,当corePoolSize,maximumPoolSize和缓冲区都满了的情况下,就会产生

    任务拒绝。在不能创建新线程并且缓冲区满了之后。缓冲池调用execute(Runnable)或submit(Runnable)方法分配任务

    就会发生任务拒绝。

    六:线程池的任务拒绝策略:

    1. CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 
    2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }} 
      这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
    3. AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();} 
      这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
    4. DiscardPolicy:不能执行的任务将被删除 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} 
      这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
    5. DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }} 
      该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

    七:线程池的应用场景:

    1、数据分析:

    一般的大型网站都会对用户的数据进行分析,以一般的大型网站举个例子,比如淘宝。我们在使用淘宝的时候,会对用

    户的行为进行记录,比如你买了什么商品,你把什么商品放在了购物车,甚至你最近浏览过什么商品。都会记录下来。

    但是淘宝的用户量是巨大了。淘宝账号的数量保守的估计是一亿。淘宝怎么分析如此庞大的数据呢?

    首先要考虑的是怎么存储这些庞大的数据量,因为人家比较有钱,可以采用数据库集群来进行存储。然后要挑一个合适

    的时间。因为要做数据分析肯定会大量调用数据库的信息。一般选择请求数量比较少的情况。可以选择晚上十二点以后。

    肯定不能在用户访问量比较大的时候进行,因为可能会导致用户的请求丢失。而且一般不是消费高峰期(双11)的时候是

    不会开启消息队列的。因为要频繁的分析数据库的数据,频繁进行磁盘I/O。线程复用就必不可少。我们可以自己选择实

    现线程的复用,但是要考虑的东西比较多,包括线程创建消亡,线程调度等等。所以一般会调用人家开发好的接口。比

    如jdk自带。或者阿里的线程池接口。

    2、网络爬虫:

    比如一个艺人的公司或者某个高校出了负面新闻,需要我们去网上的网站上联系人家删除对应的负面影响。但是人工去筛

    选这些信息显然不合理,大大小小的比较流行的大网站有300多个。所以需要用到爬虫来爬出那些有负面影响的信息,联

    系网站删除。还有应用比较火的地方就是人工智能方面。从网上爬完数据之后放在数据库,比如一些指定的名词以及解释

    然后给机器人来学习。这时候更需要线程池来保证效率。

    参考博客:

    线程池拒绝策略:https://blog.csdn.net/wang_rrui/article/details/78541786

    阻塞队列之三:SynchronousQueue同步队列 阻塞算法的3种实现https://www.cnblogs.com/duanxz/p/3252267.html

    Executors创建的4种线程池的使用https://www.cnblogs.com/ljp-sun/p/6580147.html

    展开全文
  • java线程池场景使用

    千次阅读 2019-03-21 16:43:56
    当服务器接口之间通讯中,如果A服务器请求B服务器,只返回请求通讯成功且不需要实时返回业务成功或失败时,处理成功或者失败用异步通知方式,那么可以采用线程池或者消息队列业务场景:金融业务中的强风控,还款等等...

    当服务器接口之间通讯中,如果A服务器请求B服务器,只返回请求通讯成功且不需要实时返回业务成功或失败时,处理成功或者失败用异步通知方式,那么可以采用线程池或者消息队列
    业务场景:金融业务中的强风控,还款等等
    重点:如果需要同步通知业务成功或失败,让A服务器提供通知接口,在服务器B线程中可以同步通知服务器A中的接口   累计金融业务弱风控 

    代码如下:

    /**
     * @author stone
     * @date 2019-03-21 16:20:21
     */
    @Service
    public class ExceutorTestServiceImpl implements ExceutorTestService{
    
        @Resource
        SlUserMapper slUserMapper;
    
        private ExecutorService executorService = Executors.newFixedThreadPool(5);
    
        Logger logger = LoggerFactory.getLogger(ExceutorTestServiceImpl.class);
    
        @Override
        public void exceutorTest() {
            logger.info("部分业务有线程来处理,无需返回结果,可以拋出异常全局捕获即可");
            SubmitStrongRiskTask task = new SubmitStrongRiskTask(1l, BigDecimal.valueOf(10l));
            executorService.execute(task);
            logger.info("继续做其他业务");
        }
    
        class SubmitStrongRiskTask implements Runnable {
            private Long userId;
            private BigDecimal amount;
    
            SubmitStrongRiskTask(Long userId, BigDecimal amount) {
                this.userId = userId;
                this.amount = amount;
            }
    
            @Override
            public void run() {
                SlUser slUser = slUserMapper.getUserById(16l);
                logger.info("用户手机号码是:"+ slUser.getMobile());
            }
        }
    
    }
    
    打印日志如下:
    192.168.3.78 部分业务有线程来处理,无需返回结果,可以拋出异常全局捕获即可
    192.168.3.78 继续做其他业务
    192.168.3.78 用户手机号码是:15576602451
    
    
    展开全文
  • java线程池使用

    千次阅读 2015-01-20 14:56:10
    java线程池使用,ThreadPoolExecutor

    转载地址:http://automaticthoughts.iteye.com/blog/1612388


    一 简介

    线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用。为我们在开发中处理线程的问题提供了非常大的帮助。

    二:线程池

    线程池的作用:

    线程池作用就是限制系统中执行线程的数量。
         根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

    为什么要用线程池:

    1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

    2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

    Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

    比较重要的几个类:

    ExecutorService

    真正的线程池接口。

    ScheduledExecutorService

    能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

    ThreadPoolExecutor

    ExecutorService的默认实现。

    ScheduledThreadPoolExecutor

    继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

    要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

    1. newSingleThreadExecutor

    创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    2. newFixedThreadPool

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    3. newCachedThreadPool

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,

    那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    4. newScheduledThreadPool

    创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    实例

    1:newSingleThreadExecutor

    MyThread.java

    public class MyThread extends Thread {

         @Override

         public void run() {

             System.out.println(Thread.currentThread().getName() + "正在执行。。。");

         }

    }

    TestSingleThreadExecutor.java

    public class TestSingleThreadExecutor {

         public static void main(String[] args) {

             // 创建一个可重用固定线程数的线程池

             ExecutorService pool = Executors. newSingleThreadExecutor();

             // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

             Thread t1 = new MyThread();

             Thread t2 = new MyThread();

             Thread t3 = new MyThread();

             Thread t4 = new MyThread();

             Thread t5 = new MyThread();

             // 将线程放入池中进行执行

             pool.execute(t1);

             pool.execute(t2);

             pool.execute(t3);

             pool.execute(t4);

             pool.execute(t5);

             // 关闭线程池

             pool.shutdown();

         }

    }

    输出结果

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    2 newFixedThreadPool

    TestFixedThreadPool.Java

    public class TestFixedThreadPool {

         public static void main(String[] args) {

             // 创建一个可重用固定线程数的线程池

             ExecutorService pool = Executors.newFixedThreadPool(2);

             // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

             Thread t1 = new MyThread();

             Thread t2 = new MyThread();

             Thread t3 = new MyThread();

             Thread t4 = new MyThread();

             Thread t5 = new MyThread();

             // 将线程放入池中进行执行

             pool.execute(t1);

             pool.execute(t2);

             pool.execute(t3);

             pool.execute(t4);

             pool.execute(t5);

             // 关闭线程池

             pool.shutdown();

         }

    }

    输出结果

    pool-1-thread-1正在执行。。。

    pool-1-thread-2正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-2正在执行。。。

    pool-1-thread-1正在执行。。。

    3 newCachedThreadPool

    TestCachedThreadPool.java

    public class TestCachedThreadPool {

         public static void main(String[] args) {

             // 创建一个可重用固定线程数的线程池

             ExecutorService pool = Executors.newCachedThreadPool();

             // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

             Thread t1 = new MyThread();

             Thread t2 = new MyThread();

             Thread t3 = new MyThread();

             Thread t4 = new MyThread();

             Thread t5 = new MyThread();

             // 将线程放入池中进行执行

             pool.execute(t1);

             pool.execute(t2);

             pool.execute(t3);

             pool.execute(t4);

             pool.execute(t5);

             // 关闭线程池

             pool.shutdown();

         }

    }

    输出结果:

    pool-1-thread-2正在执行。。。

    pool-1-thread-4正在执行。。。

    pool-1-thread-3正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-5正在执行。。。

    newScheduledThreadPool

    TestScheduledThreadPoolExecutor.java

    public class TestScheduledThreadPoolExecutor {

         public static void main(String[] args) {

             ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);

             exec.scheduleAtFixedRate(new Runnable() {// 每隔一段时间就触发异常

                           @Override

                           public void run() {

                                //throw new RuntimeException();

                                System.out.println("================");

                           }

                       }, 1000, 5000, TimeUnit.MILLISECONDS);

             exec.scheduleAtFixedRate(new Runnable() {// 每隔一段时间打印系统时间,证明两者是互不影响的

                           @Override

                           public void run() {

                                System.out.println(System.nanoTime());

                           }

                       }, 1000, 2000, TimeUnit.MILLISECONDS);

         }

    }

    输出结果

    ================

    8384644549516

    8386643829034

    8388643830710

    ================

    8390643851383

    8392643879319

    8400643939383

    三:ThreadPoolExecutor详解

    ThreadPoolExecutor的完整构造方法的签名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .

    corePoolSize - 池中所保存的线程数,包括空闲线程。

    maximumPoolSize - 池中允许的最大线程数。

    keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

    unit - keepAliveTime 参数的时间单位。

    workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。

    threadFactory - 执行程序创建新线程时使用的工厂。

    handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

    ThreadPoolExecutor是Executors类的底层实现。

    在JDK帮助文档中,有如此一段话:

    “强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)

    它们均为大多数使用场景预定义了设置。”

    下面介绍一下几个类的源码:

    ExecutorService   newFixedThreadPool (int nThreads):固定大小线程池。

    可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,queue有一个特点,他是无界的。

    1.   public static ExecutorService newFixedThreadPool(int nThreads) {   

    2.           return new ThreadPoolExecutor(nThreads, nThreads,   

    3.                                         0L, TimeUnit.MILLISECONDS,   

    4.                                         new LinkedBlockingQueue<Runnable>());   

    5.       }

    ExecutorService   newSingleThreadExecutor():单线程

    1.   public static ExecutorService newSingleThreadExecutor() {   

    2.           return new FinalizableDelegatedExecutorService   

    3.               (new ThreadPoolExecutor(11,   

    4.                                       0L, TimeUnit.MILLISECONDS,   

    5.                                       new LinkedBlockingQueue<Runnable>()));   

    6.       }

    ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收

    这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。

    1.   public static ExecutorService newCachedThreadPool() {   

    2.           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   

    3.                                         60L, TimeUnit.SECONDS,   

    4.                                         new SynchronousQueue<Runnable>());   

    1.     }

    先从BlockingQueue<Runnable> workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。

    所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

    如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行

    如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列而不添加新的线程

    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

    queue上的三种类型。

     

    排队有三种通用策略:

    直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  

    BlockingQueue的选择。

    例子一:使用直接提交策略,也即SynchronousQueue。

    首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。

    我们使用一下参数构造ThreadPoolExecutor:

    1.   new ThreadPoolExecutor(   

    2.               2330, TimeUnit.SECONDS,    

    3.               new  SynchronousQueue<Runnable>(),    

    4.               new RecorderThreadFactory("CookieRecorderPool"),    

    1.             new ThreadPoolExecutor.CallerRunsPolicy());  

    new ThreadPoolExecutor(

       2, 3, 30, TimeUnit.SECONDS,

       new SynchronousQueue<Runnable>(),

       new RecorderThreadFactory("CookieRecorderPool"),

       new ThreadPoolExecutor.CallerRunsPolicy());

     当核心线程已经有2个正在运行.

    1. 此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列而不添加新的线程。”,所以A被添加到queue中。
    2. 又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
    3. 此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
    4. 暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。

    所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁

    什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中。

    例子二:使用无界队列策略,即LinkedBlockingQueue

    这个就拿newFixedThreadPool来说,根据前文提到的规则:

    如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。那么当任务继续增加,会发生什么呢?

    如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?

    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,不一会儿就爆了。

    例子三:有界队列,使用ArrayBlockingQueue。

    这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。

    举例来说,请看如下构造方法:

    1.   new ThreadPoolExecutor(   

    2.               2430, TimeUnit.SECONDS,    

    3.               new ArrayBlockingQueue<Runnable>(2),    

    4.               new RecorderThreadFactory("CookieRecorderPool"),    

    5.               new ThreadPoolExecutor.CallerRunsPolicy());  

    new ThreadPoolExecutor(

         2, 4, 30, TimeUnit.SECONDS,

         new ArrayBlockingQueue<Runnable>(2),

         new RecorderThreadFactory("CookieRecorderPool"),

         new ThreadPoolExecutor.CallerRunsPolicy());

    假设,所有的任务都永远无法执行完。

    对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queue中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。

    keepAliveTime

    jdk中的解释是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

    有点拗口,其实这个不难理解,在使用了“池”的应用中,大多都有类似的参数需要配置。比如数据库连接池,DBCP中的maxIdle,minIdle参数。

    什么意思?接着上面的解释,后来向老板派来的工人始终是“借来的”,俗话说“有借就有还”,但这里的问题就是什么时候还了,如果借来的工人刚完成一个任务就还回去,后来发现任务还有,那岂不是又要去借?这一来一往,老板肯定头也大死了。

     

    合理的策略:既然借了,那就多借一会儿。直到“某一段”时间后,发现再也用不到这些工人时,便可以还回去了。这里的某一段时间便是keepAliveTime的含义,TimeUnit为keepAliveTime值的度量。

     

    RejectedExecutionHandler

    另一种情况便是,即使向老板借了工人,但是任务还是继续过来,还是忙不过来,这时整个队伍只好拒绝接受了。

    RejectedExecutionHandler接口提供了对于拒绝任务的处理的自定方法的机会。在ThreadPoolExecutor中已经默认包含了4中策略,因为源码非常简单,这里直接贴出来。

    CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

    1.   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {   

    2.               if (!e.isShutdown()) {   

    3.                   r.run();   

    4.               }   

    5.           }  

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                if (!e.isShutdown()) {

                    r.run();

                }

            }

    这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。

    AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException

    1.   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {   

    2.               throw new RejectedExecutionException();   

    3.           }  

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                throw new RejectedExecutionException();

            }

     这种策略直接抛出异常,丢弃任务。

    DiscardPolicy:不能执行的任务将被删除

    1.   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {   

    2.           }  

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            }

     这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

    DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

    1.   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {   

    2.               if (!e.isShutdown()) {   

    3.                   e.getQueue().poll();   

    4.                   e.execute(r);   

    5.               }   

    1.         }  

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                if (!e.isShutdown()) {

                    e.getQueue().poll();

                    e.execute(r);

                }

            }

    该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

    设想:如果其他线程都还在运行,那么新来任务踢掉旧任务,缓存在queue中,再来一个任务又会踢掉queue中最老任务。

    总结:

    keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。

    反之,如果核心数较小,有界BlockingQueue数值又较小,同时keepAliveTime又设的很小,如果任务频繁,那么系统就会频繁的申请回收线程。

    public static ExecutorService newFixedThreadPool(int nThreads) {

            return new ThreadPoolExecutor(nThreads, nThreads,

                                          0L, TimeUnit.MILLISECONDS,

                                          new LinkedBlockingQueue<Runnable>());

        }

    展开全文
  • Java生产环境线程池使用场景

    千次阅读 2018-07-29 00:04:52
    talk is cheap, show me the code 直接...import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * @author by bixi.lx * @created on 2018 07 28 22:17 */ public class Th...
  • java 线程池场景

    2019-10-02 03:05:32
    java 一共含有四种线程池: newCachedThreadPool, newFixedThreadPool, newSingleThreadExecutor, newScheduledThreadPool。 newCachedThreadPool:顾名思义是一种可缓存的线程池线程池除了维护初始大小的线程外...
  • Java线程池场景化总结

    2020-08-26 19:35:10
    Java线程池场景化总结 本文将通过不同的场景,对多线程及线程池使用建议进行介绍,以下场景示例将以8核心CPU为例 1)任务数多但资源占用不大 场景解读:电商平台消息推送或短信通知,发邮件、对于该场景来说需要...
  • java 线程池使用详解

    2021-05-27 10:48:08
    java 中遇到异步操作的时候经常会使用多线程来操作,当前也可以用队列,看业务场景,队列主要还是用于肖锋和解耦业务的,如果只是简单的一个异步逻辑处理,没有必有使用消息队列,起个线程就完事了。使用线程也要...
  • Java线程池的四种用法与使用场景

    千次阅读 2019-10-24 22:54:08
    文章目录Java线程池的四种用法与使用场景一、如下方式存在的问题二、使用线程池有什么优点三、线程池的四种使用方式1、newCachedThreadPool2、newFixedThreadPool3、newScheduledThreadPool4、... ...
  • java线程池使用详解

    千次阅读 2018-05-28 19:37:26
    http://automaticthoughts.iteye.com/blog/1612388一 简介线程的使用java中占有极其...Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池使用。为我们在开发中处理线程的问题提供...
  • Java线程池使用说明

    2016-12-04 15:10:05
    线程的使用java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程...
  • java线程池使用及原理

    2021-01-22 14:55:09
    java线程池原理及使用一、为什么使用线程池二、线程池 一、为什么使用线程池 1.线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动给这些任务,如果线程数量超过了最大数量...
  • 这是【从0到1学习Java线程池】系列文章的第 壹 篇,该系列文章总共三篇,介绍了 Java 线程池的使用以及原理,并且最后会实现一个基本的线程池。本篇文章主要介绍了 Java 线程池以及它的使用
  • java线程池的几种使用方式和场景

    千次阅读 2018-03-08 18:50:18
    一、线程池使用场景单个任务处理时间短将需处理的任务数量大二、使用Java线程池好处1.使用new Thread()创建线程的弊端:每次通过new Thread()创建对象性能不佳。线程缺乏统一管理,可能无限制新建线程,相互之间竞争...
  • Java线程池使用

    千次阅读 2013-11-13 13:09:06
    1. 为什么要用线程池?  在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器 在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在实际...
  • java线程池使用最全详解

    千次阅读 2021-05-18 15:14:46
    线程池使用 前言 在执行一个异步任务或并发任务时,往往是通过直接new Thread()方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池线程池的优势很明显,如下: 降低系统资源消耗,通过重用已...
  • 线程的使用java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程...
  • java线程池

    2019-05-19 10:31:44
    java线程池使用 java线程池的源码分析 一. java线程池的原理 线程池的作用 降低资源消耗. 线程池通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 提高响应速度. 当任务到达时,任务不需要等待线程创建...
  • Java线程池详解

    万次阅读 多人点赞 2018-04-08 19:07:23
    java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题。同时创建过多的线程也会占用过多的内存...
  • java线程池使用

    千次阅读 2016-04-04 12:45:17
    java线程池 使用说明:参照java api doc https://docs.oracle.com/javase/8/docs/api/ java.util.concurrent.ThreadPoolExecutor Core and maximum pool sizes On-demand construction Creating new ...
  • Java线程池使用

    千次阅读 2020-06-12 11:48:29
    关于线程池 为什么需要线程池? 线程是处理器调度的基本单位。我们会为每一个请求都独立创建一个...java预定义的线程池 CachedThreadPool:可缓存的线程池,该线程池中没有核心线程,非核心线程的数量为Integer.max_va

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 92,286
精华内容 36,914
关键字:

java线程池的使用场景

java 订阅