精华内容
下载资源
问答
  • 异步处理
    千次阅读
    2020-02-27 11:22:06

    异步指一个执行中的任务,拆开成多个子任务各自执行。异步与同步处理相对,同步指多个执行中的子任务,等待所有子任务执行完再继续执行。异步的作用是使有I/O操作的任务最大化利用处理器的计算,以达到缩短任务的完成时间。

    异步可以用多进程、多线程或其它技术实现,常见实现技术有:

    语言异步技术多异步依赖顺序表达返回值函数表达返回值直接表达处理非阻塞
    javascriptsetTimeout/setInterval
    异步请求
    Promise
    generator
    async/await
    web worker




















    javaThread
    Executor




    C#Thread
    Task
    async/await








    多异步依赖顺序表达,指后一个异步处理需要前一个异步处理的返回值作为参数,这种多异步依赖有两种表达,一种是嵌套表达,一种是顺序表达。

    嵌套表达:

    myAsyncMethod1().then(function(result1){
        myAsyncMethod2(result1).then(function(result2){
            console.log(result2);
        })
    })
    

    嵌套表达会造成异步嵌套地狱的现象,即形容代码可读性很混乱。

    顺序表达:

    var result1 = await myAsyncMethod1();
    var result2 = await myAsyncMethod2(result1);
    console.log(result2);
    

    返回值函数表达,指返回值通过函数的参数返回,如:

    myAsyncMethod1().then(function(result1){
        console.log(result1);
    })
    

    返回值直接表达,指返回值通过赋值表达式直接表达,如:

    var result1 = await myAsyncMethod1();
    console.log(result1);
    

    处理非阻塞,指处理过程中,是否会占用处理,而无法处理其它任务。阻塞表现在假如异步中有死循环或计算量需要很长时间,软件应用的显示画面卡住无法展示新的内容以及无法操作。

    更多相关内容
  • 本文涉及Java编程中异步处理机制的简单介绍和一个相关实例,相信通过这篇文章,大家能对异步处理有更多的了解。
  • 主要介绍了详解Servlet 3.0/3.1 中的异步处理,实例分析了servlet 3.0异步处理的技巧,非常具有实用价值,需要的朋友可以参考下
  • asp.net中大任务异步处理功能实现的代码,亲测能用但有两个小bug需自行修复
  • 异步处理websocketpp模型,基于websocketpp,编写异步处理消息模型
  • 今天再带大家学习汇总一下SpringBoot中异步处理的4种形式,下面开始正文: 前言 在网络上有关于SpringBoot的异步请求和异步调有两种说法,经过调用这两种说法本质上就是一回事,在《异步请求和异步调用有区别?》一...

    最近更新了一系列关于异步和回调的文章,比如《一篇文章,搞明白异步和多线程的区别》、《两个经典例子让你彻底理解java回调机制》、《异步请求和异步调用有区别?》,大家感兴趣的话可温习一下。

    今天再带大家学习汇总一下SpringBoot中异步处理的4种形式,下面开始正文:

    前言

    在网络上有关于SpringBoot的异步请求和异步调有两种说法,经过调用这两种说法本质上就是一回事,在《异步请求和异步调用有区别?》一种,已经做过解释了。

    同时,我们也知道了“服务实现的异步与同步特性完全独立于客户端调用的异步和同步特性。也就是说客户端可以异步的去调用同步服务,而且客户端也可以同步的去调用异步服务。”

    本篇文章我们以SpringBoot中异步的使用(包括:异步调用和异步方法两个维度)来进行讲解。

    异步请求与同步请求

    我们先通过一张图来区分一下异步请求和同步请求的区别:

    在这里插入图片描述

    在上图中有三个角色:客户端、Web容器和业务处理线程。

    两个流程中客户端对Web容器的请求,都是同步的。因为它们在请求客户端时都处于阻塞等待状态,并没有进行异步处理。

    在Web容器部分,第一个流程采用同步请求,第二个流程采用异步回调的形式。

    通过异步处理,可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加了服务器对客户端请求的吞吐量。但并发请求量较大时,通常会通过负载均衡的方案来解决,而不是异步。

    Servlet3.0中的异步

    Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由一个线程从头到尾处理。当涉及到耗时操作时,性能问题便比较明显。

    Servlet 3.0中提供了异步处理请求。可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加服务的吞吐量。

    Servlet 3.0的异步是通过AsyncContext对象来完成的,它可以从当前线程传给另一个线程,并归还初始线程。新的线程处理完业务可以直接返回结果给客户端。

    AsyncContext对象可以从HttpServletRequest中获取:

    @RequestMapping("/async")
    public void async(HttpServletRequest request) {
        AsyncContext asyncContext = request.getAsyncContext();
    }
    

    在AsyncContext中提供了获取ServletRequest、ServletResponse和添加监听(addListener)等功能:

    public interface AsyncContext {
    
        ServletRequest getRequest();
    
        ServletResponse getResponse();
    
        void addListener(AsyncListener var1);
        
        void setTimeout(long var1);
    
        // 省略其他方法
    }
    

    不仅可以通过AsyncContext获取Request和Response等信息,还可以设置异步处理超时时间。通常,超时时间(单位毫秒)是需要设置的,不然无限等下去不就与同步处理一样了。

    通过AsyncContext的addListener还可以添加监听事件,用来处理异步线程的开始、完成、异常、超时等事件回调。

    addListener方法的参数AsyncListener的源码如下:

    public interface AsyncListener extends EventListener {
        // 异步执行完毕时调用
        void onComplete(AsyncEvent var1) throws IOException;
        // 异步线程执行超时调用
        void onTimeout(AsyncEvent var1) throws IOException;
        // 异步线程出错时调用
        void onError(AsyncEvent var1) throws IOException;
        // 异步线程开始时调用
        void onStartAsync(AsyncEvent var1) throws IOException;
    }
    

    通常,异常或超时时返回调用方错误信息,而异常时会处理一些清理和关闭操作或记录异常日志等。

    基于Servlet方式实现异步请求

    下面直接看一个基于Servlet方式的异步请求示例:

    @GetMapping(value = "/email/send")
    public void servletReq(HttpServletRequest request) {
        AsyncContext asyncContext = request.startAsync();
        // 设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onTimeout(AsyncEvent event) {
                System.out.println("处理超时了...");
            }
    
            @Override
            public void onStartAsync(AsyncEvent event) {
                System.out.println("线程开始执行");
            }
    
            @Override
            public void onError(AsyncEvent event) {
                System.out.println("执行过程中发生错误:" + event.getThrowable().getMessage());
            }
    
            @Override
            public void onComplete(AsyncEvent event) {
                System.out.println("执行完成,释放资源");
            }
        });
        //设置超时时间
        asyncContext.setTimeout(6000);
        asyncContext.start(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println("内部线程:" + Thread.currentThread().getName());
                    asyncContext.getResponse().getWriter().println("async processing");
                } catch (Exception e) {
                    System.out.println("异步处理发生异常:" + e.getMessage());
                }
                // 异步请求完成通知,整个请求完成
                asyncContext.complete();
            }
        });
        //此时request的线程连接已经释放了
        System.out.println("主线程:" + Thread.currentThread().getName());
    }
    

    启动项目,访问对应的URL,打印日志如下:

    主线程:http-nio-8080-exec-4
    内部线程:http-nio-8080-exec-5
    执行完成,释放资源
    

    可以看出,上述代码先执行完了主线程,也就是程序的最后一行代码的日志打印,然后才是内部线程的执行。内部线程执行完成,AsyncContext的onComplete方法被调用。

    如果通过浏览器访问对应的URL,还可以看到该方法的返回值“async processing”。说明内部线程的结果同样正常的返回到客户端了。

    基于Spring实现异步请求

    基于Spring可以通过Callable、DeferredResult或者WebAsyncTask等方式实现异步请求。

    基于Callable实现

    对于一次请求(/email),基于Callable的处理流程如下:

    1、Spring MVC开启副线程处理业务(将Callable提交到TaskExecutor);

    2、DispatcherServlet和所有的Filter退出Web容器的线程,但是response保持打开状态;

    3、Callable返回结果,SpringMVC将原始请求重新派发给容器(再重新请求一次/email),恢复之前的处理;

    4、DispatcherServlet重新被调用,将结果返回给用户;

    代码实现示例如下:

    @GetMapping("/email")
    public Callable<String> order() {
        System.out.println("主线程开始:" + Thread.currentThread().getName());
        Callable<String> result = () -> {
            System.out.println("副线程开始:" + Thread.currentThread().getName());
            Thread.sleep(1000);
            System.out.println("副线程返回:" + Thread.currentThread().getName());
            return "success";
        };
    
        System.out.println("主线程返回:" + Thread.currentThread().getName());
        return result;
    }
    

    访问对应URL,控制台输入日志如下:

    主线程开始:http-nio-8080-exec-1
    主线程返回:http-nio-8080-exec-1
    副线程开始:task-1
    副线程返回:task-1
    

    通过日志可以看出,主线程已经完成了,副线程才进行执行。同时,URL返回结果“success”。这也说明一个问题,服务器端的异步处理对客户端来说是不可见的。

    Callable默认使用SimpleAsyncTaskExecutor类来执行,这个类非常简单而且没有重用线程。在实践中,需要使用AsyncTaskExecutor类来对线程进行配置。

    这里通过实现WebMvcConfigurer接口来完成线程池的配置。

    @Configuration
    public class WebConfig implements WebMvcConfigurer {
    
        @Resource
        private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
    
        /**
         * 配置线程池
         */
        @Bean(name = "asyncPoolTaskExecutor")
        public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setCorePoolSize(2);
            taskExecutor.setMaxPoolSize(10);
            taskExecutor.setQueueCapacity(25);
            taskExecutor.setKeepAliveSeconds(200);
            taskExecutor.setThreadNamePrefix("thread-pool-");
            // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            taskExecutor.initialize();
            return taskExecutor;
        }
    
        @Override
        public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
            // 处理callable超时
            configurer.setDefaultTimeout(60 * 1000);
            configurer.setTaskExecutor(myThreadPoolTaskExecutor);
            configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
        }
    
        @Bean
        public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
            return new TimeoutCallableProcessingInterceptor();
        }
    }
    

    为了验证打印的线程,我们将实例代码中的System.out.println替换成日志输出,会发现在使用线程池之前,打印日志如下:

    2021-02-21 09:45:37.144  INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主线程开始:http-nio-8080-exec-1
    2021-02-21 09:45:37.144  INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主线程返回:http-nio-8080-exec-1
    2021-02-21 09:45:37.148  INFO 8312 --- [         task-1] c.s.learn.controller.AsynController      : 副线程开始:task-1
    2021-02-21 09:45:38.153  INFO 8312 --- [         task-1] c.s.learn.controller.AsynController      : 副线程返回:task-1
    

    线程名称为“task-1”。让线程池生效之后,打印日志如下:

    2021-02-21 09:50:28.950  INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主线程开始:http-nio-8080-exec-1
    2021-02-21 09:50:28.951  INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主线程返回:http-nio-8080-exec-1
    2021-02-21 09:50:28.955  INFO 8339 --- [  thread-pool-1] c.s.learn.controller.AsynController      : 副线程开始:thread-pool-1
    2021-02-21 09:50:29.956  INFO 8339 --- [  thread-pool-1] c.s.learn.controller.AsynController      : 副线程返回:thread-pool-1
    

    线程名称为“thread-pool-1”,其中前面的“thread-pool”正是我们配置的线程池前缀。

    除了线程池的配置,还可以配置统一异常处理,这里就不再演示了。

    基于WebAsyncTask实现

    Spring提供的WebAsyncTask是对Callable的包装,提供了更强大的功能,比如:处理超时回调、错误回调、完成回调等。

    @GetMapping("/webAsyncTask")
    public WebAsyncTask<String> webAsyncTask() {
        log.info("外部线程:" + Thread.currentThread().getName());
        WebAsyncTask<String> result = new WebAsyncTask<>(60 * 1000L, new Callable<String>() {
            @Override
            public String call() {
                log.info("内部线程:" + Thread.currentThread().getName());
                return "success";
            }
        });
        result.onTimeout(new Callable<String>() {
            @Override
            public String call() {
                log.info("timeout callback");
                return "timeout callback";
            }
        });
        result.onCompletion(new Runnable() {
            @Override
            public void run() {
                log.info("finish callback");
            }
        });
        return result;
    }
    

    访问对应请求,打印日志:

    2021-02-21 10:22:33.028  INFO 8547 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 外部线程:http-nio-8080-exec-1
    2021-02-21 10:22:33.033  INFO 8547 --- [  thread-pool-1] c.s.learn.controller.AsynController      : 内部线程:thread-pool-1
    2021-02-21 10:22:33.055  INFO 8547 --- [nio-8080-exec-2] c.s.learn.controller.AsynController      : finish callback
    

    基于DeferredResult实现

    DeferredResult使用方式与Callable类似,但在返回结果时不一样,它返回的时实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去。

    DeferredResult的这个特性对实现服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等高级应用非常重要。

    关于DeferredResult的使用先来看一下官方的例子和说明:

    @RequestMapping("/quotes")
    @ResponseBody
    public DeferredResult<String> quotes() {
      DeferredResult<String> deferredResult = new DeferredResult<String>();
      // Save the deferredResult in in-memory queue ...
      return deferredResult;
    }
    
    // In some other thread...
    deferredResult.setResult(data);
    

    上述示例中我们可以发现DeferredResult的调用并不一定在Spring MVC当中,它可以是别的线程。官方的解释也是如此:

    In this case the return value will also be produced from a separate thread. However, that thread is not known to Spring MVC. For example the result may be produced in response to some external event such as a JMS message, a scheduled task, etc.

    也就是说,DeferredResult返回的结果也可能是由MQ、定时任务或其他线程触发。来个实例:

    @Controller
    @RequestMapping("/async/controller")
    public class AsyncHelloController {
    
        private List<DeferredResult<String>> deferredResultList = new ArrayList<>();
    
        @ResponseBody
        @GetMapping("/hello")
        public DeferredResult<String> helloGet() throws Exception {
            DeferredResult<String> deferredResult = new DeferredResult<>();
    
            //先存起来,等待触发
            deferredResultList.add(deferredResult);
            return deferredResult;
        }
    
        @ResponseBody
        @GetMapping("/setHelloToAll")
        public void helloSet() throws Exception {
            // 让所有hold住的请求给与响应
            deferredResultList.forEach(d -> d.setResult("say hello to all"));
        }
    }
    

    第一个请求/hello,会先将deferredResult存起来,前端页面是一直等待(转圈)状态。直到发第二个请求:setHelloToAll,所有的相关页面才会有响应。

    整个执行流程如下:

    • controller返回一个DeferredResult,把它保存到内存里或者List里面(供后续访问);
    • Spring MVC调用request.startAsync(),开启异步处理;
      与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态;
    • 应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult#setResult值。然后SpringMVC会把这个请求再次派发给servlet容器;
    • DispatcherServlet再次被调用,然后处理后续的标准流程;

    通过上述流程可以发现:利用DeferredResult可实现一些长连接的功能,比如当某个操作是异步时,可以先保存对应的DeferredResult对象,当异步通知回来时,再找到这个DeferredResult对象,在setResult处理结果即可。从而提高性能。

    SpringBoot中的异步实现

    在SpringBoot中将一个方法声明为异步方法非常简单,只需两个注解即可@EnableAsync和@Async。其中@EnableAsync用于开启SpringBoot支持异步的功能,用在SpringBoot的启动类上。@Async用于方法上,标记该方法为异步处理方法。

    需要注意的是@Async并不支持用于被@Configuration注解的类的方法上。同一个类中,一个方法调用另外一个有@Async的方法,注解也是不会生效的。

    @EnableAsync的使用示例:

    @SpringBootApplication
    @EnableAsync
    public class App {
    
        public static void main(String[] args) {
            SpringApplication.run(App.class, args);
        }
    }
    

    @Async的使用示例:

    @Service
    public class SyncService {
        
        @Async
        public void asyncEvent() {
            // 业务处理
        }
    }
    

    @Async注解的使用与Callable有类似之处,在默认情况下使用的都是SimpleAsyncTaskExecutor线程池,可参考Callable中的方式来自定义线程池。

    下面通过一个实例来验证一下,启动类上使用@EnableAsync,然后定义Controller类:

    @RestController
    public class IndexController {
        
        @Resource
        private UserService userService;
        
        @RequestMapping("/async")
        public String async(){
            System.out.println("--IndexController--1");
            userService.sendSms();
            System.out.println("--IndexController--4");
            return "success";
        }
    }
    

    定义Service及异步方法:

    @Service
    public class UserService {
    
        @Async
        public void sendSms(){
            System.out.println("--sendSms--2");
            IntStream.range(0, 5).forEach(d -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            System.out.println("--sendSms--3");
        }
    }
    

    如果先注释掉@EnableAsync和@Async注解,即正常情况下的业务请求,打印日志为:

    --IndexController--1
    --sendSms--2
    --sendSms--3
    --IndexController--4
    

    使用@EnableAsync和@Async注解时,打印日志如下:

    --IndexController--1
    --IndexController--4
    --sendSms--2
    --sendSms--3
    

    通过日志的对比我们可以看出,使用了@Async的方法,会被当成一个子线程。所以,整个sendSms方法会在主线程执行完了之后执行。

    这样的效果是不是跟我们上面使用的其他形式的异步异曲同工?所以在文章最开始已经说到,网络上所谓的“异步调用与异步请求的区别”是并不存储在的,本质上都是一回事,只不过实现形式不同而已。这里所提到异步方法,也就是将方法进行异步处理而已。

    @Async、WebAsyncTask、Callable、DeferredResult的区别

    所在的包不同:

    • @Async:org.springframework.scheduling.annotation;
    • WebAsyncTask:org.springframework.web.context.request.async;
    • Callable:java.util.concurrent;
    • DeferredResult:org.springframework.web.context.request.async;

    通过所在的包,我们应该隐隐约约感到一些区别,比如@Async是位于scheduling包中,而WebAsyncTask和DeferredResult是用于Web(Spring MVC)的,而Callable是用于concurrent(并发)处理的。

    对于Callable,通常用于Controller方法的异步请求,当然也可以用于替换Runable的方式。在方法的返回上与正常的方法有所区别:

    // 普通方法
    public String aMethod(){
    }
    
    // 对照Callable方法
    public Callable<String>  aMethod(){
    }
    

    而WebAsyncTask是对Callable的封装,提供了一些事件回调的处理,本质上区别不大。

    DeferredResult使用方式与Callable类似,重点在于跨线程之间的通信。

    @Async也是替换Runable的一种方式,可以代替我们自己创建线程。而且适用的范围更广,并不局限于Controller层,而可以是任何层的方法上。

    当然,大家也可以从返回结果,异常处理等角度来分析一下,这里就不再展开了。

    小结

    经过上述的一步步分析,大家应该对于Servlet3.0及Spring中对异步从处理有所了解。当了解了这些基础理论,实战实例,使用方法及注意事项之后,想必更能够对网络上的相关知识能够进一步的去伪存真。

    尽信书则不如无书,带大家一起学习,一起研究,一起去伪存真,追求真正有用的知识。

    原文链接:《SpringBoot的四种异步处理,写这篇文章,我自己先学到了


    程序新视界

    公众号“ 程序新视界”,一个让你软实力、硬技术同步提升的平台,提供海量资料

    微信公众号:程序新视界

    展开全文
  • python 异步处理 异步编程(简称异步)是许多现代语言的功能,它使程序可以处理多个操作,而无需等待或挂断其中的任何一个。 这是一种有效处理网络或文件I / O等任务的明智方法,其中程序的大部分时间都花在等待任务...

    python 异步处理

    异步编程(简称异步)是许多现代语言的功能,它使程序可以处理多个操作,而无需等待或挂断其中的任何一个。 这是一种有效处理网络或文件I / O等任务的明智方法,其中程序的大部分时间都花在等待任务完成上。

    考虑一个打开100个网络连接的Web抓取应用程序。 您可以打开一个连接,等待结果,然后打开下一个连接并等待结果,依此类推。 程序运行的大部分时间都花在等待网络响应上,而不是在做实际的工作。

    [ 同样在InfoWorld上:每个Python开发人员都有24个Python库 ]

    异步为您提供了一种更有效的方法:一次打开所有100个连接,然后在返回结果时在每个活动连接之间切换。 如果一个连接没有返回结果,请切换到下一个,依此类推,直到所有连接都返回了它们的数据。

    异步语法现在已成为Python的标准功能,但是习惯于一次做一件事的长期Pythonista使用者可能难以解决。 在本文中,我们将探讨异步编程如何在Python中工作以及如何使用它。

    请注意,如果要在Python中使用异步,最好使用Python 3.7或Python 3.8(撰写本文时的最新版本)。 我们将使用该语言的这些版本中定义的Python异步语法和辅助函数。

    何时使用异步编程

    通常,使用异步的最佳时间是当您尝试执行具有以下特征的工作时:

    • 这项工作需要很长时间才能完成。
    • 延迟涉及等待I / O(磁盘或网络)操作,而不是计算。
    • 这项工作涉及一次执行许多I / O操作, 或者当您还试图完成其他任务时进行一项或多项I / O操作。

    异步允许您并行设置多个任务并有效地遍历它们,而不会阻塞应用程序的其余部分。

    可以很好地与异步工作的一些任务示例:

    • 网页抓取,如上所述。
    • 网络服务(例如,Web服务器或框架)。
    • 协调来自多个源的结果的程序,这些结果需要很长时间才能返回值(例如,同时进行的数据库查询)。

    重要的是要注意,异步编程不同于多线程或多处理。 异步操作都在同一个线程中运行,但是它们根据需要相互转化,这使得异步处理比线程或多处理多种任务的效率更高。 (更多有关此内容。)

    [ 同样在InfoWorld上:Python 3.8中最好的新功能 ]

    Python async awaitasyncio

    Python最近添加了两个关键字asyncawait ,用于创建异步操作。 考虑以下脚本:

    def get_server_status(server_addr)
        # A potentially long-running operation ...
        return server_status
    
    def server_ops()
        results = []
        results.append(get_server_status('addr1.server')
        results.append(get_server_status('addr2.server')
        return results

    相同脚本的异步版本(不起作用,仅足以让我们了解语法的工作原理)可能看起来像这样。

    async def get_server_status(server_addr)
        # A potentially long-running operation ...
        return server_status
    
    async def server_ops()
        results = []
        results.append(await get_server_status('addr1.server')
        results.append(await get_server_status('addr2.server')
        return results

    async关键字为前缀的函数成为异步函数,也称为coroutines 。 协程的行为与常规函数不同:

    • 协程可以使用另一个关键字await ,它允许协程等待来自另一个协程的结果而不会阻塞。 在await协程返回结果之前,Python在其他正在运行的协程中自由切换。
    • 协程只能从其他async函数中调用。 如果您从脚本主体中按原样运行server_ops()get_server_status() ,则不会得到它们的结果。 您将获得一个Python协程对象,该对象不能直接使用。

    所以,如果我们不能称之为async非异步函数的功能,我们不能运行的async功能直接,我们如何使用它们? 答:通过使用asyncio库,它将async与Python的其余部分联系起来。

    [ 同样在InfoWorld上:每种编程需要12个Python ]

    Python async awaitasyncio示例

    这是一个示例(再次,不是功能性的,而是说明性的)如何使用asyncasyncio编写Web抓取应用程序。 该脚本获取URL列表,并使用来自外部库( read_from_site_async() )的async函数的多个实例来下载它们并汇总结果。

    import asyncio
    from web_scraping_library import read_from_site_async
    
    async def main(url_list):
        return await asyncio.gather(*[read_from_site_async(_) for _ in url_list])
    
    urls = ['http://site1.com','http://othersite.com','http://newsite.com']
    results = asyncio.run(main(urls))
    print (results)

    在上面的示例中,我们使用两个常见的asyncio函数:

    • asyncio.run()用于从代码的非异步部分启动async功能,从而启动所有progam的异步活动。 (这就是我们运行main() 。)
    • asyncio.gather()接受一个或多个异步装饰的函数(在这种情况下,是我们假设的网络抓取库中的read_from_site_async()几个实例),全部运行它们,然后等待所有结果输入。

    这里的想法是,我们立即开始所有站点的读取操作,然后在它们到达时收集结果(因此asyncio.gather() )。 在进行下一个操作之前,我们不等待任何一项操作完成。

    [ 同样在InfoWorld上:Python virtualenv和venv做和不做 ]

    Python异步应用程序的组件

    我们已经提到了Python异步应用程序如何使用协程作为主要成分,并利用asyncio库运行它们。 其他一些要素也是Python异步应用程序的关键:

    事件循环

    asyncio库创建和管理事件循环 ,即运行协程直到完成的机制。 在Python进程中,一次只能运行一个事件循环,如果这样做只是为了使程序员更容易跟踪其中的内容。

    任务

    将协程提交到事件循环以进行处理时,可以获取Task对象,该对象提供了一种从事件循环外部控制协程行为的方法。 例如,如果需要取消正在运行的任务,可以通过调用任务的.cancel()方法来完成。

    这是站点抓取脚本的略有不同的版本,该脚本显示了事件循环和正在执行的任务:

    import asyncio
    from web_scraping_library import read_from_site_async
    
    tasks = []
    
    async def main(url_list):    
        for n in url_list:
            tasks.append(asyncio.create_task(read_from_site_async(n)))
        print (tasks)
        return await asyncio.gather(*tasks)
    
    urls = ['http://site1.com','http://othersite.com','http://newsite.com']
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(urls))
    print (results)

    该脚本更明确地使用事件循环和任务对象。

    • .get_event_loop()方法为我们提供了一个对象,该对象使我们可以通过.run_until_complete()以编程方式向事件循环提交异步函数,从而直接控制事件循环。 在先前的脚本中,我们只能使用asyncio.run()运行单个顶级异步函数。 顺便说一句, .run_until_complete()完全按照其说的去做:它运行所有提供的任务直到完成,然后分批返回其结果。
    • .create_task()方法使用一个要运行的函数(包括其参数),并给我们提供了一个Task对象来运行它。 在这里,我们将每个URL作为单独的Task提交到事件循环,并将Task对象存储在列表中。 请注意,我们只能在事件循环内(即,在async函数内)执行此操作。

    您需要对事件循环及其事件进行多少控制,将取决于您所构建的应用程序的复杂程度。 如果您只想提交一组固定的作业以同时运行,就像使用我们的Web抓取工具一样,您将不需要太多控制权-仅足以启动作业和收集结果。

    相比之下,如果您要创建一个完善的Web框架,则需要对协程和事件循环的行为进行更多控制。 例如,在应用程序崩溃的情况下,您可能需要优雅地关闭事件循环 ,或者如果从另一个线程调用事件循环, 则以线程安全的方式运行任务

    [ 也在InfoWorld上:Anaconda入门,Anaconda是数据科学的Python发行版 ]

    异步与线程与多处理

    在这一点上,您可能想知道,为什么使用异步而不是线程或多处理,而这在Python中早已可用?

    首先,异步和线程或多处理之间有一个关键区别,即使这些内容是如何在Python中实现的也是如此。 异步与并发有关,而线程和多处理则与并行有关。 并发涉及一次在多个任务之间高效地分配时间,例如,在杂货店等待注册时检查电子邮件。 并行涉及多个代理并排处理多个任务,例如,在杂货店打开五个单独的寄存器。

    大多数情况下,异步是线程的良好替代品, 因为线程是在Python中实现的 。 这是因为Python不使用OS线程,而是使用自己的协作线程,在解释器中一次仅运行一个线程。 与协作线程相比,异步提供了一些关键优势:

    • 异步函数比线程轻得多。 一次运行的成千上万的异步操作将比成千上万的线程少得多的开销。
    • 异步代码的结构使人们更容易推断任务从何处提取。 这意味着数据争用和线程安全性不再是问题。 由于异步事件循环中的所有任务都在单个线程中运行,因此Python(和开发人员)可以更轻松地序列化它们访问内存中对象的方式。
    • 与线程相比,可以更轻松地取消和操作异步操作。 我们从asyncio.create_task()返回的Task对象为我们提供了一种方便的方法。

    另一方面,Python中的多处理最适合CPU密集而不是I / O密集的作业。 异步实际上与多处理并驾齐驱,因为您可以使用asyncio.run_in_executor()从中央进程将CPU密集型作业委派给进程池,而不会阻塞该中央进程。

    [ 通过InfoWorld的App Dev Report新闻通讯了解软件开发中的热门话题 ]

    Python异步的后续步骤

    最好的第一件事是构建自己的一些简单异步应用程序。 现在有很多很好的例子 ,Python中的异步编程已经经历了几个版本,并且花了几年时间才逐渐发展起来,并得到更广泛的使用。 即使您不打算使用其所有功能asyncio的官方文档也值得阅读以了解其功能。

    您可能还会探索越来越多的异步驱动的库和中间件,其中许多提供了数据库连接器,网络协议等的异步,非阻塞版本。 aio-libs存储库具有一些关键的存储库 ,例如用于Web访问的aiohittp库。 也值得在Python软件包索引中搜索带有async关键字的库。 对于异步编程之类的东西,最好的学习方法是看别人如何使用它。

    翻译自: https://www.infoworld.com/article/3454442/get-started-with-async-in-python.html

    python 异步处理

    展开全文
  • 一、异步处理1、异步概念异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。必须强调一个基础逻辑,异步是一种设计理念,异步操作不等于多线程,MQ中间件,...

    一、异步处理

    1、异步概念

    异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。

    必须强调一个基础逻辑,异步是一种设计理念,异步操作不等于多线程,MQ中间件,或者消息广播,这些是可以实现异步处理的方式。

    同步处理和异步处理相对,需要实时处理并响应,一旦超过时间会结束会话,在该过程中调用方一直在等待响应方处理完成并返回。同步类似电话沟通,需要实时对话,异步则类似短信交流,发送消息之后无需保持等待状态。

    2、异步处理优点

    虽然异步处理不能实时响应,但是处理复杂业务场景,多数情况都会使用异步处理。

    异步可以解耦业务间的流程关联,降低耦合度;

    降低接口响应时间,例如用户注册,异步生成相关信息表;

    异步可以提高系统性能,提升吞吐量;

    流量削峰即把请求先承接下来,然后在异步处理;

    异步用在不同服务间,可以隔离服务,避免雪崩;

    异步处理的实现方式有很多种,常见多线程,消息中间件,发布订阅的广播模式,其根据逻辑在于先把请求承接下来,放入容器中,在从容器中把请求取出,统一调度处理。

    注意:一定要监控任务是否产生积压过度情况,任务如果积压到雪崩之势的地步,你会感觉每一片雪花都想勇闯天涯。

    3、异步处理模式

    异步流程处理的实现有好多方式,但是实际开发中常用的就那么几种,例如:

    基于接口异步响应,常用在第三方对接流程;

    基于消息生产和消费模式,解耦复杂流程;

    基于发布和订阅的广播模式,常见系统通知

    异步适用的业务场景,对数据强一致性的要求不高,异步处理的数据更多时候追求的是最终一致性。

    二、接口响应异步

    1、流程描述

    基于接口异步响应的方式,有一个本地业务服务,第三方接口服务,流程如下:

    a0f1cf6136a0fde07aef8c3bd47e8280.png

    本地服务发起请求,调用第三方服务接口;

    请求包含业务参数,和成功或失败的回调地址;

    第三方服务实时响应流水号,作为该调用的标识;

    之后第三方服务处理请求,得到最终处理结果;

    如果处理成功,回调本地服务的成功通知接口;

    如果处理失败,回调本地服务的失败通知接口;

    整个流程基于部分异步和部分实时的模式,完整处理;

    注意:如果本地服务多次请求第三方服务,需要根据流水号判断该请求的状态,业务的状态设计也是极其复杂,要根据流水号和状态追溯整个流程的执行进度,避免错乱。

    2、流程实现案例

    模拟基础接口

    @RestController

    public class ReqAsyncWeb {

    private static final Logger LOGGER = LoggerFactory.getLogger(ReqAsyncWeb.class);

    @Resource

    private ReqAsyncService reqAsyncService ;

    // 本地交易接口

    @GetMapping("/tradeBegin")

    public String tradeBegin (){

    String sign = reqAsyncService.tradeBegin("TradeClient");

    return sign ;

    }

    // 交易成功通知接口

    @GetMapping("/tradeSucNotify")

    public String tradeSucNotify (@RequestParam("param") String param){

    LOGGER.info("tradeSucNotify param={"+ param +"}");

    return "success" ;

    }

    // 交易失败通知接口

    @GetMapping("/tradeFailNotify")

    public String tradeFailNotify (@RequestParam("param") String param){

    LOGGER.info("tradeFailNotify param={"+ param +"}");

    return "success" ;

    }

    // 第三方交易接口

    @GetMapping("/respTrade")

    public String respTrade (@RequestParam("param") String param){

    LOGGER.info("respTrade param={"+ param +"}");

    reqAsyncService.respTrade(param);

    return "NO20200520" ;

    }

    }

    模拟第三方处理

    @Service

    public class ReqAsyncServiceImpl implements ReqAsyncService {

    private static final String serverUrl = "http://localhost:8005" ;

    @Override

    public String tradeBegin(String param) {

    String orderNo = HttpUtil.get(serverUrl+"/respTrade?param="+param);

    if (StringUtils.isEmpty(orderNo)){

    return "Trade..Fail...";

    }

    return orderNo ;

    }

    @Override

    public void respTrade(String param) {

    try {

    Thread.sleep(10000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    Thread thread01 = new Thread(

    new RespTask(serverUrl+"/tradeSucNotify?param="+param),"SucNotify");

    Thread thread02 = new Thread(

    new RespTask(serverUrl+"/tradeFailNotify?param="+param),"FailNotify");

    thread01.start();

    thread02.start();

    }

    }

    三、生产消费异步

    1、流程描述

    这里基于Kafka中间件,演示流程消息生成,消息处理的异步解耦流程,基本步骤:

    80f46fc1abf428096aed1b566a4030af.png

    消息生成之后,写入Kafka队列 ;

    消息处理方获取消息后,进行流程处理;

    消息在中间件提供的队列中持久化存储 ;

    消息发起方如果挂掉,不影响消息处理 ;

    消费方如果挂掉,不影响消息生成;

    基于这种消息中间件模式,完成业务解耦,提高系统吞吐量,是架构中常用的方式。

    2、流程实现案例

    消息发送

    @Service

    public class KafkaAsyncServiceImpl implements KafkaAsyncService {

    @Resource

    private KafkaTemplate kafkaTemplate;

    @Override

    public void sendMsg(String msg) {

    // 这里Topic如果不存在,会自动创建

    kafkaTemplate.send("kafka-topic", msg);

    }

    }

    消息消费

    @Component

    public class KafkaConsumer {

    private static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "kafka-topic")

    public void listenMsg (ConsumerRecord,String> record) {

    String value = record.value();

    LOGGER.info("KafkaConsumer01 ==>>"+value);

    }

    }

    注意:这里就算有多个消息消费方,也只会在一个消费方处理消息,这就是该模式的特点。

    四、发布订阅异步

    1、流程描述

    这里基于Redis中间件,说明消息广播模式流程,基本步骤:

    c50630c3c06d2132c978b95d3899bc3b.png

    提供一个消息传递频道channel;

    多个订阅频道的客户端client;

    消息通过PUBLISH命令发送给频道channel ;

    客户端就会收到频道中传递的消息 ;

    之所以称为广播模式,该模式更注重通知下发,流程交互性不强。实际开发场景:运维总控系统,更新了某类服务配置,通知消息发送之后,相关业务线上的服务在拉取最新配置,更新到服务中。

    2、流程实现案例

    发送通知消息

    @Service

    public class RedisAsyncServiceImpl implements RedisAsyncService {

    @Resource

    private StringRedisTemplate stringRedisTemplate ;

    @Override

    public void sendMsg(String topic, String msg) {

    stringRedisTemplate.convertAndSend(topic,msg);

    }

    }

    客户端接收

    @Service

    public class ReceiverServiceImpl implements ReceiverService {

    private static final Logger LOGGER = LoggerFactory.getLogger("ReceiverMsg");

    @Override

    public void receiverMsg(String msg) {

    LOGGER.info("Receiver01 收到消息:msg-{}",msg);

    }

    }

    配置广播模式

    @Configuration

    public class SubMsgConfig {

    @Bean

    RedisMessageListenerContainer container(RedisConnectionFactory factory,

    MessageListenerAdapter msgListenerAdapter,

    MessageListenerAdapter msgListenerAdapter02){

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();

    container.setConnectionFactory(factory);

    //注册多个监听,订阅一个主题,实现消息广播

    container.addMessageListener(msgListenerAdapter, new PatternTopic("topic:msg"));

    container.addMessageListener(msgListenerAdapter02, new PatternTopic("topic:msg"));

    return container;

    }

    @Bean

    MessageListenerAdapter msgListenerAdapter(ReceiverService receiverService){

    return new MessageListenerAdapter(receiverService, "receiverMsg");

    }

    @Bean

    MessageListenerAdapter msgListenerAdapter02(ReceiverService02 receiverService02){

    return new MessageListenerAdapter(receiverService02, "receiverMsg");

    }

    @Bean

    ReceiverService receiverService(){

    return new ReceiverServiceImpl();

    }

    @Bean

    ReceiverService02 receiverService02(){

    return new ReceiverServiceImpl02();

    }

    }

    这里配置了多个订阅的客户端。

    五、任务积压监控

    生成一个消息,就因为有一个处理该消息的任务要执行,这就导致任务可能出现积压的情况,常见原因大致有如下几个:

    任务产生的服务过多,任务处理的服务过少,不均衡;

    任务处理时间太长,也导致生产过剩;

    中间件本身容量偏小,需要扩容或集群化管理;

    如果任务积压过多,可能要对任务生成进行流量控制,或者提升任务的处理能力,从而避免雪崩情况。

    六、源代码地址

    GitHub·地址

    https://github.com/cicadasmile/data-manage-parent

    GitEE·地址

    https://gitee.com/cicadasmile/data-manage-parent

    推荐阅读:《架构设计系列》,萝卜青菜,各有所需

    展开全文
  • Django-异步处理

    千次阅读 2021-01-22 23:06:02
    需求:前端给个form表单填写参数,保存后数据存入DB(一条任务数据),状态默认为未处理;调用异步方法,执行脚本,脚本执行成功后修改任务状态。 技术栈 celery urls.py from django.conf.urls import url from ...
  • PHP之异步处理

    千次阅读 2019-05-13 09:34:50
    1.把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 2.实现数据顺序排列获取 redis 队列 实现 异步处理的效果 <?php /** * 请求过来以后,如果数据过大,导致响应速度过慢,则可以先把要处理的数据保存到...
  • java如何实现异步处理

    千次阅读 2021-02-12 12:03:06
    2.2.1线程之间的协调通信线程之间的通信是线程编程中的一个重要内容。例如多线程中某个线程需要等待其他线程结束才继续执行、某个线程需要从其他线程获取数据等,都需要用到线程之间的通信。同步也可以看成是线程...
  • Java 异步处理 三种实现

    千次阅读 2021-02-12 12:03:06
    new Thread((new Runnable() {@Overridepublic void run() {// 批量同步数据try {logger.info("^^^^^^^^^^^^^^^^^ sync start ^^^^^^^^^^^^^^^^^ ");logger.info("^^^^^^^^^^^^^^^^^ sync end ^^^^^^^^^^^^^^^^^ ");...
  • C#异步操作 异步查询数据库 异步处理一行一行加载数据
  • 异步处理的几种方式

    2022-01-04 11:03:43
    1.CompletableFuture.supplyAsync异步 public static CompletableFuture supplyAsync(Supplier supplier) public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) Executor executor可...
  • Servlet3.0 异步处理 页面推送 Comet 实例
  • pb-异步处理

    2018-08-27 09:27:05
    PB通过异步处理实现伪多线程取数,这是一个小工具,有问题的地方请多担待
  • 本文实例讲述了Android编程实现异步消息处理机制的几种方法。分享给大家供大家参考,具体如下: 1、概述 Android需要更新ui的话就必须在ui线程上进行操作。否则就会抛异常。 假如有耗时操作,比如:在子线程中下载...
  • 开始使用Python进行异步处理

    千次阅读 2020-05-21 06:53:54
    异步编程(简称异步)是许多现代语言的功能,它使程序可以处理多个操作,而无需等待或挂断其中的任何一个。 这是一种有效处理网络或文件I / O等任务的明智方法,其中程序的大部分时间都花在等待任务完成上。 考虑...
  • 本文实例讲述了JavaScript异步操作的几种常见处理方法。分享给大家供大家参考,具体如下: 引言 js的异步操作,已经是一个老生常谈的话题,关于这个话题的文章随便google一下都可以看到一大堆。那么为什么我还要写这...
  • Web请求异步处理和海量数据即时分析在淘宝开放平台的实践
  • 异步则是可以提高效率了,现在cpu都是双核,四核,异步处理的话可以同时做多项工作,当然必须保证是可以并发处理的。 这些都是对的。 同步和异步最大的区别就在于。一个需要等待,一个不需要等待。 比如广播,就是一...
  • 本文我们来聊聊秒杀系统中的订单异步处理。 本篇文章主要内容 为何我们需要对下订单采用异步处理 简单的订单异步处理实现 非异步与异步下单接口的性能对比 一个用户抢购体验更好的实现方式 前文回顾 零基础实现...
  • 详解 redux 中的异步处理

    千次阅读 2021-03-09 16:10:15
    redux中的异步处理流程通常如下: 在异步操作开始前,发送action,用来表示要发起异步操作,用户界面应该有所提示 在异步操作结束后,发送action,用来表示异步操作结束,根据异步操作的结果,对store中的数据和...
  • 本程序提供了一个多任务多线程异步处理框架。该框架使用简单,用户只需要继承抽象类Task,构建自己的任务类,再构造一个任务源,就可以轻松使用这个框架。 程序包里提供了一个例子Mytask 和MyTaskGenerator, 用户只...
  • 接口业务异步处理

    千次阅读 2020-08-13 15:40:52
    比如我们在项目中的一个前端接口在每次调用的时候需要处理比较耗时且对结果不需要直接反馈的业务(比如需要多次调用第三方的http接口同步一些信息等),此时我们就需要将其设计成异步,以此来提高接口的响应速度。...
  • controller异步处理请求

    千次阅读 2020-06-10 16:44:18
    2,处理器异步处理请求 @RestController @RequestMapping("/pulldata") @Slf4j public class PullOrderController { @Autowired private TestService testService; /** * 定义任务线程池,手动处理的情况比较少,...
  • java编写异步处理方法提升系统性能

    千次阅读 2021-02-28 15:00:23
    系统性能优化时我们会对一些耗时操作进行异步处理,最常见的就是http请求,但java中怎么编写异步操作呢,下面我们就来介绍几种方法。一、使用接口1.1 编写接口,用于接收异步线程回调的数据public interface ...
  •  大四保研到实验室正好碰到师兄师姐们找工作,听到的一些面试常问的内容就是“跨时钟域”、”异步处理“、”异步FIFO“等。然而我看的一些经典的书籍都是这样说的”异步电路很难设计,全部使用同步技术进行设计,...
  • JavaWeb之异步处理请求

    千次阅读 2019-10-04 23:20:33
    1、servlet3.0-异步请求:   引用:在Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由某一个线程从头到尾负责处理,当过来一个请求之后,会从tomcat的线程池中拿出一个线程...
  • laravel 事件处理&异步处理事件

    千次阅读 2020-12-28 13:07:23
    如果没有成功或者想知道更多事件的处理,比如,延时队列,判断是否要加入事件,处理失败之后的处理等等:参考文档 事件的队列处理 如果没有接触laravel 的队列的小伙伴,先参考: Laravel Jobs 只需要在 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 859,186
精华内容 343,674
关键字:

异步处理