精华内容
下载资源
问答
  • 原标题:多线程切换的几种方法,你知道几种?刘望舒”,马上关注,每天早上8:42准时推送作者:蓝灰_qhttps://www.jianshu.com/p/31d0852c0760我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有...

    原标题:多线程切换的几种方法,你知道几种?

    刘望舒”,马上关注,每天早上8:42准时推送

    作者:蓝灰_q

    https://www.jianshu.com/p/31d0852c0760

    我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有多线程的内容,这里简单总结和探讨一下常见的多线程切换方式。

    我们先回顾一下Java多线程的几个基础内容,然后再分析总结一些经典代码中对于线程切换的实现方式。

    几点基础

    多线程切换,大概可以切分为这样几个内容:如何开启多个线程,如何定义每个线程的任务,如何在线程之间互相通信。

    Thread

    Thread可以解决开启多个线程的问题。

    Thread是Java中实现多线程的线程类,每个Thread对象都可以启动一个新的线程,注意是可以启动,也可以不启动新线程:

    thread.run; //不启动新线程,在当前线程执行

    thread.start; //启动新线程。

    另外就是Thread存在线程优先级问题,如果为Thread设置较高的线程优先级,就有机会获得更多的CPU资源,注意这里也是有机会,优先级高的Thread不是必然会先于其他Thread执行,只是系统会倾向于给它分配更多的CPU。

    默认情况下,新建的Thread和当前Thread的线程优先级一致。

    设置线程优先级有两种方式:

    thread.setPriority(Thread.MAX_PRIORITY); //1~10,通过线程设置

    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //-20~19,通过进程设置

    这两种设置方式是相对独立的,在Android中,一般建议通过Process进程设置优先级。

    ThreadPool

    Thread本身是需要占用内存的,开启/销毁过量的工作线程会造成过量的资源损耗,这种场景我们一般会通过对资源的复用来进行优化,针对IO资源我们会做IO复用(例如Http的KeepAlive),针对内存我们会做内存池复用(例如Fresco的内存池),针对CPU资源,我们一般会做线程复用,也就是线程池。

    所以,在Android开发中,一般不会直接开启大量的Thread,而是会使用ThreadPool来复用线程。

    Runnable

    Runnable主要解决如何定义每个线程的工作任务的问题。

    Runnable是Java中实现多线程的接口,相对Thread而言,Runnable接口更容易扩展(不需要单继承),而且,Thread本身也是一种Runnable:

    publicclassThreadimplementsRunnable{

    相比Thread而言,Runnable不关注如何调度线程,只关心如何定义要执行的工作任务,所以在实际开发中,多使用Runnable接口完成多线程开发。

    Callable

    Callable和Runnable基本类似,但是Callable可以返回执行结果。

    线程间通信

    Thread和Runnable能实现切换到另一个线程工作(Runnable需要额外指派工作线程),但它们完成任务后就会退出,并不注重如何在线程间实现通信,所以切换线程时,还需要在线程间通信,这就需要一些线程间通信机制。

    Future

    一般来说,如果要做简单的通信,我们最常用的是通过接口回调来实现。

    Future就是这样一种接口,它可以部分地解决线程通信的问题,Future接口定义了done、canceled等回调函数,当工作线程的任务完成时,它会(在工作线程中)通过回调告知我们,我们再采用其他手段通知其他线程。

    mFuture = newFutureTask(runnable) {

    @Override

    protectedvoiddone{

    ... //还是在工作线程里

    }

    };

    Condition

    Condition其实是和Lock一起使用的,但如果把它视为一种线程间通信的工具,也说的通。

    因为,Condition本身定位就是一种多线程间协调通信的工具,Condition可以在某些条件下,唤醒等待线程。

    Locklock= newReentrantLock;

    Condition notFull = lock.newCondition; //定义 Lock的Condition

    ...

    while( count== items.length)

    notFull.await;//等待condition的状态

    ...

    notFull.signal;//达到condition的状态

    Handler

    其实,最完整的线程间通信机制,也是我们最熟悉的线程间通信机制,莫过于Handler通信机制,Handler利用线程封闭的ThreadLocal维持一个消息队列,Handler的核心是通过这个消息队列来传递Message,从而实现线程间通信。

    AsyncTask的多线程切换

    回顾完多线程的几个基础概念,先来看看简单的多线程切换,Android自带的AsyncTask。

    AsyncTask主要在doInBackground函数中定义工作线程的工作内容,在其他函数中定义主线程的工作内容,例如onPostExecute,这里面必然涉及两个问题:

    1.如何实现把doInBackground抛给工作线程

    2.如何实现把onPostExecute抛给主线程

    1.如何实现把doInBackground抛给工作线程

    在使用AsyncTask时,我们一般会创建一个基于AsyncTask的扩展类或匿名类,在其中实现几个抽象函数,例如:

    privateclassMyTaskextendsAsyncTask{

    @Override

    protectedvoidonPreExecute{... }

    @Override

    protectedLong doInBackground(String... params){... }

    @Override

    protectedvoidonProgressUpdate(Object... values){... }

    @Override

    protectedvoidonPostExecute(Long aLong){... }

    @Override

    protectedvoidonCancelled{... }

    然后,我们会实例化这个AsyncTask:

    MyTask mTask = newMyTask;

    在AsyncTask源码中,我们看到,构造函数里会创建一个WorkerRunnable:

    publicAsyncTask{

    mWorker = newWorkerRunnable { //这是一个Callable

    publicResult callthrowsException{

    ...

    result = doInBackground(mParams); //在工作线程中执行

    ...

    WorkerRunnable实际上是一个Callable对象,所以,doInBackground是被包在一个Callable对象中了,这个Callable还会被继续包装,最终被交给一个线程池去执行:

    Runnable mActive;

    ...

    if((mActive = mTasks.poll) != null) {

    THREAD_POOL_EXECUTOR.execute(mActive); //交给线程池执行

    }

    梳理一下,大致过程为:

    定义doInBackground-->被一个Callable调用-->层层包为一个Runnable-->交给线程池执行。

    这样就解决了第一个问题,如何实现把doInBackground抛给工作线程。

    2.如何实现把onPostExecute抛给主线程

    首先,我们要知道工作任务何时执行完毕,就需要在工作完成时触发一个接口回调,也就是前面说过的Future,还是看AsyncTask源码:

    publicAsyncTask{

    mWorker = newWorkerRunnable {

    publicResult callthrowsException{

    ...

    };

    mFuture = newFutureTask(mWorker) {

    @Override

    protectedvoiddone{ //Future的回调

    try{

    postResultIfNotInvoked(get); //get是FutureTask接口函数

    ...

    }

    };

    }

    这样,我们就知道可以处理onPostExecute函数了,但是,我们还需要把它抛给主线程,主要源码如下:

    //mWorker、mFuture和都会指向postResult函数

    privateResult postResult(Result result){

    @SuppressWarnings( "unchecked")

    Message message = getHandler.obtainMessage(MESSAGE_POST_RESULT,

    newAsyncTaskResult( this, result));

    message.sendToTarget;

    returnresult;

    }

    //getHandler会指向InternalHandler

    privatestaticclassInternalHandlerextendsHandler{

    publicInternalHandler{

    super(Looper.getMainLooper); //指向MainLooper

    }

    @SuppressWarnings({ "unchecked", "RawUseOfParameterizedType"})

    @Override

    publicvoidhandleMessage(Message msg){

    AsyncTaskResult> result = (AsyncTaskResult>) msg.obj;

    switch(msg.what) {

    caseMESSAGE_POST_RESULT:

    // There is only one result

    result.mTask.finish(result.mData[ 0]); //通过handler机制,回到主线程,调用finish函数

    ...

    }

    //在Handler中,最终会在主线程中调用finish

    privatevoidfinish(Result result){

    if(isCancelled) {

    onCancelled(result);

    } else{

    onPostExecute(result); //调用onPostExecute接口函数

    }

    mStatus = Status.FINISHED;

    }

    从源码可以看到,其实AsyncTask还是通过Handler机制,把任务抛给了主线程。

    总体来说,AsyncTask的多线程任务是通过线程池实现的工作线程,在完成任务后利用Future的done回调来通知任务完成,并通过handler机制通知主线程去执行onPostExecute等回调函数。

    EventBus的多线程切换

    EventBus会为每个订阅事件注册一个目标线程,所以需要从发布事件的线程中,根据注册信息,实时切换到目标线程中,所以,这是个很典型的多线程切换场景。

    根据EventBus源码,多线程切换的主要判断代码如下:

    switch(subion.subscriberMethod.threadMode) {

    casePOSTING:

    invokeSubscriber(subion, event); //直接在当前线程执行

    break;

    caseMAIN:

    if(isMainThread) {

    invokeSubscriber(subion, event); //在当前主线程执行

    } else{

    mainThreadPoster.enqueue(subion, event); //当然不是主线程,交给主线程执行

    }

    break;

    caseBACKGROUND:

    if(isMainThread) {

    backgroundPoster.enqueue(subion, event); //当前线程为主线程,交给工作线程

    } else{

    invokeSubscriber(subion, event); //直接在当前工作线程执行

    }

    break;

    caseASYNC:

    asyncPoster.enqueue(subion, event); //异步执行

    break;

    default:

    thrownewIllegalStateException( "Unknown thread mode: "+ subion.subscriberMethod.threadMode);

    }

    所以,在EventBus里,如果需要做线程间切换,主要是抛给不同的任务队列,实现线程间切换。

    从任务队列判断,切换目标包括主线程Poster、backgroundPoster和asyncPoster这样三种。

    我们先看任务队列的设计:

    任务队列

    因为EventBus不能判断有哪些任务会并行,所以它采用了队列的设计,多线程任务(EventBus的事件)会先进入队列,然后再处理队列中的工作任务,这是典型的生产--消费场景。

    主线程Poster、backgroundPoster和asyncPoster都是任务队列的不同实现。

    主线程Poster

    负责处理主线程的mainThreadPoster是Handler的子类:

    finalclassHandlerPosterextendsHandler{

    ...

    voidenqueue(Subion subion, Object event){

    ...

    synchronized( this) { //因为主线程只有一个,需要线程安全

    queue.enqueue(pendingPost);

    ...

    if(!sendMessage(obtainMessage)) { //作为handler发送消息

    ...

    //在主线程中处理消息

    @Override

    publicvoidhandleMessage(Message msg){

    ...

    }

    从源码可以看出,这个Poster其实是一个Handler,它采用了哪个线程的消息队列,就决定了它能和哪个线程通信,我们确认一下:

    EventBus(EventBusBuilder builder) {

    ...

    mainThreadPoster = newHandlerPoster( this, Looper.getMainLooper, 10); //获取主线程的MainLooper

    所以,EventBus是扩展了一个Handler,作为主线程的Handler,通过Handler消息机制实现的多线程切换。

    当然,这个Handler本事,又多了一层queue。

    backgroundPoster和asyncPoster

    backgroundPoster和asyncPoster其实都是使用了EventBus的线程池,默认是个缓存线程池:

    privatefinalstaticExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool;

    所以,backgroundPoster和asyncPoster都是把任务交给线程池处理,这样实现的多线程切换。

    不过,backgroundPoster和asyncPoster也有一些不同,我们知道,在newCachedThreadPool中,最大线程数就是Integer的最大值,相当于不设上限,所以可以尽可能多的启动线程,asyncPoster就是这样做的,enqueu和run都没做同步,为每个事件单独开启新线程处理。

    而在backgroundPoster中,可以尽量复用线程,主要方法是在run的时候,做个1秒的等待:

    @ Override

    publicvoidrun{

    ...

    PendingPost pendingPost = queue.poll( 1000); //允许等待1秒

    因为做了这一秒的挂起等待,在enqueue和run时,都需要用synchronized (this) 来确保线程安全。

    另外,其实这里面还有个很重要的用法,就是Executors.newCachedThreadPool中的SynchronousQueue:

    publicstaticExecutorService newCachedThreadPool{

    returnnewThreadPoolExecutor( 0, Integer.MAX_VALUE,

    60L, TimeUnit.SECONDS,

    newSynchronousQueue); //用于辅助线程切换的阻塞队列

    }

    这个SynchronousQueue,在OkHttp中也使用了:

    //okhttp3.Dispatcher源码

    publicsynchronizedExecutorService executorService{

    if(executorService == null) {

    executorService = newThreadPoolExecutor( 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,

    newSynchronousQueue, Util.threadFactory( "OkHttp Dispatcher", false)); //用于辅助线程切换的阻塞队列

    }

    returnexecutorService;

    }

    SynchronousQueue与普通队列不同,不是数据等线程,而是线程等数据,这样每次向SynchronousQueue里传入数据时,都会立即交给一个线程执行,这样可以提高数据得到处理的速度。

    总的来看,EventBus还是采用线程池实现工作线程,采用handler机制通知到主线程。不同在于,它采用的queue的队列方式来管理所有的跨线程请求,而且它利用了SynchronousQueue阻塞队列来辅助实现线程切换。

    RxJava的多线程切换

    其实在多线程管理这方面,RxJava的线程管理能力是非常令人赞叹的。

    RxJava的主要概念是工作流,它可以把一序列工作流定义在一个线程类型上:

    myWorkFlow.getActResponse(myParam)

    .subscribeOn(Schedulers.io) //指定线程

    .xxx //其他操作

    这个构建工作流的过程其实挺复杂的,不过如果我们只看线程操作这部分,其实流程非常清晰,我们追踪一下subscribeOn的源码(RxJava2):

    //进入subscribeOn

    publicfinalFlowable subscribeOn( @NonNullScheduler scheduler) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    returnsubscribeOn(scheduler, !( thisinstanceof FlowableCreate));

    }

    //继续进入subscribeOn

    publicfinalFlowable subscribeOn( @NonNullScheduler scheduler, boolean requestOn) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    returnRxJavaPlugins.onAssembly(new FlowableSubscribeOn( this, scheduler, requestOn));

    }

    然后,进入FlowableSubscribeOn类

    //进入FlowableSubscribeOn类

    publicFlowableSubscribeOn(Flowable source, Scheduler scheduler, booleannonScheduledRequests){

    ...

    this.scheduler = scheduler;

    ...

    }

    @Override

    publicvoidsubscribeActual(finalSubscriber superT> s){

    Scheduler.Worker w = scheduler.createWorker; //根据参数值,如Schedulers.io创建worker

    finalSubscribeOnSubscriber sos = newSubscribeOnSubscriber(s, w, source, nonScheduledRequests); //根据worker创建SubscribeOnSubscriber

    s.onSubscribe(sos);

    w.schedule(sos);

    }

    这个SubscribeOnSubscriber是个内部类:

    SubscribeOnSubscriber(Subscriber superT> actual, Scheduler.Worker worker, Publisher source, booleanrequestOn) {

    ...

    this.worker = worker;

    ...

    }

    ...

    voidrequestUpstream(finallongn, finalSubion s){

    ...

    worker.schedule( newRequest(s, n)); //worker会安排如何执行runnable(Request是一个runnable)

    ...

    }

    而这个worker,其实就是我们输入的线程参数,如Schedulers.io,这个io是这样定义的:

    //io.reactivex.schedulers.Schedulers源码

    static{

    SINGLE = RxJavaPlugins.initSingleScheduler( newSingleTask);

    COMPUTATION = RxJavaPlugins.initComputationScheduler( newComputationTask);

    IO = RxJavaPlugins.initIoScheduler( newIOTask);

    TRAMPOLINE = TrampolineScheduler.instance;

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler( newNewThreadTask);

    }

    ...

    staticfinalclassIOTaskimplementsCallable{

    @Override

    publicScheduler callthrowsException{

    returnIoHolder.DEFAULT;

    }

    }

    staticfinalclassNewThreadTaskimplementsCallable{

    @Override

    publicScheduler callthrowsException{

    returnNewThreadHolder.DEFAULT;

    }

    }

    staticfinalclassSingleTaskimplementsCallable{

    @Override

    publicScheduler callthrowsException{

    returnSingleHolder.DEFAULT;

    }

    }

    staticfinalclassComputationTaskimplementsCallable{

    @Override

    publicScheduler callthrowsException{

    returnComputationHolder.DEFAULT;

    }

    }

    ...

    staticfinalclassSingleHolder{

    staticfinalScheduler DEFAULT = newSingleScheduler;

    }

    staticfinalclassComputationHolder{

    staticfinalScheduler DEFAULT = newComputationScheduler;

    }

    staticfinalclassIoHolder{

    staticfinalScheduler DEFAULT = newIoScheduler;

    }

    staticfinalclassNewThreadHolder{

    staticfinalScheduler DEFAULT = newNewThreadScheduler;

    }

    这里的IO,最终会指向一个Scheduler,如IoScheduler:

    //io.reactivex.internal.schedulers.IoScheduler源码

    ...

    staticfinalclassEventLoopWorkerextendsScheduler.Worker{ //Scheduler.Worker的实现类

    ...

    @NonNull

    @Override

    publicDisposable schedule(@NonNull Runnable action, longdelayTime, @NonNull TimeUnit unit){

    if(tasks.isDisposed) {

    // don't schedule, we are unsubscribed

    returnEmptyDisposable.INSTANCE;

    }

    returnthreadWorker.scheduleActual(action, delayTime, unit, tasks); //交给线程池

    }

    这样,Scheculer中的具体任务就交给了某个线程池来处理。

    需要特别说明的是,RxJava中调用Android主线程(AndroidSchedulers.mainThread),其实还是使用了Handler机制:

    publicfinalclassAndroidSchedulers{

    ...

    staticfinalScheduler DEFAULT= newHandlerScheduler( newHandler(Looper.getMainLooper));

    这个HandlerScheduler其实就是实现了Scheduler和Scheduler.Worker内部类。

    finalclassHandlerSchedulerextendsScheduler{

    privatefinalHandler handler;

    HandlerScheduler(Handler handler) {

    this.handler = handler;

    }

    privatestaticfinalclassHandlerWorkerextendsWorker{

    ...

    @Override

    publicDisposable schedule(Runnable run, longdelay, TimeUnit unit){

    ...

    handler.sendMessageDelayed(message, Math.max( 0L, unit.toMillis(delay)));

    总的来看,RxJava的多线程切换其实是利用了Scheculer.Worker这个内部类,把任务交给Scheculer的Worker去做,而这个Scheculer的Worker是根据定义的线程来实现了不同的线程池,其实还是交给线程池去处理了。

    至于主线程,RxJava也是使用了Handler机制。

    总结

    小小总结一下,基本上来说,Android中的多线程切换,主要使用Runnable和Callable来定义工作内容,使用线程池来实现异步并行,使用Handler机制来通知主线程,有些场景下会视情况需要,使用Future的接口回调,使用SynchronousQueue阻塞队列等。

    说句题外话,有不少人想加我微信,这原本是我知识星球的福利,但是我姑且放出来吧,坑位是有限的哦。

    你好,我是刘望舒,十年经验的资深架构师,著有两本业界知名的技术畅销书。

    如果你喜欢我分享的文章,就给公众号加个星标吧,方便阅读。返回搜狐,查看更多

    责任编辑:

    展开全文
  • 窗口Thread-1票已经卖完 有的卖的有的卖的少。 由此我产生了以下几个疑问: 1,JAVA的线程是抢占式的,但又遵守支持时间片的操作系统的时间片调度,那么这里的抢占与时间片轮换之间又有怎样的关系,是怎么协调的?...

    如下程序publicclassThreadTest{publicstaticvoidmain(String[]args){MyThreadth=newMyThread();newThread(th).start();newThread(th).start();newThread(th).start();newThread(th)....

    如下程序

    public class ThreadTest {

    public static void main(String[] args) {

    MyThread th=new MyThread();

    new Thread(th).start();

    new Thread(th).start();

    new Thread(th).start();

    new Thread(th).start();

    }

    }

    class MyThread implements Runnable {

    int tickets = 20;

    Object obj = new Object();

    public void run(){

    while(true){

    synchronized(obj){

    if(tickets > 0){

    try {

    Thread.sleep(10);

    } catch (InterruptedException e) {

    // TODO 自动生成 catch 块

    e.printStackTrace();

    }

    System.out.println("现在卖第"+tickets+"张 买卖票窗口:"+Thread.currentThread().getName());

    tickets--;

    }

    else{

    System.out.println("窗口"+Thread.currentThread().getName()+"票已经卖完");

    break;

    }

    }

    }

    }

    }

    由时间片分配可知运行的结果应该是各个线程运行同样的时间,卖出同样的票啊,但结果却是这样的

    现在卖第20张 买卖票窗口:Thread-0

    现在卖第19张 买卖票窗口:Thread-3

    现在卖第18张 买卖票窗口:Thread-3

    现在卖第17张 买卖票窗口:Thread-3

    现在卖第16张 买卖票窗口:Thread-3

    现在卖第15张 买卖票窗口:Thread-3

    现在卖第14张 买卖票窗口:Thread-2

    现在卖第13张 买卖票窗口:Thread-2

    现在卖第12张 买卖票窗口:Thread-2

    现在卖第11张 买卖票窗口:Thread-2

    现在卖第10张 买卖票窗口:Thread-1

    现在卖第9张 买卖票窗口:Thread-2

    现在卖第8张 买卖票窗口:Thread-2

    现在卖第7张 买卖票窗口:Thread-2

    现在卖第6张 买卖票窗口:Thread-2

    现在卖第5张 买卖票窗口:Thread-2

    现在卖第4张 买卖票窗口:Thread-2

    现在卖第3张 买卖票窗口:Thread-3

    现在卖第2张 买卖票窗口:Thread-3

    现在卖第1张 买卖票窗口:Thread-3

    窗口Thread-3票已经卖完

    窗口Thread-0票已经卖完

    窗口Thread-2票已经卖完

    窗口Thread-1票已经卖完

    有的卖的多有的卖的少。

    由此我产生了以下几个疑问:

    1,JAVA的线程是抢占式的,但又遵守支持时间片的操作系统的时间片调度,那么这里的抢占与时间片轮换之间又有怎样的关系,是怎么协调的?

    2,每个线程的时间片分配大小是否相等?

    3,以上程序中的线程为什么不是依次的执行,比如Thread-0执行后应该是Thread-1执行,但结果确实Thread-3在执行,当Thread-0睡眠过后,执行剩余的代码,外面等待的应该是Thread-2啊,那怎么会执行Thread-3呢?

    4,为什么每个线程的执行时间不是一样的长?

    期盼大家能帮下我--一个渴望学习JAVA的初学者

    展开

    展开全文
  • 原标题:Android中多线程切换的几种方法作者:蓝灰_qhttps://www.jianshu.com/p/31d0852c0760我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有多线程的内容,这里简单总结和探讨一下常见的...

    原标题:Android中多线程切换的几种方法

    作者:蓝灰_q

    https://www.jianshu.com/p/31d0852c0760

    我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有多线程的内容,这里简单总结和探讨一下常见的多线程切换方式。

    我们先回顾一下Java多线程的几个基础内容,然后再分析总结一些经典代码中对于线程切换的实现方式。

    几点基础

    多线程切换,大概可以切分为这样几个内容:如何开启多个线程,如何定义每个线程的任务,如何在线程之间互相通信。

    Thread

    Thread可以解决开启多个线程的问题。

    Thread是Java中实现多线程的线程类,每个Thread对象都可以启动一个新的线程,注意是可以启动,也可以不启动新线程:

    thread.run(); //不启动新线程,在当前线程执行

    thread.start(); //启动新线程。

    另外就是Thread存在线程优先级问题,如果为Thread设置较高的线程优先级,就有机会获得更多的CPU资源,注意这里也是有机会,优先级高的Thread不是必然会先于其他Thread执行,只是系统会倾向于给它分配更多的CPU。

    默认情况下,新建的Thread和当前Thread的线程优先级一致。

    设置线程优先级有两种方式:

    thread.setPriority(Thread.MAX_PRIORITY); //1~10,通过线程设置

    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //-20~19,通过进程设置

    这两种设置方式是相对独立的,在Android中,一般建议通过Process进程设置优先级。

    ThreadPool

    Thread本身是需要占用内存的,开启/销毁过量的工作线程会造成过量的资源损耗,这种场景我们一般会通过对资源的复用来进行优化,针对IO资源我们会做IO复用(例如Http的KeepAlive),针对内存我们会做内存池复用(例如Fresco的内存池),针对CPU资源,我们一般会做线程复用,也就是线程池。

    所以,在Android开发中,一般不会直接开启大量的Thread,而是会使用ThreadPool来复用线程。

    Runnable

    Runnable主要解决如何定义每个线程的工作任务的问题。

    Runnable是Java中实现多线程的接口,相对Thread而言,Runnable接口更容易扩展(不需要单继承),而且,Thread本身也是一种Runnable:

    publicclassThreadimplementsRunnable{

    相比Thread而言,Runnable不关注如何调度线程,只关心如何定义要执行的工作任务,所以在实际开发中,多使用Runnable接口完成多线程开发。

    Callable

    Callable和Runnable基本类似,但是Callable可以返回执行结果。

    线程间通信

    Thread和Runnable能实现切换到另一个线程工作(Runnable需要额外指派工作线程),但它们完成任务后就会退出,并不注重如何在线程间实现通信,所以切换线程时,还需要在线程间通信,这就需要一些线程间通信机制。

    Future

    一般来说,如果要做简单的通信,我们最常用的是通过接口回调来实现。

    Future就是这样一种接口,它可以部分地解决线程通信的问题,Future接口定义了done、canceled等回调函数,当工作线程的任务完成时,它会(在工作线程中)通过回调告知我们,我们再采用其他手段通知其他线程。

    mFuture = newFutureTask(runnable) {

    @Override

    protectedvoiddone(){

    ... //还是在工作线程里

    }

    };

    Condition

    Condition其实是和Lock一起使用的,但如果把它视为一种线程间通信的工具,也说的通。

    因为,Condition本身定位就是一种多线程间协调通信的工具,Condition可以在某些条件下,唤醒等待线程。

    Locklock= newReentrantLock();

    Condition notFull = lock.newCondition(); //定义 Lock的Condition

    ...

    while( count== items.length)

    notFull.await();//等待condition的状态

    ...

    notFull.signal();//达到condition的状态

    Handler

    其实,最完整的线程间通信机制,也是我们最熟悉的线程间通信机制,莫过于Handler通信机制,Handler利用线程封闭的ThreadLocal维持一个消息队列,Handler的核心是通过这个消息队列来传递Message,从而实现线程间通信。

    AsyncTask的多线程切换

    回顾完多线程的几个基础概念,先来看看简单的多线程切换,Android自带的AsyncTask。

    AsyncTask主要在doInBackground函数中定义工作线程的工作内容,在其他函数中定义主线程的工作内容,例如onPostExecute,这里面必然涉及两个问题:

    1.如何实现把doInBackground抛给工作线程

    2.如何实现把onPostExecute抛给主线程

    其实非常简单,我们先看第一个

    1.如何实现把doInBackground抛给工作线程

    在使用AsyncTask时,我们一般会创建一个基于AsyncTask的扩展类或匿名类,在其中实现几个抽象函数,例如:

    privateclassMyTaskextendsAsyncTask{

    @Override

    protectedvoidonPreExecute(){... }

    @Override

    protectedLong doInBackground(String... params){... }

    @Override

    protectedvoidonProgressUpdate(Object... values){... }

    @Override

    protectedvoidonPostExecute(Long aLong){... }

    @Override

    protectedvoidonCancelled(){... }

    然后,我们会实例化这个AsyncTask:

    MyTask mTask = newMyTask();

    在AsyncTask源码中,我们看到,构造函数里会创建一个WorkerRunnable:

    publicAsyncTask(){

    mWorker = newWorkerRunnable() { //这是一个Callable

    publicResult call()throwsException{

    ...

    result = doInBackground(mParams); //在工作线程中执行

    ...

    WorkerRunnable实际上是一个Callable对象,所以,doInBackground是被包在一个Callable对象中了,这个Callable还会被继续包装,最终被交给一个线程池去执行:

    Runnable mActive;

    ...

    if((mActive = mTasks.poll()) != null) {

    THREAD_POOL_EXECUTOR.execute(mActive); //交给线程池执行

    }

    梳理一下,大致过程为:

    定义doInBackground-->被一个Callable调用-->层层包为一个Runnable-->交给线程池执行。

    这样就解决了第一个问题,如何实现把doInBackground抛给工作线程。

    我们再来看第二个问题。

    2.如何实现把onPostExecute抛给主线程

    首先,我们要知道工作任务何时执行完毕,就需要在工作完成时触发一个接口回调,也就是前面说过的Future,还是看AsyncTask源码:

    publicAsyncTask(){

    mWorker = newWorkerRunnable() {

    publicResult call()throwsException{

    ...

    };

    mFuture = newFutureTask(mWorker) {

    @Override

    protectedvoiddone(){ //Future的回调

    try{

    postResultIfNotInvoked(get()); //get()是FutureTask接口函数

    ...

    }

    };

    }

    这样,我们就知道可以处理onPostExecute函数了,但是,我们还需要把它抛给主线程,主要源码如下:

    //mWorker、mFuture和都会指向postResult函数

    privateResult postResult(Result result){

    @SuppressWarnings( "unchecked")

    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,

    newAsyncTaskResult( this, result));

    message.sendToTarget();

    returnresult;

    }

    //getHandler()会指向InternalHandler

    privatestaticclassInternalHandlerextendsHandler{

    publicInternalHandler(){

    super(Looper.getMainLooper()); //指向MainLooper

    }

    @SuppressWarnings({ "unchecked", "RawUseOfParameterizedType"})

    @Override

    publicvoidhandleMessage(Message msg){

    AsyncTaskResult> result = (AsyncTaskResult>) msg.obj;

    switch(msg.what) {

    caseMESSAGE_POST_RESULT:

    // There is only one result

    result.mTask.finish(result.mData[ 0]); //通过handler机制,回到主线程,调用finish函数

    ...

    }

    //在Handler中,最终会在主线程中调用finish

    privatevoidfinish(Result result){

    if(isCancelled()) {

    onCancelled(result);

    } else{

    onPostExecute(result); //调用onPostExecute接口函数

    }

    mStatus = Status.FINISHED;

    }

    从源码可以看到,其实AsyncTask还是通过Handler机制,把任务抛给了主线程。

    总体来说,AsyncTask的多线程任务是通过线程池实现的工作线程,在完成任务后利用Future的done回调来通知任务完成,并通过handler机制通知主线程去执行onPostExecute等回调函数。

    EventBus的多线程切换

    EventBus会为每个订阅事件注册一个目标线程,所以需要从发布事件的线程中,根据注册信息,实时切换到目标线程中,所以,这是个很典型的多线程切换场景。

    根据EventBus源码,多线程切换的主要判断代码如下:

    switch(subion.subscriberMethod.threadMode) {

    casePOSTING:

    invokeSubscriber(subion, event); //直接在当前线程执行

    break;

    caseMAIN:

    if(isMainThread) {

    invokeSubscriber(subion, event); //在当前主线程执行

    } else{

    mainThreadPoster.enqueue(subion, event); //当然不是主线程,交给主线程执行

    }

    break;

    caseBACKGROUND:

    if(isMainThread) {

    backgroundPoster.enqueue(subion, event); //当前线程为主线程,交给工作线程

    } else{

    invokeSubscriber(subion, event); //直接在当前工作线程执行

    }

    break;

    caseASYNC:

    asyncPoster.enqueue(subion, event); //异步执行

    break;

    default:

    thrownewIllegalStateException( "Unknown thread mode: "+ subion.subscriberMethod.threadMode);

    }

    所以,在EventBus里,如果需要做线程间切换,主要是抛给不同的任务队列,实现线程间切换。

    从任务队列判断,切换目标包括主线程Poster、backgroundPoster和asyncPoster这样三种。

    我们先看任务队列的设计:

    任务队列

    因为EventBus不能判断有哪些任务会并行,所以它采用了队列的设计,多线程任务(EventBus的事件)会先进入队列,然后再处理队列中的工作任务,这是典型的生产--消费场景。

    主线程Poster、backgroundPoster和asyncPoster都是任务队列的不同实现。

    主线程Poster

    负责处理主线程的mainThreadPoster是Handler的子类:

    finalclassHandlerPosterextendsHandler{

    ...

    voidenqueue(Subion subion, Object event){

    ...

    synchronized( this) { //因为主线程只有一个,需要线程安全

    queue.enqueue(pendingPost);

    ...

    if(!sendMessage(obtainMessage())) { //作为handler发送消息

    ...

    //在主线程中处理消息

    @Override

    publicvoidhandleMessage(Message msg){

    ...

    }

    从源码可以看出,这个Poster其实是一个Handler,它采用了哪个线程的消息队列,就决定了它能和哪个线程通信,我们确认一下:

    EventBus(EventBusBuilder builder) {

    ...

    mainThreadPoster = newHandlerPoster( this, Looper.getMainLooper(), 10); //获取主线程的MainLooper

    所以,EventBus是扩展了一个Handler,作为主线程的Handler,通过Handler消息机制实现的多线程切换。

    当然,这个Handler本事,又多了一层queue。

    backgroundPoster和asyncPoster

    backgroundPoster和asyncPoster其实都是使用了EventBus的线程池,默认是个缓存线程池:

    privatefinalstaticExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    所以,backgroundPoster和asyncPoster都是把任务交给线程池处理,这样实现的多线程切换。

    不过,backgroundPoster和asyncPoster也有一些不同,我们知道,在newCachedThreadPool中,最大线程数就是Integer的最大值,相当于不设上限,所以可以尽可能多的启动线程,asyncPoster就是这样做的,enqueu和run都没做同步,为每个事件单独开启新线程处理。

    而在backgroundPoster中,可以尽量复用线程,主要方法是在run的时候,做个1秒的等待:

    @ Override

    publicvoidrun(){

    ...

    PendingPost pendingPost = queue.poll( 1000); //允许等待1秒

    因为做了这一秒的挂起等待,在enqueue和run时,都需要用synchronized (this) 来确保线程安全。

    另外,其实这里面还有个很重要的用法,就是Executors.newCachedThreadPool()中的SynchronousQueue:

    publicstaticExecutorService newCachedThreadPool(){

    returnnewThreadPoolExecutor( 0, Integer.MAX_VALUE,

    60L, TimeUnit.SECONDS,

    newSynchronousQueue()); //用于辅助线程切换的阻塞队列

    }

    这个SynchronousQueue,在OkHttp中也使用了:

    //okhttp3.Dispatcher源码

    publicsynchronizedExecutorService executorService(){

    if(executorService == null) {

    executorService = newThreadPoolExecutor( 0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,

    newSynchronousQueue(), Util.threadFactory( "OkHttp Dispatcher", false)); //用于辅助线程切换的阻塞队列

    }

    returnexecutorService;

    }

    SynchronousQueue与普通队列不同,不是数据等线程,而是线程等数据,这样每次向SynchronousQueue里传入数据时,都会立即交给一个线程执行,这样可以提高数据得到处理的速度。

    总的来看,EventBus还是采用线程池实现工作线程,采用handler机制通知到主线程。不同在于,它采用的queue的队列方式来管理所有的跨线程请求,而且它利用了SynchronousQueue阻塞队列来辅助实现线程切换。

    RxJava的多线程切换

    其实在多线程管理这方面,RxJava的线程管理能力是非常令人赞叹的。

    RxJava的主要概念是工作流,它可以把一序列工作流定义在一个线程类型上:

    myWorkFlow.getActResponse(myParam)

    .subscribeOn(Schedulers.io()) //指定线程

    .xxx //其他操作

    这个构建工作流的过程其实挺复杂的,不过如果我们只看线程操作这部分,其实流程非常清晰,我们追踪一下subscribeOn的源码(RxJava2):

    //进入subscribeOn

    publicfinalFlowable subscribeOn( @NonNullScheduler scheduler) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    returnsubscribeOn(scheduler, !( thisinstanceof FlowableCreate));

    }

    //继续进入subscribeOn

    publicfinalFlowable subscribeOn( @NonNullScheduler scheduler, boolean requestOn) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    returnRxJavaPlugins.onAssembly(new FlowableSubscribeOn( this, scheduler, requestOn));

    }

    然后,进入FlowableSubscribeOn类

    //进入FlowableSubscribeOn类

    publicFlowableSubscribeOn(Flowable source, Scheduler scheduler, booleannonScheduledRequests){

    ...

    this.scheduler = scheduler;

    ...

    }

    @Override

    publicvoidsubscribeActual(finalSubscriber superT> s){

    Scheduler.Worker w = scheduler.createWorker(); //根据参数值,如Schedulers.io()创建worker

    finalSubscribeOnSubscriber sos = newSubscribeOnSubscriber(s, w, source, nonScheduledRequests); //根据worker创建SubscribeOnSubscriber

    s.onSubscribe(sos);

    w.schedule(sos);

    }

    这个SubscribeOnSubscriber是个内部类:

    SubscribeOnSubscriber(Subscriber superT> actual, Scheduler.Worker worker, Publisher source, booleanrequestOn) {

    ...

    this.worker = worker;

    ...

    }

    ...

    voidrequestUpstream(finallongn, finalSubion s){

    ...

    worker.schedule( newRequest(s, n)); //worker会安排如何执行runnable(Request是一个runnable)

    ...

    }

    而这个worker,其实就是我们输入的线程参数,如Schedulers.io(),这个io是这样定义的:

    //io.reactivex.schedulers.Schedulers源码

    static{

    SINGLE = RxJavaPlugins.initSingleScheduler( newSingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler( newComputationTask());

    IO = RxJavaPlugins.initIoScheduler( newIOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler( newNewThreadTask());

    }

    ...

    staticfinalclassIOTaskimplementsCallable{

    @Override

    publicScheduler call()throwsException{

    returnIoHolder.DEFAULT;

    }

    }

    staticfinalclassNewThreadTaskimplementsCallable{

    @Override

    publicScheduler call()throwsException{

    returnNewThreadHolder.DEFAULT;

    }

    }

    staticfinalclassSingleTaskimplementsCallable{

    @Override

    publicScheduler call()throwsException{

    returnSingleHolder.DEFAULT;

    }

    }

    staticfinalclassComputationTaskimplementsCallable{

    @Override

    publicScheduler call()throwsException{

    returnComputationHolder.DEFAULT;

    }

    }

    ...

    staticfinalclassSingleHolder{

    staticfinalScheduler DEFAULT = newSingleScheduler();

    }

    staticfinalclassComputationHolder{

    staticfinalScheduler DEFAULT = newComputationScheduler();

    }

    staticfinalclassIoHolder{

    staticfinalScheduler DEFAULT = newIoScheduler();

    }

    staticfinalclassNewThreadHolder{

    staticfinalScheduler DEFAULT = newNewThreadScheduler();

    }

    这里的IO,最终会指向一个Scheduler,如IoScheduler:

    //io.reactivex.internal.schedulers.IoScheduler源码

    ...

    staticfinalclassEventLoopWorkerextendsScheduler.Worker{ //Scheduler.Worker的实现类

    ...

    @NonNull

    @Override

    publicDisposable schedule(@NonNull Runnable action, longdelayTime, @NonNull TimeUnit unit){

    if(tasks.isDisposed()) {

    // don't schedule, we are unsubscribed

    returnEmptyDisposable.INSTANCE;

    }

    returnthreadWorker.scheduleActual(action, delayTime, unit, tasks); //交给线程池

    }

    这样,Scheculer中的具体任务就交给了某个线程池来处理。

    需要特别说明的是,RxJava中调用Android主线程(AndroidSchedulers.mainThread),其实还是使用了Handler机制:

    publicfinalclassAndroidSchedulers{

    ...

    staticfinalScheduler DEFAULT= newHandlerScheduler( newHandler(Looper.getMainLooper()));

    这个HandlerScheduler其实就是实现了Scheduler和Scheduler.Worker内部类。

    finalclassHandlerSchedulerextendsScheduler{

    privatefinalHandler handler;

    HandlerScheduler(Handler handler) {

    this.handler = handler;

    }

    privatestaticfinalclassHandlerWorkerextendsWorker{

    ...

    @Override

    publicDisposable schedule(Runnable run, longdelay, TimeUnit unit){

    ...

    handler.sendMessageDelayed(message, Math.max( 0L, unit.toMillis(delay)));

    总的来看,RxJava的多线程切换其实是利用了Scheculer.Worker这个内部类,把任务交给Scheculer的Worker去做,而这个Scheculer的Worker是根据定义的线程来实现了不同的线程池,其实还是交给线程池去处理了。

    至于主线程,RxJava也是使用了Handler机制。

    总结

    小小总结一下,基本上来说,Android中的多线程切换,主要使用Runnable和Callable来定义工作内容,使用线程池来实现异步并行,使用Handler机制来通知主线程,有些场景下会视情况需要,使用Future的接口回调,使用SynchronousQueue阻塞队列等。返回搜狐,查看更多

    责任编辑:

    展开全文
  • 我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有多线程的内容,这里简单总结和探讨一下常见的多线程切换方式。我们先回顾一下Java多线程的几个基础内容,然后再分析总结一些经典代码中对于...

    我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有多线程的内容,这里简单总结和探讨一下常见的多线程切换方式。

    我们先回顾一下Java多线程的几个基础内容,然后再分析总结一些经典代码中对于线程切换的实现方式。

    几点基础

    多线程切换,大概可以切分为这样几个内容:如何开启多个线程,如何定义每个线程的任务,如何在线程之间互相通信。

    Thread

    Thread可以解决开启多个线程的问题。

    Thread是Java中实现多线程的线程类,每个Thread对象都可以启动一个新的线程,注意是可以启动,也可以不启动新线程:

    thread.run();//不启动新线程,在当前线程执行

    thread.start();//启动新线程。

    另外就是Thread存在线程优先级问题,如果为Thread设置较高的线程优先级,就有机会获得更多的CPU资源,注意这里也是有机会,优先级高的Thread不是必然会先于其他Thread执行,只是系统会倾向于给它分配更多的CPU。

    默认情况下,新建的Thread和当前Thread的线程优先级一致。

    设置线程优先级有两种方式:

    thread.setPriority(Thread.MAX_PRIORITY);//1~10,通过线程设置

    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);//-20~19,通过进程设置

    这两种设置方式是相对独立的,在Android中,一般建议通过Process进程设置优先级。

    ThreadPool

    Thread本身是需要占用内存的,开启/销毁过量的工作线程会造成过量的资源损耗,这种场景我们一般会通过对资源的复用来进行优化,针对IO资源我们会做IO复用(例如Http的KeepAlive),针对内存我们会做内存池复用(例如Fresco的内存池),针对CPU资源,我们一般会做线程复用,也就是线程池。

    所以,在Android开发中,一般不会直接开启大量的Thread,而是会使用ThreadPool来复用线程。

    Runnable

    Runnable主要解决如何定义每个线程的工作任务的问题。

    Runnable是Java中实现多线程的接口,相对Thread而言,Runnable接口更容易扩展(不需要单继承),而且,Thread本身也是一种Runnable:

    public class Thread implements Runnable {

    相比Thread而言,Runnable不关注如何调度线程,只关心如何定义要执行的工作任务,所以在实际开发中,多使用Runnable接口完成多线程开发。

    Callable

    Callable和Runnable基本类似,但是Callable可以返回执行结果。

    线程间通信

    Thread和Runnable能实现切换到另一个线程工作(Runnable需要额外指派工作线程),但它们完成任务后就会退出,并不注重如何在线程间实现通信,所以切换线程时,还需要在线程间通信,这就需要一些线程间通信机制。

    Future

    一般来说,如果要做简单的通信,我们最常用的是通过接口回调来实现。

    Future就是这样一种接口,它可以部分地解决线程通信的问题,Future接口定义了done、canceled等回调函数,当工作线程的任务完成时,它会(在工作线程中)通过回调告知我们,我们再采用其他手段通知其他线程。

    mFuture = new FutureTask(runnable) {

    @Override

    protected void done() {

    ...//还是在工作线程里

    }

    };

    Condition

    Condition其实是和Lock一起使用的,但如果把它视为一种线程间通信的工具,也说的通。

    因为,Condition本身定位就是一种多线程间协调通信的工具,Condition可以在某些条件下,唤醒等待线程。

    Lock lock = new ReentrantLock();

    Condition notFull = lock.newCondition(); //定义Lock的Condition

    ...

    while (count == items.length)

    notFull.await();//等待condition的状态

    ...

    notFull.signal();//达到condition的状态

    Handler

    其实,最完整的线程间通信机制,也是我们最熟悉的线程间通信机制,莫过于Handler通信机制,Handler利用线程封闭的ThreadLocal维持一个消息队列,Handler的核心是通过这个消息队列来传递Message,从而实现线程间通信。

    AsyncTask的多线程切换

    回顾完多线程的几个基础概念,先来看看简单的多线程切换,Android自带的AsyncTask。

    AsyncTask主要在doInBackground函数中定义工作线程的工作内容,在其他函数中定义主线程的工作内容,例如

    onPostExecute,这里面必然涉及两个问题:

    1.如何实现把doInBackground抛给工作线程

    2.如何实现把onPostExecute抛给主线程

    其实非常简单,我们先看第一个

    1.如何实现把doInBackground抛给工作线程

    在使用AsyncTask时,我们一般会创建一个基于AsyncTask的扩展类或匿名类,在其中实现几个抽象函数,例如:

    private class MyTask extends AsyncTask {

    @Override

    protected void onPreExecute() {... }

    @Override

    protected Long doInBackground(String... params) {... }

    @Override

    protected void onProgressUpdate(Object... values) {... }

    @Override

    protected void onPostExecute(Long aLong) {... }

    @Override

    protected void onCancelled() {... }

    然后,我们会实例化这个AsyncTask:

    MyTask mTask = new MyTask();

    在AsyncTask源码中,我们看到,构造函数里会创建一个WorkerRunnable:

    public AsyncTask() {

    mWorker = new WorkerRunnable() {//这是一个Callable

    public Result call() throws Exception {

    ...

    result = doInBackground(mParams);//在工作线程中执行

    ...

    WorkerRunnable实际上是一个Callable对象,所以,doInBackground是被包在一个Callable对象中了,这个Callable还会被继续包装,最终被交给一个线程池去执行:

    Runnable mActive;

    ...

    if ((mActive = mTasks.poll()) != null) {

    THREAD_POOL_EXECUTOR.execute(mActive);//交给线程池执行

    }

    梳理一下,大致过程为:

    定义doInBackground-->被一个Callable调用-->层层包为一个Runnable-->交给线程池执行。

    这样就解决了第一个问题,如何实现把doInBackground抛给工作线程。

    我们再来看第二个问题。

    2.如何实现把onPostExecute抛给主线程

    首先,我们要知道工作任务何时执行完毕,就需要在工作完成时触发一个接口回调,也就是前面说过的Future,还是看AsyncTask源码:

    public AsyncTask() {

    mWorker = new WorkerRunnable() {

    public Result call() throws Exception {

    ...

    };

    mFuture = new FutureTask(mWorker) {

    @Override

    protected void done() {//Future的回调

    try {

    postResultIfNotInvoked(get());//get()是FutureTask接口函数

    ...

    }

    };

    }

    这样,我们就知道可以处理onPostExecute函数了,但是,我们还需要把它抛给主线程,主要源码如下:

    //mWorker、mFuture和都会指向postResult函数

    private Result postResult(Result result) {

    @SuppressWarnings("unchecked")

    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,

    new AsyncTaskResult(this, result));

    message.sendToTarget();

    return result;

    }

    //getHandler()会指向InternalHandler

    private static class InternalHandler extends Handler {

    public InternalHandler() {

    super(Looper.getMainLooper());//指向MainLooper

    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})

    @Override

    public void handleMessage(Message msg) {

    AsyncTaskResult> result = (AsyncTaskResult>) msg.obj;

    switch (msg.what) {

    case MESSAGE_POST_RESULT:

    // There is only one result

    result.mTask.finish(result.mData[0]);//通过handler机制,回到主线程,调用finish函数

    ...

    }

    //在Handler中,最终会在主线程中调用finish

    private void finish(Result result) {

    if (isCancelled()) {

    onCancelled(result);

    } else {

    onPostExecute(result);//调用onPostExecute接口函数

    }

    mStatus = Status.FINISHED;

    }

    从源码可以看到,其实AsyncTask还是通过Handler机制,把任务抛给了主线程。

    总体来说,AsyncTask的多线程任务是通过线程池实现的工作线程,在完成任务后利用Future的done回调来通知任务完成,并通过handler机制通知主线程去执行onPostExecute等回调函数。

    EventBus的多线程切换

    EventBus会为每个订阅事件注册一个目标线程,所以需要从发布事件的线程中,根据注册信息,实时切换到目标线程中,所以,这是个很典型的多线程切换场景。

    根据EventBus源码,多线程切换的主要判断代码如下:

    switch (subscription.subscriberMethod.threadMode) {

    case POSTING:

    invokeSubscriber(subscription, event);//直接在当前线程执行

    break;

    case MAIN:

    if (isMainThread) {

    invokeSubscriber(subscription, event);//在当前主线程执行

    } else {

    mainThreadPoster.enqueue(subscription, event);//当然不是主线程,交给主线程执行

    }

    break;

    case BACKGROUND:

    if (isMainThread) {

    backgroundPoster.enqueue(subscription, event);//当前线程为主线程,交给工作线程

    } else {

    invokeSubscriber(subscription, event);//直接在当前工作线程执行

    }

    break;

    case ASYNC:

    asyncPoster.enqueue(subscription, event);//异步执行

    break;

    default:

    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);

    }

    所以,在EventBus里,如果需要做线程间切换,主要是抛给不同的任务队列,实现线程间切换。

    从任务队列判断,切换目标包括主线程Poster、backgroundPoster和asyncPoster这样三种。

    我们先看任务队列的设计:

    任务队列

    因为EventBus不能判断有哪些任务会并行,所以它采用了队列的设计,多线程任务(EventBus的事件)会先进入队列,然后再处理队列中的工作任务,这是典型的生产--消费场景。

    主线程Poster、backgroundPoster和asyncPoster都是任务队列的不同实现。

    主线程Poster

    负责处理主线程的mainThreadPoster是Handler的子类:

    final class HandlerPoster extends Handler {

    ...

    void enqueue(Subscription subscription, Object event) {

    ...

    synchronized (this) {//因为主线程只有一个,需要线程安全

    queue.enqueue(pendingPost);

    ...

    if (!sendMessage(obtainMessage())) {//作为handler发送消息

    ...

    //在主线程中处理消息

    @Override

    public void handleMessage(Message msg) {

    ...

    }

    从源码可以看出,这个Poster其实是一个Handler,它采用了哪个线程的消息队列,就决定了它能和哪个线程通信,我们确认一下:

    EventBus(EventBusBuilder builder) {

    ...

    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);//获取主线程的MainLooper

    所以,EventBus是扩展了一个Handler,作为主线程的Handler,通过Handler消息机制实现的多线程切换。

    当然,这个Handler本事,又多了一层queue。

    backgroundPoster和asyncPoster

    backgroundPoster和asyncPoster其实都是使用了EventBus的线程池,默认是个缓存线程池:

    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    所以,backgroundPoster和asyncPoster都是把任务交给线程池处理,这样实现的多线程切换。

    不过,backgroundPoster和asyncPoster也有一些不同,我们知道,在newCachedThreadPool中,最大线程数就是Integer的最大值,相当于不设上限,所以可以尽可能多的启动线程,asyncPoster就是这样做的,enqueu和run都没做同步,为每个事件单独开启新线程处理。

    而在backgroundPoster中,可以尽量复用线程,主要方法是在run的时候,做个1秒的等待:

    @Override

    public void run() {

    ...

    PendingPost pendingPost = queue.poll(1000);//允许等待1秒

    因为做了这一秒的挂起等待,在enqueue和run时,都需要用synchronized (this) 来确保线程安全。

    另外,其实这里面还有个很重要的用法,就是Executors.newCachedThreadPool()中的SynchronousQueue:

    public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

    60L, TimeUnit.SECONDS,

    new SynchronousQueue());//用于辅助线程切换的阻塞队列

    }

    这个SynchronousQueue,在OkHttp中也使用了:

    //okhttp3.Dispatcher源码

    public synchronized ExecutorService executorService() {

    if (executorService == null) {

    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,

    new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));//用于辅助线程切换的阻塞队列

    }

    return executorService;

    }

    SynchronousQueue与普通队列不同,不是数据等线程,而是线程等数据,这样每次向SynchronousQueue里传入数据时,都会立即交给一个线程执行,这样可以提高数据得到处理的速度。

    总的来看,EventBus还是采用线程池实现工作线程,采用handler机制通知到主线程。不同在于,它采用的queue的队列方式来管理所有的跨线程请求,而且它利用了SynchronousQueue阻塞队列来辅助实现线程切换。

    RxJava的多线程切换

    其实在多线程管理这方面,RxJava的线程管理能力是非常令人赞叹的。

    RxJava的主要概念是工作流,它可以把一序列工作流定义在一个线程类型上:

    myWorkFlow.getActResponse(myParam)

    .subscribeOn(Schedulers.io())//指定线程

    .xxx//其他操作

    这个构建工作流的过程其实挺复杂的,不过如果我们只看线程操作这部分,其实流程非常清晰,我们追踪一下subscribeOn的源码(RxJava2):

    //进入subscribeOn

    public final Flowable subscribeOn(@NonNull Scheduler scheduler) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    return subscribeOn(scheduler, !(this instanceof FlowableCreate));

    }

    //继续进入subscribeOn

    public final Flowable subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    return RxJavaPlugins.onAssembly(new FlowableSubscribeOn(this, scheduler, requestOn));

    }

    然后,进入FlowableSubscribeOn类

    //进入FlowableSubscribeOn类

    public FlowableSubscribeOn(Flowable source, Scheduler scheduler, boolean nonScheduledRequests) {

    ...

    this.scheduler = scheduler;

    ...

    }

    @Override

    public void subscribeActual(final Subscriber super T> s) {

    Scheduler.Worker w = scheduler.createWorker();//根据参数值,如Schedulers.io()创建worker

    final SubscribeOnSubscriber sos = new SubscribeOnSubscriber(s, w, source, nonScheduledRequests);//根据worker创建SubscribeOnSubscriber

    s.onSubscribe(sos);

    w.schedule(sos);

    }

    这个SubscribeOnSubscriber是个内部类:

    SubscribeOnSubscriber(Subscriber super T> actual, Scheduler.Worker worker, Publisher source, boolean requestOn) {

    ...

    this.worker = worker;

    ...

    }

    ...

    void requestUpstream(final long n, final Subscription s) {

    ...

    worker.schedule(new Request(s, n));//worker会安排如何执行runnable(Request是一个runnable)

    ...

    }

    而这个worker,其实就是我们输入的线程参数,如Schedulers.io(),这个io是这样定义的:

    //io.reactivex.schedulers.Schedulers源码

    static {

    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

    }

    ...

    static final class IOTask implements Callable {

    @Override

    public Scheduler call() throws Exception {

    return IoHolder.DEFAULT;

    }

    }

    static final class NewThreadTask implements Callable {

    @Override

    public Scheduler call() throws Exception {

    return NewThreadHolder.DEFAULT;

    }

    }

    static final class SingleTask implements Callable {

    @Override

    public Scheduler call() throws Exception {

    return SingleHolder.DEFAULT;

    }

    }

    static final class ComputationTask implements Callable {

    @Override

    public Scheduler call() throws Exception {

    return ComputationHolder.DEFAULT;

    }

    }

    ...

    static final class SingleHolder {

    static final Scheduler DEFAULT = new SingleScheduler();

    }

    static final class ComputationHolder {

    static final Scheduler DEFAULT = new ComputationScheduler();

    }

    static final class IoHolder {

    static final Scheduler DEFAULT = new IoScheduler();

    }

    static final class NewThreadHolder {

    static final Scheduler DEFAULT = new NewThreadScheduler();

    }

    这里的IO,最终会指向一个Scheduler,如IoScheduler:

    //io.reactivex.internal.schedulers.IoScheduler源码

    ...

    static final class EventLoopWorker extends Scheduler.Worker {//Scheduler.Worker的实现类

    ...

    @NonNull

    @Override

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

    if (tasks.isDisposed()) {

    // don't schedule, we are unsubscribed

    return EmptyDisposable.INSTANCE;

    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);//交给线程池

    }

    这样,Scheculer中的具体任务就交给了某个线程池来处理。

    需要特别说明的是,RxJava中调用Android主线程(AndroidSchedulers.mainThread),其实还是使用了Handler机制:

    public final class AndroidSchedulers {

    ...

    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

    这个HandlerScheduler其实就是实现了Scheduler和Scheduler.Worker内部类。

    ···

    final class HandlerScheduler extends Scheduler {

    private final Handler handler;

    HandlerScheduler(Handler handler) {

    this.handler = handler;

    }

    private static final class HandlerWorker extends Worker {

    ...

    @Override

    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

    ...

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    ···

    总的来看,RxJava的多线程切换其实是利用了Scheculer.Worker这个内部类,把任务交给Scheculer的Worker去做,而这个Scheculer的Worker是根据定义的线程来实现了不同的线程池,其实还是交给线程池去处理了。

    至于主线程,RxJava也是使用了Handler机制。

    总结

    小小总结一下,基本上来说,Android中的多线程切换,主要使用Runnable和Callable来定义工作内容,使用线程池来实现异步并行,使用Handler机制来通知主线程,有些场景下会视情况需要,使用Future的接口回调,使用SynchronousQueue阻塞队列等。

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    展开全文
  • 线程的内存模型32位操作系统的寻址空间为2的32次方,也就是4GB...用户线程需要访问硬件资源的时候需要委托内核线程进行访问,这就涉及到CPU上下文在用户模式和内核模式的切换。因此在使用线程或者进程的时候需要尽量...
  • python多线程

    2020-12-22 21:38:10
    虽然python能运行多线程,但是因为GIL所以同一时刻只有一条线程在python解释器运行。多线程下python虚拟机按以下方式执行:1. 设置GIL2. 切换到一条线程去运行3. 运行:a. 执行python2虚拟机运行1000字节指令 或者 ...
  • python如何切换线程

    2021-01-14 08:44:25
    条件对象能让一个线程 A 停下来,等待其他线程 B ,线程 B 满足了某个条件后通知(notify)线程 A 继续运行。线程首先获取一个条件变量锁,如果条件不足,则该线程等待(wait)并释放条件变量锁,如果满足就执行线程,也...
  • 多线程开得越多,cpu都忙在切换上面了,代码执行的时间就会越来越少,执行一条指令立马被人偷袭抢占切换,当前进程启动的多线程执行时间就会越来越少,等半天才执行几条指令,还原到各位写的源码估计一行都不到,...
  • Java线程状态及切换

    2021-03-05 17:13:13
    Java线程状态及切换一、什么是Java线程状态在Java程序中,用于描述Java线程的六种状态:新建(NEW):当前线程,刚刚新建出来,尚未启动。运行(RUNNABLE):当前线程,处于竞争CPU时间分片或已经获得CPU时间片的状态。...
  • 多线程上下文切换对于单核CPU来说(对于多核CPU,此处就理解为一个核),CPU在一个时刻只能运行一个线程,当在运行一个线程的过程中转去运行另外一个线程,这个叫做线程上下文切换(对于进程也是类似)。由于可能当前...
  • 1、右键断点,选择设置为thread 2、开始debug,假如t1先执行,到park时,无法继续debug了,打开左下角frames,选择切换到t2(前面有✔的那个),然后继续F8,断点调试又可以进行了
  • 自动切换账号是多线程模板的一个变化应用,你有100个账号,每次开始10个线程,当第一批账号完成任务(登录,刷任务,退出)时,...易语言大漠多线程循环登录自动切换账号 截图预览 设计目录 1.易语言大漠多线程
  • 点击上方“朱小厮的博客”,选择“设为星标”后台回复"书",获取后台回复“k8s”,可领取k8s资料进程切换分两步:1.切换页目录以使用新的地址空间2.切换内核栈和硬件上下文...
  • 无论是单核还是多核CPU都是支持多线程代码的,CPU通过给每个线程分配CPU时间片来实这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切换线程执行,让我们感觉多个线程是同时执行的...
  • 通过上一讲的讲解,相信你对上下文切换已经有了一定的了解了。...还有,在多线程中如果使用了竞争锁,当线程由于等待竞争锁而被阻塞时,JVM 通常会将这个锁挂起,并允许它被交换出去。如果频繁地发生...
  • 线程的上下文切换多线程频繁上下文切换利用时间片轮转的方式,CPU给每个任务都服务一定的时间,然后把当前任务的状态保存下来,在加载下一个任务后,继续服务下一个任务,这个过程叫做上下文切换。时间片轮转的...
  • 标签:记录分段运行数据性能思想返回切换cas什么是上下文:cpu会为每个线程分配一个时间片,这个时间片是非常短的,毫秒级别的,cup不断的切换线程执行任务时,会记录这个任务的状态,下次切换回来时,可以通过这个状态继续...
  • 目标:Number线程类执行数字的递增并输出,并且没增加两个就切换到另外一个线程,Letter线程类执行字母的递增并输出,并且没输出一个就切换到另外一个线程。主类:public static void main(String[] args) {Print ...
  • 为什么线程切换开销大

    千次阅读 2020-12-22 17:01:49
    线程切换的开销 我们都知道,线程切换会带来开销,如果频繁进行线程切换,所造成的开销是相当可观的。那么为什么线程切换会有开销呢,有哪些开销呢?这里涉及几个概念:CPU上下文切换、线程上下文切换、特权模式...
  • 我们在CSDN发布了多套多线程循环登录的模板,本套多线程模板控制的雷电模拟器自动切换账号的一套模板,适合在模拟器里控制手游,特点是任意多...语言大漠多线程雷电模拟器自动切换循环登录 课程目录 1.课程特点与雷
  • 前言 突发奇想想搞一个同步切换线程的Kotlin协程,而不用各种withContext(){},可以减少嵌套且逻辑更清晰,想实现的结果如下图: 分析 实现我们想要的结果,...其实控制线程切换是协程库内内置的一个拦截器类:Continu
  • 对于linux来说,线程和进程的最大区别就在于地址空间,对于线程切换,第1步是不需要做的,第2是进程和线程切换都要做的。 二、切换的性能消耗: 1、线程上下文切换和进程上下问切换一个最主要的区别是线程的切换虚拟...
  • 进程切换为什么比线程切换慢? 注意这里问的是为什么进程切换比线程慢,而不是问为什么进程比线程慢。当然这里的线程肯定指的是同一个进程中的线程。 解释下虚拟地址(逻辑地址)和物理地址的区别 下面这段 C 代码...
  • 线程切换时CPU在干嘛

    2021-08-11 00:12:49
    关注+星标公众号,不错过精彩内容来源 |码农的荒岛求生计算机系统中有很程序员习以为常但又十分神秘的存在:函数调用、系统调用、进程切换、线程切换以及中断处理。函数调用能让程序员提高代码可...
  • 该楼层疑似违规已被系统折叠隐藏此楼查看此楼Windows操作系统,C语言实现多线程:#include#includeDWORDAPIENTRYThreadOne(LPVOIDthreadArg){printf("线程开始啦,参数是:%s\n",(char*)threadArg);return0;}intmain...
  • Java多线程上下文切换

    2021-02-12 18:30:25
    时间片非常短,一般只有几十毫秒,所以CPU通过不停地切换线程执行时我们几乎感觉不到任务的停滞,让我们感觉是线程同时执行。CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务...
  • 切换内核栈和硬件上下文对于linux来说,线程和进程的最大区别就在于地址空间,对于线程切换,第1步是不需要做的,第2是进程和线程切换都要做的。切换的性能消耗:1、线程上下文切换和进程上下问切换一个最主要的区别...
  • 进程切换 1.1 相关背景知识 1.1.1 虚拟内存 1.1.1.1 虚拟内存是什么 虚拟内存别称虚拟存储器(Virtual Memory)。电脑中所运行的程序均需经由内存执行,若执行的程序占用内存很大或很,则会导致内存消耗殆尽。为...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 395,965
精华内容 158,386
关键字:

多线程切换