精华内容
下载资源
问答
  • Java接口异步调用

    2020-08-26 00:11:43
    主要介绍了Java接口异步调用,下面我们来一起学习一下吧
  • java异步调用外部接口

    2020-12-24 14:42:35
    通过SpringBoot-RestTemplate方式调用 1.RestTemplate工具类 package com.singhand.companyadressredis.main.config; import org.apache.http.impl.client.CloseableHttpClient; import org.springframework....

    通过SpringBoot-RestTemplate方式调用

    1.RestTemplate工具类

    package com.singhand.companyadressredis.main.config;
    
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
    import org.springframework.http.converter.HttpMessageConverter;
    import org.springframework.http.converter.StringHttpMessageConverter;
    import org.springframework.scheduling.TaskScheduler;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
    import org.springframework.web.client.RestTemplate;
    
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.List;
    
    @Configuration
    public class RestTemplateConfig {
    
        @Autowired
        CloseableHttpClient httpClient;
    
        @Bean
        public RestTemplate restTemplate() {
            RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory());
            return restTemplate;
        }
    
        @Bean
        public HttpComponentsClientHttpRequestFactory clientHttpRequestFactory() {
            HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
            clientHttpRequestFactory.setHttpClient(httpClient);
            return clientHttpRequestFactory;
        }
    }
    

    上面是通过设置httpClient,我们也可以直接

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.client.ClientHttpRequestFactory;
    import org.springframework.http.client.SimpleClientHttpRequestFactory;
    import org.springframework.web.client.RestTemplate;
    
    @Configuration
    public class RestTemplateConfig {
    
        @Bean
        public RestTemplate restTemplate(ClientHttpRequestFactory factory){
            return new RestTemplate(factory);
        }
    
        @Bean
        public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
            SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
            factory.setConnectTimeout(15000);
            factory.setReadTimeout(5000);
            return factory;
        }
    }

    如果通过httpClient,需要加入httpClient配置,如下

    2.httpClient

    package com.singhand.companyadressredis.main.config;
    
    import com.singhand.sysdev.utils.log4wk.KLog;
    import com.singhand.sysdev.utils.tools.ExceptionUtils;
    import org.apache.http.HeaderElement;
    import org.apache.http.HeaderElementIterator;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.config.Registry;
    import org.apache.http.config.RegistryBuilder;
    import org.apache.http.conn.ConnectionKeepAliveStrategy;
    import org.apache.http.conn.socket.ConnectionSocketFactory;
    import org.apache.http.conn.socket.PlainConnectionSocketFactory;
    import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
    import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
    import org.apache.http.message.BasicHeaderElementIterator;
    import org.apache.http.protocol.HTTP;
    import org.apache.http.protocol.HttpContext;
    import org.apache.http.ssl.SSLContextBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import java.security.KeyManagementException;
    import java.security.KeyStoreException;
    import java.security.NoSuchAlgorithmException;
    
    /**
     * - Supports both HTTP and HTTPS
     * - Uses a connection pool to re-use connections and save overhead of creating connections.
     * - Has a custom connection keep-alive strategy (to apply a default keep-alive if one isn't specified)
     * - Starts an idle connection monitor to continuously clean up stale connections.
     */
    @Configuration
    @EnableScheduling
    public class HttpClientConfig {
        /*
        * Determines the timeout in milliseconds until a connection is established
        * */
        private static final int CONNECT_TIMEOUT = 30000;
    
        /*
        * The timeout when requesting a connection from the connection manager
        */
        private static final int REQUEST_TIMEOUT = 30000;
    
        /*
         * The timeout for waiting for data
        */
        private static final int SOCKET_TIMEOUT = 60000;
    
        private static final int MAX_TOTAL_CONNECTIONS = 50;
        private static final int DEFAULT_KEEP_ALIVE_TIME_MILLIS = 20 * 1000;
        private static final int CLOSE_IDLE_CONNECTION_WAIT_TIME_SECS = 30;
    
        @Bean
        public PoolingHttpClientConnectionManager poolingConnectionManager() {
            SSLContextBuilder builder = new SSLContextBuilder();
            try {
                builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
            } catch (NoSuchAlgorithmException | KeyStoreException e) {
                KLog.error("poolingConnectionManager Exception","Pooling Connection Manager Initialisation failure because of " + ExceptionUtils.errInfo(e));
            }
    
            SSLConnectionSocketFactory sslsf = null;
            try {
                sslsf = new SSLConnectionSocketFactory(builder.build());
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                KLog.error("poolingConnectionManager Exception","Pooling Connection Manager Initialisation failure because of " + ExceptionUtils.errInfo(e));
            }
    
            Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
                    .<ConnectionSocketFactory>create().register("https", sslsf)
                    .register("http", new PlainConnectionSocketFactory())
                    .build();
    
            PoolingHttpClientConnectionManager poolingConnectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
            poolingConnectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
            return poolingConnectionManager;
        }
    
        @Bean
        public ConnectionKeepAliveStrategy connectionKeepAliveStrategy() {
            return new ConnectionKeepAliveStrategy() {
                @Override
                public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
                    HeaderElementIterator it = new BasicHeaderElementIterator
                            (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
                    while (it.hasNext()) {
                        HeaderElement he = it.nextElement();
                        String param = he.getName();
                        String value = he.getValue();
    
                        if (value != null && param.equalsIgnoreCase("timeout")) {
                            return Long.parseLong(value) * 1000;
                        }
                    }
                    return DEFAULT_KEEP_ALIVE_TIME_MILLIS;
                }
            };
        }
    
        @Bean
        public CloseableHttpClient httpClient() {
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectionRequestTimeout(REQUEST_TIMEOUT)
                    .setConnectTimeout(CONNECT_TIMEOUT)
                    .setSocketTimeout(SOCKET_TIMEOUT).build();
    
            return HttpClients.custom()
                    .setDefaultRequestConfig(requestConfig)
                    .setConnectionManager(poolingConnectionManager())
                    .setKeepAliveStrategy(connectionKeepAliveStrategy())
                    .build();
        }
    
    }
    

    3.接下来t通过CompletableFuture异步调用接口

    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.*;
    import org.springframework.stereotype.Component;
    import org.springframework.web.client.RestTemplate;
    
    import java.lang.reflect.Array;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CompletableFuture;
    
    @Component
    public class ReptileUtils {
        @Autowired
        RestTemplate restTemplate;
        public void sendReptile(String keyword){
    //CompletableFuture实现异步调用
            CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    //你自己的逻辑代码,通过restTemplate.exchange调用外部接口
                    HttpHeaders header = new HttpHeaders();
                    header.setContentType(MediaType.APPLICATION_JSON);
                    List<ReptileInput> params= new ArrayList<>();
                    HttpEntity<Object> requestEntity = new HttpEntity<>(params, header);
                    ResponseEntity<Object> response = restTemplate.exchange(url, HttpMethod.POST,requestEntity,Object.class);
                    return response;
            }).thenAccept(e->System.out.println(e));//thenAccept接口完成处理结果
    
        }
    }
    

     

    展开全文
  • Java异步调用模式

    千次阅读 2017-01-11 16:36:58
    Java异步调用模式 在长期的Java客户端开发中,最常见的一个客户端调用模式就是Java的异步调用。所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在Java语言中,简单的讲就是另启...

    Java异步调用模式

    在长期的Java客户端开发中,最常见的一个客户端调用模式就是Java的异步调用。所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在Java语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。虽然在1.5以前从异步线程中取得返回结果需要自己精心设计,但从JDK1.5开始引入了Future接口(FutureTask类)从异步执行的线程中取得返回值。
    Future 表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。FutureTask类是Future接口方法的一个基本实现,是一种可以取消的异步计算任务,计算是通过Callable接口来实现的。

    FutureTask有下面几个重要的方法:

           1. get()   阻塞一直等待执行完成拿到结果

           2. get(int timeout, TimeUnit timeUnit)  阻塞一直等待执行完成拿到结果,如果在超时时间内,没有拿到抛出异常

           3. isCancelled()  是否被取消

           4. isDone()   是否已经完成

           5. cancel(boolean mayInterruptIfRunning)  试图取消正在执行的任务

    Callable和Runnable有几点不同:

    • Callable规定的方法是call(),而Runnable规定的方法是run().
    • Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
    • call()方法可抛出异常,而run()方法是不能抛出异常的。
    运行Callable任务可拿到一个Future对象,通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
    举一个例子说明如何使用Future对象,如下:

    public class MyFutureTaskTest {
        public static void main(String[] args) {
            ExecutorService executor = Executors.newCachedThreadPool();
            FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
                public String call() throws Exception{ //建议抛出异常
                    try {
                        Thread.sleep(5* 1000);
                        return "Hello Welcome!";
                    }
                    catch(Exception e) {
                        throw new Exception("Callable terminated with Exception!"); // call方法可以抛出异常
                    }
                }
            });
            executor.execute(future);
            long t = System.currentTimeMillis();
            try {

    //            String result = future.get(3000, TimeUnit.MILLISECONDS); //取得结果,同时设置超时执行时间为5秒。
                String result = future.get(); //取得结果,同时设置超时执行时间为5秒。
                System.err.println("result is " + result + ", time is " + (System.currentTimeMillis() - t));
            } catch (InterruptedException e) {
                future.cancel(true);
                System.err.println("Interrupte time is " + (System.currentTimeMillis() - t));
            } catch (ExecutionException e) {
                future.cancel(true);
                System.err.println("Throw Exception time is " + (System.currentTimeMillis() - t));
    //        } catch (TimeoutException e) {
    //            future.cancel(true);
    //            System.err.println("Timeout time is " + (System.currentTimeMillis() - t));
            } finally {
                executor.shutdown();
            }

        }
    }

    运行结果如下:

     result is Hello Welcome!, time is 5000

    如果设置了超时时间,则运行结果如下:

    Timeout time is 3000

     可以看出设置超时时间的影响。

    再如一个多个运行任务的例子:

    public class MyAsyncExample implements Callable {
        private int num;

        public MyAsyncExample(int aInt) {
            this.num = aInt;
        }

        public String call() throws Exception {
            boolean resultOk = false;
            if (num == 0) {
                resultOk = true;
            } else if (num == 1) {
                while (true) { //infinite loop
                    System.out.println("looping....");
                    Thread.sleep(3000);
                }
            } else {
                throw new Exception("Callable terminated with Exception!"); 
            }
            if (resultOk) {
                return "Task done.";
            } else {
                return "Task failed";
            }
        }

        public static void main(String[] args) {
            //定义几个任务
            MyAsyncExample call1 = new MyAsyncExample(0);
            MyAsyncExample call2 = new MyAsyncExample(1);
            MyAsyncExample call3 = new MyAsyncExample(2);
            //初始任务执行工具。
            ExecutorService es = Executors.newFixedThreadPool(3);
            //执行任务,任务启动时返回了一个Future对象,
            Future future1 = es.submit(call1);
            Future future2 = es.submit(call2);
            Future future3 = es.submit(call3);
            try {
                //任务1正常执行完毕,future1.get()会返回线程的值
                System.out.println(future1.get());
                //任务2进行一个死循环,调用future2.cancel(true)来中止此线程。
                Thread.sleep(3000);
                System.out.println("Thread 2 terminated? :" + future2.cancel(true));
                //任务3抛出异常,调用future3.get()时会引起异常的抛出
                System.out.println(future3.get());
            } catch (ExecutionException ex) {
                ex.printStackTrace();
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }

     运行结果如下:

    looping....
    Task done.
    java.util.concurrent.ExecutionException: java.lang.Exception: Callable terminated with Exception!
     at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
    looping....
     at java.util.concurrent.FutureTask.get(FutureTask.java:83)
    Thread 2 terminated? :true
     at org.jevo.future.sample.MyAsyncExample.main(MyAsyncExample.java:57)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
     at java.lang.reflect.Method.invoke(Method.java:597)
     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:90)
    Caused by: java.lang.Exception: Callable terminated with Exception!
     at org.jevo.future.sample.MyAsyncExample.call(MyAsyncExample.java:30)
     at org.jevo.future.sample.MyAsyncExample.call(MyAsyncExample.java:13)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
     at java.lang.Thread.run(Thread.java:662)

    以上是对Future模型的例子。异步调用在Swing中应该十分广泛,当客户端调用一个'重'的服务端操作时,我们常采用这种方式。Swing中存在一个Future的实现——SwingWorker,这使我们十分方便地在客户端开发中使用异步调用,详细使用参见API文档。下面附一个不使用Future来实现取得异步调用的代码,如下:

    public abstract class AsyncWorker {
        private Object value;  //the running result
        private boolean finished = false;

        private static class ThreadVar {
            private Thread thread;

            ThreadVar(Thread t) {
                thread = t;
            }

            synchronized Thread get() {
                return thread;
            }

            synchronized void clear() {
                thread = null;
            }
        }

        private ThreadVar threadVar;

        /**
         * 返回当前线程运行结果。
         */
        protected synchronized Object getValue() {
            return value;
        }

        /**
         * 设置当前线程运行结果
         */
        private synchronized void setValue(Object x) {
            value = x;
        }

        /**
         * 调用都创建计算逻辑,将运算结果返回
         */
        public abstract Object construct();

        public void finished() {
            finished = true;
        }

        public boolean isFinished() {
            return finished;
        }

        public void interrupt() {
            Thread t = threadVar.get();
            if (t != null) {
                t.interrupt();
            }
            threadVar.clear();
        }

        public void stop() {
            Thread t = threadVar.get();
            if(t!=null) {
                t.stop();
            }
            threadVar.clear();
        }

        /**
         * 返回 construct方法运行结果。
         */
        public Object get() {
            while (true) {
                Thread t = threadVar.get();
                if (t == null) {
                    return getValue();
                }
                try {
                    t.join();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            }
        }


        public AsyncWorker() {
            final Runnable doFinished = new Runnable() {
                public void run() {
                    finished();
                }
            };

            Runnable doConstruct = new Runnable() {
                public void run() {
                    try {
                        setValue(construct());
                    }
                    finally {
                        threadVar.clear();
                    }

                    SwingUtilities.invokeLater(doFinished);
                }
            };

            Thread t = new Thread(doConstruct);
            threadVar = new ThreadVar(t);
        }

        /**
         * Start the worker thread.
         */
        public void start() {
            finished = false;
            Thread t = threadVar.get();
            if (t != null) {
                t.start();
            }
        }

        public static void main(String[] args) {
            AsyncWorker worker = new AsyncWorker() {
                public Object construct() {
                    try {
                        Thread.sleep(3*1000);
                    }
                    catch(Exception e){}
                    return "hello world";

                }

                public void finished() {
                    super.finished();
                    //取线程运行返回的结果
    //                Object obj = this.get();
    //                System.err.println("return is " + obj);
                }
            };

            long t = System.currentTimeMillis();
            worker.start();
            Object obj = worker.get(); //取得运行结果
            System.err.println("return is " + obj + ", time = " + (System.currentTimeMillis() - t));

        }
    }

    在上述代码中,调用者只需要扩展AsyncWorker类定义可计算的逻辑,并将逻辑结果返回。返回结果会保存在一变量中。当调用者调用返回结果时,如果计算还未完成,将调用Thread.join()阻塞线程,直到计算结果返回。用法上是不是与FutureTask相似?在Swing异步调用中,还需要结合等待对话框来表示计算运行进程,从而使运行界面显示更加友好。 

    再看一下线程的join方法,我们知道线程可被Object.wait、Thread.join和Thread.sleep三种方法之一阻塞,当接收到一个中断异常(InterruptedException)时,可提早地终结被阻塞状态。Thread.join的使用情况却有所不同:我们对一些耗时运算,常启用一个主线程来生成并启动一些子线程,在子线程中进行耗时的运算,当主线程继续处理完其他的事务后,需要调用子线程的处理结果,这个时候就要使用join();。Joint方法将使主线程等待子线程运行结束,即join()方法后的代码,只有等到子线程运行结束后才能被执行。参考下例:

    public class ChildThread extends Thread {
        public ChildThread() {
            super("ChildThread");
        }

        public void run() {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " start.");
            try {
                for (int i = 0; i < 5; i++) {
                    System.out.println(threadName + " loop at " + i);
                    Thread.sleep(1000);
                }
                System.out.println(threadName + " end.");
            } catch (Exception e) {
                System.out.println("Exception from " + threadName + ".run");
            }
        }
    }

     

    public class ParentThread extends Thread {
        ChildThread t1;

        public ParentThread(ChildThread t1) {
            super("ParentThread");
            this.t1 = t1;
        }

        public void run() {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " start.");
            try {
                t1.join();   //ChildThread 线程t1结束后,才能运行此行代码后的代码。
                System.out.println(threadName + " end.");
            } catch (Exception e) {
                System.out.println("Exception from " + threadName + ".run");
            }
        }

        public static void main(String[] args) {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " start.");
            ChildThread t1 = new ChildThread();
            ParentThread t = new ParentThread(t1);
            try {
                t1.start();
                Thread.sleep(2000);
                t.start();
                t.join();//此处注释后,将直接运行到结束代码. 注释此处代码,比较运行结果
            } catch (Exception e) {
                System.out.println("Exception from main");
            }
            System.out.println(threadName + " end!");
        }

    }

    在t.join()被注释前运行结果如下:

    main start.
    ChildThread start.
    ChildThread loop at 0
    ChildThread loop at 1
    ParentThread start.
    ChildThread loop at 2
    ChildThread loop at 3
    ChildThread loop at 4
    ChildThread end.
    ParentThread end.
    main end!

    当t.join()被注释后运行结果如下: 

    main start.
    ChildThread start.
    ChildThread loop at 0
    ChildThread loop at 1
    main end!
    ParentThread start.
    ChildThread loop at 2
    ChildThread loop at 3
    ChildThread loop at 4
    ChildThread end.
    ParentThread end.

    可见ParentThread线程仍等待ChildThread线程运行结束后才运行完毕,而Main线程与ParentThread线程的运行并没有保持等待。

    展开全文
  • A和B之间是同步调用 B和C是异步调用 现在需要A同步请求B时 B根据A的请求去异步请求C A等待 B要等到C异步返回数据之后 在将C返回的数据同步返回给A 。 有没有好的解决方案 。 感谢各位大神给我解答一下。
  • AsyncTask -- Java异步调用框架

    千次阅读 2017-02-28 17:40:09
    原创文章,转载请注明...  AsyncTask是个人编写的一个Java异步调用框架,支持用户: 1)自定义Task,并可设置Task的类型(Type), 子类型(subType),超时时间(TImeout),标识(Flag-可用来区分不同的Task),

    原创文章,转载请注明作者:jmppok及出处:http://blog.csdn.net/jmppok/article/details/44590991



    AsyncTask是个人编写的一个Java异步调用框架,支持用户:

    1)自定义Task,并可设置Task的类型(Type), 子类型(subType),超时时间(TImeout),标识(Flag-可用来区分不同的Task),Task的输入参数(input)等。

    2)可通过submitTask,提交 到框架中异步执行,框架查找对应的TaskExectuor,多线程执行。

    3)可自定义对应TaskExecutor,通过配置添加到框架中。TaskExecutor支持Execotor Chain, 多个Executor可以组合在一起顺序执行。并且支持在Task执行过程中,实时通知任务调用者Task的状态,进度等。

    4)用户可使用TaskCollector通过TaskManager查询所有的Task,支持按Task Id,Task Type, Task SubType, Task State, Task Flag, Task beginTIme, Task finishTime等多种方式的组合查询。

    5)支持持久化,用户提交的Task可以被存储在数据库中。即使Task在执行过程中被中断,重新启动后会从数据库中恢复后继续执行。

    6)用户可通过查询接口可获取Task的引用ITaskReference,通过ITaskReference可实时获取Task的状态(State)和进度Progress。

    7)用户可定义Task的FinishedCallBack回调,在Submit Task时传入,在Task完成后自动回调。

    8)通过ITaskReference的waitForTask,支持用户以同步方式使用。

    9)用户可通过ITaskReference获取Task的执行结果或错误信息。


    代码:https://git.oschina.net/jmpp/AsyncTask


    1.为什么需要AsyncTask?与Asyn4J 区别?

    1.1Java传统的Thread 和 Runable功能不足

    Java提供了Thread,ThreadPool等多线程编程接口。但这些都是基础接口,虽然使用方便,但功能比较简单,很多场景下都无法满足需求。

    比如下面的几个场景:

    1)我需要提交一个任务,改任务在后台异步执行,同时我要实时观察任务的状态,进度等信息。

    2)在提交任务时希望传入参数,任务完成后能主动通知我,并能获取结果。

    3)任务持久化,我希望在任务执行完毕后,可以查询到执行的任务列表。或者任务失败后能重新执行。

    如果要实现这些场景,Java本身自带的接口显然无法满足,必须要有一个新的框架来实现。


    1.2 Asyn4J

    Asyn4J也是一个类似的框架,但它目前还不支持任务的超时设置,持久化,任务回调等功能。



    2.设计及实现

    2.1接口设计

    直接上图


    2.2 代码实现

    具体实现代码见 Git@OSChttps://git.oschina.net/jmpp/AsyncTask

    代码结构如下:


    这里简单说一下实现思路:

    1) 整个实现还是基于Java的Thread和ThreadPool,没有用第三方框架。

    2)持久化基于MySQL,只有一个数据库表tasks,见tasks.sql.

    3)持久化层的实现用到了Mybatis,给予Mybatis的代码生成工具,直接生成了tasks表对应的数据结构

    4)要持久化必然还要用到对象序列化,这里使用了Kryo。为啥用Kryo,见我的另一篇文章:Java对象序列化小结

    5)日志使用了Log4j。


    3.测试

    具体可见代码:https://git.oschina.net/jmpp/AsyncTask

    3.1自定义MyTask

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. package test.mytask;  
    2.   
    3. import com.lenovo.asynctask.Task;  
    4.   
    5. /** 
    6.  * 类 MyTask 的实现描述:TODO 类实现描述 
    7.  *  
    8.  * @author ligh4 2015年3月12日下午2:42:56 
    9.  */  
    10. public class MyTask extends Task {  
    11.   
    12.     /** 
    13.      * @param taskType 
    14.      * @param inputParam 
    15.      * @param timeoutMills 
    16.      */  
    17.     public MyTask(Object inputParam, int timeoutMills) {  
    18.         super(MyTask.class.getSimpleName(), inputParam, timeoutMills);  
    19.         setNeedPersistence(true);  
    20.     }  
    21.   
    22. }  


    3.2自定义MyTaskExecutor

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. package test.mytask;  
    2.   
    3. import com.lenovo.asynctask.ITaskExecutor;  
    4. import com.lenovo.asynctask.ITaskReferenceInternal;  
    5. import com.lenovo.asynctask.TaskState;  
    6. import com.lenovo.asynctask.util.LogHelper;  
    7.   
    8. /** 
    9.  * 类 TestTaskExecutor 的实现描述:TODO 类实现描述 
    10.  *  
    11.  * @author ligh4 2015年3月12日下午2:43:19 
    12.  */  
    13. public class MyTaskExecutor extends ITaskExecutor {  
    14.   
    15.     /** 
    16.      * @author ligh4 2015年3月12日下午2:46:51 
    17.      */  
    18.     @Override  
    19.     public Object execute(ITaskReferenceInternal taskRef) {  
    20.         LogHelper.debug("begin execute MyTask...");  
    21.   
    22.         for (int i = 0; i < 100; i++) {  
    23.             try {  
    24.                 Thread.sleep(1000);  
    25.             } catch (Exception e) {  
    26.                 LogHelper.exception(e);  
    27.             }  
    28.             taskRef.setProgress(i + 1);  
    29.         }  
    30.   
    31.         return taskRef.getInput().toString().toUpperCase();  
    32.     }  
    33.   
    34.     /** 
    35.      * @author ligh4 2015年3月12日下午2:46:51 
    36.      */  
    37.     @Override  
    38.     public Object continue_execute(ITaskReferenceInternal taskRef) {  
    39.         if (taskRef.getState() == TaskState.running) {  
    40.             int i = taskRef.getProgress();  
    41.             for (; i < 100; i++) {  
    42.                 try {  
    43.                     Thread.sleep(1000);  
    44.                 } catch (Exception e) {  
    45.                     LogHelper.exception(e);  
    46.                 }  
    47.                 taskRef.setProgress(i + 1);  
    48.             }  
    49.   
    50.             return taskRef.getInput().toString().toUpperCase();  
    51.         } else {  
    52.             taskRef.setState(TaskState.failed, "");  
    53.             return null;  
    54.         }  
    55.   
    56.     }  
    57.   
    58. }  


    3.3配置MyTaskExecutor

    taskexecutors.properties中添加:

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. MyTask = test.mytask.MyTaskExecutor  

    其实是task的type     =     task的Executor     

    3.4提交Task并实时监听进度

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. package test.mytask;  
    2.   
    3. import java.util.List;  
    4.   
    5. import com.lenovo.asynctask.ITaskFinishedCallback;  
    6. import com.lenovo.asynctask.ITaskReference;  
    7. import com.lenovo.asynctask.TaskCollector;  
    8. import com.lenovo.asynctask.TaskManager;  
    9. import com.lenovo.asynctask.TaskState;  
    10. import com.lenovo.asynctask.util.DateUtil;  
    11. import com.lenovo.asynctask.util.LogHelper;  
    12.   
    13. /** 
    14.  * 类 TestContinueTask 的实现描述:TODO 类实现描述 
    15.  *  
    16.  * @author ligh4 2015年3月23日上午9:42:14 
    17.  */  
    18. public class TestContinueTask {  
    19.   
    20.     /** 
    21.      * @author ligh4 2015年3月12日下午2:52:45 
    22.      * @param args 
    23.      */  
    24.     public static void main(String[] args) throws Exception {  
    25.         TaskManager.instance().start();  
    26.   
    27.         List<ITaskReference> tasks = queryRunningTasks();  
    28.         if (tasks == null || tasks.size() == 0) {  
    29.             submitAndWaitTask();  
    30.         } else {  
    31.             for (ITaskReference taskReference : tasks) {  
    32.                 queryTaskProgress(taskReference);  
    33.             }  
    34.         }  
    35.   
    36.         TaskManager.instance().stop();  
    37.     }  
    38.   
    39.     public static void submitAndWaitTask() throws Exception {  
    40.         MyTask task = new MyTask("liguanghui"200000);  
    41.         ITaskReference taskReference = TaskManager.instance().submitTask(task,  
    42.                 new ITaskFinishedCallback() {  
    43.   
    44.                     @Override  
    45.                     public void onTaskFinished(ITaskReference taskRef) {  
    46.                         LogHelper.debug(taskRef.getId() + ";" + taskRef.getState().toString() + ";"  
    47.                                 + DateUtil.format(taskRef.getStartedTime()) + "  "  
    48.                                 + DateUtil.format(taskRef.getFinishedTime()) + ";"  
    49.                                 + taskRef.getResult().toString());  
    50.   
    51.                     }  
    52.                 });  
    53.   
    54.         queryTaskProgress(taskReference);  
    55.     }  
    56.   
    57.     public static void queryTaskProgress(ITaskReference taskReference) throws Exception {  
    58.         String taskID = taskReference.getId();  
    59.         while (!taskReference.isFinished()) {  
    60.             LogHelper.debug(taskID + ": progress " + taskReference.getProgress());  
    61.             Thread.sleep(1000);  
    62.         }  
    63.         LogHelper.debug(taskID + ": finished. ");  
    64.     }  
    65.   
    66.     public static List<ITaskReference> queryRunningTasks() {  
    67.         TaskCollector collector = new TaskCollector();  
    68.         collector.setTaskStateFilter(new TaskState[] { TaskState.running });  
    69.         collector.setTaskTypeFilter(new String[] { MyTask.class.getSimpleName() });  
    70.         return TaskManager.instance().findTasks(collector);  
    71.   
    72.     }  
    73. }  


    3.5终止Task执行然后重新启动程序,进行Task恢复测试

    还是同3.4一样的代码

    1)第一次运行没有Task,会提交一个Task。  submitAndWaitTask();

    2)如果改Task没有被执行完毕就被终止,第二次启动后该Task就会恢复。

    3)这时queryRunningTasks就会查询到正在运行的Task,并且进度到等待该Task的分支。

    4)当然如果你停止的时间很长才重新启动,会发现Task超时。

    4.不足

    1.整体实现比较简单,特别是数据库表中存储Java对象序列化的字段,偷懒用的varchar(2000),可能超出,最好改为Blob。(为啥当时不用Blob,因为偷懒,如果Blob的话mybatis生成的代码就比较复杂了,会有一个XXwithBlob,调用不方便....)

    2.线程池个数写死了,应该可以配置。

    3.测试比较简单,可能有未知bug。



    展开全文
  • Java 实现异步调用

    万次阅读 热门讨论 2018-04-03 15:27:12
    首先 我遇到的问题是 接口调用时需要更新缓存 而更新缓存又是个说快不快的过程 所以打算做异步调用 返回我所需要的结果即可 ,至于缓存什么时候更新完 就不是我所需要关注的了废话不多说 上代码public class ...

    首先 我遇到的问题是 接口调用时需要更新缓存 而更新缓存又是个说快不快的过程 所以打算做异步调用 返回我所需要的结果即可 ,至于缓存什么时候更新完 就不是我所需要关注的了

    废话不多说 上代码

    public class MyExecutor {

        private ExecutorService executor = Executors.newCachedThreadPool() ;

        public void fun() throws Exception {

            executor.submit(new Runnable(){

                @override

                    public void run() {

                        try {

                            //要执行的业务代码,我们这里没有写方法,可以让线程休息几秒进行测试

                            Thread.sleep(10000);

                            System.out.print("睡够啦~");

                        }catch(Exception e) {

                            throw new RuntimeException("报错啦!!");

                        }

                    }

            });

        }

    }

    public class Demo{

        

        public static void main(String[] args) {

            

             MyExecutor  myExecutor = new MyExecutor();

             try {

                myExecutor.fun();

                System.our.print("你先睡着,我先回家啦~");

            }catch(Exception e) {

                 throw new RuntimeException("业务程序报错啦!!");

            }

        }

    }

    好啦 代码到此结束 (ps:纯手打 若有错 请见谅) 

    运行主方法 

    会先打印(你先睡着,我先回家啦~)

    然后(睡够啦~)

    也就是说 在需要异步执行的方法未执行完毕时 主程序已经返回结果了  不需要继续等待 这样可以保证程序先返回结果 再继续执行不需要等待的繁琐的任务  当然也可以加一些方法去判断异步方法是否执行完毕。

    说一下Executors类

    这个类是用来创建线程池的

    有这么几个方法

    1、newFixedThreadPool() 创建固定大小的线程池 线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程

    2、newCachedThreadPool() 创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60s不执行任务)的线程,当任务数量增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于系统(JVM)能够创建的最大线程大小

    3、newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行

    4、newScheduledThreadPool() 创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求

    5、newSingleThreadScheduledExecutor() 创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求


    展开全文
  • 异步调用 在网上看了半天文章,现在对异步调用的理解就是在一个进程执行的过程中,有一个执行很长时间的方法,这时候可以创建一个线程去异步调用这个方法,然后在方法执行完成之后调用回调函数告诉主进程他执行完了...
  • java线程异步调用

    千次阅读 2018-11-07 09:54:13
    本文主要讲解生产环境中遇到的异步加载数据问题以及相应的解决思路。 系统登录的时候,需要根据用户ID生成一些和当前...在Java程序中,如果想实现异步调用方法的功能,需要通过线程方式实现,即实现java.lang.Ru...
  • JAVA异步调用

    万次阅读 2017-12-11 10:14:53
    那么后台的异步调用改怎么实现呢?1、 使用callable接口,加线程池 2、 使用JAVA的回调机制 3、 使用spring框架默认的异步调用方式一、第一种方式是比较简单,也比较常用的方式,下面看一段示例代码:public void ...
  • 创建线程执行FutureTask并通过FutureTask得到异步结果; public static void main(String[] args) throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ...
  • Java中的异步调用

    万次阅读 2015-11-29 17:00:32
    Java中的异步调用java中可以通过接口的多态特性来达到调用外部实现类的接口方法的效果,比如我们经常使用的框架,驱动程序等,都会定义许多的接口,向外部暴露统一的接口方法,内部则调用这些接口来达到业务逻辑的实现....
  • 现在比如说我有10笔交易,我在执行第二笔交易的时候需要根据第一笔交易的结果来判断是否需要做该笔交易。注:如果是同步调用,应该很简单,但是现在因为业务需要不能改成同步调用。求大神支招。
  • Java调用第三方接口示范

    万次阅读 多人点赞 2018-10-08 15:03:53
    人工智能,零基础入门!...【1】准备工作:在项目的工具包下导入HttpClientUtil这个工具类,或者也可以使用Spring框架的restTemplate来调用,上面有调用接口的方法【分为Get和Post方式的有参和无参调用】: pa...
  • webservice接口异步调用

    千次阅读 2018-07-25 12:19:34
    webservice异步调用其实很简单,当客户端添加了webservice的引用之后会由系统自动生成一段代码,这段自动生成的代码中有同步方法和异步方法,还会提供一个异步完成后事件,以便在异步结束后做相关处理。贴一段代码...
  • 1、不存在继发关系的异步操作,同时触发 例1: let [foo, bar] = Promise.all([getFoo(), getBar()]); 例2: let fooPromise = getFoo(); let barPromise = getBar(); let foo = await fooPromise(); let Bar =...
  • 教你如何用 Java 实现异步调用

    千次阅读 2019-09-12 12:16:39
    导读 本教程教你如何使用Java实现异步调用。 一、创建线程 @Test public void test0() throws Exception { System.out.println("main函数开始执行"); Thr...
  • 最近研究了一下异步调用,接下来几篇博客是跟异步调用相关的,首先使用@FunctionalInterface接口实现一个简单的步调用,也就是本篇博客主要内容。 然后再加上ZMQ,实现一个带网络通信的异步调用。再下一步就是复杂...
  • 异步调用实现java

    万次阅读 2015-12-05 21:53:25
    异步调用主要用于当前程序的执行不用等待调用方法执行结束就可以继续执行。用一个最简单的例子来说,当前的方法要调用一个发送短信的方法,但是发送短信的方法调用了外部的接口,这样就导致短信发送方法耗费的时间很...
  • Java方法异步调用,并行

    千次阅读 2019-12-27 15:58:36
    Java同一个方法在调用多个方法时默认是串行的方式,我的这个业务调用6个方法串行需要4秒左右,由于需要处理数据量比较多,需要优化 原来的逻辑: //裁判文书对象转换 List<JudicialUpdateItem> ...
  • 异步调用实现对比 java

    千次阅读 2014-07-06 22:13:17
    异步调用主要用于当前程序的执行不用等待调用方法执行结束就可以继续执行。用一个最简单的例子来说,当前的方法要调用一个发送短信的方法,但是发送短信的方法调用了外部的接口,这样就导致短信发送方法耗费的时间很...
  • SpringBoot 异步执行方法,在接口调用时开启一个新线程做一些操作,这个操作结果,不影响返回值 项目启动类或者任意被Spring容器管理的类,必须要有一个`@EnableAsync`注解,标识项目开启异步功能 然后在需要异步...
  • java实现异步调用的方法

    万次阅读 2018-03-08 00:19:29
    概念的理解同步/异步:关于同步,我们知道jvm解释执行class文件时候,就是按照代码从上到下的顺序执行的,这里我们正常的程序开发流程,如果一个方法中执行了doSomething1,doSomething2两个方法,正常情况下...
  • java8多线程异步调用 CompletableFuture 详解
  • java的同步调用、回调和异步调用

    万次阅读 2012-04-11 17:48:26
    软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。   同步调用:一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用; 回 调:一种双向调用模式...
  • Java future 到 Guava ListenableFuture实现异步调用

    万次阅读 多人点赞 2016-04-24 09:11:14
    针对于开发人员来说,可能之前的一个业务只需要调取一次第三方接口以获取数据,而如今随着需求的增加,该业务需调取多个不同的第三方接口。通常,我们处理方法是让代码同步顺序的去调取这些接口。显然,调取接口数量...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 175,764
精华内容 70,305
关键字:

java异步调用接口

java 订阅