精华内容
下载资源
问答
  • Executor到底是什么时候启动的?

    千次阅读 2017-06-18 21:16:10
    6.4.1 Executor到底是什么时候启动的? 在SparkContext启动之后,StandaloneSchedulerBackend中会new出一个StandaloneAppClient,StandaloneAppClient中有一个名叫ClientEndPoint的内部类,在创建ClientEndpoint会...

    6.4.1 Executor到底是什么时候启动的?

    在SparkContext启动之后,StandaloneSchedulerBackend中会new出一个StandaloneAppClient,StandaloneAppClient中有一个名叫ClientEndPoint的内部类,在创建ClientEndpoint会传入Command来指定具体为当前应用程序启动的Executor进行的入口类的名称为CoarseGrainedExecutorBackend ClientEndPoint继承自ThreadSafeRpcEndpoint,其通过RPC机制完成和Master的通信。在ClientEndPoint的start方法中,会通过registerWithMaster方法向Master发送RegisterApplication请求,Master收到该请求消息之后,首先通过registerApplication方法完成信息登记,之后将会调用schedule方法,在Worker上启动Executor,Master对RegisterApplication请求处理源代码如下所示。

    Master.scala源码:

    1.              caseRegisterApplication(description, driver) =>

    2.             // TODO Prevent repeatedregistrations from some driver

    3.          //Master处于STANDBY(备用)状态,不作处理

    4.             if (state ==RecoveryState.STANDBY) {

    5.               // ignore, don't sendresponse

    6.             } else {

    7.               logInfo("Registeringapp " + description.name)

    8.           //由description描述,构建ApplicationInfo

    9.               val app =createApplication(description, driver)

    10.            registerApplication(app)

    11.            logInfo("Registeredapp " + description.name + " with ID " + app.id)

    12.         //在持久化引擎中加入application

    13.            persistenceEngine.addApplication(app)

    14.            driver.send(RegisteredApplication(app.id,self))

    15.      //调用schedule方法,在worker节点上启动Executor   

    16.            schedule()

    17.          }

    在上面的代码中,Master匹配到RegisterApplication请求,先判断Master的状态是否为STANDBY(备用)状态,如果不是说明Master为ALIVE状态,在这种状态下,调用createApplication(description,sender)方法创建ApplicationInfo,完成之后调用persistenceEngine.addApplication(app)方法,将新创建的ApplicationInfo持久化,以便错误恢复。完成这两步操作之后,通过driver.send(RegisteredApplication(app.id,self))向StandaloneAppClient返回注册成功后ApplicationInfo的Id和master的url地址。

    ApplicationInfo对象是对application的描述,先来看一下createApplication这个方法的源代码,源代码如下所示。

    Master.scala源码:

    1.           private def createApplication(desc:ApplicationDescription, driver: RpcEndpointRef):

    2.             ApplicationInfo = {

    3.        //ApplicationInfo创建时间

    4.           val now =System.currentTimeMillis()

    5.           val date = new Date(now)

    6.         //由date生成application id

    7.           val appId =newApplicationId(date)

    8.        //创建ApplicationInfo

    9.           new ApplicationInfo(now,appId, desc, date, driver, defaultCores)

    10.      }

    上面代码中,createApplication方法接收ApplicationDescription和ActorRef两种类型的参数。并调用newAppicationId方法生成appId,关键代码如下所示。

    1. val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)

    由代码所决定,appid的格式形如:app-20160429101010-0001。在desc这个对象中,包含一些基本的配置,包括从系统中传入的一些配置信息,如appname、maxCores、memoryPerExecutorMB等。最后使用desc、date、driver、defaultCores等做为参数构造一个ApplicatinInfo对象并返回。函数返回之后,调用registerApplication方法,完成application的注册,该方法是如何完成注册的呢?方法代码如下所示。

    Master.scala源码:

    1.         private def registerApplication(app:ApplicationInfo): Unit = {

    2.         //Driver的地址,用于Master和Driver通信

    3.           val appAddress =app.driver.address

    4.         //如果addressToApp中已经有了该Driver地址,说明该Driver已经注册过了,直接return

    5.        

    6.           if(addressToApp.contains(appAddress)) {

    7.             logInfo("Attempted tore-register application at same address: " + appAddress)

    8.             return

    9.           }

    10.     //向度量系统注册

    11.        applicationMetricsSystem.registerSource(app.appSource)

    12.     //apps是一个HashSet,保存数据不能重复,向HashSet中加入app

    13.        apps += app

    14.    //idToApp是一个HashMap,该HashMap用于保存id和app的对应关系

    15.        idToApp(app.id) = app

    16.    //endpointToApp是一个HashMap, driver和app的对应关系

    17.        endpointToApp(app.driver) =app

    18.    //addressToApp是一个HashMap,记录app Driver的地址和app的对应关系

    19.        addressToApp(appAddress) = app

    20.    /waitingApps是一个数组,记录等待调度的app记录

    21.        waitingApps += app

    22.        if (reverseProxy) {

    23.          webUi.addProxyTargets(app.id,app.desc.appUiUrl)

    24.        }

    25.      }

     

    上面代码中,首先通过app.driver.path.address得到driver的地址,然后查看appAdress映射表中是否已经存在这个路径,如果存在表示该application已经注册,直接返回;如果不存在,则在waitingApps数组中加入该application,同时在idToApp、endpointToApp、addressToApp映射表中加入映射关系。加入waitingApps数组中的application等待schedule方法的调度。

    schedule方法有两个作用,第一,完成Driver的调度,将waitingDrivers数组中的Driver发送到满足运行条件的Worker上运行。第二,在满足条件的Worker节点上为application启动Executor。schedule方法源代码如下所示。

    Master.scala的schedule方法源码:

    1.       private def schedule(): Unit = {

    2.       …….

    3.            launchDriver(worker, driver)

    4.           …….

    5.           startExecutorsOnWorkers()

    6.         }

    在Master中,schedule方法是一个很重要的方法,每一次新的Driver的注册application的注册或者可用资源发生变动,都将调用schedule方法。Schedule方法用于为当前等待调度的application调度可用的资源,在满足条件的Worker节点上启动Executor。这个方法还有另外一个作用,就是当有Driver提交的时候,负责将Driver发送到一个可用资源满足Driver需求的Worker节点上运行,launchDriver(worker,driver)方法负责完成这一任务。

    application调度成功之后,Master将会为appication在Worker节点上启动Executors,调用startExecutorsOnWorkers方法完成此操作,其源代码如下所示。

    Master.scala源码:

    1.          privatedef startExecutorsOnWorkers(): Unit = {

    2.           // Right now this is a verysimple FIFO scheduler. We keep trying to fit in the first app

    3.           // in the queue, then thesecond app, etc.

    4.           for (app <- waitingApps ifapp.coresLeft > 0) {

    5.             val coresPerExecutor:Option[Int] = app.desc.coresPerExecutor

    6.             // Filter out workers thatdon't have enough resources to launch an executor

    7.             val usableWorkers =workers.toArray.filter(_.state == WorkerState.ALIVE)

    8.               .filter(worker =>worker.memoryFree >= app.desc.memoryPerExecutorMB &&

    9.                 worker.coresFree >=coresPerExecutor.getOrElse(1))

    10.            .sortBy(_.coresFree).reverse

    11.          val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

    12.     

    13.          // Now that we've decidedhow many cores to allocate on each worker, let's allocate them

    14.          for (pos <- 0 untilusableWorkers.length if assignedCores(pos) > 0) {

    15.            allocateWorkerResourceToExecutors(

    16.              app, assignedCores(pos), coresPerExecutor,usableWorkers(pos))

    17.          }

    18.        }

    19.      }

    在scheduleExecutorsOnWorkers方法中,有两种启动Executor的策略,第一种是轮流均摊策略(round-robin),采用圆桌算法依次轮流均摊,直到满足资源需求,轮流均摊策略通常会有更好的数据本地性,因此它是默认的选择策略。第二种是依次全占,在usableWorkers中,依次获取每个Worker上的全部资源,直到满足资源需求。

    scheduleExecutorsOnWorkers方法为application分配好逻辑意义上的资源后,还不能真正在Worker 节点为application 分配出资源,需要调用动作函数为application真正的分配资源,allocateWorkerResourceToExecutors 方法的调用,将会在Worker节点上实际分配资源,下面是allocateWorkerResourceToExecutors的源代码。

    Master.scala源码:

    1. private def allocateWorkerResourceToExecutors(

    2.    ……

    3.       launchExecutor(worker, exec)

    4.    …….

    上面代码调用了launchExecutor(worker,exec)方法,这个方法有两个参数,第一个参数是满足条件的WorkerInfo信息,第二个参数是描述Executor的ExecutorDesc对象。这个方法将会向Worker节点发送LaunchExecutor的请求,Worker节点收到该请求之后,将会负者启动Executor。launchExecutor方法代码清单如下所示。

    Master.scala源码:

    1.           private def launchExecutor(worker: WorkerInfo,exec: ExecutorDesc): Unit = {

    2.           logInfo("Launchingexecutor " + exec.fullId + " on worker " + worker.id)

    3.       //向WorkerInfo中加入exec这个描述Executor的ExecutorDesc对象

    4.           worker.addExecutor(exec)

    5.       //向worker发送LaunchExecutor消息,加载Executor消息中携带了masterUrl地址、application id、Executor id、Executor描述desc、Executor核的个数、Executor分配的内存大小

    6.        

    7.           worker.endpoint.send(LaunchExecutor(masterUrl,

    8.             exec.application.id,exec.id, exec.application.desc, exec.cores, exec.memory))

    9.         //向Driver发回ExecutorAdded消息,消息携带worker的id号,worker的host和port,分配的核的个数和内存大小

    10.        exec.application.driver.send(

    11.          ExecutorAdded(exec.id,worker.id, worker.hostPort, exec.cores, exec.memory))

    12.      }      

     launchExecutor有两个参数,第一个参数是worker:WorkerInfo,代表着Worker的基本信息,第二个参数是exec:ExecutorDesc,这个参数保存了Executor的基本配置信息,如memory、cores等。此方法中,有worker.endpoint.send(LaunchExecutor(...)),向Worker发送LaunchExecutor请求,Worker收到该请求之后将会调用方法启动Executor。

    向Worker发送LaunchExecutor消息的同时,通过exec.application.driver.send(ExecutorAdded(…))向Driver发送ExecutorAdded消息,该消息为Driver反馈Master都在哪些Worker上启动了Executor,Executor的编号是多少,为每个Executor分配了多少个核,多大的内存以及Worker的联系hostport等消息。

    Worker收到LaunchExecutor消息会做相应的处理,在Worker节点中,LaunchExecutor处理逻辑源代码如下所示。

    Worker.scala源码:

    1.           caseLaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>

    2.        //若masterUrl和activeMasterUrl不是同一个url,说明非法的Master尝试加载Executor,打印错误信息

    3.             if (masterUrl != activeMasterUrl) {

    4.               logWarning("InvalidMaster (" + masterUrl + ") attempted to launch executor.")

    5.             } else {

    6.               try {

    7.                 logInfo("Asked tolaunch executor %s/%d for %s".format(appId, execId, appDesc.name))

    8.        

    9.               //在workDir/appId/目录下创建以execId为名的Executor工作目录

    10.              val executorDir = newFile(workDir, appId + "/" + execId)

    11.        //调用mkdirs创建目录

    12.              if(!executorDir.mkdirs()) {

    13.                throw newIOException("Failed to create directory " + executorDir)

    14.              }

    15.     

    16.             //为Executor创建本地目录,该目录通过变量SPARK_EXECUTOR_DIRS设置并传递,该目录在application运行结束时由Worker负责删除

    17.              val appLocalDirs =appDirectories.getOrElse(appId,

    18.                Utils.getOrCreateLocalRootDirs(conf).map{ dir =>

    19.                  val appDir =Utils.createDirectory(dir, namePrefix = "executor")

    20.                  Utils.chmod700(appDir)

    21.                  appDir.getAbsolutePath()

    22.                }.toSeq)

    23.              //在哈希表appDirectories中加入appId和appLocalDirs的对应关系

    24.              appDirectories(appId) =appLocalDirs

    25.      //创建ExecutorRunner

    26.              val manager = newExecutorRunner(

    27.                appId,

    28.                execId,

    29.                appDesc.copy(command =Worker.maybeUpdateSSLSettings(appDesc.command, conf)),

    30.                cores_,

    31.                memory_,

    32.                self,

    33.                workerId,

    34.                host,

    35.                webUi.boundPort,

    36.                publicAddress,

    37.                sparkHome,

    38.                executorDir,

    39.                workerUri,

    40.                conf,

    41.                appLocalDirs,ExecutorState.RUNNING)

    42.      //在哈希表executors中加入appId+”/”+execId和ExecutorRunner的对应关系

    43.              executors(appId + "/" + execId)= manager

    44.        //启动ExecutorRunner

    45.              manager.start()

    46.              coresUsed += cores_

    47.       //Worker上已经使用的核增加cores_个,cores_分配给Executor的核的个数

    48.              memoryUsed += memory_

    49.      //向Master发送ExecutorStateChanged消息,该消息携带appId,exeId,ExecutorRunner的状态

    50.              sendToMaster(ExecutorStateChanged(appId,execId, manager.state, None, None))

    51.            } catch {

    52.              case e: Exception =>

    53.                logError(s"Failedto launch executor $appId/$execId for ${appDesc.name}.", e)

    54.                if (executors.contains(appId + "/"+ execId)) {

    55.                  executors(appId +"/" + execId).kill()

    56.                  executors -= appId +"/" + execId

    57.                }

    58.                sendToMaster(ExecutorStateChanged(appId,execId, ExecutorState.FAILED,

    59.                  Some(e.toString), None))

    60.            }

    61.          }

      

    上面代码中,首先判断传过来的masterUrl是否和activeMasterUrl相同,如果不相同,说明收到的不是处于ALIVE状态的Master发送过来的请求,这种情况直接打印警告信息。如果相同,则说明该请求来自ALIVE Master,于是为Executor创建工作目录,创建好工作目录之后,使用appid、execid、appDes等参数创建ExecutorRunner,顾名思义,ExecutorRunner是Executor运行的地方,在ExecutorRunner中,有一个工作线程,这个线程负责下载依赖的文件,并启动CoarseGaindExecutorBackend进程,该进程单独在一个JVM上运行。下面是ExecutorRunner中的线程启动的源代码。

    ExecutorRunner.scala源码:

    1.        private[worker] def start() {

    2.            //创建线程

    3.           workerThread = newThread("ExecutorRunner for " + fullId) {

    4.         //线程run方法中调用fetchAndRunExcutor

    5.             override def run() {fetchAndRunExecutor() }

    6.           }

    7.          //启动线程

    8.           workerThread.start()

    9.           

    10.       // 终止回调函数,用于杀死进程

    11.        shutdownHook =ShutdownHookManager.addShutdownHook { () =>

    12.          // It's possible that wearrive here before calling `fetchAndRunExecutor`, then `state` will

    13.          // be `ExecutorState.RUNNING`.In this case, we should set `state` to `FAILED`.

    14.          if (state ==ExecutorState.RUNNING) {

    15.            state =ExecutorState.FAILED

    16.          }

    17.          killProcess(Some("Worker shuttingdown")) }

    18.      }

    上面代码中,定义了一个Thread,这个Thread的run方法中调用fetchAndRunExecutor方法,fetchAndRunExecutor负责以进程的方式启动ApplicationDescription中携带的org.apache.spark.executor.CoarseGrainedExecutorBackend进程。fetchAndRunExecutor方法源代码如下所示。

    ExecutorRunner.scala源码:

    1.         private def fetchAndRunExecutor() {

    2.           try {

    3.             // Launch the process

    4.             val builder =CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),

    5.               memory,sparkHome.getAbsolutePath, substituteVariables)

    6.             val command =builder.command()

    7.             val formattedCommand =command.asScala.mkString("\"", "\" \"","\"")

    8.             logInfo(s"Launchcommand: $formattedCommand")

    9.        

    10.          builder.directory(executorDir)

    11.          builder.environment.put("SPARK_EXECUTOR_DIRS",appLocalDirs.mkString(File.pathSeparator))

    12.          // In case we are runningthis from within the Spark Shell, avoid creating a "scala"

    13.          // parent process for theexecutor command

    14.          builder.environment.put("SPARK_LAUNCH_WITH_SCALA","0")

    15.     

    16.          // Add webUI log urls

    17.          val baseUrl =

    18.            if(conf.getBoolean("spark.ui.reverseProxy", false)) {

    19.              s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="

    20.            } else {

    21.              s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="

    22.            }

    23.          builder.environment.put("SPARK_LOG_URL_STDERR",s"${baseUrl}stderr")

    24.          builder.environment.put("SPARK_LOG_URL_STDOUT",s"${baseUrl}stdout")

    25.     

    26.          process = builder.start()

    27.          val header = "SparkExecutor Command: %s\n%s\n\n".format(

    28.            formattedCommand, "=" * 40)

    29.     

    30.          // Redirect its stdout andstderr to files

    31.          val stdout = newFile(executorDir, "stdout")

    32.          stdoutAppender =FileAppender(process.getInputStream, stdout, conf)

    33.     

    34.          val stderr = newFile(executorDir, "stderr")

    35.          Files.write(header, stderr,StandardCharsets.UTF_8)

    36.          stderrAppender =FileAppender(process.getErrorStream, stderr, conf)

    37.     

    38.          // Wait for it to exit;executor may exit with code 0 (when driver instructs it to shutdown)

    39.          // or with nonzero exit code

    40.          val exitCode =process.waitFor()

    41.          state = ExecutorState.EXITED

    42.          val message = "Commandexited with code " + exitCode

    43.          worker.send(ExecutorStateChanged(appId,execId, state, Some(message), Some(exitCode)))

    44.        } catch {

    45.          case interrupted:InterruptedException =>

    46.            logInfo("Runnerthread for executor " + fullId + " interrupted")

    47.            state =ExecutorState.KILLED

    48.            killProcess(None)

    49.          case e: Exception =>

    50.            logError("Errorrunning executor", e)

    51.            state =ExecutorState.FAILED

    52.            killProcess(Some(e.toString))

    53.        }

    54.      }

    其中fetchAndRunExecutor()方法中的CommandUtils.buildProcessBuilder(appDesc.command,传入的入口类是:"org.apache.spark.executor.CoarseGrainedExecutorBackend",当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。

    CoarseGrainedExecutorBackend的onStart方法源码:

    1.             override def onStart() {

    2.         …….

    3.               driver = Some(ref)

    4.          //向driver发送ask请求,等待driver的回应

    5.               ref.ask[Boolean](RegisterExecutor(executorId,self, hostname, cores, extractLogUrls))

    6.           ……

    Driver端收到注册请求,将会注册Executor的请求,

    CoarseGrainedSchedulerBackend.scala的 receiveAndReply方法源码:

    1.             override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {

    2.          

    3.               caseRegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>

    4.          …….

    5.                   executorRef.send(RegisteredExecutor)

    6.           ……

     如上面代码所示,Driver向CoarseGrainedExecutorBackend发送RegisteredExecutor消息后,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息源代码如下所示。

    CoarseGrainedExecutorBackend.scala的receive源码:

    1.          override def receive:PartialFunction[Any, Unit] = {

    2.             case RegisteredExecutor =>

    3.               logInfo("Successfullyregistered with driver")

    4.               try {

    5.           //收到RegisteredExecutor消息,立即创建Executor

    6.                 executor = newExecutor(executorId, hostname, env, userClassPath, isLocal = false)

    7.               } catch {

    8.                 case NonFatal(e) =>

    9.                   exitExecutor(1,"Unable to create executor due to " + e.getMessage, e)

    10.            }

    上面代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新创建一个org.apache.spark.executor.Executor对象,至此Executor创建完毕。

     

    展开全文
  • 执行器框架(Executor Framework),围绕着Executor接口和它的子接口ExecutorService,及实习这两个接口的ThreadPoolExecutor类展开。  这套机制分离了任务的创建和执行。通过使用执行器,仅需要在实现Runnable接口...

    执行器框架(Executor Framework),围绕着Executor接口和它的子接口ExecutorService,及实习这两个接口的ThreadPoolExecutor类展开。

     这套机制分离了任务的创建和执行。通过使用执行器,仅需要在实现Runnable接口的对象,然后将这些对象发送给执行器即可。执行器通过创建所需的线程,来负责这些Runnalbe对象的创建,实例化及运行。执行器使用了线程池来提高应用程序的性能。当发送一个任务给执行器,执行器会尝试使用线程池中的线程对象来执行这个任务,避免了不断地创建和销毁线程而导致性能下降。

       执行器框架另一个重要的优势是Calllable接口。它类似于Runnable接口,但是却提供了两个方面的增强。

          这个接口的主要方法名为call(),可以返回结果。

          当发送一个Callable对象给执行器,将获得一个实现Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果。

    展开全文
  • 线程池-Executor框架

    2020-12-17 13:04:21
    文章目录Executor是什么Executor是什么? Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。 主要包括什么?...

    什么是Executor?

    Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

    Executor框架的结构

    Executor框架主要由3大部分组成如下:
    1、任务
    包括被执行任务需要实现的接口:Runnable接口或Callable接口。
    2、任务的执行
    包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
    3、异步计算的结果。包括接口Future和实现Future的FutureTask类。

    Executor框架主要成员简介

    • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开。只有一个execute方法,void execute(Runnable command)

    • ExecutorService继承于Executor接口, 他提供了更为丰富的线程实现方法。
      ExecutorService有三种状态:运行、关闭、终止。
      创建后便进入运行状态
      当调用了shutdown()方法时,便进入了关闭状态,此时意味着ExecutorService不再接受新的任务,但是他还是会执行已经提交的任务,
      当所有已经提交了的任务执行完后,便达到终止状态。
      如果不调用shutdown方法,ExecutorService方法会一直运行下去,系统一般不会主动关闭。

    • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

    • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

    • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

    • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

    Executor框架主要成员结构图

    在这里插入图片描述

    Executor框架使用结构图

    在这里插入图片描述

    1、主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。
    2、然后可以把Runnable对象直接交给 ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象 提交给ExecutorService执行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callabletask))。
    3、如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
    4、最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

    Executor框架的两级调度模型

    在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

    在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器Executor框架将这些任务映射为固定数量的线程;
    在底层,操作系统内核将这些线程映射到硬件处理器上。

    这种两级调度模型的示意图如图如下:
    应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
    在这里插入图片描述

    ThreadPoolExecutor和ScheduledThreadPoolExecutor

    ThreadPoolExecutor
    ScheduledThreadPoolExecutor

    CompletionService接口

    在这里插入图片描述
    CompletionService与ExecutorService类似,都可以用来执行线程池的任务
    ExecutorService继承了Executor接口,而CompletionService没有继承Executor接口。why?
    主要是Executor的特性决定的,Executor框架不能完全保证任务执行的异步性,那就是如果需要实现任务task的异步性,只要为每个task创建一个线程就实现了任务的异步性。代码往往包含new Thread(task).start()。这种方式的问题在于,它没有限制可创建线程的数量(在ExecutorService可以限制),不过,这样最大的问题是在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。

    CompletionService想要完全的异步,就不能受到Executor的影响。

    一般情况下,如果需要判断任务是否完成,思路是得到Future列表的每个Future,然后反复调用其get方法,并将timeout参数设为0,从而通过轮询的方式判断任务是否完成。
    为了更精确实现任务的异步执行,可以使用CompletionService。

    接口内的几个方法:

    Future<V> submit(Callable<V> task);
    

    用于向服务中提交有返回结果的任务,并返回Future对象

    Future<V> submit(Runnable task, V result);
    

    用户向服务中提交有返回值的任务去执行,并返回Future对象

    Future<V> take() throws InterruptedException;
    

    从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。

    Future<V> poll();
    

    从服务中返回并移除一个已经完成的任务,如果内部没有已经完成的任务,则返回空,此方法会立即响应。

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
    

    尝试在指定的时间内从服务中返回并移除一个已经完成的任务,等待的时间超时还是没有获取到已完成的任务,则返回空。此方法会响应线程中断

    ExecutorCompletionService类

    ExecutorCompletionService类是CompletionService接口的具体实现。
    在这里插入图片描述
    说一下其内部原理,ExecutorCompletionService创建的时候会传入一个线程池,调用submit方法传入需要执行的任务,任务由内部的线程池来处理;ExecutorCompletionService内部有个阻塞队列,任意一个任务完成之后,会将任务的执行结果(Future类型)放入阻塞队列中,然后其他线程可以调用它take、poll方法从这个阻塞队列中获取一个已经完成的任务,**获取任务返回结果的顺序和任务执行完成的先后顺序一致,所以先完成的任务会先返回。**能最大限度的提升性能。

    看一下构造方法:

    public ExecutorCompletionService(Executor executor) {
            if (executor == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }
    

    构造方法需要传入一个Executor对象,这个对象表示任务执行器,所有传入的任务会被这个执行器执行。

    completionQueue是用来存储任务结果的阻塞队列,默认用采用的是LinkedBlockingQueue,也支持开发自己设置。通过submit传入需要执行的任务,任务执行完成之后,会放入completionQueue中。

    在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
    QueueingFuture 源码:

     private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    

    demo:

    public class Demo14 {
        static class GoodsModel {
            //商品名称
            String name;
            //购物开始时间
            long startime;
            //送到的时间
            long endtime;
    
            public GoodsModel(String name, long startime, long endtime) {
                this.name = name;
                this.startime = startime;
                this.endtime = endtime;
            }
    
            @Override
            public String toString() {
                return name + ",下单时间[" + this.startime + "," + endtime + "],耗时:" + (this.endtime - this.startime);
            }
        }
    
        /**
         * 将商品搬上楼
         *
         * @param goodsModel
         * @throws InterruptedException
         */
        static void moveUp(GoodsModel goodsModel) throws InterruptedException {
            //休眠5秒,模拟搬上楼耗时
            TimeUnit.SECONDS.sleep(5);
            System.out.println("将商品搬上楼,商品信息:" + goodsModel);
        }
    
        /**
         * 模拟下单
         *
         * @param name     商品名称
         * @param costTime 耗时
         * @return
         */
        static Callable<GoodsModel> buyGoods(String name, long costTime) {
            return () -> {
                long startTime = System.currentTimeMillis();
                System.out.println(startTime + "购买" + name + "下单!");
                //模拟送货耗时
                try {
                    TimeUnit.SECONDS.sleep(costTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long endTime = System.currentTimeMillis();
                System.out.println(endTime + name + "送到了!");
                return new GoodsModel(name, startTime, endTime);
            };
        }
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long st = System.currentTimeMillis();
            System.out.println(st + "开始购物!");
            ExecutorService executor = Executors.newFixedThreadPool(5);
    
            //创建ExecutorCompletionService对象
            ExecutorCompletionService<GoodsModel> executorCompletionService = new ExecutorCompletionService<>(executor);
            //异步下单购买冰箱
            executorCompletionService.submit(buyGoods("冰箱", 5));
            //异步下单购买洗衣机
            executorCompletionService.submit(buyGoods("洗衣机", 2));
            executor.shutdown();
    
            //购买商品的数量
            int goodsCount = 2;
            for (int i = 0; i < goodsCount; i++) {
                //可以获取到最先到的商品
                GoodsModel goodsModel = executorCompletionService.take().get();
                //将最先到的商品送上楼
                moveUp(goodsModel);
            }
    
            long et = System.currentTimeMillis();
            System.out.println(et + "货物已送到家里咯,哈哈哈!");
            System.out.println("总耗时:" + (et - st));
        }
    }
    

    输出:

    1564653208284开始购物!
    1564653208349购买冰箱下单!
    1564653208349购买洗衣机下单!
    1564653210349洗衣机送到了!
    1564653213350冰箱送到了!
    将商品搬上楼,商品信息:洗衣机,下单时间[1564653208349,1564653210349],耗时:2000
    将商品搬上楼,商品信息:冰箱,下单时间[1564653208349,1564653213350],耗时:5001
    1564653220350货物已送到家里咯,哈哈哈!
    总耗时:12066
    

    洗衣机先到的,然后被送上楼了,冰箱后到被送上楼,先到先上楼=先执行先返回。不用程序员操心。

    推荐阅读这个ExecutorCompletion

    展开全文
  • Executor

    2016-08-08 17:22:00
    一、为什么需要Executor?为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制。他们都在java.util.concurrent包中,JDK并发包的核心。其中有一个比较重要的类:Executors,他...

    一、为什么需要Executor?
    为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员
    有效的进行线程控制。他们都在java.util.concurrent包中,是JDK并发包的
    核心。其中有一个比较重要的类:Executors,他扮演着线程工厂的角色,我
    们通过Executors可以创建特定功能的线程池。

    newFixedThreadPool()方法,该方法返回一个固定数量的线程池,该方法的线程数始
    终不变。当已有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在
    一个任务队列中等待有空闲的线程去执行。
    newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有
    空闲线程则暂缓在任务队列中。
    newCachedThreadPool()方法,返回一个可根据实际情况调整线程个数的线程池,不
    限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程。并且
    每一个空闲线程会在60秒后自动回收。
    newScheduledThreadPool()方法,该方法返回一个SchededExecutorService对象,但
    该线程池可以指定线程的数量。

    二、对上面的几个JDK提供的线程池做分析?
    四个方法底层代码都是创建了一个ThreadPoolExecutor对象返回的。
    自定义线程池
    若Executor工厂类无法满足我们的需求,可以自己去创建自定义的线程池,其实
    Executors工程类里面的创建线程方法其内部实现均是用了ThreadPoolExecutor这个
    类,这个类可以自定义线程。构造方法如下:
    public ThreadPoolExecutor(
    int corePoolSize, //核心线程数,new的时候直接初始化的线程数量
    int maxinumPoolSize, //最大线程数
    long keppAliveTime, //空闲时间
    TimeUnit unit, //时间单位
    BlockingQueue<Runnable> workQueue, //任务队列
    ThreadFactory threadFactory, //
    RejectedExecutionHandler handler //
    )

    对于newFixedThreadPool(10)
    ThreadPoolExecutor(10, 10,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());

    对于newSingleThreadExecutor()
    new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>())

    对于newCachedThreadPool()
    ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    这里需要注意了,刚创建完CacheThreadPool对象,池中是没有任何线程的,
    也就是说来一个任务会创建一个线程去执行;并且如果一个线程空闲时间大于60s了
    那么他就会被释放掉了。为什么会来一个任务就创建一个线程,是因为使用了
    SynchronousQueue的原因。

    对于newScheduledThreadPool(10)
    super(10, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
    new DelayedWorkQueue());
    说明这个线程池也是不限制任务数量的,默认池初始化有10个线程,
    并且使用的是DelayedWorkQueue()说明是带有定时任务的一些任务。

    三、newScheduledThreadPool的使用
    class Temp extends Thread{
    @Override
    public void run() {
    System.out.println("temp run...");
    }
    }

    public class TestFixedThreadPool {
    public static void main(String[] args) {
    /**
    ExecutorService pool1 = Executors.newFixedThreadPool(10);
    ExecutorService pool2 = Executors.newSingleThreadExecutor();
    ExecutorService pool3 = Executors.newCachedThreadPool();
    **/
    ScheduledExecutorService pool4 = Executors.newScheduledThreadPool(10);
    //其中的5是初始化时间
    //其中的1是轮询时间
    //下面这句代码的执行结果是,程序启动后延迟5s的时间来初始化任务,然后每隔一秒钟就执行一次run方法
    pool4.scheduleWithFixedDelay(new Temp(), 5, 1,TimeUnit.SECONDS);
    }
    }

    四、自定义线程池
    public ThreadPoolExecutor(
    int corePoolSize, //核心线程数,new的时候直接初始化的线程数量
    int maxinumPoolSize, //最大线程数
    long keppAliveTime, //空闲时间
    TimeUnit unit, //时间单位
    BlockingQueue<Runnable> workQueue, //任务队列
    ThreadFactory threadFactory, //
    RejectedExecutionHandler handler //
    )
    这个构造方法对于队列是什么类型的比较关键
    在使用有界队列时,若有新的任务需要执行,如果线程池实际线程小于corePoolSize,
    则优先创建线程;若大于corePoolSize,则会将任务加入队列,若队列已满,则在总
    线程数不大于maxnumPoolSize的前提下,创建新的线程,若线程数大于maxmunPoolSize,
    则执行拒绝策略。或其他自定义方式。
    无界的任务队列时:LinkedBlockingQueue。与有界队列相比,除非系统资源耗尽,否则
    无界的任务队列不存在任务入队失败的情况。当有新任务到来,系统的线程数小于
    corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。
    若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务
    创建和处理的速度差异很大,无界队列会保持快速增长,直至耗尽系统内存。
    JDK拒绝策略:
    AbortPolicy:直接抛出异常组织系统正常工作
    CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
    DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务。
    DiscardPolocy:丢弃无法处理的任务,不给于任何处理。
    如果需要自定义拒绝策略可以实现RejectExecutionHandler接口。

    4.1有界队列的例子
    public class UseSizeQueue {
    public static void main(String[] args) {
    /**
    * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
    * 若大于corePoolSize,则会将任务加入队列, 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
    * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
    *
    */
    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, // coreSize
    2, // MaxSize
    60, // 60
    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) // 指定一种队列(有界队列)
    , new MyRejected()
    );

    MyTask mt1 = new MyTask(1, "任务1");
    MyTask mt2 = new MyTask(2, "任务2");
    MyTask mt3 = new MyTask(3, "任务3");
    MyTask mt4 = new MyTask(4, "任务4");
    MyTask mt5 = new MyTask(5, "任务5");
    MyTask mt6 = new MyTask(6, "任务6");

    pool.execute(mt1);
    pool.execute(mt2);
    pool.execute(mt3);
    pool.execute(mt4);
    pool.execute(mt5);
    pool.execute(mt6);

    //调用线程池的shutdown方法,并不是直接这个线程池就销毁了
    //而是等到所有任务运行结束
    pool.shutdown();
    }
    }
    1.如果上述代码只有pool.execute(mt1);的话,那么输出是:run taskId =1,并且在5s之后程序停止,这就是第一种情况:
    在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程。
    2.如果上述代码有
    pool.execute(mt1);
    pool.execute(mt2);
    两行,那么输出是:
    run taskId =1
    run taskId =2
    并且先输出run taskId=1等待5s之后输出run taskId =2,再等待5s之后程序停止,这就是第二种情况:
    若大于corePoolSize,则会将任务加入队列,
    3.如果上述代码有5行
    pool.execute(mt1);
    pool.execute(mt2);
    pool.execute(mt3);
    pool.execute(mt4);
    pool.execute(mt5);
    则输出是这样的:
    run taskId =1
    run taskId =5
    run taskId =2
    run taskId =3
    run taskId =4
    首先是1和5执行,然后经过5s后,2和3执行,在经过5s后4执行,然后经过5s后程序结束。
    这就是第三种情况:
    若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
    4.如果代码有6行:
    pool.execute(mt1);
    pool.execute(mt2);
    pool.execute(mt3);
    pool.execute(mt4);
    pool.execute(mt5);
    pool.execute(mt6);
    执行效果是:
    自定义处理..
    run taskId =1
    当前被拒绝任务为:6
    run taskId =5
    run taskId =2
    run taskId =3
    run taskId =4
    也就是说,当6来的时候被拒绝了,这就是第四种情况:
    若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。

    4.2无界队列的例子
    public class NoSizeQueue implements Runnable{

    private static AtomicInteger count = new AtomicInteger(0);

    public void run() {
    try {
    int temp = count.incrementAndGet();
    System.out.println("任务" + temp);
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    public static void main(String[] args) throws Exception{
    BlockingQueue<Runnable> queue =
    new LinkedBlockingQueue<Runnable>();
    ExecutorService executor = new ThreadPoolExecutor(
    5, //core
    10, //max
    120L, //2fenzhong
    TimeUnit.SECONDS,
    queue);

    for(int i = 0 ; i < 20; i++){
    executor.execute(new NoSizeQueue());
    }
    Thread.sleep(1000);
    System.out.println("queue size:" + queue.size()); //10
    Thread.sleep(2000);
    }

    }
    执行的结果是:
    任务1
    任务2
    任务3
    任务5
    任务4
    queue size:15
    任务6
    任务7
    任务8
    任务9
    任务10
    任务11
    任务12
    任务13
    任务14
    任务15
    任务16
    任务17
    任务18
    任务19
    任务20
    先执行了前5个任务,然后休息了时间。然后又取出了5个任务,然后又取出了5个任务,最后又取出了5个任务
    最先前的5个任务到位后,因为当前线程数小于coreSize,所以就直接新建线程执行了。
    其他的后面的15个任务会加入到队列中,等待被执行。

    五、拒绝策略
    AbortPolicy:直接抛出异常组织系统正常工作,当前任务丢失了,但是队列中等到的任务继续执行。
    CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
    如果需要自定义拒绝策略可以实现RejectExecutionHandler接口。
    public class MyRejected implements RejectedExecutionHandler{


    public MyRejected(){
    }
    //其中r就是线程对象MyTask的一个实例
    //executor就是当前的线程池对象
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    System.out.println("自定义处理..");
    System.out.println("当前被拒绝任务为:" + r.toString());

    }

    }
    常用的拒绝策略是:写日志,然后写一个定时的job,读取log日志,然后重跑。
    或者向数据源发送拒绝消息

    转载于:https://www.cnblogs.com/dongdone/p/5750272.html

    展开全文
  • 戳蓝字“CSDN云计算”关注我们哦!容器技术这两年热门的话题,因为容器技术给我们带来了很多方便的地方,节约了不少成本,不管在运维还是开发上。今天,就让我们来看看关于D...
  • SpringBoot整合SpringCloud实战开发系列教程(精品不容错过) 在实际程序设计中,如果不用并发,问题很难解决,比如仿真系统。在仿真程序中,我们通常...线程机制原理是什么? 在实战中,我们进行并发编程,通常..
  • 任务执行器(Executor)一个接口,位于java.util.concurrent包下,它的作用主要为我们提供任务与执行机制(包括线程使用和调度细节)之间的解耦。比如我们定义了一个任务,我们通过线程池来执行该任务,还是直接创...
  • Executor框架

    2019-03-31 15:06:03
    Executor是一个灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程...
  • 1.mybatis有三种executor执行器,分别为simpleexecutor、reuseexecutor、batchexecutor。 simpleexecutor执行器:在每执行一次update或select,就开启一个statement对象,用完后就关闭。 reuseexecutor执行器:在...
  • 每次执行任务创建线程 new Thread()比较消耗性能,创建一个线程比较耗时、耗资源的。 调用 new Thread()创建的线程缺乏管理,被称为野线程,而且可以无限制的创建,线程之间的相互竞争会导致过多占用系统资源而...
  • 在实际程序设计中,如果不用并发,问题很难解决,比如仿真系统。在仿真程序中,我们通常将每个元素都设置问一个任务,通过一个个线程去执行,这样就容易实现...线程机制原理是什么?在实战中,我们进行并发编程,通...
  • 非常常见的情况,单个的Executor被用来创建和管理系统中的任务。shutdown()方法可以防止新的任务被提交给这个Executor。如果在shutdown()方法之后提交新任务,则会抛出java.util.concurrent....
  • Executor 线程池

    2018-10-11 08:48:00
    1、什么是线程池: java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,...
  • Executor并发框架

    2021-01-06 14:23:21
    1. Executor 框架是什么Executor 框架包含一组用于有效管理工作线程的组件。Executor API 通过 Executors 将任务的执行与要执行的实际任务解耦。 这是 生产者-消费者 模式的一种实现。 java.util.concurrent....
  • 1. 究竟是怎么运行的?... 但是究竟那些Executor是怎么运行你提交的代码段的?下面是一个日志分析的例子,来自Spark的example def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Log Quer
  • dynamic allocate executor 感觉好拽的能力,动态分配哦,好腻害的样子噢。万事大家,妈妈再也不用担心我到底该配置多少 资源合适了,又保住了好几根银发。面试官再问我我如何估计资源的,我就把这个参数甩她胸口~...
  • 1、什么是Executor框架? 我们知道线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等。线程用于执行异步任务,单个的线程既是工作单元也执行机制,从JDK1.5开始,为了...
  • Spark Executor内幕

    2016-12-14 09:25:00
    Spark Executor工作原理ExecutorBackend注册Executor实例...Worker为什么要启动另外一个进程?Worker本身管理当前机器上的资源,变动资源的时候向Master汇报。有很多应用程序,就需要很多Executor。这样程序之间不...
  • 一、什么是Executor框架?我们知道线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等。线程用于执行异步任务,单个的线程既是工作单元也执行机制,从JDK1.5开始,为了把...
  • 一、介绍 主线程和线程池之间没有直接关系,... shutdown() 作为函数,当然立即执行,也即不再接受新任务了;但是它既不会强行终止正在执行的任务,也不会取消已经提交的任务。也就是说之前提交的5个任务,
  • Executor线程池

    2019-08-07 21:47:59
    一、为什么要用线程池? 1)、系统启动一个新线程的成本比较高的,因为它涉及与操作系统交互。在这种情形下,使用线程池可以很好地提高性能,尤其当程序中需要创建大量生存期很短的线程时,更应该考虑使用线程池...
  • Executor具体如何工作的 [引言部份:你希望读者看完这篇博客后有那些启发、学到什么样的知识点] 更新中...... Spark Executor 工作原理图 第一步:Master 发指令给 Worker 启动 Exec...
  • Java Executor 框架

    2019-09-16 15:37:02
    学习Java的同学注意了!!! 学习过程中遇到什么问题或者想获取学习资源的话,欢迎加入Java学习交流群,群号码:183993990 我们一起学Java!... Executor框架指java5中引入的一系列并发库中与exe...
  • Executor框架指JDK 1.5中引入的一系列并发库中与Executor相关的功能类,包括Executor、Executors、ExecutorService、Future、Callable等。 一、为什么要引入Executor框架? 1、如果使用new Thread(…).start()的...
  • 首先concurrent里面最简单也最基础的一个接口 Executor,先在这里卖个关子,为什么这个接口只有一个方法。为什么ExecutorService(点击查看源码分析),会去继承这个接口,而不是直接把这个方法纳入到自己的实现中...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,254
精华内容 501
关键字:

executor是什么