异步 订阅
异步:一种通讯方式,对设备需求简单。我们的PC机提供的标准通信接口都是异步的。异步双方不需要共同的时钟,也就是接收方不知道发送方什么时候发送,所以在发送的信息中就要有提示接收方开始接收的信息,如开始位,同时在结束时有停止位。异步的另外一种含义是计算机多线程的异步处理。与同步处理相对,异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。但此处需要明确的是:异步与多线程与并行不是同一个概念. 展开全文
异步:一种通讯方式,对设备需求简单。我们的PC机提供的标准通信接口都是异步的。异步双方不需要共同的时钟,也就是接收方不知道发送方什么时候发送,所以在发送的信息中就要有提示接收方开始接收的信息,如开始位,同时在结束时有停止位。异步的另外一种含义是计算机多线程的异步处理。与同步处理相对,异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。但此处需要明确的是:异步与多线程与并行不是同一个概念.
信息
外文名
Asynchronous
中文名
异步
异步C#中的异步
异步的概念和同步相对。异步,线程,并行.三个概念是不同的.线程是进程中某个单一顺序的控制流。也被称为轻量进程(lightweight processes).计算机科学术语,指运行中的程序的调度单位. [1]  当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。以CAsyncSocket类为例(注意,CSocket从CAsyncSocket派生,但是其功能已经由异步转化为同步),当一个客户端通过调用Connect函数发出一个连接请求后,调用者线程立刻可以朝下运行。当连接真正建立起来以后,socket底层会发送一个消息通知该对象。这里提到执行部件和调用者通过三种途径返回结果:状态、通知和回调。可以使用哪一种依赖于执行部件的实现,除非执行部件提供多种选择,否则不受调用者控制。如果执行部件用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一种很严重的错误)。如果是使用通知的方式,效率则很高,因为执行部件几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。一般指并行计算,是说同一时刻有多条指令同时被执行,这些指令可能执行于同一CPU的多核上,或者多个CPU上,或者多个物理主机甚至多个网络中. [2]  与同步相对应,异步指的是让CPU暂时搁置当前请求的响应,处理下一个请求,当通过轮询或其他方式得到回调通知后,开始运行。多线程将异步操作放入另一线程中运行,通过轮询或回调方法得到完成通知,但是完成端口,由操作系统接管异步操作的调度,通过硬件中断,在完成时触发回调方法,此方式不需要占用额外线程。 [3] 
收起全文
精华内容
下载资源
问答
  • 异步
    千次阅读
    2021-11-14 19:52:04

    JavaScript实现异步的五种实现方法



    前言

    一、同步和异步是什么?

    1.同步任务:

    js中的同步任务就是按照顺序执行(可以理解为A------>B,只有一条道路可以到达B)

    2.异步任务:

    异步肯定与同步不一样.简单来说异步任务就像:到达终点的路多了几条(A地点------->B地点,中间有很多条路可以走.不会造成A----->B地点的主路压力增大)
    异步任务 setTimeout setInterval xmlHttprequest 等

    二、异步方法有哪些(5种):

     	1.webwork 为了让任务在多线程去执行,防止复杂逻辑阻塞线程
        2.promise 为了让异步任务顺序执行,解决地狱回调
        3.window.fetch H5新增的拉取资源的 api  1.通过.then() 2.数据序列化 3.通过.then()获取数据
        4.jquery.deffered对象 为了让创建的deffered对象的时候和异步任务属于同步代码的结果,并且能够保证deffered和promise进行转换
        5.async和awit 实质上是对Promise()对象提供的 -语法糖-. 让代码更简洁.通过同步代码的形式来实现异步操作
    
    

    1.webWork(创建分线程)

    index.js为加载到html页面中的主线程(js文件)

    work.js为在index中创建的分线程


     1.werbwork的实现方法
        ①在html中引入index.js主线程
        ②在index.js中 创建分线程  var w =new webwork('work.js')
        ③在index.js中 通过 w.postmesage('数据') 向子线程发送数据
        ④在work.js中  通过onmessage=function(ev){ev.data  postmessage(a)} 接受主线程发送过来的ev.data数据
        ⑤在index.js中 通过onmessage=function(ev){ev.data} ev.data 接受  a 的值.
    

    index.js(主)代码如下(示例):

    //创建一个分线程 work.js
    var w = new Worker('work.js')
    
    //主线程向分线程发送数据
    w.postMessage(40);
    
    //接受分线程转递过来的数据
    w.onmessage = function(ev) {
        console.log(ev);
        console.log(ev.data);
    }
    

    work.js(分线程)代码如下(示例):

    function fibo(n) {
        var num = 1;
        var num1 = 1;
        var sum = 0;
        for (let i = 2; i < n; i++) {
            sum = num + num1;
            num = num1;
            num1 = sum;
    
        }
        return sum;
    }
    
    onmessage = function(ev) {
        console.log(ev);
        this.postMessage(fibo(ev.data))
    }
    

    2.promise(es6方法)

    介绍步骤 如下:

    2.promise的实现方法(处理异步),解决地狱回调
        ①创建promise实例   var p =new promise((reslove,reject)=>{reslove('res')});
        ②p.then(res=>  console.log(res))// reslove(变量) 对应的是 then(变量 => {变量的操作})
    
    
    	//promise 是对异步任务处理的对象,判断异步任务有没有成功执行的状态
        //resolve , reject 也是两个参数
        //resolve 是调用成功的函数  reject 是调用失败的函数
        //如果不使用 resove 和 reject 两个函数 状态为pendding ---resolve之后变成了 成功状态 fulfilled
        //promise 的then()  在成功状态的时候被触发  resolve(res)  res可以传递参数
        //promise 的catch()  在失败状态的时候被触发 reject(res)  res可以传递参数
        
    

    代码如下(示例):

     var promise = new Promise((reslove, reject) => {
            setTimeout(() => {
                console.log('执行大量代码');
            }, 2000);
            //resolve('我成功了'); //resolve 之后变成了 成功状态 fulfilled
            // reject('我失败了'); //reject 之后promise状态变为 失败状态
        });
        //promise 的then()  在成功状态的时候被触发  resolve(res)  res可以传递参数
        //promise 的catch()  在失败状态的时候被吃法 reject(err)  err可以传递参数
        promise.then((res) => {
            console.log(res);
        }).catch((err) => {
            console.log(err);
        })
    

    3.Async await 异步的实现方法

    介绍步骤 如下(相当于promise用法的语法糖):

    //async 和 await 的使用场景
        //async 与 await 必须连用 , await只能用在async函数里面
        //await作用等待异步任务完成,如果请求数据成功了,执行then函数
        //  async function getData() {
        //      console.log('000');
        //      var book1 = await window.fetch('https://autumnfish.cn/top/artists');
        //      //第二个请求
        //      console.log('1111');
        //      var book2 = await $.get('https://autumnfish.cn/top/artists');
        //      console.log('2222');
        //      return [book1.json(), book2]
        //  };
        //  console.log('aaa');
        //  getData().then(res => {
        //      return res[0];
        //  }).then(res => console.log(res)).catch(err => {
        //      console.log(err)
        //  })
    

    4.window.fetch() 的实现方法

      <script>
        //拉取资源 fetch是 es6新增的一个解决异步的方案, 拉取网络资源
    
        var url = 'http://123.207.32.32:8000/home/data?type=new&page1';
        //fetch() 返回响应的promise 
        // 第一层then返回的是:响应的报文对象   
        //第二层:如果想要使用第一层的数据,必须把数据序列化  res.json() 返回的对象是一个 Promise对象再进行then()
    
        window.fetch(url).then(res => {
            console.log(res);
            //将数据序列化
            return res.json()
        }).then(res => console.log(res));
    </script>
    

    5.jquery中的deffered对象 的实现方法

    <script>
        function cook() {
            console.log('开始做饭');
            var def = $.Deferred() //创建一个jquery延时对象
            setTimeout(() => {
                console.log('做完了');
                /*   def.resolve('烩面'); */
                def.reject('做饭失败')
            }, 2000);
            //返回Deferred()  吧def实例转化成promise实例返回
            //返回可以利用then catch finally 等这些函数
            return def.promise()
        }
        cook().then(res => {
            console.log(res);
        }).catch(err => {
            console.log(err);
        });
    
        /*    $.get('http://123.207.32.32:8000/home/data?type=new&page1').then(res => {
               console.log(res);
           }).catch(err => console.log(err)); */
    
        $.get('http://123.207.32.32:8000/home/data?type=new&page1').done(res => {
            console.log(res);
        }).fail(err => console.log(err)).then(() => alert(1213))
    </script>
    

    总结

    提示:这里对文章进行总结:

    异步处理方法

    1.webwork 为了让任务在多线程去执行,防止复杂逻辑阻塞线程
    2.promise 为了让异步任务顺序执行,解决地狱回调
    3.window.fetch H5新增的拉取资源的 api  1.通过.then() 2.数据序列化 3.通过.then()获取数据
    4.jquery.deffered对象 为了让创建的deffered对象的时候和异步任务属于同步代码的结果,并且能够保证deffered和promise进行转换
    5.async和awit 实质上是对Promise()对象提供的 -语法糖-. 让代码更简洁.通过同步代码的形式来实现异步操作
    
    
    1.werbwork的实现方法
    ①在html中引入index.js主线程
    ②在index.js中 创建分线程  var w =new webwork('work.js')
    ③在index.js中 通过 w.postmesage('数据') 向子线程发送数据
    ④在work.js中  通过onmessage=function(ev){ev.data  postmessage(a)} 接受主线程发送过来的ev.data数据
    ⑤在index.js中 通过onmessage=function(ev){ev.data} ev.data 接受  a 的值.
    
    2.promise的实现方法(处理异步),解决地狱回调
    ①创建promise实例   var p =new promise((reslove,reject)=>{reslove('res')});
    ②p.then(res=>  console.log(res))
    ③  // reslove(变量) 对应的是 then(变量 => {变量的操作})
    
    3.window.fetch() 的实现方法
    ①window.fetch('url').then(
         console.log(res)
         return res.json() //必须序列化  返回的是一个respones对象
    )
    4.async 和 awit的使用方法:
     async function f2() {
        var books = await $.get('https://autumnfish.cn/top/artists');
        return books
    };
    f2().then(res => {
        console.log(res);
    }) 
    5.jquery的jquery.deffered对象上面有介绍,就不做总结了
    
    更多相关内容
  • 使用Win32API实现Windows下异步串口通讯:目录:1. 异步非阻塞串口通讯的优点2. 异步非阻塞串口通讯的基本原理3. 异步非阻塞串口通讯的基础知识4. 异步非阻塞串口通讯的实现步骤
  • 小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小...
  • 本文以实例的方式讲解了异步清零和同步清零置数区别以及区分方法,希望对你有所帮助。
  • 自己搭建的异步电机矢量控制仿真,由于异步电机的动态数学模型是一个高阶、非线性、强耦合的多变量系统。上世纪60年代末由达姆斯塔特工业大学(TU Darmstadt)的K.Hasse提出。在70年代初由西门子工程师F.Blaschke在...
  • 异步调用流程图

    2019-08-09 01:40:18
    NULL 博文链接:https://m635674608.iteye.com/blog/2198181
  • Labview异步调用示例

    2019-03-31 20:58:53
    异步调用示例,Labview异步调用示例
  • winform的一个简单易用异步加载等待效果,调用方式: this.AsyncWaitDo(delegate{ //异步等待操作}, delegate{//回调操作}); this是要进行异步等待的窗体
  • 异步FIFO,Verilog源码

    2018-10-31 10:22:54
    异步FIFO,Verilog源码实现异步FIFO,异步FIFO的原理,
  • gohttp:支持异步请求的Go的HTTP客户端
  • 在Matlab中simulink环境中搭建了基于SVPWM的三相异步电机矢量控制系统在Matlab中simulink环境中搭建了基于SVPWM的三相异步电机矢量控制系统在Matlab中simulink环境中搭建了基于SVPWM的三相异步电机矢量控制系统在...
  • C#异步回调

    2016-09-16 15:28:40
    .net的一个同步和异步的例子,不是用Task等线程,而是用AsyncResult实现的异步方法。
  • 易语言同步异步套接字模块源码 系统结构:Call,取字节集指针,异步客户_初始,异步客户_销毁,异步客户_连接,异步客户_断开,异步客户_发送数据,异步客户_取回数据,异步客户_回调函数,同步
  • labview2015关于异步调用功能的研究工程,主要用于解决主线程负责快速轮询,多个子线程负责具体的任务处理并反馈给主线程(子线程需要长时间处理任务),解决单纯的可重复VI不能实现并行的功能。
  • C#编写的TCP异步通信,采用Socket实现,(还可用C#对Socket的封装TcpClient和TcpListener实现) 包含服务端及客户端完整代码
  • 主要介绍了Springboot @Async 异步方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下
  • C# 异步刷新控件

    2016-09-30 08:53:16
    异步刷新textbox和datagridview控件
  • 异步写数据库

    2015-01-29 19:30:04
    该代码,描述如何异步对数据库进行异步操作,读和修改操作也类似,知识给到一个简单的代码实例
  • 异步刷新进度条.rar

    2019-07-26 17:51:04
    异步更新UI进度条,刷新UI进度条,线程异步刷新进度条
  • 〖Python语法进阶篇⑧〗- 异步关键字与gevent包

    千次阅读 多人点赞 2022-04-13 23:42:28
    在之前的章节中,我们学习了多线程与多线程的使用方法。它们有一个共同的特点,就是在主进程或者说主线程中创建多个子进程或子线程,子进程与子线程的...今天我们就来学习多进程与多线程的另一个知识点 ---> `异步` 。
    万叶集
    🎉 隐约雷鸣,阴霾天空。 🎉
    🎉 但盼风雨来,能留你在此。 🎉

    前言
    ✌ 作者简介:渴望力量的哈士奇 ✌,大家可以叫我 🐶哈士奇🐶 ,一位致力于 TFS 赋能的博主 ✌
    🏆 CSDN博客专家认证、新星计划第三季全栈赛道 top_1 、华为云享专家、阿里云专家博主 🏆
    📫 如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
    💬 人生格言:优于别人,并不高贵,真正的高贵应该是优于过去的自己。💬
    🔥 如果感觉博主的文章还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦


    专栏系列(点击解锁)学习路线指引知识定位
    🔥Python全栈白皮书🔥 零基础入门篇 以浅显易懂的方式轻松入门,让你彻底爱上Python的魅力。
    语法进阶篇 主要围绕多线程编程、正则表达式学习、含贴近实战的项目练习 。
    自动化办公篇 实现日常办公软件的自动化操作,节省时间、提高办公效率。
    自动化测试实战篇 从实战的角度出发,先人一步,快速转型测试开发工程师。
    数据库开发实战篇 更新中
    爬虫入门与实战 更新中
    数据分析篇 更新中
    前端入门+flask 全栈篇 更新中
    django+vue全栈篇 更新中
    拓展-人工智能入门 更新中
    网络安全之路 踩坑篇 记录学习及演练过程中遇到的坑,便于后来居上者
    网安知识扫盲篇 三天打鱼,不深入了解原理,只会让你成为脚本小子。
    vulhub靶场漏洞复现 让漏洞复现变得简单,让安全研究者更加专注于漏洞原理本身。
    shell编程篇 不涉及linux基础,最终案例会偏向于安全加固方向。 [待完结]
    WEB漏洞攻防篇 2021年9月3日停止更新,转战先知社区等安全社区及小密圈
    渗透工具使用集锦 2021年9月3日停止更新,转战先知社区等安全社区及小密圈
    点点点工程师 测试神器 - Charles 软件测试数据包抓包分析神器
    测试神器 - Fiddler 一文学会 fiddle ,学不会倒立吃翔,稀得!
    测试神器 - Jmeter 不仅是性能测试神器,更可用于搭建轻量级接口自动化测试框架。
    RobotFrameWork Python实现的自动化测试利器,该篇章仅介绍UI自动化部分。
    Java实现UI自动化 文档写于2016年,Java实现的UI自动化,仍有借鉴意义。
    MonkeyRunner 该工具目前的应用场景已不多,文档已删,为了排版好看才留着。



    在之前的章节中,我们学习了多线程与多线程的使用方法。它们有一个共同的特点,就是在主进程或者说主线程中创建多个子进程或子线程,子进程与子线程的运行不会影响主进程或主线程的代码执行。从而使得代码在可以在多个进程或多个线程的工作下提高工作效率。今天我们就来学习多进程与多线程的另一个知识点 —> 异步

    异步 其实在我们前面章节的进程池与线程池的时候已经间接的接触过了,今天就好好的了解一下,看看 异步 在平时的工作中都有哪些优势和使用场景。

    🐳 初探异步

    🐬 什么是异步与异步的好处

    我们曾经在 Python 的脚本结构介绍过,python脚本的执行顺序是自上而下逐行执行的(处于下方的代码会等待上方的代码执行完之后才会执行)。

    不过 异步 则有些不同,举个例子。比如我们当前的脚本中有5个任务,其中第二个任务需要执行比较长的时间才会结束,当第二个任务执行完毕之后才会去执行后面的任务,这样必然会消耗比较长的时间。

    当我们希望任务二虽然需要很长的一段运行时间却又不影响后续的任务执行的时候,就需要将任务二从同步的状态变为 异步 执行的状态。可以认为 异步的执行 不会影响或者说不会阻塞主程序的执行。

    正常状态下,任务二的执行需要任务一完成之后才会开始。相对于任务二来说,任务一等于阻塞了任务二的执行。而如果任务二是异步的,当任务二执行完毕之后,任务四不会受到任务三的影响。也就是说任务三不会阻塞任务四 ,可以继续的正常执行。

    这就是 异步 ,听起来好像与 多进程、多线程 类似,的确如此。多进程、多线程与异步之间的关系有点类似于兄弟之间的关系似的。接下来让我们看一下 异步与多进程、多线程之间有何相同又有何不同。

    🐬 异步与多线程、多进程的对比

    首先要确定一下身份,异步 也属于一种线程,只不过它属于一种轻量级的线程,我们也把它叫做 协程通过前面章节的学习我们知道线程是进程下的一部分,同样的 "协程" 也是进程下的一部分。 而与多进程、多线程不同的是,多进程与多线程无法获取函数的返回值,但是 “异步” 是可以获取到函数的返回值的(之前章节的进程池与线程池的演示效果,其实就是 "异步" 的效果,所以它们可以获取返回值。)。

    听起来似乎 “异步” 更适合平时的使用,其实不然,它们都有各自擅长使用的场景和条件。

    比如 “多进程与多线程” 可以随时创建使用,虽然可能 多进程 的每次创建都会消耗一定的资源。相比于 “多进程与多线程” 来说, “异步” 的执行就有一些苛刻了,他们必须保证主进程是在异步的情况下,才可以使用。比如后续的 WEB 开发阶段,如果启动WEB服务是一个异步的服务,我们才可以比较轻松的使用异步。

    另外,异步的要求是执行的过程中所有的程序都需要是异步才可以使用。(这对于初学者来说不太好理解,没关系。现阶段只需要对此有一个了解即可,等到 WEB开发阶段的时候可能就会豁然开朗…[不开朗就重新学一遍!])

    可能大家会有个疑问,我们目前还没有接触 WEB 开发,是不是就没有办法使用异步了呢?其实不是,今天的章节也会涉及到如何在脚本中使用异步的案例。因为 异步可以获取返回值 ,所以 异步在平时的工作中更适合于文件读写相关需要获取返回值的场景 ;而多进程与多线程则更适合业务方向的处理,不需要返回值的相关工作。

    🐳 异步的关键字与模块

    现在我们对 异步的相关知识有一个大体上的了解,接下来就学习一下 Python 中的关键字 async 与 await,它们是在 Python 3.4之后才引进的新的功能。

    🐬 异步关键字 - async 与 await

    • async 的功能介绍:代表定义异步;无论是多线程、多进程还是异步,都是对函数进行操作。所以如果想要函数编程异步,就需要在函数的前面加上 async 关键字。 示例如下:

      • async def test():
            return 'test'
        
    • await 的 功能介绍:当定义了异步函数之后,需要使用 await 关键字来执行异步函数。示例如下:

      • async def deal ():
            result = await test()	# 在 test() 函数之前添加关键字 await
        
      • 这里需要注意的是:如果 test() 没有使用 async 关键字进行异步声明,则无法通过使用 await 进行异步的调用。

      • 这个时候大家可能又会有个疑问了,main 主函数作为程序的入口它无法定义 async,这个时候又怎么能执行异步函数呢?如果是在异步的 WEB服务中自然不需要有这些顾虑,但现阶段我们是在脚本中运行就需要一个帮手的帮助了,就是 Python 中的内置模块 — async 模块。通过 async 模块 来调用异步函数的执行,async 模块有很多异步的功能,这里仅介绍两个功能,来帮助我们认识异步与其使用方法。

    🐬 asyncio模块 - gather 与 run 函数的使用

    函数名介绍参数返回值
    gather将异步函数批量执行asyncfunc…List - 函数的返回结果
    run执行主异步函数[task]执行函数的返回结果
    • gather 函数:可以将异步函数一起执行,不论数量是多少,并且可以按照顺序获取这些异步函数的返回值:它的返回值是一个列表;它的参数可以传入多个声明了 async 关键字的函数。
    • run 函数:可以执行一个单独的声明了 async 关键字的函数,并且获取这个异步函数的返回值

    PS:需要注意的是如果想要使用异步那么我们的整个环境都需要是异步才行。

    接下来看一个小案例:

    # coding:utf-8
    
    
    import time
    import random
    import asyncio
    
    
    async def test_a():   # 定义一个测试函数 test_a
        for i in range(5):
            _start_time = time.time()
            time.sleep(random.random() * 2)
            _end_time = time.time()
            print('这是 \'test_a()\' 函数的第 {} 循环,随机休眠时间为:{}'.format(i, _end_time - _start_time))
    
    async def test_b():   # 定义一个测试函数 test_a
        for i in range(5):
            _start_time = time.time()
            time.sleep(random.random() * 2)
            _end_time = time.time()
            print('这是 \'test_b()\' 函数的第 {} 循环,随机休眠时间为:{}'.format(i, _end_time - _start_time))
    
    async def main():   # 利用 async 关键字 异步声明一个 main 函数
        result = await asyncio.gather(      # 调用 asyncio模块的gather函数 并传入 test_a 与 test_b 函数,并获取返回值。
            test_a(),
            test_b()
        )
        print(result)
    
    
    if __name__ == '__main__':      # 在脚本主函数入口,调用 asyncio模块的run函数 启动上面 异步声明的main函数
        asyncio.run(main())
    

    该脚本等于说模拟 main 函数是一个主进程,并且该主进程已经通过 async 关键字声明了异步;接下来使用 asyncio模块的run函数来调用,从而达到进程下都是一个异步环境的效果。

    运行效果如下:


    这里发现一个问题,脚本的运行结果依然是同步运行的,并没有达到我们想要的异步效果。这是为什么呢?其实问题就出在了 time.sleep() 上面,因为 time.sleep() 实际上是 CPU级别的阻塞 ,如果CPU阻塞了也是无法实现异步效果的。 所以这里就需要改为 asyncio.sleep(random.random() ,但由于 asyncio.sleep() 也是异步的,所以还需要加上 await 才可以。


    修改后的脚本如下:

    # coding:utf-8
    
    
    import time
    import random
    import asyncio
    
    
    async def test_a():   # 定义一个测试函数 test_a
        for i in range(5):
            _start_time = time.time()
            await asyncio.sleep(random.random() * 2)    # time.sleep() 是CPU级别的阻塞所以需要修改为 asyncio.sleep() 实现异步
                                                        # 但由于 asyncio.sleep() 也是异步的,所以还需要加上 await 才可以
            _end_time = time.time()
            print('这是 \'test_a()\' 函数的第 {} 次循环,随机休眠时间为:{} 秒'.format(i+1, _end_time - _start_time))
        return '\'test_a\' 函数运行结束'
    
    async def test_b():   # 定义一个测试函数 test_a
        for i in range(5):
            _start_time = time.time()
            await asyncio.sleep(random.random() * 2)
            _end_time = time.time()
            print('这是 \'test_b()\' 函数的第 {} 次循环,随机休眠时间为:{} 秒'.format(i+1, _end_time - _start_time))
        return '\'test_b\' 函数运行结束'
    
    async def main():   # 利用 async 关键字 异步声明一个 main 函数
        result = await asyncio.gather(      # 调用 asyncio模块的gather函数 并传入 test_a 与 test_b 函数,并获取返回值。
            test_a(),
            test_b()
        )
        print(result)
    
    
    if __name__ == '__main__':      # 在脚本主函数入口,调用 asyncio模块的run函数 启动上面 异步声明的main函数
        start_time = time.time()
        asyncio.run(main())
        end_time = time.time()
        print('共计耗时为:%s' % (end_time - start_time))
    

    运行效果如下:

    这次的运行效果我们可以看到已经实现了异步的效果,而且耗时上也比上次的执行减少到了 5秒钟。

    这里大家可以将异步的进程号和主函数的进程号也打印出来,届时会发现异步的进程号与主函数的进程号是一样的。也就证明了异步实际上一种类似于线程的存在,就像文章开头说的,它是一个轻量级的微小线程(我们也把异步叫做协程),其实协程还有更多的执行的原理,我们会在未来学习了更高级的知识之后再去理解。


    🐳 第三方异步包 - gevent

    接下里让我们继续看一下今天的第二个异步包 ---> gevent 。 gevent 为 Python 提供异步支持的时间要早于 async ,早在 Python 2.x 的时代它就已经存在了。

    首先,来看一下如何使用 gevent

    通过 pip install gevent 来安装 gevent 包。

    如果是使用 Windows 系统的话,还需要安装 Microsoft Visual C++ 的依赖

    有些 Linux 环境也会需要依赖 wheel 的包,pip install wheel

    🐬 gevent 模块的常用方法

    来看一下 gevent 常用的两个函数

    函数名介绍参数返回值
    spawn创建协程对象Func, args协程对象
    joinall批量处理协程对象[spawnpbj][spawnpbj]


    接下来我们根据上文的异步脚本修改为通过 gevent 实现的方式:

     # coding:utf-8
    
    
    import os
    import time
    import random
    import asyncio
    import gevent
    
    
    def gevent_test_a():   # 定义一个测试函数 test_a
        for i in range(5):
            _start_time = time.time()
            gevent.sleep(random.random() * 2)    # time.sleep() 是CPU级别的阻塞所以需要修改为 gevent.sleep() 实现异步
            _end_time = time.time()
            print('这是 \'gevent_test_a\' 函数的第 {} 次循环, 随机休眠时间为:{} 秒, 进程号为:{} 。'.format(i+1, _end_time - _start_time, os.getpid()))
        return '\'gevent_test_a\' 函数运行结束'
    
    def gevent_test_b():   # 定义一个测试函数 test_a
        for i in range(5):
            _start_time = time.time()
            gevent.sleep(random.random() * 2)
            _end_time = time.time()
            print('这是 \'gevent_test_b\' 函数的第 {} 次循环, 随机休眠时间为:{} 秒, 进程号为:{} 。'.format(i+1, _end_time - _start_time, os.getpid()))
        return '\'gevent_test_b\' 函数运行结束'
    
    
    if __name__ == '__main__':      # 在脚本主函数入口,调用 asyncio模块的run函数 启动上面 异步声明的main函数
        start_time = time.time()
    
        gevent_test_a = gevent.spawn(gevent_test_a)
        gevent_test_b = gevent.spawn(gevent_test_b)
        gevent_lists = [gevent_test_a, gevent_test_b]
        result = gevent.joinall(gevent_lists)
        print(result)
    
        end_time = time.time()
        print('共计耗时为:%s' % (end_time - start_time), '主函数的进程号为:%s' % os.getpid())
    

    运行结果如下:



    🐳 异步案例 - 捡豆子

    🐬 async 关键字实现 “捡豆子”

    代码示例如下:

    # coding:utf-8
    
    import random
    import asyncio
    
    """
        async 定义异步函数
        await 执行异步
        gather 将异步函数批量执行
        run 执行主异步函数
    """
    
    
    beans = list(range(1, 51))      # 豆子总数
    
    
    async def child_a():
        child_a_beans = []
        while 1:
            date = random.choice(beans)
            child_a_beans.append(date)
            beans.remove(date)
            await asyncio.sleep(random.random())
            if not beans:
                break
        # global child_a_beans
        return 'child_a捡到的豆子有: {} ,共计: {} 个'.format(child_a_beans, len(child_a_beans)), len(child_a_beans)
    
    
    async def child_b():
        child_b_beans = []
        while 1:
            date = random.choice(beans)
            child_b_beans.append(date)
            beans.remove(date)
            await asyncio.sleep(random.random())
            if not beans:
                break
        # global child_b_beans
        return 'child_a捡到的豆子有: {} ,共计: {} 个'.format(child_b_beans, len(child_b_beans)), len(child_b_beans)
    
    
    async def main():
        result = await asyncio.gather(
            child_a(),
            child_b())
        return result
    
    
    if __name__ == '__main__':
        result = asyncio.run(main())
        # print(type(result[0][-1]))
        # print(result[1][-1])
        print("a_beans:", result[0])
        print("b_beans:", result[1])
    
    
        if result[0][1] > result[1][1]:
            print('child_a赢了')
        elif result[0][1] < result[1][1]:
            print('child_b赢了')
        else:
            print('child_a 与 child_b 平局')
    

    执行结果如下:



    🐬 gevent 包实现 “捡豆子”

    代码示例如下:

    # coding:utf-8
    
    import random
    import gevent
    
    """
        spawn 创建协程对象
        joinall 批量处理协程对象
    """
    
    # 豆子总数
    beans = list(range(1, 51))
    
    
    def child_a():
        a_beans = []
        while 1:
            date = random.choice(beans )  # 随机获取豆子
            a_beans.append(date)  # 把随机获取到的豆子添加到a_beans列表中
            beans.remove(date)  # 并删除获取到的豆子
            gevent.sleep(random.random())  # 执行随机阻塞时间   注意:随机时间越长获取到的豆子就越多
            #gevent.sleep(0.1) 阻塞时间相等获取到的豆子数量也相等
            if not beans:
                break
    
        return 'child_a捡到的豆子有%s,总共%s个' % (a_beans, len(a_beans ))
    
    
    def child_b():
        b_beans = []
        while 1:
            date = random.choice(beans)
            b_beans.append(date)
            beans.remove(date)
            gevent.sleep(random.random())
            # gevent.sleep(0.1) 阻塞时间相等获取到的豆子数量也相等
            if not beans:
                break
        return 'child_b捡到的豆子有%s,总共%s个' % (b_beans, len(b_beans))
    
    
    if __name__ == '__main__':
        g_a = gevent.spawn(child_a)
        g_b = gevent.spawn(child_b)
        result = gevent.joinall([g_a, g_b])
        print("a_beans", result[0].value)
        print("b_beans", result[1].value)
    

    执行结果如下:



    🐳 总结

    通过 gevent 的这种方式实现的异步,几乎与 async 的效果完全是一致的(它们都是将协程放在一起去批量的执行)。

    今天作为异步的启蒙,我们也仅仅是针对异步做了初步的了解,以及常用模块的基本使用方法。知识的积累也是伴随着经验和成长的积累,很多只是和经验都是在适当的时间获取才会更加合适,今天关于异步的内容就讲到这里了。

    (累了...)

    展开全文
  • 常用设计编程工具 单相异步电机设计 7.1.zip常用设计编程工具 单相异步电机设计 7.1.zip常用设计编程工具 单相异步电机设计 7.1.zip常用设计编程工具 单相异步电机设计 7.1.zip常用设计编程工具 单相异步电机设计 ...
  • c#Winform异步线程刷新UI

    千次下载 热门讨论 2015-12-27 10:25:24
    使用开发工具为VS2013;.net:4.0 多线程异步刷新ui界面,实时获取任务进度并进行反馈。
  • SpringBoot之异步方法

    千次阅读 2022-04-27 11:12:50
    异步方法

    1、Future

    Future代表异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法。只有在计算完成后,才能使用方法get检索结果,如有必要,将其阻塞,直到准备就绪。取消是通过取消方法执行的。还提供了其他方法来确定任务是否正常完成或被取消。

    	//等待异步任务完成,然后检索其结果
        V get() throws InterruptedException, ExecutionException;
    	//最多等待给定的时间以完成计算,然后检索其结果
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    	//如果此任务已完成,则返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true
    	boolean isDone();
    
    	private static final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
            int i = 0;
    
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "test-" + i++);
            }
        });
    	
        public static void demo01() {
            log.info("创建异步任务");
            Future<String> submit = executor.submit(new Callable<String>() {
                @Override
                public String call() {
                    String result = "fail";
                    try {
                        log.info("开始执行异步任务");
                        // 执行任务耗时
                        Thread.sleep(10000);
                        result = "success";
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result;
                }
            });
    
            try {
                String result = submit.get();
                log.info("获取异步任务结果 " + result);
            } catch (InterruptedException e) {
                System.out.println("中断异常");
            } catch (ExecutionException e) {
                System.out.println("执行异常");
            }
    
            log.info("Future的get方法,会使当前线程阻塞");
        }
    

    在这里插入图片描述

        public static void demo02() throws InterruptedException, ExecutionException {
            log.info("创建异步任务");
            Future<String> submit = executor.submit(new Callable<String>() {
                @Override
                public String call() {
                    String result = "fail";
                    try {
                        log.info("开始执行异步任务");
                        // 执行任务耗时
                        Thread.sleep(10000);
                        result = "success";
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result;
                }
            });
    
            log.info("轮询调用isDone方法查询异步任务是否完成");
            while (true) {
                if (submit.isDone()) {
                    String result = submit.get();
                    log.info(result);
                    break;
                } else {
                    log.info("异步任务还未完成,先干点别的事");
                    Thread.sleep(1000);
                }
            }
    
            log.info("Future的get方法,会使当前线程阻塞");
        }
    

    在这里插入图片描述
    使用Future,并不能实现真正的异步,要么需要阻塞的获取结果,要么不断的轮询

    2、CompletableFuture

    CompletableFuture实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

    	//创建带返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定		
    	public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
            return asyncSupplyStage(screenExecutor(executor), supplier);
        }
        
    	//创建无返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定	
    	public static CompletableFuture<Void> runAsync(Runnable runnable) {
            return asyncRunStage(asyncPool, runnable);
        }
        public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
            return asyncRunStage(screenExecutor(executor), runnable);
        }
    
    	//如果以任何方式完成,则返回true:正常、异常或通过取消
    	public boolean isDone() {
            return result != null;
        }
    	//等待此任务完成,然后返回其结果
        public T get() throws InterruptedException, ExecutionException {
            Object r;
            return reportGet((r = result) == null ? waitingGet(true) : r);
        }
    	//最多等待给定的时间,以完成此任务,然后返回其结果
        public T get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            Object r;
            long nanos = unit.toNanos(timeout);
            return reportGet((r = result) == null ? timedGet(nanos) : r);
        }
        //如果任务完成则返回结果集,否则返回给定的valueIfAbsent
        public T getNow(T valueIfAbsent) {
            Object r;
            return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
        }
    

    thenApply / thenAccept / thenRun

    在流式处理中,等待上层任务正常执行完成后,再执行回调方法;
    thenApply:上层任务的结果值作为回调方法的入参值,该回调方法有返回值
    thenAccept:上层任务的结果值作为回调方法的入参值,该回调方法没有返回值
    thenRun:没有入参也没有返回值的回调方法

        public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
            return uniApplyStage(null, fn);
        }
        public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
            return uniApplyStage(asyncPool, fn);
        }
        public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
            return uniApplyStage(screenExecutor(executor), fn);
        }
    
    	public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
            return uniAcceptStage(null, action);
        }
        public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
            return uniAcceptStage(asyncPool, action);
        }
        public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
            return uniAcceptStage(screenExecutor(executor), action);
        }
    
    
    	public CompletableFuture<Void> thenRun(Runnable action) {
            return uniRunStage(null, action);
        }
        public CompletableFuture<Void> thenRunAsync(Runnable action) {
            return uniRunStage(asyncPool, action);
        }
        public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
            return uniRunStage(screenExecutor(executor), action);
        }
    
        public static void demo03() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "success";
                }
            }, fixedThreadPool).thenApplyAsync((result) -> {
                log.info("上层任务结果: " + result);
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "over";
            }, fixedThreadPool);
    
            log.info("最终结果 = " + finalResult.get());
        }
    

    在这里插入图片描述
    如果上层任务抛异常则不会进入回调方法中

        public static void demo03() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).thenApplyAsync((result) -> {
                log.info("上层任务结果: " + result);
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "over";
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    

    在这里插入图片描述

    exceptionally

    上层任务执行中,若抛出异常可被该方法接收,异常即该方法的参数;
    若无异常,不会进入该方法并将上层的结果值继续下传。

        public static void demo03() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).exceptionally((exception) -> {
                try {
                    log.info("异常处理 " + exception);
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "exception";
            }).thenApplyAsync((result) -> {
                log.info("上层任务结果: " + result);
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "over";
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    

    异常情况
    在这里插入图片描述
    正常情况
    在这里插入图片描述

    whenComplete

    接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法无返回值

        public CompletableFuture<T> whenComplete(
            BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(null, action);
        }
        public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(asyncPool, action);
        }
        public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action, Executor executor) {
            return uniWhenCompleteStage(screenExecutor(executor), action);
        }
    
        public static void demo04() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    log.info("上层任务无异常,获取到上层结果为:" + result);
                } else {
                    log.info("上层任务有异常,获取到上层结果为:" + result);
                }
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    

    无异常
    在这里插入图片描述
    有异常
    在这里插入图片描述

    handle

    接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法有返回值

        public <U> CompletableFuture<U> handle(
            BiFunction<? super T, Throwable, ? extends U> fn) {
            return uniHandleStage(null, fn);
        }
        public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn) {
            return uniHandleStage(asyncPool, fn);
        }
        public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
            return uniHandleStage(screenExecutor(executor), fn);
        }
    
        public static void demo04() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<String> finalResult = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    //if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).handleAsync((result, exception) -> {
                if (exception == null) {
                    log.info("上层任务无异常,获取到上层结果为:" + result);
                } else {
                    log.info("上层任务有异常,获取到上层结果为:" + result, exception);
                }
                return "handle " + result;
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    

    无异常
    在这里插入图片描述
    有异常
    在这里插入图片描述

    thenCombine / thenAcceptBoth / runAfterBoth

    将两个CompletableFuture组合起来,当这两个future都正常执行完了才会执行回调任务
    thenCombine:2个future的返回值作为回调方法的入参值,该回调方法有返回值
    thenAcceptBoth:2个future的返回值作为回调方法的入参值,该回调方法没有返回值
    runAfterBoth:没有入参也没有返回值

        public <U,V> CompletableFuture<V> thenCombine(
            CompletionStage<? extends U> other,
            BiFunction<? super T,? super U,? extends V> fn) {
            return biApplyStage(null, other, fn);
        }
        public <U,V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T,? super U,? extends V> fn) {
            return biApplyStage(asyncPool, other, fn);
        }
        public <U,V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
            return biApplyStage(screenExecutor(executor), other, fn);
        }
    
        public <U> CompletableFuture<Void> thenAcceptBoth(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
            return biAcceptStage(null, other, action);
        }
        public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
            return biAcceptStage(asyncPool, other, action);
        }
        public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action, Executor executor) {
            return biAcceptStage(screenExecutor(executor), other, action);
        }
    
        public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
            return biRunStage(null, other, action);
        }
        public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
            return biRunStage(asyncPool, other, action);
        }
        public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
            return biRunStage(screenExecutor(executor), other, action);
        }
    
        public static void demo05() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("开始执行异步任务");
                        Thread.sleep((long) (Math.random() * 5000));
                        i = 1;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture<Integer> supplyAsync2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("开始执行异步任务");
                        Thread.sleep((long) (Math.random() * 8000));
                        i = 2;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture<Integer> thenCombineAsync = supplyAsync.thenCombineAsync(supplyAsync2, (a, b) -> {
                log.info("a = " + a + ", b = " + b);
                return a + b;
            }, fixedThreadPool);
            log.info("thenCombineAsync = " + thenCombineAsync.get());
    
        }
    
    

    在这里插入图片描述
    其中任意一个有异常都会导致thenCombineAsync方法不执行

    applyToEither / acceptEither / runAfterEither

    将两个CompletableFuture组合起来,只要有一个future正常执行完了就可以执行回调任务
    applyToEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法有返回值
    acceptEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法没有返回值
    runAfterEither:只要有任务执行完就调用回调方法

        public <U> CompletableFuture<U> applyToEither(
            CompletionStage<? extends T> other, Function<? super T, U> fn) {
            return orApplyStage(null, other, fn);
        }
        public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn) {
            return orApplyStage(asyncPool, other, fn);
        }
        public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) {
            return orApplyStage(screenExecutor(executor), other, fn);
        }
    
        public CompletableFuture<Void> acceptEither(
            CompletionStage<? extends T> other, Consumer<? super T> action) {
            return orAcceptStage(null, other, action);
        }
        public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action) {
            return orAcceptStage(asyncPool, other, action);
        }
        public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) {
            return orAcceptStage(screenExecutor(executor), other, action);
        }
    
        public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) {
            return orRunStage(null, other, action);
        }
        public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) {
            return orRunStage(asyncPool, other, action);
        }
        public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor) {
            return orRunStage(screenExecutor(executor), other, action);
        }
    
        public static void demo06() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("执行异步任务");
                        Thread.sleep((long) (Math.random() * 5000));
                        i = 1;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture<Integer> supplyAsync2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("执行异步任务");
                        Thread.sleep((long) (Math.random() * 5000));
                        i = 2;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture<Integer> thenCombineAsync = supplyAsync.applyToEitherAsync(supplyAsync2, (result) -> {
                log.info("result " + result);
                return 3;
            }, fixedThreadPool);
    
            log.info("final result = " + thenCombineAsync.get());
        }
    

    在这里插入图片描述
    在这里插入图片描述
    任意一个任务有异常,都不会进入applyToEitherAsync方法

    3、@Async

    基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作

    启用@Async注解

    package com.yzm.thread.async;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    @EnableAsync // 开启异步调用功能,即使@Async注解生效
    @Slf4j
    public class AsyncConfig implements AsyncConfigurer {
    
        @Bean(name = "default_async_pool", destroyMethod = "shutdown")
        public ThreadPoolTaskExecutor defaultAsyncPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            // 设置线程池前缀:方便排查
            executor.setThreadNamePrefix("default-async-");
            // 设置线程池的大小
            executor.setCorePoolSize(10);
            // 设置线程池的最大值
            executor.setMaxPoolSize(15);
            // 设置线程池的队列大小
            executor.setQueueCapacity(250);
            // 设置线程最大空闲时间,单位:秒
            executor.setKeepAliveSeconds(3000);
            // 饱和策略
            // AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
            // CallerRunsPolicy:若已达到待处理队列长度,将由主线程直接处理请求
            // DiscardOldestPolicy:抛弃旧的任务;会导致被丢弃的任务无法再次被执行
            // DiscardPolicy:抛弃当前任务;会导致被丢弃的任务无法再次被执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            return executor;
        }
    
        @Bean(name = "another_async_pool", destroyMethod = "shutdown")
        public ThreadPoolTaskExecutor anotherAsyncPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setThreadNamePrefix("another-task-");
            executor.setCorePoolSize(3);
            executor.setMaxPoolSize(6);
            executor.setQueueCapacity(5);
            executor.setKeepAliveSeconds(10);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            return executor;
        }
    
        /**
         * 自定义异步线程池,若不重写,则使用默认的
         */
        @Override
        public Executor getAsyncExecutor() {
            return defaultAsyncPool();
        }
    
        /**
         * 1.无参无返回值方法
         * 2.有参无返回值方法
         * 返回值为void的, 通过IllegalArgumentException异常, AsyncUncaughtExceptionHandler处理异常
         * 3.有参有返回值方法
         * 返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
         * 或者在调用方在调用Future.get时捕获异常进行处理
         */
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            System.out.println("正在处理无返回值的@Async异步调用方法");
            return (throwable, method, objects) -> {
                log.info("Exception message - " + throwable.getMessage());
                log.info("Method name - " + method.getName());
                for (Object param : objects) {
                    log.info("Parameter value - " + param);
                }
            };
        }
    
    }
    
    package com.yzm.thread.async;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.Future;
    
    @Slf4j
    @Component
    public class AsyncService {
    
        /**
         * 1.无参无返回值方法
         * 最简单的异步调用,返回值为void
         */
        @Async
        public void async() {
            log.info("无参无返回值方法,通过观察线程名称以便查看效果");
    //        int a = 1 / 0;
        }
    
        /**
         * 2.有参无返回值方法
         * 指定线程池
         *
         * @param i 传入参数
         */
        @Async("another_async_pool")
        public void async(int i) {
            log.info("有参无返回值方法, 参数={}", i);
        }
    
        /**
         * 3.有参有返回值方法
         *
         * @param i 传入参数
         * @return Future
         */
        @Async
        public Future<String> asyncReturn(int i) throws InterruptedException {
            log.info("有参有返回值方法, 参数={}", i);
    //        int a = 1 / 0;
            Thread.sleep(100);
            return new AsyncResult<String>("success:" + i);
        }
    
        /**
         * @Async  必须不同类间调用:
         */
        public void D() {
            log.info("在同类下调用 @Async 方法是同步执行的");
            async();
        }
    }
    

    调用无参无返回值的异步方法

    @Component
    public class AsyncDemo {
    
        private final AsyncService asyncService;
    
        public AsyncDemo(AsyncService asyncService) {
            this.asyncService = asyncService;
        }
    
        @PostConstruct
        public void demo() {
            asyncA();
        }
    
        public void asyncA() {
            asyncService.async();
        }
    }
    

    在这里插入图片描述

    调用有参无返回值的异步方法并指定线程池

    AsyncService类

        /**
         * 2.有参无返回值方法
         * 指定线程池
         *
         * @param i 传入参数
         */
        @Async("another_async_pool")
        public void async(int i) {
            log.info("有参无返回值方法, 参数={}", i);
        }
    

    AsyncDemo类

        @PostConstruct
        public void demo() {
    //        asyncA();
            asyncB(1);
        }
    
        public void asyncA() {
            asyncService.async();
        }
    
        public void asyncB(int i) {
            asyncService.async(i);
        }
    

    在这里插入图片描述

    调用有参有返回值的异步方法

        public void asyncC(int i) {
            try {
                Future<String> future = asyncService.asyncReturn(i);
                // 这里使用了循环判断,等待获取结果信息
                while (true) {
                    // 判断是否执行完毕
                    if (future.isDone()) {
                        System.out.println("执行完毕,结果为:" + future.get());
                        break;
                    }
                    System.out.println("还未执行完毕,请稍等。。。");
                    Thread.sleep(1000);
                }
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("异步调用失败");
                e.printStackTrace();
            }
        }
    

    在这里插入图片描述

    调用方法内部调用一个异步方法是不行的,仍是同步调用

    AsyncService类

        /**
         * @Async  必须不同类间调用:
         */
        public void D() {
            log.info("在同类下调用 @Async 方法是同步执行的");
            // 调用本类的异步方法
            async();
        }
    

    AsyncDemo类

    	public void asyncD() {
            asyncService.D();
        }
    

    在这里插入图片描述

    异常处理

    AsyncConfig类

    	// 可处理无返回值的异步方法异常
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            System.out.println("正在处理无返回值的@Async异步调用方法");
            return (throwable, method, objects) -> {
                log.info("Exception message - " + throwable.getMessage());
                log.info("Method name - " + method.getName());
                for (Object param : objects) {
                    log.info("Parameter value - " + param);
                }
            };
        }
    

    AsyncService类

        /**
         * 1.无参无返回值方法
         * 最简单的异步调用,返回值为void
         */
        @Async
        public void async() {
            log.info("无参无返回值方法,通过观察线程名称以便查看效果");
            int a = 1 / 0;
        }
    

    AsyncDemo类

        @PostConstruct
        public void demo() {
            asyncA();
    //        asyncB(1);
    //        asyncC(11);
    //        asyncD();
        }
    
        public void asyncA() {
            asyncService.async();
        }
    

    在这里插入图片描述
    有返回值的异步方法异常,需要手动try{}catch(){}处理

    事务处理机制

    在@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
    那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.
    例如:
    方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
    方法B,使用了@Async来标注, B中调用了C,C使用@Transactional做了标注,则可实现事务控制的目的。

    展开全文
  • 在实际的开发过程中,有些业务逻辑使用异步的方式处理更为合理。比如在某个业务逻辑中,需要把一些数据存入到redis缓存中,这个操作只是一个辅助的功能,成功或者失败对主业务并不会产生根本影响,这个过程可以通过...

    在实际的开发过程中,有些业务逻辑使用异步的方式处理更为合理。比如在某个业务逻辑中,需要把一些数据存入到redis缓存中,这个操作只是一个辅助的功能,成功或者失败对主业务并不会产生根本影响,这个过程可以通过异步的方法去进行。

    Spring中通过在方法上设置@Async注解,可使得方法被异步调用。也就是说该方法会在调用时立即返回,而这个方法的实际执行交给Spring的TaskExecutor去完成。

    异步执行的使用

    配置类

    使用@EnableAsync注解开启异步功能。

    package com.morris.spring.config;
    
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    
    @Configuration
    @EnableAsync // 开启Async
    public class AsyncConfig implements AsyncConfigurer {
    
    	@Override
    	public Executor getAsyncExecutor() {
    		// 自定义线程池
    		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    		executor.setCorePoolSize(2);
    		executor.setMaxPoolSize(4);
    		executor.setQueueCapacity(10);
    		executor.setThreadNamePrefix("MyExecutor-");
    		executor.initialize();
    		return executor;
    	}
    
    }
    

    service层的使用

    在需要异步执行的方法上面加上@Async注解。

    package com.morris.spring.service;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class AsyncService {
    
    	@Async
    	public void noResult() {
    		log.info("execute noResult");
    	}
    
    	@Async
    	public Future<String> hasResult() throws InterruptedException {
    		log.info("execute hasResult");
    		TimeUnit.SECONDS.sleep(5);
    		return new AsyncResult<>("hasResult success");
    	}
    
    	@Async
    	public CompletableFuture<String> completableFuture() throws InterruptedException {
    		log.info(" execute completableFuture");
    		TimeUnit.SECONDS.sleep(5);
    		return CompletableFuture.completedFuture("completableFuture success");
    	}
    
    }
    

    测试类

    package com.morris.spring.demo.async;
    
    import com.morris.spring.config.AsyncConfig;
    import com.morris.spring.service.AsyncService;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.jupiter.api.Test;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
     * 异步调用的演示
     */
    @Slf4j
    public class AsyncDemo {
    	@Test
    	public void test() throws ExecutionException, InterruptedException {
    		AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
    		applicationContext.register(AsyncService.class);
    		applicationContext.register(AsyncConfig.class);
    		applicationContext.refresh();
    
    		AsyncService asyncService = applicationContext.getBean(AsyncService.class);
    		asyncService.noResult(); // 无结果
    
    		Future<String> future = asyncService.hasResult();
    		log.info("hasResult: {}", future.get()); // 有结果
    
    		CompletableFuture<String> completableFuture = asyncService.completableFuture();
    		completableFuture.thenAcceptAsync(System.out::println);// 异步回调
    		log.info("completableFuture call down");
    	}
    }
    

    运行结果如下:

    INFO  MyExecutor-1 AsyncService:16 - execute noResult
    INFO  MyExecutor-2 AsyncService:21 - execute hasResult
    INFO  main AsyncDemo:29 - hasResult: hasResult success
    INFO  MyExecutor-1 AsyncService:28 -  execute completableFuture
    INFO  main AsyncDemo:33 - completableFuture call down
    

    通过日志可以发现AsyncService的方法都是通过线程名为MyExecutor-1的线程执行的,这个名称的前缀是在AsyncConfig中指定的,而不是通过main线程执行的。

    两个疑问:

    • 是否可以不配置Executor线程池,Spring会默认创建默认的Executor,还是会报错?
    • Executor线程池中执行任务时如果抛出了异常,可否自定义异常的处理类对异常进行捕获处理?

    源码分析

    @EnableAsync

    @EnableAsync主要是向Spring容器中导入了AsyncConfigurationSelector类。

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(AsyncConfigurationSelector.class)
    public @interface EnableAsync {
    

    AsyncConfigurationSelector

    AsyncConfigurationSelector的主要方法当然是selectImports(),注意这里会先调用父类的selectImports()
    org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)

    public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
    	Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
    	Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
    
    	AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
    	if (attributes == null) {
    		throw new IllegalArgumentException(String.format(
    				"@%s is not present on importing class '%s' as expected",
    				annType.getSimpleName(), importingClassMetadata.getClassName()));
    	}
    
    	AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
    	// 模板方法模式,回调子类的selectImports
    	String[] imports = selectImports(adviceMode);
    	if (imports == null) {
    		throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
    	}
    	return imports;
    }
    

    org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports

    public String[] selectImports(AdviceMode adviceMode) {
    	switch (adviceMode) {
    		case PROXY:
    			// 奇怪???@Transaction、@EnableCaching都是注入两个类,一个config,一个registrar导入aop的入口类
    			// 而这里只有一个config类ProxyAsyncConfiguration
    			return new String[] {ProxyAsyncConfiguration.class.getName()};
    		case ASPECTJ:
    			return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
    		default:
    			return null;
    	}
    }
    

    AsyncConfigurationSelector又导入了配置类ProxyAsyncConfiguration。

    ProxyAsyncConfiguration

    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    
    	/**
    	 * 先看父类AbstractAsyncConfiguration
    	 * @return
    	 */
    	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    		// 实例化AsyncAnnotationBeanPostProcessor
    		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    		bpp.configure(this.executor, this.exceptionHandler);
    		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
    		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
    			bpp.setAsyncAnnotationType(customAsyncAnnotation);
    		}
    		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
    		return bpp;
    	}
    
    }
    

    ProxyAsyncConfiguration向容器中注入了一个AsyncAnnotationBeanPostProcessor。

    疑问:这里为啥是BeanPostProcessor,不应该像事务切面或者缓存切面一样,注入一个Advisor和XxxxInterceptor(Advice)吗?

    AbstractAsyncConfiguration

    AbstractAsyncConfiguration是ProxyAsyncConfiguration的父类。

    @Configuration
    public abstract class AbstractAsyncConfiguration implements ImportAware {
    
    	@Nullable
    	protected AnnotationAttributes enableAsync;
    
    	@Nullable
    	protected Supplier<Executor> executor;
    
    	@Nullable
    	protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
    
    	/**
    	 * 实现了ImportAware.setImportMetadata
    	 * 在ProxyAsyncConfiguration初始化后被调用
    	 * @param importMetadata
    	 */
    	@Override
    	public void setImportMetadata(AnnotationMetadata importMetadata) {
    		// 取得@EnableAsync注解
    		this.enableAsync = AnnotationAttributes.fromMap(
    				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
    		if (this.enableAsync == null) {
    			throw new IllegalArgumentException(
    					"@EnableAsync is not present on importing class " + importMetadata.getClassName());
    		}
    	}
    
    	/**
    	 * Collect any {@link AsyncConfigurer} beans through autowiring.
    	 */
    	@Autowired(required = false)
    	void setConfigurers(Collection<AsyncConfigurer> configurers) {
    		// configurers默认为空,除非手动注入AsyncConfigurer
    		if (CollectionUtils.isEmpty(configurers)) {
    			return;
    		}
    		if (configurers.size() > 1) {
    			throw new IllegalStateException("Only one AsyncConfigurer may exist");
    		}
    		AsyncConfigurer configurer = configurers.iterator().next();
    		this.executor = configurer::getAsyncExecutor;
    		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
    	}
    
    }
    

    从这里可以看出,可以通过向spring容器中注入AsyncConfigurer来指定执行异步任务的线程池和异常处理器。

    AsyncAnnotationBeanPostProcessor

    AsyncAnnotationBeanPostProcessor的继承结构图:
    20220424174516480.png
    AsyncAnnotationBeanPostProcessor主要实现了BeanFactoryAware和BeanPostProcessor接口。

    org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory

    public void setBeanFactory(BeanFactory beanFactory) {
    	super.setBeanFactory(beanFactory);
    
    	// 实例化Advisor
    	AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    	if (this.asyncAnnotationType != null) {
    		advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    	}
    	advisor.setBeanFactory(beanFactory);
    	this.advisor = advisor;
    }
    

    在AsyncAnnotationBeanPostProcessor实例化时实例化了切面AsyncAnnotationAdvisor。

    每个bean实例化完后都会调用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判断是否要生成代理对象。

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
    	... ...
    	/**
    	 * @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String)
    	 */
    	// isEligible会判断哪些bean要生成代理
    	// 就是使用advisor中的pointcut进行匹配
    	if (isEligible(bean, beanName)) {
    		// 创建代理
    		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
    		if (!proxyFactory.isProxyTargetClass()) {
    			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
    		}
    		proxyFactory.addAdvisor(this.advisor);
    		customizeProxyFactory(proxyFactory);
    		return proxyFactory.getProxy(getProxyClassLoader());
    	}
    	// No proxy needed.
    	return bean;
    }
    

    AsyncAnnotationAdvisor

    切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切点ComposablePointcut。

    public AsyncAnnotationAdvisor(
    		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    
    	Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    	asyncAnnotationTypes.add(Async.class);
    	try {
    		asyncAnnotationTypes.add((Class<? extends Annotation>)
    				ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
    	}
    	catch (ClassNotFoundException ex) {
    		// If EJB 3.1 API not present, simply ignore.
    	}
    	this.advice = buildAdvice(executor, exceptionHandler); // 创建AnnotationAsyncExecutionInterceptor
    	this.pointcut = buildPointcut(asyncAnnotationTypes); // 创建ComposablePointcut
    }
    
    protected Advice buildAdvice(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    	AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    	interceptor.configure(executor, exceptionHandler);
    	return interceptor;
    }
    
    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    	ComposablePointcut result = null;
    	for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
    		Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 类
    		Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法
    		if (result == null) {
    			result = new ComposablePointcut(cpc);
    		}
    		else {
    			result.union(cpc); // 类和方法的组合切点
    		}
    		result = result.union(mpc);
    	}
    	return (result != null ? result : Pointcut.TRUE);
    }
    

    AnnotationMatchingPointcut切面其实就是查看类或者方法上面有没有@Async注解。

    AnnotationAsyncExecutionInterceptor

    AnnotationAsyncExecutionInterceptor类主要负责增强逻辑的实现。

    org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

    public Object invoke(final MethodInvocation invocation) throws Throwable {
    	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    	Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    	final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
    	// 获得线程池
    	AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    	if (executor == null) {
    		throw new IllegalStateException(
    				"No executor specified and no default executor set on AsyncExecutionInterceptor either");
    	}
    
    	// 将目标方法的执行封装为Callable,方便提交到线程池
    	Callable<Object> task = () -> {
    		try {
    			// 执行目标方法
    			Object result = invocation.proceed();
    			if (result instanceof Future) {
    				return ((Future<?>) result).get();
    			}
    		}
    		catch (ExecutionException ex) {
    			handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    		}
    		catch (Throwable ex) {
    			handleError(ex, userDeclaredMethod, invocation.getArguments());
    		}
    		return null;
    	};
    
    	// 提交任务
    	return oSubmit(task, executor, invocation.getMethod().getReturnType());
    }
    

    org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    	AsyncTaskExecutor executor = this.executors.get(method);
    	if (executor == null) {
    		Executor targetExecutor;
    		/**
    		 * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier(java.lang.reflect.Method)
    		 */
    		// 获得@Async注解中的value属性中指定的taskExecutor名称
    		String qualifier = getExecutorQualifier(method);
    		if (StringUtils.hasLength(qualifier)) {
    			targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
    		}
    		else {
    			// 获取默认的taskExecutor
    			targetExecutor = this.defaultExecutor.get();
    		}
    		if (targetExecutor == null) {
    			return null;
    		}
    		executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
    				(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
    		this.executors.put(method, executor);
    	}
    	return executor;
    }
    

    determineAsyncExecutor()负责获取异步任务执行的线程池,线程池的查找步骤如下:

    1. 从spring容器中寻找@Async注解中的value属性中指定的taskExecutor
    2. 寻找默认的defaultExecutor

    默认的defaultExecutor是怎么来的?

    org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure

    public void configure(@Nullable Supplier<Executor> defaultExecutor,
    		@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    	// defaultExecutor默认为从beanFactory获取TaskExecutor或者bean名字为taskExecutor的Executor,beanFactory.getBean(TaskExecutor.class)
    	this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
    	// exceptionHandler默认为SimpleAsyncUncaughtExceptionHandler
    	this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
    }
    

    defaultExecutor首先取参数传入的defaultExecutor,这个参数来自接口AsyncConfigurer.getAsyncExecutor(),如果参数为null,那么就调用getDefaultExecutor(),注意这个方法子类AsyncExecutionInterceptor重写了:

    org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor

    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
    	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }
    

    如果找不到defaultExecutor就会创建一个SimpleAsyncTaskExecutor。

    再来看看父类的AsyncExecutionAspectSupport#getDefaultExecutor:
    org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor

    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    	if (beanFactory != null) {
    		try {
    			// Search for TaskExecutor bean... not plain Executor since that would
    			// match with ScheduledExecutorService as well, which is unusable for
    			// our purposes here. TaskExecutor is more clearly designed for it.
    			return beanFactory.getBean(TaskExecutor.class);
    		}
    		catch (NoUniqueBeanDefinitionException ex) {
    			logger.debug("Could not find unique TaskExecutor bean", ex);
    			try {
    				// 找名为taskExecutor的Executor
    				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
    			}
    			catch (NoSuchBeanDefinitionException ex2) {
    				if (logger.isInfoEnabled()) {
    					logger.info("More than one TaskExecutor bean found within the context, and none is named " +
    							"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
    							"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
    				}
    			}
    		}
    		catch (NoSuchBeanDefinitionException ex) {
    			logger.debug("Could not find default TaskExecutor bean", ex);
    			try {
    				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
    			}
    			catch (NoSuchBeanDefinitionException ex2) {
    				logger.info("No task executor bean found for async processing: " +
    						"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
    			}
    			// Giving up -> either using local default executor or none at all...
    		}
    	}
    	return null;
    }
    

    先从beanFactory中获取TaskExecutor类型的对象,然后再找名为taskExecutor的Executor对象。

    org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit

    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    	// 执行任务
    	if (CompletableFuture.class.isAssignableFrom(returnType)) {
    		return CompletableFuture.supplyAsync(() -> {
    			try {
    				return task.call();
    			}
    			catch (Throwable ex) {
    				throw new CompletionException(ex);
    			}
    		}, executor);
    	}
    	else if (ListenableFuture.class.isAssignableFrom(returnType)) {
    		return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    	}
    	else if (Future.class.isAssignableFrom(returnType)) {
    		return executor.submit(task);
    	}
    	else {
    		executor.submit(task);
    		return null;
    	}
    }
    

    doSubmit()负责将任务提交至线程池中,并对各种方法的返回值进行处理。

    展开全文
  • c# 异步编程详解

    千次阅读 2022-03-13 10:57:46
    要使用异步,就是用委托进行处理,如果委托对象在调用列表中只有一个方法,它就可以异步执行这个方法。委托类有两个方法,叫做BeginInvoke和EndInvoke,它们是用来异步执行使用。BeginInvoke方法可以使用线程异步地...
  • 多图片js异步上传带预览demo源码

    热门讨论 2017-02-24 20:23:58
    多图片js异步上传带预览demo源码,实现图片预览,预览图片移除,任意张数异步上传,上传进度条指示,已选中并且上传的图片不会重复上传,且不能移除
  • SVPWM异步电机矢量控制模型,适合初学者学习的异步电机矢量控制模型
  • MFC异步串口类

    2018-04-04 12:15:31
    工程可以直接运行,由于微软官方在win7系统及以上已不直接支持MFC串口控件,所有自己基于WIN32 串口API 封装了一个基于串口事件的C++类,基于消息控制主要是用于高效开发窗口端程序
  • javaweb异步导出

    2014-01-20 09:42:16
    javaweb异步导出

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,463,527
精华内容 585,410
关键字:

异步

友情链接: EMailReporter.zip