精华内容
下载资源
问答
  • 主线程在接受连接与处理相关请求等操作之间不断交替运行。当服务器正在处理请求是时,新到来的连接必须等待直到请求处理完成,然后服务器将再次调用accept。 显示的为任务创建线程 通过为每个请求创建一个新的线程来...

    第六章 任务执行(Task Execution)

    在线程中执行任务

    最简单策略就是在单个线程中串行地执行任务。
    主线程在接受连接与处理相关请求等操作之间不断交替运行。当服务器正在处理请求是时,新到来的连接必须等待直到请求处理完成,然后服务器将再次调用accept。

    显示的为任务创建线程
    通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。
    对于每个连接,主循环都将创建一个新的线程来处理请求,而不是在主循环内进行处理。

    • 任务处理过程中从主线程中分离出来,使得主循环能够更快的重新等待下一个到来的连接。这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。
    • 任务可以并行处理,从而能同时服务多个请求。如果有多个处理器,或者任务由于某种原因被阻塞,例如等待I/O完成、
    • 任务处理代码必须是线程安全的,因为当有多个任务时会并发的调用这段代码。

    无限制创建线程的不足
    线程生命周期的开销非常高。
    资源消耗。
    稳定性。

    Executor 框架

    该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用管理机制和性能监视等机制。

    线程池

    线程池从字面含义,指一组同构工作线程的资源池。线程池与工作队列(work queue)密切相关,其中工作队列中保存了所有等待执行的任务。工作者线程的任务:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

    优点:

    • 可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
    • 当请求到达的时候,工作线程已将存在,不用等待创建线程。
    • 适当调整线程池大小,可以创建足够多的线程以变使处理器保持子啊忙碌状态
    • 防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

    调用Executor是中的静态工厂方法之一来创建一个线程池:
    newFixedThreadPool。newFIxedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这是线程池的规模不再变化。
    newCachedThreadPool。newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程池,线程池的规模不存在任何限制。
    newSIngleThreadExector。newSingleThreadExecutor是一单线程的Executor,它将创建单个工作线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中顺序来串行执行。
    newScheduledThreadPool。newScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或者定时的方式来执行任务,类似于Timer。

    Executor的生命周期

    ExecutorServce 的生命周期有三种状态:运行、关闭和已终止。ExecutorService在出事创建是处于运行状态。shutdow方法将执行平缓的关闭过程:不再接受新的任务,同时等待已提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动尚未开始执行的任务。

    延迟任务与周期任务

    TImer类负责管理延迟任务(“在100ms后执行该任务”)以及周期任务(“每10ms执行依次该任务”)。
    然而Timer支持给予绝对时间而不是相对时间的调度机制,因此任务的执行系统时钟变化很敏感,而ScheduledThreadPoolExecutor只支持基于相对时间的调度。可以通过ScheduledThreadPoolExecutor的构造函数或newScheduledThreadPool工厂方法来创建该类的对象。

    找出可利用的并行性

    携带结果的任务Callable 与 Future
    Executor框架使用Runnable作为其基本任务的表现形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
    许多任务实际都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象:它认为主入口点将返回一个值,并可能抛出一个异常。
    Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
    在Future规范中包含的隐含意义是:任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在“完成”的状态。
    get方法的行为取决与任务的状态(尚未开始,正在运行,已完成)。

    • 任务已完成,get会立即返回或者抛出一个Exception。
    • 任务未完成,get将阻塞并直到任务完成。
    • 任务抛出异常,get将该异常封装成ExecutionException并重新抛出。并且可以通过getCause来获得被封装的初始异常。
    • 任务被取消,get将抛出CancellationException。
    public interface Callable<V> {
    	V call() throws Exception;
    }
    
    public interface Future<V> {
    	boolean cancel(boolean mayInterruptIfRunning);
    	boolean isCancelled();
    	boolean isDone();
    	V get() throws InterruptedException,ExecutionException,
    					CancellationException;
    	V get(long timeout, TimeUnit unit) throws InterruptedException,
    					ExecutionException, CancellationException,TimeoutException;
    }
    

    单页地渲染页面元素

    public class SingleThreadRenderer {
    	void renderPage(CharSequence source) {
    		renderText(source);
    		List<ImagerData> imageData = new ArrayList<ImageData>();
    		for (ImageInfo imageInfo : scanForImageInfo(source))
    			imageData.add(imageInfo.downloadImage());
    		for (ImageData data : imageData)
    			renderImage(data);
    	 }
    }
    

    使用Future等待图像下载

    public class FutureRenderer {
    	private fianl ExecutorService executor = ...;
    	
    	void renderPage(CharSequence source) {
    		final List<ImageInfo> imageInfos = scanForImagerInfo(source);
    		Callable<List<ImageData>>task = new Callable<List<ImageData>>() {
    			public List<ImageData> call() {
    				List<ImageData> result = new ArrayList<ImageData>();
    				for (ImageInfo imageInfo : imageInfos)
    					result.add(imageInfo.downloadImageA());
    					return result;
    			}
    	};
    	Future<List<ImageData>> future = executor.submit(task);
    	renderText(source);
    
    	try {
    		List<ImageData> imageData = future.get();
    		for (ImageData data : imageData)
    			renderImage(data);
    		} catch (InterruptedException e ) {
    			Thread.currentThread().interrupt();
    			future.cancel(ture);
    		} catch (ExecutionException e) {
    			throw lauderThrowable(e.getCause());
    		}
    	}
    }			
    

    在异构任务并行中存在的局限

    通过对异构任务进行并行化来获得重大的性能提升是很困难的。
    只用当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

    CompletionService: Executor 与 BlockingQueue
    完成服务(CompletionService) 将Executor和BlockingQueue的功能融合在一起。可以寄将Callable任务提交给它来执行,然后使用类队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExcutorCompletionService实现了COmpletionService,并将计算部分委托给一个Executor。
    ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用Future-Task的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。如下面程序,take和poll方法委托给了BlockingQueue,这些方法会在得出结果之前阻塞。

    由ExecutorCompletionService使用的QueueingFuture类

    private class QueueingFuture<V> extends FutureTask<V> {
    	QueueingFuture(Callable<V> c) {
    		super(c);
    	}
    	QueueingFuture(Runnable t, V r) {
    		super(t, r);
    	}
    	protected void done() {
    		completionQueue.add(this);
    	}
    }
    

    为任务设置时限

    例子:给出了限时Future.get的一种典型应用。在它生成的页面中包括响应用户请求的内容以及从广告服务器上获得的广告。它将获取广告的任务提交给一个Executor,然后计算剩余的文本页面内容,最后等待广告信息,直到超出指定的时间。如果get超时,那么将取消广告获取任务,并转而使用默认的广告信息。

    Page renderPageWithAd() throws InterruptedException {
    	long endNanos = System.nanoTime() + TIME_BUDGET;
    	Future<Ad> f = exec.submit (new FetchAdTask());
    	// 在等待广告的同时显示页面
    	Page page = renderPageBody();
    	Ad ad;
    	try {
    		// 只等待指定的时间长度
    		long timeLeft = endNanos - System.nanoTime();
    		ad = f.get(timeLeft, NANOSECONDS);
    	} catch (ExecutionException e) {
    		ad = DEFAULT_AD;
    	} catch (TimeoutException e) {
    		ad = DEFAULT_AD;
    		f.cancel(true);
    	}
    	page.setAd(ad);
    	return page;
    }
    
    展开全文
  • 对于synchronized来说,如果一个线程在 等待锁,那么结果只有两种情况,要么它获得这把锁继续执行,要么它 就保持等待。而使用重入锁,提供了另外一种可能,那就是线程可以被中断。 package com.thread.t01; ...
    • 重入锁(Re-Entrant-Lock)
    1. 重入锁的中断响应(lock.lockInterruptibly())

    对于synchronized来说,如果一个线程在 等待锁,那么结果只有两种情况,要么它获得这把锁继续执行,要么它 就保持等待。而使用重入锁,提供了另外一种可能,那就是线程可以被中断。

    package com.thread.t01;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    public class IntLock implements Runnable{
    
    	public static ReentrantLock lock1 = new ReentrantLock();
    	public static ReentrantLock lock2 = new ReentrantLock();
    	int lock;
    	
    	public IntLock(int lock) {
    		super();
    		this.lock = lock;
    	}
    
    	@Override
    	public void run() {
    		try {
    			if(lock==1){
    				//lockInterruptibly()可以对中断进行响应
    				lock1.lockInterruptibly();
    				Thread.sleep(500);
    				lock2.lockInterruptibly();
    			}else{
    				lock2.lockInterruptibly();
    				Thread.sleep(500);
    				lock1.lockInterruptibly();
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}finally {
    			if(lock1.isHeldByCurrentThread()){
    				lock1.unlock();
    			}
    			if(lock2.isHeldByCurrentThread()){
    				lock2.unlock();
    			}
    			System.out.println(Thread.currentThread().getId()+":线程退出");
    		}
    	}
    	
    	public static void main(String[] args) throws InterruptedException {
    		IntLock r1 = new IntLock(1);
    		IntLock r2 = new IntLock(2);
    		Thread t1 = new Thread(r1);
    		Thread t2 = new Thread(r2);
    		t1.start();
    		t2.start();
    		Thread.sleep(1000);
    		t2.interrupt();
    	}
    	
    }
    

        2.锁申请等待限时(lock.tryLock())

    lock.tryLock(5,TimeUnit.SECONDS)

    表示线程在锁的请求中,最多等待5秒,如果超过5s还没有得到锁,就会返回false,如果成功获得锁,则返回true。

    package com.thread.t02;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TimeLock implements Runnable{
    
    	public static ReentrantLock lock = new ReentrantLock();
    	@Override
    	public void run() {
    		try {
    			if(lock.tryLock(5,TimeUnit.SECONDS)){
    				Thread.sleep(6000);
    			}else{
    				System.out.println("get Lock failed");
    			}
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}finally {
    			if(lock.isHeldByCurrentThread())
    				lock.unlock();
    		}
    	}
    
    	public static void main(String[] args) {
    		TimeLock t1 = new TimeLock();
    		Thread r1 = new Thread(t1);
    		Thread r2 = new Thread(t1);
    		r1.start();
    		r2.start();
    	}
    }
    

     

    转载于:https://my.oschina.net/projerry/blog/865071

    展开全文
  • 1.在任务与执行策略之间的隐性耦合 有些类型的任务需要明确地指定执行策略,包括: 依赖性任务。大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果。当在线程池中执行独立的任务...

    第八章 线程池的使用

    1.在任务与执行策略之间的隐性耦合

    有些类型的任务需要明确地指定执行策略,包括:
    依赖性任务。大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果。当在线程池中执行独立的任务时,可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。如果提交给线程池的任务需要依赖其他任务,那么就隐含了给执行策略带来了约束,此时必须小心维持这些执行策略以避免产生活跃性问题。
    使用线程封闭机制的任务。与线程池相比,单线程的Executor能够对并发性做出更强的承诺。对象可以封闭在任务线程中,使得在该线程中执行的任务在访问对象时不需要同步,即使这些资源不是线程安全也没问题。这种情形将在任务与执行策略形成隐性耦合——任务要求其执行所在的Executor是单线程的(确保任务不会并发执行,并提供足够的同步机制,使得一个任务对内存的作用对于下一个任务一定是可见的)。
    对响应时间敏感的任务。GUI应用对响应时间是敏感的:如果用户在点击按钮后需要很长延迟才能得到可见的反馈,那么他们会感到不满。如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少量线程池的线程池中,那么将降低由该Executor管理的服务的响应性。
    使用ThreadLocal的任务。ThreadLocal使得每个线程都可以拥有某个变量的一个私有“版本”。只要条件允许,Executor可以自由地重用这些线程。

    1.1.线程饥饿死锁

    线程饥饿死锁(Thread Starvation Deadlock):所有正在执行任务的线程都由于等待其他仍然处于工作队列中的任务而阻塞。

    1.2.运行时间较长的任务

    在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等。如果等待时间超时,那么可以把任务标识为失败,这种办法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能够更快完成的任务。
    如果总是线程池充满了被阻塞的任务,那么也可能表明线程池规模太小。

    2.设置线程池的大小

    线程池的大小通过某种配置机制来提供或者根据Runtime.availableProcessors来动态计算。
    cup周期不是唯一影响线程池的大小的资源,还有内存、文件句柄、套接字句柄和数据库连接。计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。
    当任务需要某种通过资源池来管理的资源时,例如数据库连接,线程池和资源池的大小将会相互影响。

    3.配置ThreadPoolExecutor

    ThreadPoolExecutor通用构造函数

    public ThreadPoolExecutor(int corePoolSize,
    						  int maximumPoolSize,
    						  long keepAliveTime,
    						  TimeUnit unit,
    						  BlockingQueue<Runnable> workQueue,
    						  ThreadFactory threadFactory,
    						  RejectedExecutionHandler handler) {...}
    					
    

    3.1.线程的创建与销毁

    线程池的基本大小(CorePoolSize)最大大小(MaximumPoolSize)以及存活时间等因素共同负责线程的创建与销毁。
    基本大小:线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了才会创建超出这个数量的线程。当线程池的当前大小超过了基本大小,这个线程将会被终止。
    最大大小:可同时活动的线程数量的上限。
    存活时间:某个线程的空闲时间超过了存活时间,那么将被标记为可回收的。

    • newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
    • newCachedThreadPool工厂方法将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。
    • 其他形式的线程池可以通过显示的ThreadP构造函数来构造。

    3.2.管理队列任务

    如果新的请求的到达速率超过了线程池的处理速率,那么新到达的请求将累积起来。子啊线程池中,这些请求会在一个由Executor管理的Runnable队列中等待,而不会像线程那样去竞争CPU资源。通过一个Runnable和一个链表节点来表示一个等待中的任务。

    基本的任务排列方法有三种:无界队列、有界队列和同步移交(Synchronous Handoff)。

    • newFixedThreadpool和newSingleThreadExecutor在默认的情况下将使用一个无界的LinkedBlockingQueue。如果所有工作线程都在忙碌中,那么任务将在队列中等候,如果任务持续快速到达,而且超过了线程池处理它们的速度,那么队列将无限制增加。
    • 有界队列ArrayBlockingQueue,有界的LinkedBlockingQueue、PriorityBlockingQueue。在使用有界的工作队列时,队列大小和线程池大小必须一起调节。线程池小而队列大,有助于减少内存使用量,降低cpu的使用率,同时减少上下文切换,但是限制吞吐量。
    • 非常大或者无界的线程池,使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作着线程。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等地啊,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交更高效,因为任务会直接移交给执行它的线程,而不是首先放在队列中,然后由工作者线程从队列中提取任务。只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue。
    • 使用PriorityBlockingQueue,这个队列通过优先级安排任务。任务优先级通过自然顺序或Comparator(如果任务实现了Comparable)来定义的。

    只有当任务是相互独立时,为线程池设置界限才是合理的。如果任务之间存在依赖性,有界的线程池或队列可能导致线程的饥饿死锁,应用无界线程池例如newCacheThreadPool。

    3.3.饱和策略

    当有界队列被填满后,饱和策略开始发挥作用。
    jdk提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy和DiscardOldestPolicy。

    • 中止(Abort)策略是默认的饱和策略,该策略将抛出未检查的RejectedExecution-Exception。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
    • 当新提交的任务无法保存在队列中等待时,抛弃(Discard)策略会悄悄抛弃该任务。
    • Discard-Oldest策略会抛弃下一个即将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么抛弃最旧的策略会抛弃优先级最高的任务,不适合一起使用)。
    • 调用者运行(Caller-Runs)策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在调用了execute的线程中执行该任务。我们可以将WebServer示例修改为使用有界队列和调用者运行饱和策略,当线程池中所有的线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中执行。由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理正在执行的任务。在此期间,主线程不会调用accept,因此到达的请求将被保护在tcp层的队列中而不是在应用程序队列中。如果持续过载,那么tcp层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,那么tcp层将最终发现他的请求队列被填满,因此同样会开始抛弃请求,当服务器过载时,这种过载情况会逐渐向外蔓延——从线程池到工作队列到应用程序再到tcp层,最终到达客户端,导致服务器在高负载下实现一种平缓的性能降低。

    3.4.线程工厂

    每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory中只定义一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。

    3.5.在调用构造函数后再定制ThreadPoolExecutor

    在调用玩ThreadPoolExecutor的构造函数后,仍然可以通过设置函数Setter来修改大多数传递给它的构造函数(线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器(Rejected Execution Handler))如果Executor是通过Executors中的某个工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor以访问设置器。

    4.扩展ThreadPoolExecutor

    ThreadPoolExecutor是扩展的,它提供了几个可以子啊子类中改写的方法:beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
    在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常返回,afterExecute都会被调用。(除了完成后带着一个Error)
    如果beforeExecute抛出一个RuntimeException,那么任务将不会被执行,并且afterExecute也不会被调用。
    在线程池完成关闭操作时调用teminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

    5.递归算法的并行化

    如果循环中迭代操作都是独立的,并且不需要等待所有迭代操作都完成在继续执行,那么就可以使用Executor将串行循环转化为并行循环。
    调用processInParallel比调用processSequentially更快地返回,因为processInParallel会在所有下载任务都进入了Executor的队列后就立即返回,而不会等待这些任务全部完成。如果需要提交一个任务集并等待它们完成,那么可以使用ExecutorService。invokeAll,并且在所有任务都执行完成后调用CompletionService来获取结果。

    展开全文
  • 前言  在上一篇的博客中,我们...同时,因为OJ业务要求的限制,每一道题目的答案代码运行都应该是限时的(比如限时1000毫秒内出结果),对于运行超过指定时间还未出结果的,我们就应该终止运行的线程,并判定这个测

     前言

        在上一篇的博客中,我们通过设置SecurityManager已经实现了大部分的安全措施。这里我们将实现最后的安全措施,防止用户提交死循环的代码无止境的消耗服务器的CPU,以及防止用户恶意破坏沙箱运行代码的能力。同时,因为OJ业务要求的限制,每一道题目的答案代码运行都应该是限时的(比如限时1000毫秒内出结果),对于运行超过指定时间还未出结果的,我们就应该终止运行的线程,并判定这个测试用例这份代码超时了。

    无法中断的线程

        我们应该都知道JAVA的线程是协作式而非抢占式的。也就是说对于运行已经超时的线程,或者因为死循环而一直在工作的线程,我们是无法使用JAVA自带的中断API(如Thread类的interrupt方法)进行有效停止的,线程还会依旧运行下去。由于我们无法限制用户的算法,而有些算法可能本身因为出现了漏洞,导致了死循环的出现。总而言之,对于我们通过反射调用main方法运行起来的用户代码,一旦死循环了或者超时了,我们是无法通过JAVA推荐的中断的方式,去终止运行用户代码的线程。

    废弃的stop方法

        要解决上述问题,需要用到一个JAVA已经废弃的方法:Thread类的stop方法。使用这个方法就可以强制杀掉一个超时运行的线程。JAVA要废弃stop方法的最大一个原因是因为它是不安全的。为什么说不安全呢?大致就是因为:用 Thread.stop 来终止线程将释放它已经锁定的所有监视器(作为沿堆栈向上传播的未检查 ThreadDeath 异常的一个自然后果)。如果以前受这些监视器保护的任何对象都处于一种不一致的状态,则损坏的对象将对其他线程可见,这有可能导致任意的行为。

        用人话来讲,强行使用stop方法后,将可能导致我们的程序产生不一致性。这还是很好理解的,比如你做事情做到一半,就被别人强行叫停了,本身就很有问题。举一个不太适当例子就是,比如A线程和B线程共用一把锁,A线程先获得锁然后做某些业务操作,B线程在等待A线程释放锁并根据A线程操作的结果,做出相应的操作,这个时候就会出事了。

        例子代码如下:(仅仅是为了说明效果,用synchronized关键字也行,并且synchronized会自己释放锁

    import java.util.concurrent.locks.ReentrantLock;
    
    public class Main {
    	volatile static int aa = 1;
    	volatile static boolean isHaveAdd = false;
    	volatile static int cc = 1;
    	volatile static ReentrantLock lock = new ReentrantLock();
    
    	public static void main(String[] args) throws InterruptedException {
    		Thread thread = new Thread() {
    			public void run() {
    				System.out.println("第一个线程在等待锁");
    				try {
    					lock.lock();
    					System.out.println("第一个线程获得锁");
    					aa++;
    					// 模拟一个超级耗时而且无法中断的操作
    					for (long i = 0; i < Long.MAX_VALUE; i++) {
    						// 假设有这么一个业务需求,当耗时操作快完成时,需要isHaveAdd要变为true告诉别人aa被加过了
    						if (i == Long.MAX_VALUE - 1) {
    							isHaveAdd = true;
    						}
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    				} finally {
    					System.out.println("进入finally方法");
    					// 需要注意的是,跟synchronized不同的是,这里我们需要自己手动释放锁,synchronized会自己释放锁
    					lock.unlock();
    				}
    
    			};
    		};
    		thread.start();
    
    		// 确保上面的线程先运行
    		Thread.sleep(10);
    
    		new Thread() {
    			public void run() {
    				System.out.println("第二个线程在等待锁");
    				try {
    					lock.lock();
    					System.out.println("第二个线程获得锁");
    					System.out.println(aa);
    					System.out.println(isHaveAdd);
    					System.out.println(cc);
    					if (isHaveAdd) {
    						cc++;
    					}
    					System.out.println(aa);
    					System.out.println(isHaveAdd);
    					System.out.println(cc);
    				} catch (Exception e) {
    					e.printStackTrace();
    				} finally {
    					lock.unlock();
    				}
    
    			};
    		}.start();
    
    		// 稍微等待一下,然后杀死线程
    		Thread.sleep(500);
    		System.out.println("杀死线程");
    		thread.stop();
    	}
    }
    运行结果将会是:

    第一个线程在等待锁
    第一个线程获得锁
    第二个线程在等待锁
    杀死线程
    进入finally方法
    第二个线程获得锁
    2
    false
    1
    2
    false
    1

    可以看出,因为线程A的逻辑还未执行完,导致aa虽然加1了,但是cc并未能加1,业务出现了一定的问题。再一次说明,我这个例子是不太恰当的,但是为了说明问题,我也一时想不出一个简单的代码例子,所以就只好这样了。感兴趣朋友,可以将ReentrantLock这种锁的方式换回synchronized的方式,再试试效果。

    使用stop方法

        关于stop方法等内容,我这里不继续探讨了,因为本文并不是主要讲述这个的。虽然上面说了那么多,但是因为业务需求,我们最终还是使用了stop方法。而且,因为OJ业务的特殊性,stop方法的危害对于我们来说,基本上都不是危害。

        为什么这么说呢?因为当代码运行超时时,其实我们已经得出了该代码对于这份测试用例的结果了,再由于我们的代码不会跟用户提交的代码涉及到任何相关的监视器、锁。因此,放心大胆的使用stop方法吧!

        下面给出,在本系统沙箱端中,调用杀死线程的主要函数(需要主要的是,因为我们采用了future模式,因此需要用到反射,拿到真正运行改任务的工作线程,顺便说明的是submit.cancel(true)其实也是利用中断机制的,面对死循环等情况,也是无力的,这里使用,仅仅是因为~我就是想调用一下而已0.0):

    @SuppressWarnings("deprecation")
    	private void killThread(FutureTask<ProblemResultItem> submit) {
    		try {
    			submit.cancel(true);
    			// 利用反射,强行取出正在运行该任务的线程
    			Field runner = submit.getClass().getDeclaredField("runner");
    			runner.setAccessible(true);
    			Thread execThread = (Thread) runner.get(submit);
    			execThread.stop();
    			submit.cancel(true);
    		} catch (Exception e) {
    			System.err.println(e);
    		}
    
    	}

        为了让代码看起来稍微完整一点,我下面再贴出对于一份用户代码,运行对应题目的所有测试数据时的任务类:ProblemCallable,如果留意代码的话,还会发现一个很相近的类:ProblemItemCallable。ProblemItemCallable是运行每一个测试用例的任务类,而ProblemCallable是对该代码相应题目运行的测试类。 简单来说,就是一道题目有5个测试用例时(一份标准输入数据和一份标准输出数据构成一个测试用例),就会产生5个ProblemCallable对象。主要是用于减少跑测试用例的时间,详细的内容会在后面博文:《并行运行测试》中提及。CacheOutputStream类和ThreadInputStream类将会在后面博文:《并行运行测试》中的输入输出分流部分提及。

    package cn.superman.sandbox.callable;
    
    import java.lang.reflect.Field;
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CancellationException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    import cn.superman.sandbox.core.systemInStream.ThreadInputStream;
    import cn.superman.sandbox.core.systemOutStream.CacheOutputStream;
    import cn.superman.sandbox.dto.Problem;
    import cn.superman.sandbox.dto.ProblemResultItem;
    
    public class ProblemCallable implements Callable<List<ProblemResultItem>> {
    	private Method mainMethod;
    	private Problem problem;
    	private CacheOutputStream resultBuffer;
    	private Runtime run = null;
    	private CountDownLatch countDownLatch = null;
    	private ThreadInputStream threadSystemIn;
    	private static final ExecutorService itemGetThreadPool = Executors
    			.newCachedThreadPool(new ThreadFactory() {
    				@Override
    				public Thread newThread(Runnable r) {
    					Thread thread = new Thread(r);
    					thread.setName("itemGetThreadPool id "
    							+ System.currentTimeMillis());
    					return thread;
    				}
    			});
    	private static final ExecutorService itemExecThreadPool = Executors
    			.newCachedThreadPool(new ThreadFactory() {
    				@Override
    				public Thread newThread(Runnable r) {
    					Thread thread = new Thread(r);
    					thread.setName("itemExecThreadPool id "
    							+ System.currentTimeMillis());
    					return thread;
    				}
    			});
    
    	public ProblemCallable(Method mainMethod, Problem problem,
    			CacheOutputStream resultBuffer, ThreadInputStream threadSystemIn) {
    		this.mainMethod = mainMethod;
    		this.problem = problem;
    		this.resultBuffer = resultBuffer;
    		this.threadSystemIn = threadSystemIn;
    		run = Runtime.getRuntime();
    	}
    
    	@Override
    	public List<ProblemResultItem> call() throws Exception {
    		List<String> paths = problem.getInputDataFilePathList();
    		final List<ProblemResultItem> resultItems = new ArrayList<ProblemResultItem>();
    		countDownLatch = new CountDownLatch(paths.size());
    		// 为了内存使用比较准确,先大概的执行一次回收吧
    		run.gc();
    
    		for (int i = 0; i < paths.size(); i++) {
    			final String path = paths.get(i);
    			itemExecThreadPool.execute(new Runnable() {
    				@Override
    				public void run() {
    					resultItems.add(process(path));
    				}
    			});
    		}
    
    		// 阻塞线程,等待所有结果都计算完了,再返回
    		countDownLatch.await();
    		return resultItems;
    	}
    
    	private ProblemResultItem process(String inputFilePath) {
    		ProblemResultItem item = null;
    		ProblemItemCallable itemCallable = null;
    		long beginMemory = 0;
    		long beginTime = 0;
    		long endTime = 0;
    		long endMemory = 0;
    		Future<ProblemResultItem> submit = null;
    
    		try {
    			itemCallable = new ProblemItemCallable(mainMethod, inputFilePath,
    					resultBuffer, threadSystemIn);
    
    			submit = itemGetThreadPool.submit(itemCallable);
    			beginMemory = run.totalMemory() - run.freeMemory();
    			beginTime = System.nanoTime();
    
    			item = submit
    					.get(problem.getTimeLimit() + 2, TimeUnit.MILLISECONDS);
    
    			if (item == null) {
    				killThread((FutureTask<ProblemResultItem>) submit);
    				throw new TimeoutException();
    			}
    
    			endTime = System.nanoTime();
    			endMemory = run.totalMemory() - run.freeMemory();
    		} catch (Exception e) {
    			// 出现了意外,先关闭资源再说(如已经打开的流等)
    			itemCallable.colseResource();
    			killThread((FutureTask<ProblemResultItem>) submit);
    			item = new ProblemResultItem();
    			item.setNormal(false);
    			if (e instanceof CancellationException
    					|| e instanceof TimeoutException) {
    				// 超时了,会进来这里
    				item.setMessage("超时");
    			} else {
    				item.setMessage(e.getMessage());
    			}
    			endTime = System.nanoTime();
    			endMemory = run.totalMemory() - run.freeMemory();
    		}
    		// 时间为毫微秒,要先转变为微秒再变为毫秒
    		item.setUseTime((endTime - beginTime) / 1000 / 1000);
    		item.setUseMemory(endMemory - beginMemory);
    		item.setInputFilePath(inputFilePath);
    		if (item.getUseMemory() > problem.getMemoryLimit()) {
    			item.setNormal(false);
    			item.setMessage("超出内存限制");
    		}
    		// 无论怎么样,这里必须最后都要进行减一,不然将会一直阻塞线程,最终无法返回结果
    		countDownLatch.countDown();
    		return item;
    	}
    
    	/**
    	 * 需要注意的是,这里将会调用线程stop方法,因为只有这样才能强行终止超时的线程,而又因为这里并不需要保证什么原子性以及一致性的业务要求,
    	 * 所以用stop方法是没什么大问题的
    	 * 
    	 * @param submit
    	 * @throws NoSuchFieldException
    	 * @throws SecurityException
    	 * @throws IllegalArgumentException
    	 * @throws IllegalAccessException
    	 */
    	@SuppressWarnings("deprecation")
    	private void killThread(FutureTask<ProblemResultItem> submit) {
    		try {
    			submit.cancel(true);
    			// 利用反射,强行取出正在运行该任务的线程
    			Field runner = submit.getClass().getDeclaredField("runner");
    			runner.setAccessible(true);
    			Thread execThread = (Thread) runner.get(submit);
    			execThread.stop();
    			submit.cancel(true);
    		} catch (Exception e) {
    			System.err.println(e);
    		}
    
    	}
    
    	public Problem getProblem() {
    		return problem;
    	}
    
    	public void setProblem(Problem problem) {
    		this.problem = problem;
    	}
    }
    

    预告

        在本篇博文中,我们已经实现了限时运行。下一篇博文中,我们将开始利用多线程的方式,加速答案代码的测试,以及如何解决多线程所带来的资源冲突问题。

        PS:为什么要加速测试呢?我们试想一下,假设一道题目有5份测试用例(数据),也就是5份标准输入数据,5份标准输出数据。用户的代码跑一份测试用例(数据)平均需要消耗500毫秒的话,如果我们是串行的方式进行的话,跑完5份就需要2.5秒了。但是,如果我们利用多线程,同时进行5份测试的话,我们就只需要500毫秒(实际上会多一点点时间)就可以了。但是,因为涉及到多线程了,就涉及到了冲突等问题~

    展开全文
  • Java并发执行任务的几种方式

    万次阅读 2018-05-06 08:33:59
    在编写业务代码时经常遇到并发执行多个任务的需求,因为串行执行太慢,会影响业务代码性能。特别对于直接面向普通用户的业务来说用户体验至关重要,保证用户体验重要的一点是要“快”。业务代码中经常需要调用其它...
  • 一.在线程中执行任务 顺序的执行任务 显式的为任务创建线程 无限制创建线程的缺点 二.Executor框架 使用Executor实现的Web Server 执行策略 ...并行运行异类任务的局限性 CompletionService 为任务设置时限
  • Java 线程

    2020-07-08 09:23:07
    进程:指一个内存中运行的应用程序(进入到内存中) 线程:是进程中的一个执行单元 多线程好处:效率高 各个单元互不影响 线程调度 分时调度:所有线程轮流使用CPU,平均分配每个线程占用CPU的时间 抢占式调度:优先...
  • 通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。 6.1 在线程中执行任务 当围绕 任务执行 来设计应用...
  • 任务通过是一些抽象的且离散的工作单元。...独立性有助于实现并发,因为如果存在足够多的处理资源,那么这些独立的任务都可以并行执行。 服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。...
  • Java高并发程序设计

    2018-09-16 16:06:38
    第一章 走入并行世界   关于并行的两个定律: Amdahl...第二章 Java并行程序基础    线程创建: new Thread(Runable).start()  线程终止: stop 方法 会立即释放锁,导致数据不一致问题,已经废弃  线程中断...
  • java 线程

    2021-03-04 10:50:15
    进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。进程是一种抽象的概念,从来没有统一的标准定义。进程一般由程序,数据...
  • java进阶知识

    2021-06-03 11:02:07
    缓存一致性问题:多核多线程并行执行导致L1,L2缓存数据不一致 举个栗子: 结果分析: 编译器优化和指令重排: 上述1属于编译器重排序,2,3属于处理器重排序。重排序会导致多线程程序出现内存可见性问题。 ...
  • java并发编程

    2021-05-13 22:26:21
    java并发编程 为了充分利用计算机的资源,吧计算机的性能发挥到最大. 什么是高并发 并发和并行的区别 并发 concurrency 单核 线程”同时“操作(交替) 并行:多核同时执行。 高并发是指我们设计的程序,可以支持...
  • Java问答题

    2019-10-29 21:32:09
    1. JDK 和 JRE 有什么区别? JDK:Java Development Kit 的简称,java ...JRE:Java Runtime Environment 的简称,java 运行时环境,为 java 程序的运行提供了所需环境。jre包含jvm。 2. == 和 equals 的区别是...
  • 日常Java练习题(每天进步一点点系列)

    千次阅读 多人点赞 2021-06-03 22:54:10
    来迟了,但是不是缺席,以后会争取保证每天更Java练习题,文章末尾有为大家争取到的CSDN官方独家出品的【Java工程师知识图谱】限时福利优惠,为成长助力!加油!天天进步一点点! 1、main 方法是 Java Application ...
  • java知识

    2019-03-10 14:30:20
    目录 一、线程知识 1、进程间通信的方式? 2、线程间通信的方式? 3、可以有几种方式实现生产者-消费者模型 ...4、讲一下Java内存模型 ...8、Java线程生命周期的...9、被notify()唤醒的线程可以立即得到执行吗? 10、s...
  • 6.1 在线程中执行任务 在理想情况下,各个任务之间时相互独立的:任务并不依赖于其他任务的状态、结果、边界效应。 应用程序性希望支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应。 ...
  • Java并发编程

    2020-11-21 13:21:35
    之前学习的程序在没有跳转语句的前提下都是由上到下依次执行,那现在想要设计一个程序,边打游戏边听歌,该怎么设计? 要解决上述问题,咱们得使用多进程或多线程来解决。 14.1并发与并行 并发:指两个或多个事件在...
  • java知识点

    千次阅读 2018-12-30 10:44:56
    必会知识点 是什么 在哪里用,怎么用,遇到了什么问题 ...jre:Java 运行时环境(JVM+类库) jvm:Java 虚拟机 2.环境变量的解释: 目的:在任意路径下能够直接执行相关命令。 原理:在命令行中执行某...
  • java高并发实战

    2019-09-03 23:20:58
    目录 一 走入并行世界 1 并行概念 1.1.1 同步和异步 1.1.2 并发与并行 1.1.3 临界区 1.1.4 阻塞和非阻塞 1.1.5 死锁、饥饿和活锁 1.2 并发级别 1.3 回到java:JMM ...二 JAVA并行程序基础 1 .1并...
  • java并发编程实战笔记》第六章 结构化并发应用程序-----任务执行介绍Executor框架前的背景提到的概念 第六章 结构化并发应用程序-----任务执行 介绍Executor框架前的背景   大多数应用程序都是围绕“任务执行...
  • 楠哥Java SE总结详细笔记

    千次阅读 多人点赞 2021-02-19 17:26:30
    Java SE总结 一、Java体系 1、Java 基础 1、跨平台 2、面向对象编程语言 3、分布式计算 2、Java运行机制 编程 Java 程序 ...JRE:Java Runtime Environment Java 运行环境 JDK:Java Devlopment K
  • java详细版

    千次阅读 2020-07-04 17:34:57
    第一部分 java编程基础 第一章 java语言概述 1.jdk的安装配置 此电脑–>属性 -->高级系统设置 -->环境变量 JAVA_HOME: C:\Program Files\Java\jdk1.8.0_171 path: %JAVA_HOME%\bin %JAVA_HOME%\jre\...
  • Java线程池原理

    2021-03-22 07:53:14
      现在的服务器基本都是多个核心,多个核心可以达到并行运算。对于我们常见的web服务,有cpu密集型计算,也有IO密集型计算,线程数设置过多,会导致线程上下切换过于频繁,消耗性能,设置过少,不能充分利用cpu
  • Java 任务

    2021-03-23 15:18:19
    目录任务执行在线程中执行任务单个线程串行执行多个任务单线程+IO多路复用每个线程执行一个任务Executor框架(异步的任务执行框架)Executor生命周期携带结果的任务Callable与FutureCompletionService:Executor与...
  • 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统地称作“运行中” BLOCKED 阻塞状态,表示线程阻塞于锁 WAITING 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定...
  • 前面第2~5章是本书的第二部分,介绍了线程安全的基本知识,以及常用的java线程安全组件。这一章开始,进入第二部分,从更高的层次,介绍如何通过结构化的方式,来设计、构建基于多线程的应用系统。 第6章:任务及其...
  • Java 线程池原理

    2021-08-26 18:11:59
    J.U.C 提供的线程池 ThreadPoolExecutor 类,帮助开发人员管理线程并方便地执行并行任务。了解并合理使用线程池,是一个开发人员必修的基本功。 本文开篇简述了线程池概念和用途,接着结合线程池的源码,帮助大家...
  • java:多线程

    2018-08-13 19:54:07
    进程、线程都可以并行执行,  操作系统中有一个组件叫做任务调度器,将cpu的时间片分给不同的程序使用, 微观串行(单核),宏观并行.(多核cpu可以并行) 好处: 1) 多进程,多线程可以让程序不被阻塞. 2) 充分...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,674
精华内容 669
关键字:

java并行执行限时

java 订阅