精华内容
下载资源
问答
  • 实现 Java 多线程并发控制框架

    千次阅读 2019-01-08 04:46:25
    2006 年 8 月 14 日 Java 提供了语言级别的线程支持,所以在 Java 中使用多线程相对于 C,C++ 来说更简单便捷,但...在本文中,我们将讨论如何实现一个 Java 多线程的运行框架以及我们是如何来控制线程的并发同步...

    2006 年 8 月 14 日

    Java 提供了语言级别的线程支持,所以在 Java 中使用多线程相对于 C,C++ 来说更简单便捷,但本文并不是介绍如何在 Java 中使用多线程来来解决诸如 Web services, Number crunching 或者 I/O processing 之类的问题。在本文中,我们将讨论如何实现一个 Java 多线程的运行框架以及我们是如何来控制线程的并发同步以及顺序执行的。

    所面临的问题


    图 1. 线程场景
    图 1. 线程场景

    这幅图中节点代表一个 single Thread,边代表执行的步骤。

    整幅图代表的意思是,ROOT 线程执行完毕后执行 T1 线程,T1 执行完毕后并发的执行 T2 和 T3。而从 T2 和 T3 指向 T4 的两条边表示的是 T4 必须等 T2 和 T3 都执行完毕以后才能开始执行。剩下的步骤以此类推,直到 END 作为整个过程的结束。当然,这只是个简略的示意图,可能面对的一个线程场景会有上百个线程。还有,你可以观察到这整个场景只有一个入口点和一个出口点,这意味着什么?在下文中为你解释。

    这其中涉及到了 Java 线程的同步互斥机制。例如如何让 T1 在 T2 和 T3 之前运行,如何让 T2 和 T3 都执行完毕之后开启 T4 线程。





    回页首


    模型的描述

    如何来描述图 1 中所示的场景呢?可以采用 XML 的格式来描述我们的模型。我定义一个“Thread” element 来表示线程。

    <ThreadList>
    <Thread ID = "thread-id" PRETHREAD = "prethread1, prethread2…"></Thread>
    <Thread ID = "thread-id" PRETHREAD = "prethread3, prethread4…"></Thread>
    </ThreadList>
    

    其中 ID 是线程的唯一标识符,PRETHREAD 便是该线程的直接先决线程的ID,每个线程 ID 之间用逗号隔开。

    在 Thread 这个 element 里面可以加入你想要该线程执行任务的具体信息。

    实际上模型的描述是解决问题非常重要的一个环节,整个线程场景可以用一种一致的形式来描述,作为 Java 多线程并发控制框架引擎的输入。也就是将线程运行的模式用 XML 来描述出来,这样只用改动 XML 配置文件就可以更改整个线程运行的模式,不用改动任何的源代码。





    回页首


    两种实现机制

    对于 Java 多线程的运行框架来说,我们将采用“外”和“内”的两种模式来实现。





    回页首


    “外” - 主线程轮询


    图 2. 静态类图
    图 2. 静态类图

    Thread 是工作线程。ThreadEntry 是 Thread 的包装类,prerequisite 是一个 HashMap,它含有 Thread 的先决线程的状态。如图1中显示的那样,T4 的先决线程是 T2 和 T3,那么 prerequisite 中就包含 T2 和 T3 的状态。TestScenario 中的 threadEntryList 中包含所有的 ThreadEntry。


    图 3. 线程执行场景
    图 3. 线程执行场景

    TestScenario 作为主线程,作为一个“外”在的监控者,不断地轮询 threadEntryList 中所有 ThreadEntry 的状态,当 ThreadEntry 接受到 isReady 的查询后查询自己的 prerequisite,当其中所有的先决线程的状态为“正常结束时”,它便返回 ready,那么 TestScenario 便会调用 ThreadEntry 的 startThread() 方法授权该 ThreadEntry 运行线程,Thread 便通过 run() 方法来真正执行线程。并在正常执行完毕后调用 setPreRequisteState() 方法来更新整个 Scenario,threadEntryList 中所有 ThreadEntry 中 prerequisite 里面含有该 Thread 的状态信息为“正常结束”。


    图 4. 状态更改的过程
    图 4. 状态更改的过程

    如图 1 中所示的 T4 的先决线程为 T2 和 T3,T2 和 T3 并行执行。如图 4 所示,假设 T2 先执行完毕,它会调用 setPreRequisteState() 方法来更新整个 Scenario, threadEntryList 中所有 ThreadEntry 中 prerequisite 里面含有该 T2 的状态信息为“正常结束”。此时,T4 的 prerequisite 中 T2 的状态为“正常结束”,但是 T3 还没有执行完毕,所以其状态为“未完毕”。所以 T4 的 isReady 查询返回为 false,T4 不会执行。只有当 T3 执行完毕后更新状态为“正常结束”后,T4 的状态才为 ready,T4 才会开始运行。

    其余的节点也以此类推,它们正常执行完毕的时候会在整个的 scenario 中广播该线程正常结束的信息,由主线程不断地轮询各个 ThreadEntry 的状态来开启各个线程。

    这便是采用主控线程轮询状态表的方式来控制 Java 多线程运行框架的实现方式之一。

    优点:概念结构清晰明了,实现简单。避免采用 Java 的锁机制,减少产生死锁的几率。当发生异常导致其中某些线程不能正常执行完毕的时候,不会产生挂起的线程。

    缺点:采用主线程轮询机制,耗费 CPU 时间。当图中的节点太多的(n>??? 而线程单个线程执行时间比较短的时候 t<??? 需要进一步研究)时候会产生线程启动的些微延迟,也就是说实时性能在极端情况下不好,当然这可以另外写一篇文章来专门探讨。





    回页首


    “内” - wait&notify

    相对于“外”-主线程轮询机制来说,“内”采用的是自我控制连锁触发机制。


    图 5. 锁机制的静态类图
    图 5. 锁机制的静态类图

    Thread 中的 lock 为当前 Thread 的 lock,lockList 是一个 HashMap,持有其后继线程的 lock 的引用,getLock 和 setLock 可以对 lockList 中的 Lock 进行操作。其中很重要的一个成员是 waitForCount,这是一个引用计数。表明当前线程正在等待的先决线程的个数,例如图 1 中所示的 T4,在初始的情况下,他等待的先决线程是 T2 和 T3,那么它的 waitForCount 等于 2。


    图 6. 锁机制执行顺序图
    图 6. 锁机制执行顺序图

    当整个过程开始运行的时候,我们将所有的线程 start,但是每个线程所持的 lock 都处于 wait 状态,线程都会处于 waiting 的状态。此时,我们将 root thread 所持有的自身的 lock notify,这样 root thread 就会运行起来。当 root 的 run 方法执行完毕以后。它会检查其后续线程的 waitForCount,并将其值减一。然后再次检查 waitForCount,如果 waitForCount 等于 0,表示该后续线程的所有先决线程都已经执行完毕,此时我们 notify 该线程的 lock,该后续线程便可以从 waiting 的状态转换成为 running 的状态。然后这个过程连锁递归的进行下去,整个过程便会执行完毕。

    我们还是以 T2,T3,T4 为例,当进行 initThreadLock 过程的时候,我们可以知道 T4 有两个直接先决线程 T2 和 T3,所以 T4 的 waitForCount 等于 2。我们假设 T3 先执行完毕,T2 仍然在 running 的状态,此时他会首先遍历其所有的直接后继线程,并将他们的 waitForCount 减去 1,此时他只有一个直接后继线程 T4,于是 T4 的 waitForCount 减去 1 以后值变为 1,不等于 0,此时不会将 T4 的 lock notify,T4 继续 waiting。当 T2 执行完毕之后,他会执行与 T3 相同的步骤,此时 T4 的 waitForCount 等于 0,T2 便 notify T4 的 lock,于是 T4 从 waiting 状态转换成为 running 状态。其他的节点也是相似的情况。

    当然,我们也可以将整个过程的信息放在另外的一个全局对象中,所有的线程都去查找该全局对象来获取各自所需的信息,而不是采取这种分布式存储的方式。

    优点:采用 wait&notify 机制而不采用轮询的机制,不会浪费CPU资源。执行效率较高。而且相对于“外”-主线程轮询的机制来说实时性更好。

    缺点:采用 Java 线程 Object 的锁机制,实现起来较为复杂。而且采取一种连锁触发的方式,如果其中某些线程异常,会导致所有其后继线程的挂起而造成整个 scenario 的运行失败。为了防止这种情况的发生,我们还必须建立一套线程监控的机制来确保其正常运行。





    回页首


    延伸

    下面的图所要表达的是这样一种递归迭代的概念。例如在图1 中展示的那样,T1 这个节点表示的是一个线程。现在,忘掉线程这样一个概念,将 T1 抽象为一个过程,想象它是一个银河系,深入到 T1 中去,它也是一个许多子过程的集合,这些子过程之间的关系模式就如图 1 所示那样,可以用一个图来表示。


    图 7. 嵌套子过程
    图 7. 嵌套子过程

    可以想象一下这是怎样的一个框架,具有无穷扩展性的过程框架,我们只用定义各个过程之间的关系,我们不用关心过程是怎样运行的。事实上,可以在最终的节点上指定一个实际的工作,比如读一个文件,或者submit一个JCL job,或者执行一条sql statement。

    其实,按照某种遍历规则,完全可以将这种嵌套递归的结构转化成为一个一层扁平结构的图,而不是原来的分层的网状结构,但是我们不这样做的原因是基于以下的几点考虑:

    1. 如果这样做,会导致图节点太多,边太多,令人眼花缭乱。
    2. 不这样做更主要的原因是每一个场景,如图 7 中的 T1,T13,是状态聚集的一个单元,具有高复用性和可靠性。
    3. 框架是高度抽象的,它实际的执行可以是分布式的,一个单元可以是一个系统,作为和其他系统的分界标志。

    实际上,这是一个状态聚集的层次控制框架,我们可以依赖此框架来执行自主运算。我们将在其它的文章中来讨论它的应用。





    回页首


    总结

    本文介绍了一种 Java 多线程并发控制的框架,并给出了其两种实现的模型,它们有各自的优缺点,有各自的适用范围。当需要进行 Java 线程的并发控制的时候,可以作为参考。



    参考资料

    展开全文
  • 主要介绍了使用java的HttpClient实现多线程并发的相关资料,需要的朋友可以参考下
  • 这篇文章主要介绍了python多线程并发及测试框架案例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1、循环创建多个线程,并通过循环启动执行 import ...
  • Java 多线程并发控制框架

    千次阅读 2009-09-16 23:25:00
    Java 提供了语言级别的线程支持,所以在 Java 中使用多线程相对于 C,C++ 来说更简单便捷,但...在本文中,我们将讨论如何实现一个 Java 多线程的运行框架以及我们是如何来控制线程的并发同步以及顺序执行的。 所面临

    Java 提供了语言级别的线程支持,所以在 Java 中使用多线程相对于 C,C++ 来说更简单便捷,但本文并不是介绍如何在 Java 中使用多线程来来解决诸如 Web services, Number crunching 或者 I/O processing 之类的问题。在本文中,我们将讨论如何实现一个 Java 多线程的运行框架以及我们是如何来控制线程的并发同步以及顺序执行的。

      所面临的问题

      图 1. 线程场景

      这幅图中节点代表一个 single Thread,边代表执行的步骤。

      整幅图代表的意思是,ROOT 线程执行完毕后执行 T1 线程,T1 执行完毕后并发的执行 T2 和 T3。而从 T2 和 T3 指向 T4 的两条边表示的是 T4 必须等 T2 和 T3 都执行完毕以后才能开始执行。剩下的步骤以此类推,直到 END 作为整个过程的结束。当然,这只是个简略的示意图,可能面对的一个线程场景会有上百个线程。还有,你可以观察到这整个场景只有一个入口点和一个出口点,这意味着什么?在下文中为你解释。

      这其中涉及到了 Java 线程的同步互斥机制。例如如何让 T1 在 T2 和 T3 之前运行,如何让 T2 和 T3 都执行完毕之后开启 T4 线程。

      模型的描述

      如何来描述图 1 中所示的场景呢?可以采用 XML 的格式来描述我们的模型。我定义一个“Thread” element 来表示线程。

      其中 ID 是线程的唯一标识符,PRETHREAD 便是该线程的直接先决线程的ID,每个线程 ID 之间用逗号隔开。

      在 Thread 这个 element 里面可以加入你想要该线程执行任务的具体信息。

      实际上模型的描述是解决问题非常重要的一个环节,整个线程场景可以用一种一致的形式来描述,作为 Java 多线程并发控制框架引擎的输入。也就是将线程运行的模式用 XML 来描述出来,这样只用改动 XML 配置文件就可以更改整个线程运行的模式,不用改动任何的源代码。

      两种实现机制

      对于 Java 多线程的运行框架来说,我们将采用“外”和“内”的两种模式来实现。

      “外” - 主线程轮询

      图 2. 静态类图

      Thread 是工作线程。ThreadEntry 是 Thread 的包装类,prerequisite 是一个 HashMap,它含有 Thread 的先决线程的状态。如图1中显示的那样,T4 的先决线程是 T2 和 T3,那么 prerequisite 中就包含 T2 和 T3 的状态。TestScenario 中的 threadEntryList 中包含所有的 ThreadEntry。

      图 3. 线程执行场景

      TestScenario 作为主线程,作为一个“外”在的监控者,不断地轮询 threadEntryList 中所有 ThreadEntry 的状态,当 ThreadEntry 接受到 isReady 的查询后查询自己的 prerequisite,当其中所有的先决线程的状态为“正常结束时”,它便返回 ready,那么 TestScenario 便会调用 ThreadEntry 的 startThread() 方法授权该 ThreadEntry 运行线程,Thread 便通过 run() 方法来真正执行线程。并在正常执行完毕后调用 setPreRequisteState() 方法来更新整个 Scenario,threadEntryList 中所有 ThreadEntry 中 prerequisite 里面含有该 Thread 的状态信息为“正常结束”。

      图 4. 状态更改的过程

      如图 1 中所示的 T4 的先决线程为 T2 和 T3,T2 和 T3 并行执行。如图 4 所示,假设 T2 先执行完毕,它会调用 setPreRequisteState() 方法来更新整个 Scenario, threadEntryList 中所有 ThreadEntry 中 prerequisite 里面含有该 T2 的状态信息为“正常结束”。此时,T4 的 prerequisite 中 T2 的状态为“正常结束”,但是 T3 还没有执行完毕,所以其状态为“未完毕”。所以 T4 的 isReady 查询返回为 false,T4 不会执行。只有当 T3 执行完毕后更新状态为“正常结束”后,T4 的状态才为 ready,T4 才会开始运行。

      其余的节点也以此类推,它们正常执行完毕的时候会在整个的 scenario 中广播该线程正常结束的信息,由主线程不断地轮询各个 ThreadEntry 的状态来开启各个线程。

      这便是采用主控线程轮询状态表的方式来控制 Java 多线程运行框架的实现方式之一。

      优点:概念结构清晰明了,实现简单。避免采用 Java 的锁机制,减少产生死锁的几率。当发生异常导致其中某些线程不能正常执行完毕的时候,不会产生挂起的线程。

      缺点:采用主线程轮询机制,耗费 CPU 时间。当图中的节点太多的(n>??? 而线程单个线程执行时间比较短的时候 t<!-- 需要进一步研究)时候会产生线程启动的些微延迟,也就是说实时性能在极端情况下不好,当然这可以另外写一篇文章来专门探讨。<-->

      “内” - wait¬ify

      相对于“外”-主线程轮询机制来说,“内”采用的是自我控制连锁触发机制。

      图 5. 锁机制的静态类图

      Thread 中的 lock 为当前 Thread 的 lock,lockList 是一个 HashMap,持有其后继线程的 lock 的引用,getLock 和 setLock 可以对 lockList 中的 Lock 进行操作。其中很重要的一个成员是 waitForCount,这是一个引用计数。表明当前线程正在等待的先决线程的个数,例如图 1 中所示的 T4,在初始的情况下,他等待的先决线程是 T2 和 T3,那么它的 waitForCount 等于 2。

      图 6. 锁机制执行顺序图

      当整个过程开始运行的时候,我们将所有的线程 start,但是每个线程所持的 lock 都处于 wait 状态,线程都会处于 waiting 的状态。此时,我们将 root thread 所持有的自身的 lock notify,这样 root thread 就会运行起来。当 root 的 run 方法执行完毕以后。它会检查其后续线程的 waitForCount,并将其值减一。然后再次检查 waitForCount,如果 waitForCount 等于 0,表示该后续线程的所有先决线程都已经执行完毕,此时我们 notify 该线程的 lock,该后续线程便可以从 waiting 的状态转换成为 running 的状态。然后这个过程连锁递归的进行下去,整个过程便会执行完毕。

      我们还是以 T2,T3,T4 为例,当进行 initThreadLock 过程的时候,我们可以知道 T4 有两个直接先决线程 T2 和 T3,所以 T4 的 waitForCount 等于 2。我们假设 T3 先执行完毕,T2 仍然在 running 的状态,此时他会首先遍历其所有的直接后继线程,并将他们的 waitForCount 减去 1,此时他只有一个直接后继线程 T4,于是 T4 的 waitForCount 减去 1 以后值变为 1,不等于 0,此时不会将 T4 的 lock notify,T4 继续 waiting。当 T2 执行完毕之后,他会执行与 T3 相同的步骤,此时 T4 的 waitForCount 等于 0,T2 便 notify T4 的 lock,于是 T4 从 waiting 状态转换成为 running 状态。其他的节点也是相似的情况。

      当然,我们也可以将整个过程的信息放在另外的一个全局对象中,所有的线程都去查找该全局对象来获取各自所需的信息,而不是采取这种分布式存储的方式。

      优点:采用 wait¬ify 机制而不采用轮询的机制,不会浪费CPU资源。执行效率较高。而且相对于“外”-主线程轮询的机制来说实时性更好。

      缺点:采用 Java 线程 Object 的锁机制,实现起来较为复杂。而且采取一种连锁触发的方式,如果其中某些线程异常,会导致所有其后继线程的挂起而造成整个 scenario 的运行失败。为了防止这种情况的发生,我们还必须建立一套线程监控的机制来确保其正常运行。

      延伸

      下面的图所要表达的是这样一种递归迭代的概念。例如在图1 中展示的那样,T1 这个节点表示的是一个线程。现在,忘掉线程这样一个概念,将 T1 抽象为一个过程,想象它是一个银河系,深入到 T1 中去,它也是一个许多子过程的集合,这些子过程之间的关系模式就如图 1 所示那样,可以用一个图来表示。

      图 7. 嵌套子过程

      可以想象一下这是怎样的一个框架,具有无穷扩展性的过程框架,我们只用定义各个过程之间的关系,我们不用关心过程是怎样运行的。事实上,可以在最终的节点上指定一个实际的工作,比如读一个文件,或者submit一个JCL job,或者执行一条sql statement。

      其实,按照某种遍历规则,完全可以将这种嵌套递归的结构转化成为一个一层扁平结构的图,而不是原来的分层的网状结构,但是我们不这样做的原因是基于以下的几点考虑:

      如果这样做,会导致图节点太多,边太多,令人眼花缭乱。

      不这样做更主要的原因是每一个场景,如图 7 中的 T1,T13,是状态聚集的一个单元,具有高复用性和可靠性。

      框架是高度抽象的,它实际的执行可以是分布式的,一个单元可以是一个系统,作为和其他系统的分界标志。

      实际上,这是一个状态聚集的层次控制框架,我们可以依赖此框架来执行自主运算。我们将在其它的文章中来讨论它的应用。

    总结

      本文介绍了一种 Java 多线程并发控制的框架,并给出了其两种实现的模型,它们有各自的优缺点,有各自的适用范围。当需要进行 Java 线程的并发控制的时候,可以作为参考。

    展开全文
  • 并发场景可能存在的需求之——任意编排 1 个执行单元的串行请求 2 个执行单元的并行请求 3 阻塞等待,串行的后面跟个并行 4 阻塞等待,个并行的执行完毕后才执行某个 5 串并行相互依赖...

    并发场景可能存在的需求之——任意编排

    1 多个执行单元的串行请求

     

    2 多个执行单元的并行请求

     

    3 阻塞等待,串行的后面跟多个并行

     

    4 阻塞等待,多个并行的执行完毕后才执行某个

     

    5 串并行相互依赖

     

    6 复杂场景

    并发场景可能存在的需求之——每个执行结果的回调

    传统的Future、CompleteableFuture一定程度上可以完成任务编排,并可以把结果传递到下一个任务。如CompletableFuture有then方法,但是却无法做到对每一个执行单元的回调。譬如A执行完毕成功了,后面是B,我希望A在执行完后就有个回调结果,方便我监控当前的执行状况,或者打个日志什么的。失败了,我也可以记录个异常信息什么的。

    此时,传统的就无能为力了。

    我的框架提供了这样的回调功能。并且,如果执行失败、超时,可以在定义这个执行单元时就设定默认值。

    并发场景可能存在的需求之——执行顺序的强依赖和弱依赖

    如上图的3,A和B并发执行,最后是C。

    有些场景下,我们希望A和B都执行完毕后,才能执行C,CompletableFuture里有个allOf(futures...).then()方法可以做到。

    有些场景下,我们希望A或者B任何一个执行完毕,就执行C,CompletableFuture里有个anyOf(futures...).then()方法可以做到。

    我的框架同样提供了类似的功能,通过设定wrapper里的addDepend依赖时,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。

    如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。

    并发场景可能存在的需求之——依赖上游的执行结果作为入参

    譬如A-B-C三个执行单元,A的入参是String,出参是int,B呢它需要用A的结果作为自己的入参。也就是说A、B并不是独立的,而是有结果依赖关系的。

    在A执行完毕之前,B是取不到结果的,只是知道A的结果类型。

    那么,我的框架也支持这样的场景。可以在编排时,就取A的结果包装类,作为B的入参。虽然此时尚未执行,必然是空,但可以保证A执行完毕后,B的入参会被赋值。

    并发场景可能存在的需求之——全组任务的超时

    一组任务,虽然内部的各个执行单元的时间不可控,但是我可以控制全组的执行时间不超过某个值。通过设置timeOut,来控制全组的执行阈值。

    并发场景可能存在的需求之——高性能、低线程数

    该框架全程无锁,没有一个加锁的地方。

    创建线程量少。

    如这样的,A会运行在B、C执行更慢的那个单元的线程上,而不会额外创建线程。

    总结

    该并发框架提供

    > 1 提供任何形式的串行、并行执行单元的组合。如a、b、c的串行,a、b的串行同时与c并行,a、b、c的并行
    > 2 为每个执行单元提供执行成功、失败、超时、异常的回调
    > 3 支持为单个执行单元设置异常、失败后的默认值
    > 4 支持为整个group(多个任意组合的执行单元)设置超时时间。单个执行单元失败,不影响其他单元的回调和最终结果获取。如果自己依赖的任务失败,则自己也立刻失败。
    > 5 整个group执行完毕或超时后,同步阻塞返回所有执行单元结果集,按添加的顺序返回list。也支持整个group的异步回调不阻塞主线程
    > 6 支持每个group独享线程池,或所有group共享线程池(默认)

    该并发框架已开源,https://gitee.com/tianyalei/asyncTool

     

    展开全文
  • 前言 经常会遇到一些性能问题,比如调用某个接口,可能要循环调用100...多线程常规的有两种实现方式,即继承Tread类,实现Runnable接口,但是这两种实现方式,有一个共同的问题,就是没有返回值,对于我们来说,获...

    前言
    经常会遇到一些性能问题,比如调用某个接口,可能要循环调用100次,并且需要拿到每一次调用的返回结果,通常我们都是放在for循环中一次次的串行调用,这种方式可想而知道有多慢,那怎么解决这个问题呢?

    多线程
    为了解决以上问题,我使用的方式是多线程。多线程常规的有两种实现方式,即继承Tread类,实现Runnable接口,但是这两种实现方式,有一个共同的问题,就是没有返回值,对于我们来说,获得每个线程的返回值,是个很困难的问题,因此不能用Tread类或Runnable接口,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允许有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到线程的执行结果

    案例
    假设需要给100个用户发送邮件,并需要每个用户的返回结果,先看下代码结构
    这里写图片描述

    CallableTemplate.java

    package com.mairuan.base.concurrent;
    
    import java.util.concurrent.Callable;
    
    /**
     * 多线程模板类
     * 
     * @author Administrator
     *
     * @param <V>
     */
    public abstract class CallableTemplate<V> implements Callable<V> {
        /**
         * 前置处理,子类可以Override该方法
         */
        public void beforeProcess() {
            System.out.println("before process....");
        }
        /**
         * 处理业务逻辑的方法,需要子类去Override
         * @return
         */
        public abstract V process();
        /**
         * 后置处理,子类可以Override该方法
         */
        public void afterProcess() {
            System.out.println("after process....");
        }
        @Override
        public V call() throws Exception {
            beforeProcess();
            V result = process();
            afterProcess();
            return result;
        }
    }
    

    CallableTemplate类实现了Callable接口,并实现了process方法,该类是一个抽象类,接收任意返回值的类型,beforeProcess方法为前置处理,afterProcess的后置处理,process为具体的业务逻辑抽象方法,该方法在子类中实现

    IConcurrentThreadPool.java

    package com.mairuan.base.concurrent;
    
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    
    public interface IConcurrentThreadPool {
        /**
         * 初始化线程池
         */
        void initConcurrentThreadPool();
    
        /**
         * 提交单个任务
         * 
         * @param task
         * @return
         */
        <V> V submit(CallableTemplate<V> task) throws InterruptedException,
                ExecutionException;
    
        /**
         * 提交多个任务
         * 
         * @param tasks
         * @return
         */
        <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
                throws InterruptedException, ExecutionException;
    }
    

    IConcurrentThreadPool是多线程接口类,声名了三个方法,initConcurrentThreadPool:初始化线程池,submit:提交单个任务的线程,并有返回值,invokeAll:提交多个任务的线程,并有返回值

    ConcurrentThreadPool.java

    package com.mairuan.base.concurrent.impl;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import com.mairuan.base.concurrent.CallableTemplate;
    import com.mairuan.base.concurrent.IConcurrentThreadPool;
    
    public class ConcurrentThreadPool implements IConcurrentThreadPool {
        private ThreadPoolExecutor threadPoolExecutor;
        // 核心线程数
        private int corePoolSize = 10;
        // 最大线程数
        private int maximumPoolSize = 20;
        // 超时时间30秒
        private long keepAliveTime = 30;
    
        @Override
        public void initConcurrentThreadPool() {
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                    new LinkedBlockingDeque<Runnable>());
        }
    
        @Override
        public <V> V submit(CallableTemplate<V> task) throws InterruptedException,
                ExecutionException {
            Future<V> result = threadPoolExecutor.submit(task);
    
            return result.get();
        }
    
        @Override
        public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
                throws InterruptedException, ExecutionException {
    
            List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
            List<V> resultList = new ArrayList<V>();
    
            for (Future<V> future : tasksResult) {
                resultList.add(future.get());
            }
    
            return resultList;
        }
    
    }
    

    ConcurrentThreadPool是创建线程池的实现类,用到了ThreadPoolExecutor线程池类及这个类的invokeAll方法和submit方法,这两个方法的返回值,都可以通过Future类的get方法获得

    ICallableTaskFrameWork.java

    public interface ICallableTaskFrameWork {
        <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
                throws InterruptedException, ExecutionException;
    }

    ICallableTaskFrameWork是定义的线程任务框架接口,所有的多线程调用,都通过该接口发起

    CallableTaskFrameWork.java

    package com.mairuan.base.concurrent.impl;
    
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    
    import com.mairuan.base.concurrent.CallableTemplate;
    import com.mairuan.base.concurrent.ICallableTaskFrameWork;
    import com.mairuan.base.concurrent.IConcurrentThreadPool;
    
    public class CallableTaskFrameWork implements ICallableTaskFrameWork {
    
        private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();
    
        @Override
        public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
                throws InterruptedException, ExecutionException {
    
            concurrentThreadPool.initConcurrentThreadPool();
    
            return concurrentThreadPool.invokeAll(tasks);
        }
    
    }
    

    CallableTaskFrameWork是ICallableTaskFrameWork 的实现类,在submitsAll实现方法中,通过调用线程池对象IConcurrentThreadPool接口的invokeAll方法来发起多线程的调用,这里注意一个,在submitAll实现方法中,我手动的调用了初始化线程池的方法concurrentThreadPool.initConcurrentThreadPool(),在真实的项目上,应该在应用启动的时候就调用该方法来初始化线程池

    测试类代码
    SendMessageService.java,假设这是一个发送邮件信息的服务类

    package com.mairuan.base.concurrent.test;
    
    public class SendMessageService {
        public void sendMessage(String email,String content){
            System.out.println("发送邮件。。。");
        }
    }

    SendMessageHander.java,多线程发送邮件的处理类

    package com.mairuan.base.concurrent.test;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.mairuan.base.concurrent.CallableTemplate;
    
    public class SendMessageHander extends CallableTemplate<Map<String, String>> {
    
        private String email;
        private String content;
        public SendMessageHander(String email,String content) {
            this.email = email;
            this.content = content;
        }
    
        @Override
        public Map<String, String> process() {
            SendMessageService sendMessageService = new SendMessageService();
            sendMessageService.sendMessage(email, content);
            Map<String, String> map = new HashMap<String, String>();
            map.put(email, content);
            return map;
        }
    
    }

    这个类继承了上面的CallableTemplate,我们要的返回值是Map,因此泛型类型是Map,在类中还重写了process方法,在方法中调用发送邮件的业务逻辑接口SendMessageService.sendMessage,并将返回结果组装成Map返回,这里我就简单处理了,将邮件地址及内容放在Map中直接返回了;另外还要注意这个类有个有参构造器,通过构建器可以接收需要传递进来的参数

    SendMessageTest.java,测试类

    package com.mairuan.base.concurrent.test;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.ExecutionException;
    
    import com.mairuan.base.concurrent.CallableTemplate;
    import com.mairuan.base.concurrent.ICallableTaskFrameWork;
    import com.mairuan.base.concurrent.impl.CallableTaskFrameWork;
    
    public class SendMessageTest {
        public static void main(String[] args) throws InterruptedException,
                ExecutionException {
            ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();
            List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();
            SendMessageHander sendMessageHander = null;
            // 将需要发送邮件的邮件地址及内容组装好,放在一个集合中
            for (int i = 0; i < 1000; i++) {
                sendMessageHander = new SendMessageHander("email" + i, "content"
                        + i);
    
                tasks.add(sendMessageHander);
            }
            //通过多线程一次性发起邮件,并拿到返回结果集
            List<Map<String, String>> results = callableTaskFrameWork
                    .submitsAll(tasks);
            // 解析返回结果集
            for (Map<String, String> map : results) {
                for (Entry<String, String> entry : map.entrySet()) {
                    System.out.println(entry.getKey() + "\t" + entry.getValue());
                }
            }
    
        }
    }
    

    运行结果
    这里写图片描述

    展开全文
  • JAVA多线程并发

    千次阅读 多人点赞 2019-09-18 12:14:29
    JAVA多线程并发1 JAVA并发知识库2 JAVA 线程实现/创建方式2.1 继承 Thread 类2.2 实现 Runnable 接口2.3 Callable 、Future 、ExecutorService 有返回值线程2.4 基于线程池的方式2.4.1 4种线程池2.4.1.1 ...
  • 多线程并发内部实现机制

    千次阅读 2016-11-02 17:18:10
    多线程和共享内存线程模型争用及并发访问如何能够打破不变量作为争用标准解决方案的锁定何时需要锁定如何使用锁定;理解开销锁定如何能够各行其道   十年前,只有核心系统程序员会担心在多个执行线程的情况下...
  • Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括ThreadPool,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。并发编程的一种编程方式是把任务...
  • Python多线程并发接口自动化框架

    千次阅读 2019-12-12 11:38:39
    整体框架使用的是:Python+Unittest+Requests+PyMysql+HTMLReport 主要包含以下几个模块: 1. Business:与业务相关的公共模块 get_login_token:实时获取token from Business.url import url_login import ...
  • 多线程并发之Semaphore(信号量)使用详解

    万次阅读 多人点赞 2019-01-02 17:04:03
    专题相关文章: 从内存可见性看Volatile、原子变量和CAS算法 多线程并发之CountDownLatch(闭锁)使用详解 多线程并发之显示锁Lock与其通信...多线程并发之线程池Executor与Fork/Join框架 多线程并发之JUC 中的 Atomi...
  • Disruptor多线程并发构架

    千次阅读 2017-05-26 15:09:51
    Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其...Disruptor它是一个开源的并发框架,并获得20
  • 三、:locked_with_key:Java多线程并发框架:unlocked: Java多线程与并发框 (第 13 篇) 深入理解:Fork/Join框架 Java多线程与并发框 (第 14 篇) 深入理解:原子操作 Java多线程与并发框 (第 15 篇) 深入理解...
  • java高并发之有返回值的多线程并发

    千次阅读 2017-05-27 11:49:40
    需求: 工作流策略的一个节点,需要查询13个第三方的...以下是实现参考的例子JAVA多线程实现的四种方式Java多线程实现方式主要有四种:继承Thread类、实现Runnable接口、实现Callable接口通过FutureTask包装器来创建Th
  • libevent 多线程实现并发

    万次阅读 2017-12-01 11:23:16
    当你看到这篇文章时,想必你对libevent已经有了足够的了解,笔者在此就不做描述了,直接进入正题。...如果采用中间件的方式管理个Redis实例,不仅可以避免单点机器内存不够用的情况,也使性能得到大幅提升。经过R
  • 多线程并发 (二) 了解 Thread

    千次阅读 多人点赞 2019-12-30 14:46:20
    章节:多线程并发 (一) 了解 Java 虚拟机 - JVM多线程并发 (二) 了解 Thread多线程并发 (三) 锁 synchronized、volatile多线程并发 (四) 了解原子类 AtomicXX 属性地址偏移量,CAS机制多线程并发 (五) ReentrantLock...
  • Java 多线程并发——CAS 操作和 AQS 框架

    万次阅读 多人点赞 2016-06-03 09:10:26
    所以多线程环境下,可以使用 synchronized 修饰,value++; 的操作变成了原子性操作,保证了线程安全: public class VolatileVisibility { private static volatile int value = 0; public synchronized static void...
  • 目录一, JAVA 多线程并发1,JAVA 并发知识库2,JAVA 线程实现/创建方式(1) 继承 Thread 类(2)实现 Runnable 接口。(3)ExecutorService、Callable、Future 有返回值线程(4)基于线程池的方式(5)4 种线程池...
  • Windows下的多线程编程
  • java多线程并发机制

    千次阅读 2017-04-27 10:59:33
    一、多线程 1、操作系统有两个容易混淆的概念,进程和线程。 进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种...
  • Java 多线程 并发编程

    万次阅读 多人点赞 2013-08-28 01:42:48
    一、多线程 1、操作系统有两个容易混淆的概念,进程和线程。 进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种...
  • 模拟多线程并发订单处理功能实现

    千次阅读 2019-09-17 11:59:35
    在目前的应用开发中,有一个需要自动处理某状态订单的功能,需要和手动处理功能并行不冲突和重复处理。目前大致写了一下demo实现,记录一下,方便以后有时间复习和改进。实现思路也比较简单,就是...
  • 多线程并发1.单线程执行2.多线程执行3.守护线程4.阻塞线程5.并发测试框架三级目录 1.单线程执行 Python的内置模块提供了两个线程模块:threading 和thread。 thread:是原生的 threading是扩展的 2.多线程执行 3.守护...
  • 一个多线程并发排队/** * 线程请求队列,用来处理多线程并发排队. * 实际用的是先进先出的堆栈 Queue,默认队大小为128 * @author guishuanglin 2008-11-3 * */public class ConcurrentQueue { ...
  • 研究了程序并发过程中的同步机制和交互通信机制,比较了基于操作系统级和基于Java多线程并发机制的实现结构,总结了并发程序中死锁预防的一些编程规则和策略。所构造的一个具有完全意义上的并发同步的框架实例有...
  • java有返回值的多线程并发

    千次阅读 2019-03-20 10:32:34
    Java多线程实现方式主要有四种:继承Thread类、实现Runnable接口、实现Callable接口通过FutureTask包装器来创建Thread线程、使用ExecutorService、Callable、Future实现有返回结果的多线程。 其中前两种方式线程执行...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 198,099
精华内容 79,239
关键字:

能实现多线程并发的框架