-
2020-07-02 22:07:21
# -*- coding: utf-8 -*- from concurrent.futures import wait from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext conf = SparkConf() conf.set("spark.yarn.queue", "queue_name") \ .set("spark.app.name", "application_name") \ .set("spark.driver.memory", "20g") \ .set("spark.executor.cores", "1") \ .set("spark.executor.memory", "10g") \ .set("spark.executor.memoryOverhead", "5g") \ .set("spark.executor.instances", "400") \ .set("spark.dynamicAllocation.enabled", "true") \ .set("spark.dynamicAllocation.maxExecutors", "500") \ .set("spark.rdd.compress", "true") \ .set("spark.shuffle.compress", "true") sc = SparkContext(conf=conf) spark = HiveContext(sc) def features_gps(param): sql_query = """select 1/0""".format(**param) try: print(sql_query) spark.sql(sql_query).show() except Exception as e: info = u"任务失败" print(info, e) raise RuntimeError(info) return 0 def main(): params = list() for tail in range(10): params.append(tail) pool = ThreadPoolExecutor(5) task = list(map(lambda x: pool.submit(features_gps, x), params)) # ALL_COMPLETED全部完成时退出,FIRST_EXCEPTION有异常就退出 wait(task, return_when=FIRST_EXCEPTION) pool.shutdown() r = sum([_.result() for _ in task]) if r != 0: info = u"任务失败" print(info) raise RuntimeError(info) return r if __name__ == "__main__": main()
更多相关内容 -
Java线程池ThreadPoolExecutor原理及使用实例
2020-08-19 06:01:14主要介绍了Java线程池ThreadPoolExecutor原理及使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 -
ThreadPoolExecutor使用例子
2015-05-04 23:32:27private ThreadPoolExecutor threadpool; /** * Param: * corePoolSize - 池中所保存的线程数,包括空闲线程。 * maximumPoolSize - 池中允许的最大线程数(采用LinkedBl- public class Test1 {
- private ThreadPoolExecutor threadpool;
- /**
- * Param:
- * corePoolSize - 池中所保存的线程数,包括空闲线程。
- * maximumPoolSize - 池中允许的最大线程数(采用LinkedBlockingQueue时没有作用)。
- * keepAliveTime -当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间,线程池维护线程所允许的空闲时间。
- * unit - keepAliveTime参数的时间单位,线程池维护线程所允许的空闲时间的单位:秒 。
- * workQueue - 执行前用于保持任务的队列(缓冲队列)。此队列仅保持由execute 方法提交的 Runnable 任务。
- * RejectedExecutionHandler -线程池对拒绝任务的处理策略(重试添加当前的任务,自动重复调用execute()方法)
- */
- public Test1(){
- threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue(10),
- new ThreadPoolExecutor.DiscardOldestPolicy());
- }
- //add task into thread pool
- public void submit(final int flag){
- threadpool.execute(new Runnable(){
- public void run(){
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(flag + " Hello");
- }
- });
- }
- /**
- * close thread pool
- */
- public void shutdown() {
- threadpool.shutdown();
- }
- public static void main(String[] args) {
- Test1 t = new Test1();
- for (int i = 0; i < 20; i++) {
- System.out.println("time:" + i);
- t.submit(i);
- }
- System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
- }
- /**
- * 当一个任务通过execute(Runnable)方法欲添加到线程池时:
- * 1.如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
- * 2.如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
- * 3.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
- * 4.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过
- * handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize
- * ,如果三者都满了,使用handler处理被拒绝的任务。
- *
- * 5.当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
- */
- }
附上一篇详细讲解使用threadpool的博客:java自带线程池和队列详细讲解
http://www.oschina.net/question/565065_86540 -
线程池ThreadPoolExecutor使用简介与方法实例
2020-08-26 06:22:47今天小编就为大家分享一篇关于线程池ThreadPoolExecutor使用简介与方法实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧 -
ThreadPoolExecutor 例子
2022-03-06 23:06:571. 重写RejectedExecutionHandler 线程池在BlockingQueue用完的情况下,会执行这里。...import java.util.concurrent.ThreadPoolExecutor; public class UserRejectHandler implements RejectedExecutionHandle1. 重写RejectedExecutionHandler
线程池在BlockingQueue用完的情况下,会执行这里。可以利用这个方法把数据存下来。等空闲的时候在运行。
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class UserRejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { try { System.out.println("task submission rejected because of queue being full. So may block the thread "); e.getQueue().put(r); System.out.println("Queue size:"+e.getQueue().size()); System.out.println("Block released. task submitted"); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RejectedExecutionException(ie); } } }
2. 创建ThreadFactory 。生产Thread
import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; // UserThreadFactory.java public class UserThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(1); UserThreadFactory(String whatFeatureOfGroup) { this.namePrefix = "UserThreadFactory's " + whatFeatureOfGroup + "-Worker-"; } public Thread newThread(Runnable runnable) { String name = this.namePrefix + nextId.getAndIncrement(); Thread thread = new Thread(null, runnable, name, 0); System.out.println(thread.getName()); return thread; } } class Task implements Runnable { private final AtomicLong count = new AtomicLong(0L); public void run() { System.out.println("Starting to run running_" + count.getAndIncrement()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End to run running_" + count.getAndIncrement()); } }
3. 执行15个任务。
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class UserThreadPool { public static void main(String[] args) { BlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>(13); UserThreadFactory f1 = new UserThreadFactory("UserThreadFactory 1"); UserRejectHandler handler = new UserRejectHandler(); ThreadPoolExecutor threadPoolFirst = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, blockingDeque, f1, handler); Runnable task = new Task(); for (int i = 0; i < 15; i++) { threadPoolFirst.execute(task); System.out.println("threadPoolFirst.execute.."+i); } } }
执行结果
UserThreadFactory's UserThreadFactory 1-Worker-1
threadPoolFirst.execute..0
threadPoolFirst.execute..1
threadPoolFirst.execute..2
Starting to run running_0
threadPoolFirst.execute..3
threadPoolFirst.execute..4
threadPoolFirst.execute..5
threadPoolFirst.execute..6
threadPoolFirst.execute..7
threadPoolFirst.execute..8
threadPoolFirst.execute..9
threadPoolFirst.execute..10
threadPoolFirst.execute..11
threadPoolFirst.execute..12
threadPoolFirst.execute..13
UserThreadFactory's UserThreadFactory 1-Worker-2
threadPoolFirst.execute..14
Starting to run running_1
End to run running_2
End to run running_3
Starting to run running_4
Starting to run running_5
End to run running_6
End to run running_7
Starting to run running_8
Starting to run running_9
End to run running_11
End to run running_10
Starting to run running_12
Starting to run running_13
End to run running_14
End to run running_15
Starting to run running_16
Starting to run running_17
End to run running_18
Starting to run running_20
End to run running_19
Starting to run running_21
End to run running_22
End to run running_23
Starting to run running_24
Starting to run running_25
End to run running_26
Starting to run running_27
End to run running_28
End to run running_29
-
Python线程池模块ThreadPoolExecutor用法分析
2020-09-19 18:55:04主要介绍了Python线程池模块ThreadPoolExecutor用法,结合实例形式分析了Python线程池模块ThreadPoolExecutor的导入与基本使用方法,需要的朋友可以参考下 -
Python 线程池 ThreadPoolExecutor 使用教程 使用例子 使用示例
2021-12-13 15:53:27Python有GIL,故多线程最好只用于io密集型程序,比如需要网络下载很多个文件,下面截取部分代码来展示如何使用: from concurrent.futures import ThreadPoolExecutor, as_completed # 导入需要的包 threadPool = ...官网文档:https://docs.python.org/zh-cn/3/library/concurrent.futures.html
Python有GIL,故多线程最好只用于io密集型程序,比如需要网络下载很多个文件,下面截取部分代码来展示如何使用:
from concurrent.futures import ThreadPoolExecutor, as_completed # 导入需要的包 threadPool = ThreadPoolExecutor(max_workers=140) # 线程池的池子大小,最多同时执行多少个任务,submit的任务数超过这个数字后后续任务会自行等待进入线程池 all_task = [threadPool.submit(self.download_01, url, path, delay * 1.5) for delay, url in enumerate(urls)] # 使用线程池ThreadPoolExecutor的submit方法发起任务,列表生成式里面装了所有任务的返回句柄。self.download_01是我的函数名称,url, path, delay * 1.5 全是我的self.download_01函数的参数。 for task in as_completed(all_task): # as_completed 可以判断列表里的线程是否完成 data = task.result() print("任务{} down load success".format(data)) # 打印结果 threadPool.shutdown(wait=True) # 释放线程池资源
-
【Java】 之 ThreadPoolExecutor 使用案例
2019-09-14 14:56:43// 定义线程组名称,在使用 jstack 来排查线程问题时,非常有帮助 UserThreadFactory ( String whatFeatureOfGroup ) { this . namePrefix = "UserThreadFactory's " + whatFeatureOfGroup + "-... -
ThreadPoolExecutor使用实例
2018-05-31 16:25:01创建ThreadPoolExecutor,切记创建在类的方法之外,这样就不会调用一次方法,创建一个ThreadPoolExecutorprivate final static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 2000, TimeUnit.... -
Android线程池ThreadPoolExecutor使用
2020-10-24 12:08:491、 ExecutorService,它是一个接口,其实如果要从真正意义上来说,它可以叫做线程池的服务,因为它提供了众多接口api来控制线程池中的线程,而真正意义上的线程池就是:ThreadPoolExecutor,它...直接使用线程的弊端: -
java ThreadPoolExecutor 并发调用实例详解
2020-08-30 09:56:20主要介绍了java ThreadPoolExecutor 并发调用实例详解的相关资料,需要的朋友可以参考下 -
线程池ThreadPoolExecutor的使用demo
2021-11-27 16:08:101.示例代码: Runnable + ThreadPoolExecutor 首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区 别。) MyRunnable.java import java.util.Date; public class... -
ThreadPoolExecutor使用示例
2016-03-27 13:50:53Java 5之后,Java并发API提供了Executor框架,主要包括Executor接口,它的子接口ExecutorService,以及实现上述两个接口的ThreadPoolExecutor类。 这种机制使得任务的创建和任务的执行分离,使用executor,... -
线程池 ThreadPoolExecutor(推荐) 示例
2022-03-03 14:25:52package multiThread;...import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorTest { public static void main(String[] args) { Thread. -
ThreadPoolExecutor使用介绍
2018-03-22 17:39:23【强制】 新建线程时,必须通过线程池提供(AsyncTask 或者 ThreadPoolExecutor 或者其他形式自定义的线程池),不允许在应用中自行显式创建线程。 说明: 使用线程池的好处是减少在创建和销毁线程上所花的时间... -
Java线程池(ThreadPoolExecutor)示例
2021-03-17 20:08:17我们可以使用ThreadPoolExecutor在Java中创建线程池。Java线程池管理Runnable线程的集合。工作线程从队列中执行Runnable线程。java.util.concurrent.Executors为java.util.concurrent.Executor接口提供工厂和支持... -
线程池ThreadPoolExecutor使用简介
2017-01-26 14:53:17线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueueworkQueue, RejectedExecut -
java线程池使用详解ThreadPoolExecutor使用示例
2021-06-03 14:40:16一 使用线程池的好处 二 Executor 框架 2.1 简介 2.2 Executor 框架结构(主要由三大部分组成) 1) 任务(Runnable /Callable) 2) 任务的执行(Executor) ...3.2 推荐使用 ThreadPoolExecutor 构造函数创建线程池 四 .. -
Java线程池-ThreadPoolExecutor,Executors使用示例
2021-04-07 20:34:57ThreadPoolExecutor简单示例 在阿里巴巴java开发规范手册中,并不推荐使用Executors工具去创建线程池,而是使用ThreadPoolExecutor创建线程池,至于原因,在下面再讨论。那么,先看看ThreadPoolExecutor是怎么创建... -
ThreadPoolExecutor简单使用示例
2019-08-01 20:29:56在编程规范中,不建议使用Executors去创建线程池,而是推荐使用ThreadPoolExecutor。 ThreadPoolExecutor会更明确运行规则,避免资源耗尽的风险。 因为Executors返回线程池有弊端: 1)FixedThreadPool和... -
django中使用ThreadPoolExecutor实现异步视图
2020-08-07 17:30:03django中使用ThreadPoolExecutor实现异步视图并监控线程 众所周知,django3.1版本以下是不支持异步view的。如果在一个请求中需要做一些耗时操作,或者不确定需要多久才能完成的操作的时候,大部分都会选择使用celery... -
ThreadPoolExecutor 用法详解(附demo)
2020-11-27 18:06:21ThreadPoolExecutor的使用 ThreadPoolExecutor提供了四个构造方法: 以最后一个构造方法(参数最多的那个),对其参数进行解释: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long ... -
使用执行程序和ThreadPoolExecutor的Java线程池示例
2020-05-09 13:55:39Executors类提供简单实现的ExecutorService的使用的ThreadPoolExecutor但ThreadPoolExecutor的提供了更多的功能不止于此。 我们可以指定创建ThreadPoolExecutor实例时仍处于活动状态的线程数,并且可以限制线程池的... -
Java 线程池 – ThreadPoolExecutor 示例
2022-06-14 19:36:59使用ThreadPoolExecutor,您只需实现Runnable对象并将它们发送给执行程序。它负责它们的执行、实例化和使用必要的线程运行。 它超越了这一点,并使用线程池提高了性能。当您将任务发送给执行程序时,它会尝试使用池... -
线程池实例:使用Executors和ThreadPoolExecutor
2019-08-12 01:53:14NULL 博文链接:https://bijian1013.iteye.com/blog/2284676 -
ThreadPoolExecutor使用详解
2021-04-07 10:00:511. 通过Executors创建线程池的弊端 在创建线程池的时候,大部分人还是会选择使用Executors去创建。 下面是创建定长线程池(FixedThreadPool)的...线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor... -
java 线程池ThreadPoolExecutor简介与实例
2016-07-22 17:31:23java 线程池ThreadPoolExecutor使用简介