精华内容
下载资源
问答
  • java实现线程池

    2017-04-01 16:03:13
    线程池:  多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。    假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行...
    线程池:
        多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
        
        假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
        
        如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
                    一个线程池包括以下四个基本组成部分:
                    1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                    2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                    3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                    4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                    
        线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

        线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目


    线程池管理器以及工作线程

    package com.thread;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class DefaultThreadPool<Job extends Runnable> {
    
    	// 线程池维护工作者线程的最大数量
    	private static final int MAX_WORKER_NUMBERS = 10;
    	// 线程池维护工作者线程的默认值
    	private static final int DEFAULT_WORKER_NUMBERS = 5;
    	// 线程池维护工作者线程的最小数量
    	private static final int MIN_WORKER_NUMBERS = 1;
    	// 维护一个工作列表,里面加入客户端发起的工作
    	private final LinkedList<Job> jobs = new LinkedList<Job>();
    
    	// 工作者线程的列表
    	private final List<Worker> workers = Collections
    			.synchronizedList(new ArrayList<Worker>());
    	// 工作者线程的数量
    	private int workerNum;
    	// 每个工作者线程编号生成
    	private AtomicLong threadNum = new AtomicLong();
    
    	// 生成线程池
    	public DefaultThreadPool() {
    		this.workerNum = DEFAULT_WORKER_NUMBERS;
    		initializeWorkers(this.workerNum);
    	}
    
    	public DefaultThreadPool(int num) {
    		if (num < MIN_WORKER_NUMBERS) {
    			this.workerNum = DEFAULT_WORKER_NUMBERS;
    		} else {
    			this.workerNum = num;
    		}
    		initializeWorkers(this.workerNum);
    	}
    
    	// 初始化每个工作者线程
    	private void initializeWorkers(int num) {
    		for (int i = 0; i < num; i++) {
    			Worker worker = new Worker("编号"+i + ":");
    			// 添加到工作者线程的列表
    			workers.add(worker);
    			// 启动工作者线程
    			Thread thread = new Thread(worker);
    			thread.start();
    		}
    	}
    
    	public void execute(Job job) {
    		if (job != null) {
    			// 根据线程的"等待/通知机制"这里必须对jobs加锁
    			synchronized (jobs) {
    				jobs.addLast(job);
    				jobs.notify();
    			}
    		}
    	}
    
    	// 关闭线程池即关闭每个工作者线程
    	public void shutdown() {
    		for (Worker w : workers) {
    			w.shutdown();
    		}
    	}
    
    	// 增加工作者线程
    	public void addWorkers(int num) {
    		// 加锁,防止该线程还么增加完成而下个线程继续增加导致工作者线程超过最大值
    		synchronized (jobs) {
    			if (num + this.workerNum > MAX_WORKER_NUMBERS) {
    				num = MAX_WORKER_NUMBERS - this.workerNum;
    			}
    			initializeWorkers(num);
    			this.workerNum += num;
    		}
    	}
    
    	// 减少工作者线程
    	public void removeWorker(int num) {
    		synchronized (jobs) {
    			if (num >= this.workerNum) {
    				throw new IllegalArgumentException("超过了已有的线程数量");
    			}
    			for (int i = 0; i < num; i++) {
    				Worker worker = workers.get(i);
    				if (worker != null) {
    					// 关闭该线程并从列表中移除
    					worker.shutdown();
    					workers.remove(i);
    				}
    			}
    			this.workerNum -= num;
    		}
    	}
    
    	public int getJobSize() {
    		return workers.size();
    	}
    
    	// 定义工作者线程
    	class Worker implements Runnable {
    		// 表示是否运行该worker
    		private volatile boolean running = true;
    
    		private String name;
    
    		public Worker(String name) {
    			this.name = name;
    		}
    
    		public void run() {
    			System.out.println(name + "线程初始化");
    			while (running) {
    				Job job = null;
    				// 线程的等待/通知机制
    				synchronized (jobs) {
    					if (jobs.isEmpty()) {
    						try {
    							System.out.println(name + "jobs wait....");
    							jobs.wait();// 线程等待唤醒
    						} catch (InterruptedException e) {
    							// 感知到外部对该线程的中断操作,返回
    							Thread.currentThread().interrupt();
    							return;
    						}
    					}
    
    					if(!jobs.isEmpty()){
    						// 取出一个job
    						job = jobs.removeFirst();
    					}
    					
    
    				}
    
    				// 执行job
    				if (job != null) {
    					System.err.println(name + "线程执行操作");
    					job.run();
    				}
    			}
    		}
    
    		// 终止该线程
    		public void shutdown() {
    			running = false;
    		}
    	}
    }
    
    实现线程的任务类	

    package com.thread;
    
    public class RunableTest implements Runnable {
    	private String name;
    	public RunableTest(String name){
    		this.name = name;
    	}
    	public void run() {
    		System.out.println("RunableTest:"+name);
    
    	}
    }
    



    测试类

    package com.thread;
    
    public class MyThread{
    	public static void main(String args[]) throws Exception{
    		DefaultThreadPool<Runnable> dp = new DefaultThreadPool<Runnable>(10);
    		dp.execute(new RunableTest("AA"));
    		dp.execute(new RunableTest("BB"));
    		dp.execute(new RunableTest("CC"));
    		dp.execute(new RunableTest("DD"));
    		dp.execute(new RunableTest("EE"));
    		dp.execute(new RunableTest("FF"));
    		dp.execute(new RunableTest("GG"));
    		dp.execute(new RunableTest("HH"));
    		
    	}
    
    }
    

    测试结果

    编号10:线程初始化
    编号11:线程初始化
    编号12:线程初始化
    编号15:线程初始化
    编号13:线程初始化
    编号10:jobs wait....
    编号15:jobs wait....
    编号16:线程初始化
    编号16:jobs wait....
    编号17:线程初始化
    编号12:jobs wait....
    编号11:jobs wait....
    编号14:线程初始化
    编号19:线程初始化
    编号17:jobs wait....
    编号13:jobs wait....
    编号19:jobs wait....
    编号18:线程初始化
    编号18:jobs wait....
    编号14:jobs wait....
    编号10:线程执行操作
    RunableTest:AA
    编号10:线程执行操作
    编号19:线程执行操作
    RunableTest:CC
    编号19:线程执行操作
    RunableTest:BB
    编号16:jobs wait....
    编号15:jobs wait....
    编号12:线程执行操作
    RunableTest:HH
    编号11:线程执行操作
    RunableTest:GG
    RunableTest:EE
    编号17:线程执行操作
    RunableTest:FF
    编号13:线程执行操作
    RunableTest:DD
    编号10:jobs wait....
    编号13:jobs wait....
    编号17:jobs wait....
    编号19:jobs wait....
    编号11:jobs wait....
    编号12:jobs wait....




    展开全文
  • Java实现线程池遍历查找文件 个人理解: 线程池就好比是顺丰,老板接到了订单。一个线程就是一个工人。半成品丢入了流水想后,相当于给工人们分配了任务。工人们迅速拿走 如果对于线程池或文件的知识有所遗忘可以在...

    Java Thread pool 线程池简单实现线程池遍历查找文件

    个人理解:

    线程池就好比是顺丰,老板接到了订单。一个线程就是一个快递员。快递到物流点后,相当于给快递小哥分配了任务。小哥迅速拿走,进行投递。

    平常情况快递小哥派完一车,又回来拿件继续派送,足够人手就是定义的corepoolsize的线程数。
    特殊情况,到双十一,快递量激增,人手不够就得增加maximumpoolsize。当然过了双十一,就不需要这么多人手,就要适当裁员。

    如果对于线程池或文件的知识有所遗忘可以在这个网站学习:
    老师超级搞(笑)
    https://how2j.cn/

    话不多说,上代码:

    //给线程任务,快递到物流点:
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Findfile extends Thread {
        public static void main(String[] args)throws InterruptedException {
            ThreadPoolExecutor pool=new ThreadPoolExecutor(20,30,20, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
            //20:创建了20个线程   30:20个线程不够用时会自动增加到30个  20,TimeUnit.SECONDS:线程停用20s后被回收
            Mytask mytask0=new Mytask("D:\\QQ\\178");  //分别在这两个目录下寻找
            Mytask mytask1=new Mytask("D:\\QQ\\951");
            pool.execute(mytask1);    //设定了两个任务 mytask0 mytask1
            pool.execute(mytask0);    //mytask就是命令(任务)
    
        }
    }
    
    
    //线程解决任务,快递小哥派送
    package AT;
    
    import java.io.File;
    
    public class Mytask implements Runnable {
        String  name="";
        String  filename="fzk.txt";    //需要被查找出的文件(收件人地址)
        static int a=0;             //用于判断是哪个线程找到了文件
        public Mytask(String name){
            this.name=name;
        }
        public void run() {
            try {
                File file=new File(this.name);
                File[] fs= file.listFiles();           //返回所有的文件
                System.out.println("文件查找结果:");
                Find(file);
                if (a==1){
                    System.out.println("线程"+this.name+" found");
                }
                else{
                    System.out.println("线程"+this.name+" not found");
                }
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
        void Find(File fa){
            File[] fs=fa.listFiles();
            if (fa.isDirectory()){       //isDirectory方法是判断是否为文件夹 是的话深入遍历子文件
                if (null!=fa){
                    for (File file1:fs){
                        if (file1.getName().equals(filename)){
                            System.out.println("找到了,文件的绝对路径:");
                            System.out.println(file1.getAbsolutePath());  //文件的绝对路径
                            a=1;   //表示找到了
                        }
                        Find(file1);       //遍历子文件夹将file1传给fa
                    }
                }
            }
        }
    }
    
    

    运行结果:

    文件查找结果:
    文件查找结果:
    线程D:\QQ\178 not found
    找到了,文件的绝对路径:
    D:\QQ\951\QQ\fzk.txt
    线程D:\QQ\951 found
    
    

    这是我的第一篇博客,计算机小白。有很多不好的地方,希望大家指正。也希望这篇博客能给需要的人些许帮助。

    展开全文
  • 后参考网上优化建议,在Javafor循环中采用线程的方式进行速度优化。暂时查询速度是有所提高,只是不太后期会不会因为线程问题导致别的未知问题。暂时记录本次优化过程。 因正常的线程方式Thread、Runnab...

    项目中设计到,查询近12个月的数据成长轨迹,本想着直接通过数据库中查询得到,但实际的业务场景中涉及到关联表和统计查询等,使用时间匹配后无法使用时间索引进行优化查询等原因放弃数据库优化策略。后参考网上优化建议,在Java中for循环中采用线程的方式进行速度优化。暂时查询速度是有所提高,只是不太后期会不会因为线程问题导致别的未知问题。暂时记录本次优化过程。

    因正常的线程方式Thread、Runnable方式因不能获取返回值,所以采用callable实现该功能。

    1、创建对应的线程类实现Callable接口

    class CallableThread implements Callable<Map<String, Object>> {
        int i;
        int total;
        String orgId;
        String userId;
        String queryType;
        String quarter;
        private CustomerMarketingService marketingService;
        private Map<String, Object> map = new HashMap<>();
        private final CountDownLatch latch ;
    
        public CallableThread(int i,int total, String orgId,String userId,
                              String queryType,String quarter,
                              CustomerMarketingService marketingService, CountDownLatch latch) {
            this.i = i;
            this.total = total;
            this.orgId = orgId;
            this.userId = userId;
            this.queryType = queryType;
            this.quarter = quarter;
            this.marketingService = marketingService;
            this.latch = latch;
        }
    
        @Override
        public Map<String, Object> call() throws Exception {
            try{
                String time="";
                String resultTime="";
                if("day".equals(queryType)){
                    time = DateUtils.getDay(i-total+1);
                    resultTime=time.substring(5,time.length());
                }
                else if ("month".equals(queryType)) {
                    time = DateUtils.getLastMonth(total-i-1, new Date());
                    resultTime=time;
                } else if ("quarter".equals(queryType)) {
                    time = DateUtils.getLastMonth((total-i-1) * 3, new Date());
                    quarter = DateUtils.getSeason(time);
                    String year = DateUtils.getYear(time);
                    resultTime = year + "年" + quarter + "季度";
    
                }else if ("year".equals(queryType)) {
                    time = DateUtils.getLastMonth((total-i-1) * 12, new Date());
                    String year = DateUtils.getYear(time);
                    resultTime = year + "年";
                }
    
    
                //客户营销统计-(新客户/总数+具体客户营销方式统计)
                int newCount =0;
                int allCount =0;
                //EntityWrapper<CustomerMarketingEntity> ew = new EntityWrapper<>();
                Map<String,Object> params = new HashMap<>();
                if("day".equals(queryType)){
                    //ew.between("create_time", time+" 00:00:00",time+" 23:59:59");
                    params.put("beginTime",time+" 00:00:00");
                    params.put("endTime",time+" 23:59:59");
                }else{
                    Date beginTime = DateUtils.getBeginTime(queryType, time, quarter);
                    Date endTime = DateUtils.getEndTime(queryType, time, quarter);
                    params.put("beginTime",DateUtils.format(beginTime)+" 00:00:00");
                    params.put("endTime",DateUtils.format(endTime)+" 23:59:59");
                    //ew.between("create_time", DateUtils.format(beginTime)+" 00:00:00",DateUtils.format(endTime)+" 23:59:59");
                }
    
    
                if (UtilValidate.isNotEmpty(orgId)) {
                    //拼接权限的查询条件
                    params.put("orgId",orgId);
                    //ew.addFilter("FIND_IN_SET(create_org_id,getChildrenOrg({0}))", orgId);
                }
                List<CustomerMarketCount> list = marketingService.selectCountAONList(params);
                for(CustomerMarketCount count:list){
                    if("0".equals(count.getCode())){
                        newCount=count.getCount();
                    }
                    allCount=allCount+count.getCount();
                }
                //allCount = marketingService.selectCountM(params);
                //params.put("isNew","0");
                //ew.eq("is_new", "0");
                //newCount = marketingService.selectCountM(params);
                // 新客户
                map.put("resultTime", resultTime);
                map.put("time", resultTime);
                map.put("allCount", allCount);
                map.put("newCount", newCount);
                double ratio;
                if(allCount!=0){
                    ratio=(double)newCount/allCount;
                    map.put("ratio", CountUtil.formatDouble(ratio));
                }else{
                    map.put("ratio", 0.0);
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                latch.countDown();
            }
            return map;
        }
    }
    

    2、具体的业务逻辑中,通过线程池调用该线程

            List<Map<String, Object>> mapList = new ArrayList<>();        
            final CountDownLatch latch = new CountDownLatch(total);
            ExecutorService pool=Executors.newFixedThreadPool(5);
            List list=new ArrayList();
            try {
                for (int i = 0; i < total; i++) {//生长轨迹
                    Callable<Map<String, Object>> c1 = new CallableThread(i,total,orgId,userId,queryType,quarter,this,latch);
                    Future<Map<String,Object>> f1=pool.submit(c1);
                    list.add(f1);
                }
                latch.await(); //等待所有线程执行完成后 对返回值进行合并处理
                for(Object o:list){
                    Future<Map<String,Object>> f1=(Future<Map<String, Object>>) o;
                    mapList.add(f1.get());
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                return Result.error("未知异常");
            }

    通过线程池调用对应的线程,具体某个线程通过传值后处理单独的业务逻辑后,latch.await()等待所有的线程执行完成后,针对返回值进行数据合并后返回给前台。

    展开全文
  • 自定义java线程池 ThreadPoolExecutor是Java并发api添加的一项功能,可以有效地维护和重用线程,因此我们的程序不必担心创建和销毁线程,也不必关注核心功能。 我创建了一个自定义线程池执行程序,以更好地了解...

    自定义java线程池

    ThreadPoolExecutor是Java并发api添加的一项功能,可以有效地维护和重用线程,因此我们的程序不必担心创建和销毁线程,也不必关注核心功能。 我创建了一个自定义线程池执行程序,以更好地了解线程池执行程序的工作方式。

    功能性:

    • 它维护一个固定的线程池,即使没有任务提交也创建线程并启动线程,而ThreadPoolExecutor根据需要创建线程,即,每当将可运行对象提交给池且线程数小于核心池大小时。
    • 在ThreadPoolExecutor中,我们提供了一个等待队列,当所有线程忙于运行现有任务时,新的可运行任务将在该队列中等待。 队列填满后,将创建最大线程池大小的新线程。 在MyThreadPool中,我将可运行对象存储在链接列表中,因此每个任务都将在列表中等待并且不受限制,因此在此不使用maxPoolSize。
    • 在ThreadPoolExecutor中,我们使用Future Objects从任务中获取结果,如果结果不可用,则future.get()方法将阻塞,或者使用CompletionService。 在MyThreadPoolExecutor中,我创建了一个名为ResultListener的简单接口,用户必须提供对此的实现,如他希望如何处理输出。 每个任务完成后,ResultListener将获得带有任务输出的回调,或者在发生任何异常的情况下将调用error方法。
    • 调用shutdown方法时,MyThreadPoolExecutor将停止接受新任务并完成剩余任务。
    • 与ThreadPoolExecutor相比,我提供了非常基本的功能,我使用了简单的线程机制,如wait(),notify(),notifyAll()和join()。
    • 在性能方面,它类似于ThreadPoolExecutor,在某些情况下好一些。 如果您发现任何有趣的结果或改进方法,请告诉我。
    package com.util;
    
    import java.util.concurrent.Callable;
    
    /**
     * Run submitted task of {@link MyThreadPool} After running the task , It calls
     * on {@link ResultListener}object with {@link Output}which contains returned
     * result of {@link Callable}task. Waits if the pool is empty.
     * 
     * @author abhishek
     * 
     * @param 
    
     */
    
    import java.util.concurrent.Callable;
    /**
    * Run submitted task of {@link MyThreadPool} After running the task , It calls
    * on {@link ResultListener}object with {@link Output}which contains returned
    * result of {@link Callable}task. Waits if the pool is empty.
    *
    * @author abhishek
    *
    * @param <V>
    */
    public class MyThread<V> extends Thread {
        /**
        * MyThreadPool object, from which the task to be run
        */
        private MyThreadPool<V> pool;
        private boolean active = true;
        public boolean isActive() {
            return active;
        }
        public void setPool(MyThreadPool<V> p) {
            pool = p;
        }
        /**
        * Checks if there are any unfinished tasks left. if there are , then runs
        * the task and call back with output on resultListner Waits if there are no
        * tasks available to run If shutDown is called on MyThreadPool, all waiting
        * threads will exit and all running threads will exit after finishing the
        * task
        */
        public void run() {
            ResultListener<V> result = pool.getResultListener();
            Callable<V> task;
            while (true)
            {
                task = pool.removeFromQueue();
                if (task != null)
                {
                    try
                    {
                        V output = task.call();
                        result.finish(output);
                    } catch (Exception e)
                    {
                        result.error(e);
                    }
                } else
                {
                    if (!isActive())
                    break;
                    else
                    {
                        synchronized (pool.getWaitLock())
                        {
                            try
                            {
                                pool.getWaitLock().wait();
                            } catch (InterruptedException e)
                            {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
        void shutdown() {
            active = false;
        }
    }
    package com.util;
    import java.util.LinkedList;
    import java.util.concurrent.Callable;
    /**
    * This class is used to execute submitted {@link Callable} tasks. this class
    * creates and manages fixed number of threads User will provide a
    * {@link ResultListener}object in order to get the Result of submitted task
    *
    * @author abhishek
    *
    *
    */
    public class MyThreadPool<V> {
        private Object waitLock = new Object();
        public Object getWaitLock() {
            return waitLock;
        }
        /**
        * list of threads for completing submitted tasks
        */
        private final LinkedList<MyThread<V>> threads;
        /**
        * submitted task will be kept in this list untill they run by one of
        * threads in pool
        */
        private final LinkedList<Callable<V>> tasks;
        /**
        * shutDown flag to shut Down service
        */
        private volatile boolean shutDown;
        /**
        * ResultListener to get back the result of submitted tasks
        */
        private ResultListener<V> resultListener;
        /**
        * initializes the threadPool by starting the threads threads will wait till
        * tasks are not submitted
        *
        * @param size
        * Number of threads to be created and maintained in pool
        * @param myResultListener
        * ResultListener to get back result
        */
        public MyThreadPool(int size, ResultListener<V> myResultListener) {
            tasks = new LinkedList<Callable<V>>();
            threads = new LinkedList<MyThread<V>>();
            shutDown = false;
            resultListener = myResultListener;
            for (int i = 0; i < size; i++) {
                MyThread<V> myThread = new MyThread<V>();
                myThread.setPool(this);
                threads.add(myThread);
                myThread.start();
            }
        }
        public ResultListener<V> getResultListener() {
            return resultListener;
        }
        public void setResultListener(ResultListener<V> resultListener) {
            this.resultListener = resultListener;
        }
        public boolean isShutDown() {
            return shutDown;
        }
        public int getThreadPoolSize() {
            return threads.size();
        }
        public synchronized Callable<V> removeFromQueue() {
            return tasks.poll();
        }
        public synchronized void addToTasks(Callable<V> callable) {
            tasks.add(callable);
        }
        /**
        * submits the task to threadPool. will not accept any new task if shutDown
        * is called Adds the task to the list and notify any waiting threads
        *
        * @param callable
        */
        public void submit(Callable<V> callable) {
            if (!shutDown) {
                addToTasks(callable);
                synchronized (this.waitLock) {
                    waitLock.notify();
                }
                } else {
                System.out.println('task is rejected.. Pool shutDown executed');
            }
        }
        /**
        * Initiates a shutdown in which previously submitted tasks are executed,
        * but no new tasks will be accepted. Waits if there are unfinished tasks
        * remaining
        *
        */
        public void stop() {
            for (MyThread<V> mythread : threads) {
                mythread.shutdown();
            }
            synchronized (this.waitLock) {
                waitLock.notifyAll();
            }
            for (MyThread<V> mythread : threads) {
                try {
                    mythread.join();
                    } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    package com.util;
    
    /**
     * This interface imposes finish method 
     * which is used to get the {@link Output} object 
     * of finished task
     * @author abhishek
     *
     * @param 
    
     */
    
    public interface ResultListener 
    
                            {
    
     public void finish(T obj);
     public void error(Exception ex);
    
    }

    您可以根据需要实现此类并返回并处理任务返回的结果。

    package com.util;
    
    public class DefaultResultListener implements ResultListener{
    
     @Override
     public void finish(Object obj) {
    
     }
    
     @Override
     public void error(Exception ex) {
      ex.printStackTrace();
     }
    
    }

    例如,此类将添加task返回的数字。

    package com.util;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * ResultListener class to keep track of total matched count
     * @author abhishek
     * 
     * @param 
    
     */
    public class MatchedCountResultListener
    
                            implements ResultListener
    
                             {
    
    	/**
    	 * matchedCount to keep track of the number of matches returned by submitted
    	 * task
    	 */
    	AtomicInteger matchedCount = new AtomicInteger();
    
    	/**
    	 * this method is called by ThreadPool to give back the result of callable
    	 * task. if the task completed successfully then increment the matchedCount by
    	 * result count
    	 */
    	@Override
    	public void finish(V obj) {
    		//System.out.println('count is '+obj);
    		matchedCount.addAndGet((Integer)obj);
    	}
    
    	/**
    	 * print exception thrown in running the task
    	 */
    	@Override
    	public void error(Exception ex) {
    		ex.printStackTrace();
    	}
    
    	/**
    	 * returns the final matched count of all the finished tasks
    	 * 
    	 * @return
    	 */
    	public int getFinalCount() {
    		return matchedCount.get();
    	}
    }

    这是一个测试类,使用CompletionService和MyThreadPoolExecutor对循环运行简单

    package test;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.util.DefaultResultListener;
    import com.util.MyThreadPool;
    
    public class TestClass {
    
    	public static void main(String[] args) throws InterruptedException {
    		CompletionService
    
                           threadService;
    		ExecutorService service = Executors.newFixedThreadPool(2);
    		threadService = new ExecutorCompletionService
    
                           (service);
    
    		long b = System.currentTimeMillis();
    		for(int i =0;i<50000;i++){
    			threadService.submit(new MyRunable (i));
    		}
    
    		service.shutdown();
    		System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b));
    
    		DefaultResultListener result = new DefaultResultListener();
    		MyThreadPool
    
                             newPool = new MyThreadPool
    
                             (2,result);
    		long a = System.currentTimeMillis();
    
    		int cc =0;
    		for(int i =0;i<50000;i++)
    		{
    			cc = cc+i;
    		}
    		System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a));
    		a= System.currentTimeMillis();
    
    		for(int i =0;i<5000;i++){
    			newPool.submit(new MyRunable (i));
    		}
    
    		newPool.stop();
    		System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a));
    	}
    
    }
    
    class MyRunable implements Callable
    
    {
    	int index = -1;
    	public MyRunable(int index)
    	{
    		this.index = index;
    	}
    	@Override
    	public Integer call() throws Exception {
    		return index;
    	}
    
    }

    参考: JCG合作伙伴 Abhishek Somani在Java,J2EE和Server博客上的Java 自定义线程池执行程序

    翻译自: https://www.javacodegeeks.com/2013/03/my-custom-thread-pool-executor-in-java.html

    自定义java线程池

    展开全文
  • Java简单实现线程池

    2021-04-21 22:01:31
    相当于一个池子,里面存放大量已经创建好的线程,当有一个任务需要处理时, 可以直接从池子里面取一个线程去执行它。 包括内存池,很多缓冲的技术都是采用这种技术。 其实理解起来很简答! 为什么需要线程池,这种池...
  • Java线程池如何实现复用

    千次阅读 2019-05-29 15:36:56
    Java线程池优点: 降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的...
  • Java手写线程池实现

    千次阅读 2017-10-14 15:50:52
    手写线程池实现1.线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。2.线程池简易架构3.简易线程池代码(自行优化)import java.util.List;/** ...
  • java 简单实现线程池

    2018-06-29 10:27:38
    先上原理图:为了更好的在手机上显示,我重新把图画了一遍上代码之前,要先补充一下线程池构造的...只实现了BlockingQueue存放任务,然后每个worker取任务并执行,下面看代码首先定义一个线程池ThreadExcutorclass...
  • 本篇文章将追寻ThreadPoolExecutor类的代码思路模拟jdk下线程池的执行策略,实现一个自己的线程池,文章目的主要是为了研究jdk下ThreadPoolExecutor的设计思想,理解作者为什么要这样设计,这么设计有什么好处,而...
  • Java线程池是怎么实现复用的? Java线程池,维护了一个线程池,每一个新的任务都会提交到线程池,由线程池进行调度和资源释放,这样的好处: 1.通过线程池,可以限制线程创建的数量,当创建许多的任务时,任务...
  • 每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终...
  • 将es.execute放在两种循环中,看打印的情况,while控制台打印卡顿明显,for中打印的挺流畅的。。。是两种循环方式影响执行速度么? ExecutorService es = Executors.newFixedThreadPool(1000); while (true) { ...
  • Java线程池实现原理详解

    万次阅读 多人点赞 2018-03-16 21:57:32
    其实java线程池实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue...
  • Java线程池简介

    2021-03-28 11:03:50
    Java线程池简介1 常见的四种创建线程池方法1 newFixedThreadPool(固定大小的线程池)2 newCachedThreadPool(可缓存的线程池)3 newSingleThreadExecutor(单线程的线程池)4 newScheduledThreadPool(定时及周期性任务...
  • Java 线程池技术之一 自实现线程池

    万次阅读 2011-09-15 09:15:08
    尽管自jdk1.5,Java已经自带了线程池实现,了解如何自己实现Java线程池有助于加深对操作系统和Java虚拟机的理解。 一,线程池的基本要素 线程池一般需要一个线程管理类: ThreadPoolManager,其作用有: ...
  • 分析Java线程池执行原理
  • java线程池

    2020-12-06 21:01:56
    java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。 下面我们来看一下ThreadPoolExecutor类的具体实现源码。 1.1、构造方法 从上面的...
  • Java实现线程池

    2016-08-08 17:16:00
    由于最近开始学习java,用到了线程池,按照之前c++的写法写出此java版的线程池 TaskRunnale实现相关任务的接口,具体要实现什么任务在相应的run函数中实现。 package threadpool; import java.util....
  • 本例中使用Java自带方法实现线程池,并比较不使用多线程和使用多线程使用冒泡排序排序有50000个元素的数组10次所消耗的时间。 代码 import java.util.concurrent.CountDownLatch; import java.util.concurrent....
  • Java:线程池

    2019-09-19 15:42:02
    参考:干货,谈谈对java线程池的理解(面试必备) 为什么要用线程池? 使用线程池可以给我们带来很多好处,首先通过线程池中线程的重用,减少创建和销毁线程的性能开销。其次,能控制线程池中的并发数,否则会因为...
  • java 创建线程池

    2019-08-20 18:46:10
    线程池,其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作, 无需反复创建线程而消耗过多资源。 我们详细的解释一下为什么要使用线程池? 在java中,如果每个请求到达就...
  • java线程池

    2017-01-17 23:13:54
    我们的web项目都是部署在服务器上,浏览器端的每一个request就是一个线程,那么服务器需要并发的处理多个请求,就需要线程池技术,下面来看一下Java并发包下如何创建线程池。  1. 创建一个可重用固定线程集合的...
  • 给女朋友讲 : Java线程池的内部原理

    万次阅读 多人点赞 2019-11-03 15:09:57
    餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:“经常听你说线程池,到底线程池到底是个什么原理?”
  • 引言 线程池:可以理解为缓冲区,由于频繁的创建销毁线程会带来一定的成本,可以预先创建但不...当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性。线程是稀缺资源,如果无限制的创建...
  • Java线程池实现原理

    2017-12-06 20:42:18
    Java线程池实现原理
  • 文章目录Java线程池实现原理和源码分析前言外观线程池继承关系构造函数成员变量创建线程池任务阻塞队列SynchronousQueueArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue拒绝策略AbortPolicy 默认的拒绝...
  • java线程池运行原理

    2021-02-02 10:21:23
    一、线程池优点 重用线程 :线程若频繁的创建销毁会给线程调度...java中常用的线程池:ThreadPoolExecutor ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, // 核心线程数2,最大线程数3 10L, TimeUn
  • Java线程池

    2019-02-17 20:53:17
    创建线程4种方式:继承Thread、实现Runnable、Callable、创建线程池 一:线程池优点1.降低资源消耗:通过重复利用已创建的线程,降低线程创建与销毁带来的损耗,手动创建线程:假如线程执行结束,就会销毁线程,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 45,616
精华内容 18,246
关键字:

java实现线程池执行for循环

java 订阅