精华内容
下载资源
问答
  • 2020-07-02 22:07:21

     

    # -*- coding: utf-8 -*-
    from concurrent.futures import wait
    from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor
    
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import HiveContext
    conf = SparkConf()
    conf.set("spark.yarn.queue", "queue_name") \
        .set("spark.app.name", "application_name") \
        .set("spark.driver.memory", "20g") \
        .set("spark.executor.cores", "1") \
        .set("spark.executor.memory", "10g") \
        .set("spark.executor.memoryOverhead", "5g") \
        .set("spark.executor.instances", "400") \
        .set("spark.dynamicAllocation.enabled", "true") \
        .set("spark.dynamicAllocation.maxExecutors", "500") \
        .set("spark.rdd.compress", "true") \
        .set("spark.shuffle.compress", "true")
    
    sc = SparkContext(conf=conf)
    spark = HiveContext(sc)
    
    
    def features_gps(param):
        sql_query = """select 1/0""".format(**param)
        try:
            print(sql_query)
            spark.sql(sql_query).show()
        except Exception as e:
            info = u"任务失败"
            print(info, e)
            raise RuntimeError(info)
        return 0
    
    
    def main():
        params = list()
        for tail in range(10):
            params.append(tail)
    
        pool = ThreadPoolExecutor(5)
        task = list(map(lambda x: pool.submit(features_gps, x), params))
        # ALL_COMPLETED全部完成时退出,FIRST_EXCEPTION有异常就退出
        wait(task, return_when=FIRST_EXCEPTION)
        pool.shutdown()
        r = sum([_.result() for _ in task])
        if r != 0:
            info = u"任务失败"
            print(info)
            raise RuntimeError(info)
        return r
    
    
    if __name__ == "__main__":
        main()
    

     

     

    更多相关内容
  • 主要介绍了Java线程池ThreadPoolExecutor原理及使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  •  private ThreadPoolExecutor threadpool;     /**   * Param:   * corePoolSize - 池中所保存的线程数,包括空闲线程。   * maximumPoolSize - 池中允许的最大线程数(采用LinkedBl
    1. public class Test1 {  
    2.   
    3.      private ThreadPoolExecutor threadpool;  
    4.        
    5.     /** 
    6.      * Param:  
    7.      * corePoolSize - 池中所保存的线程数,包括空闲线程。  
    8.      * maximumPoolSize - 池中允许的最大线程数(采用LinkedBlockingQueue时没有作用)。  
    9.      * keepAliveTime -当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间,线程池维护线程所允许的空闲时间。  
    10.      * unit - keepAliveTime参数的时间单位,线程池维护线程所允许的空闲时间的单位:秒 。  
    11.      * workQueue - 执行前用于保持任务的队列(缓冲队列)。此队列仅保持由execute 方法提交的 Runnable 任务。  
    12.      * RejectedExecutionHandler -线程池对拒绝任务的处理策略(重试添加当前的任务,自动重复调用execute()方法) 
    13.      */  
    14.       public Test1(){   
    15.         threadpool=new ThreadPoolExecutor(21020, TimeUnit.SECONDS, new ArrayBlockingQueue(10),   
    16.         new ThreadPoolExecutor.DiscardOldestPolicy());   
    17.       }   
    18.         
    19.       //add task into thread pool   
    20.       public void submit(final int flag){   
    21.         threadpool.execute(new Runnable(){   
    22.           public void run(){   
    23.             try {   
    24.               Thread.sleep(2000);   
    25.             } catch (InterruptedException e) {   
    26.               e.printStackTrace();   
    27.             }   
    28.             System.out.println(flag + "   Hello");   
    29.           }  
    30.         });       
    31.       }   
    32.           
    33.     /** 
    34.      * close thread pool 
    35.      */  
    36.     public void shutdown() {  
    37.         threadpool.shutdown();  
    38.     }  
    39.   
    40.     public static void main(String[] args) {  
    41.         Test1 t = new Test1();  
    42.         for (int i = 0; i < 20; i++) {  
    43.             System.out.println("time:" + i);  
    44.             t.submit(i);  
    45.         }  
    46.           
    47.         System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");  
    48.     }  
    49.       
    50.     /** 
    51.      * 当一个任务通过execute(Runnable)方法欲添加到线程池时: 
    52.      * 1.如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 
    53.      * 2.如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 
    54.      * 3.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 
    55.      * 4.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 
    56.      * handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize 
    57.      * ,如果三者都满了,使用handler处理被拒绝的任务。  
    58.      *  
    59.      * 5.当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 
    60.      */  
    61. }  
    附上一篇详细讲解使用threadpool的博客:

    java自带线程池和队列详细讲解

    http://www.oschina.net/question/565065_86540
    展开全文
  • 今天小编就为大家分享一篇关于线程池ThreadPoolExecutor使用简介与方法实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
  • ThreadPoolExecutor 例子

    2022-03-06 23:06:57
    1. 重写RejectedExecutionHandler 线程池在BlockingQueue用完的情况下,会执行这里。...import java.util.concurrent.ThreadPoolExecutor; public class UserRejectHandler implements RejectedExecutionHandle

    1. 重写RejectedExecutionHandler

    线程池在BlockingQueue用完的情况下,会执行这里。可以利用这个方法把数据存下来。等空闲的时候在运行。

    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    public class UserRejectHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
           try {
    				System.out.println("task submission rejected because of queue being full. So may block the thread ");
    				e.getQueue().put(r);
    				System.out.println("Queue size:"+e.getQueue().size());
    				System.out.println("Block released. task submitted");
    			} catch (InterruptedException ie) {
    				Thread.currentThread().interrupt();
    				throw new RejectedExecutionException(ie);
    			}
        }
    }
    
    
    

    2. 创建ThreadFactory 。生产Thread

    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    // UserThreadFactory.java
    public class UserThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger nextId = new AtomicInteger(1);
        UserThreadFactory(String whatFeatureOfGroup) {
            this.namePrefix = "UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";
        }
    
        public Thread newThread(Runnable runnable) {
            String name = this.namePrefix + nextId.getAndIncrement();
            Thread thread = new Thread(null, runnable, name, 0);
            System.out.println(thread.getName());
            return thread;
        }
    }
    class Task implements Runnable {
        private final AtomicLong count = new AtomicLong(0L);
        public void run() {
            System.out.println("Starting to run running_" + count.getAndIncrement());
            try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
            System.out.println("End to run running_" + count.getAndIncrement());
        }
    }
    
    
    

    3. 执行15个任务。

    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class UserThreadPool {
        public static void main(String[] args) {
            BlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>(13);
            UserThreadFactory f1 = new UserThreadFactory("UserThreadFactory 1");
            UserRejectHandler handler = new UserRejectHandler();
            ThreadPoolExecutor threadPoolFirst = new ThreadPoolExecutor(1, 2,
                    60, TimeUnit.SECONDS, blockingDeque, f1, handler);
            Runnable task = new Task();
            for (int i = 0; i < 15; i++) {
                threadPoolFirst.execute(task);
                System.out.println("threadPoolFirst.execute.."+i);
            }   
        }
    }
    

    执行结果

    UserThreadFactory's UserThreadFactory 1-Worker-1
    threadPoolFirst.execute..0
    threadPoolFirst.execute..1
    threadPoolFirst.execute..2
    Starting to run running_0
    threadPoolFirst.execute..3
    threadPoolFirst.execute..4
    threadPoolFirst.execute..5
    threadPoolFirst.execute..6
    threadPoolFirst.execute..7
    threadPoolFirst.execute..8
    threadPoolFirst.execute..9
    threadPoolFirst.execute..10
    threadPoolFirst.execute..11
    threadPoolFirst.execute..12
    threadPoolFirst.execute..13
    UserThreadFactory's UserThreadFactory 1-Worker-2
    threadPoolFirst.execute..14
    Starting to run running_1
    End to run running_2
    End to run running_3
    Starting to run running_4
    Starting to run running_5
    End to run running_6
    End to run running_7
    Starting to run running_8
    Starting to run running_9
    End to run running_11
    End to run running_10
    Starting to run running_12
    Starting to run running_13
    End to run running_14
    End to run running_15
    Starting to run running_16
    Starting to run running_17
    End to run running_18
    Starting to run running_20
    End to run running_19
    Starting to run running_21
    End to run running_22
    End to run running_23
    Starting to run running_24
    Starting to run running_25
    End to run running_26
    Starting to run running_27
    End to run running_28
    End to run running_29
     

    展开全文
  • 主要介绍了Python线程池模块ThreadPoolExecutor用法,结合实例形式分析了Python线程池模块ThreadPoolExecutor的导入与基本使用方法,需要的朋友可以参考下
  • Python有GIL,故多线程最好只用于io密集型程序,比如需要网络下载很多个文件,下面截取部分代码来展示如何使用: from concurrent.futures import ThreadPoolExecutor, as_completed # 导入需要的包 threadPool = ...

    官网文档:https://docs.python.org/zh-cn/3/library/concurrent.futures.html

    Python有GIL,故多线程最好只用于io密集型程序,比如需要网络下载很多个文件,下面截取部分代码来展示如何使用:

    from concurrent.futures import ThreadPoolExecutor, as_completed # 导入需要的包
    threadPool = ThreadPoolExecutor(max_workers=140) # 线程池的池子大小,最多同时执行多少个任务,submit的任务数超过这个数字后后续任务会自行等待进入线程池
    all_task = [threadPool.submit(self.download_01, url, path, delay * 1.5)
                for delay, url in enumerate(urls)] # 使用线程池ThreadPoolExecutor的submit方法发起任务,列表生成式里面装了所有任务的返回句柄。self.download_01是我的函数名称,url, path, delay * 1.5 全是我的self.download_01函数的参数。
    for task in as_completed(all_task): # as_completed 可以判断列表里的线程是否完成
        data = task.result()
        print("任务{} down load success".format(data)) # 打印结果
    threadPool.shutdown(wait=True) # 释放线程池资源
    
    展开全文
  • 【Java】 之 ThreadPoolExecutor 使用案例

    千次阅读 2019-09-14 14:56:43
    // 定义线程组名称,在使用 jstack 来排查线程问题时,非常有帮助 UserThreadFactory ( String whatFeatureOfGroup ) { this . namePrefix = "UserThreadFactory's " + whatFeatureOfGroup + "-...
  • ThreadPoolExecutor使用实例

    千次阅读 2018-05-31 16:25:01
    创建ThreadPoolExecutor,切记创建在类的方法之外,这样就不会调用一次方法,创建一个ThreadPoolExecutorprivate final static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 2000, TimeUnit....
  • 1、 ExecutorService,它是一个接口,其实如果要从真正意义上来说,它可以叫做线程池的服务,因为它提供了众多接口api来控制线程池中的线程,而真正意义上的线程池就是:ThreadPoolExecutor,它...直接使用线程的弊端:
  • 主要介绍了java ThreadPoolExecutor 并发调用实例详解的相关资料,需要的朋友可以参考下
  • 1.示例代码: Runnable + ThreadPoolExecutor 首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区 别。) MyRunnable.java import java.util.Date; public class...
  • ThreadPoolExecutor使用示例

    千次阅读 2016-03-27 13:50:53
    Java 5之后,Java并发API提供了Executor框架,主要包括Executor接口,它的子接口ExecutorService,以及实现上述两个接口的ThreadPoolExecutor类。   这种机制使得任务的创建和任务的执行分离,使用executor,...
  • package multiThread;...import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorTest { public static void main(String[] args) { Thread.
  • 【强制】 新建线程时,必须通过线程池提供(AsyncTask 或者 ThreadPoolExecutor 或者其他形式自定义的线程池),不允许在应用中自行显式创建线程。 说明: 使用线程池的好处是减少在创建和销毁线程上所花的时间...
  • 我们可以使用ThreadPoolExecutor在Java中创建线程池。Java线程池管理Runnable线程的集合。工作线程从队列中执行Runnable线程。java.util.concurrent.Executors为java.util.concurrent.Executor接口提供工厂和支持...
  • 线程池ThreadPoolExecutor使用简介

    千次阅读 2017-01-26 14:53:17
    线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueueworkQueue, RejectedExecut
  • 使用线程池的好处 二 Executor 框架 2.1 简介 2.2 Executor 框架结构(主要由三大部分组成) 1) 任务(Runnable /Callable) 2) 任务的执行(Executor) ...3.2 推荐使用 ThreadPoolExecutor 构造函数创建线程池 四 ..
  •  ThreadPoolExecutor简单示例 在阿里巴巴java开发规范手册中,并不推荐使用Executors工具去创建线程池,而是使用ThreadPoolExecutor创建线程池,至于原因,在下面再讨论。那么,先看看ThreadPoolExecutor是怎么创建...
  • ThreadPoolExecutor简单使用示例

    千次阅读 2019-08-01 20:29:56
    在编程规范中,不建议使用Executors去创建线程池,而是推荐使用ThreadPoolExecutorThreadPoolExecutor会更明确运行规则,避免资源耗尽的风险。 因为Executors返回线程池有弊端: 1)FixedThreadPool和...
  • django中使用ThreadPoolExecutor实现异步视图并监控线程 众所周知,django3.1版本以下是不支持异步view的。如果在一个请求中需要做一些耗时操作,或者不确定需要多久才能完成的操作的时候,大部分都会选择使用celery...
  • ThreadPoolExecutor使用 ThreadPoolExecutor提供了四个构造方法: 以最后一个构造方法(参数最多的那个),对其参数进行解释: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long ...
  • Executors类提供简单实现的ExecutorService的使用ThreadPoolExecutorThreadPoolExecutor的提供了更多的功能不止于此。 我们可以指定创建ThreadPoolExecutor实例时仍处于活动状态的线程数,并且可以限制线程池的...
  • 使用ThreadPoolExecutor,您只需实现Runnable对象并将它们发送给执行程序。它负责它们的执行、实例化和使用必要的线程运行。 它超越了这一点,并使用线程池提高了性能。当您将任务发送给执行程序时,它会尝试使用池...
  • NULL 博文链接:https://bijian1013.iteye.com/blog/2284676
  • 1. 通过Executors创建线程池的弊端 在创建线程池的时候,大部分人还是会选择使用Executors去创建。 下面是创建定长线程池(FixedThreadPool)的...线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor...
  • java 线程池ThreadPoolExecutor简介与实例

    千次阅读 2016-07-22 17:31:23
    java 线程池ThreadPoolExecutor使用简介

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 19,762
精华内容 7,904
关键字:

threadpoolexecutor使用例子