精华内容
下载资源
问答
  • Latch模式--多线程同步

    2020-04-12 23:34:57
    Java多线程---CountDownLatch用法实例 POI结合线程池批量处理导入减少导入时间 2,直接源码 package com.wayne.latchPattern; import java.sql.Time; import java.util.concurrent.TimeUnit; public abstract ...

    1,相关文章推荐

    Java多线程---CountDownLatch用法实例

    POI结合线程池批量处理导入减少导入时间

    2,直接源码

    package com.wayne.latchPattern;
    
    import java.sql.Time;
    import java.util.concurrent.TimeUnit;
    
    public abstract class Latch {
    
        protected int limit;
    
        public Latch(int limit){
            this.limit = limit;
        }
    
        public abstract void await() throws InterruptedException;
    
        public abstract void await(TimeUnit timeUnit,long time) throws InterruptedException, WaitTimeoutException;
    
        public abstract void countdown();
    
        public abstract int getUnArrived();
    }
    
    package com.wayne.latchPattern;
    
    import java.util.concurrent.TimeUnit;
    
    public class CountDownLatch extends Latch {
    
        public CountDownLatch(int limit){
            super(limit);
        }
    
        @Override
        public void await() throws InterruptedException {
            synchronized (this){
                while (limit >0) {
                    this.wait();
                }
            }
        }
    
        @Override
        public void await(TimeUnit timeUnit, long time) throws InterruptedException,WaitTimeoutException {
            if(time <= 0 ){
                throw new IllegalArgumentException("time is invaild.");
            }
    
            long remainingNanos = timeUnit.toNanos(time);
            final long endNanos = System.nanoTime() + remainingNanos;
            synchronized (this){
                while (limit > 0){
                    if(TimeUnit.NANOSECONDS.toMillis(remainingNanos)<= 0){
                        throw new WaitTimeoutException("The wait time over specify time.");
                    }
    
                    this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
                    remainingNanos = endNanos - System.nanoTime();
                }
            }
        }
    
        @Override
        public void countdown() {
            synchronized (this){
                if(limit <= 0){
                    throw new IllegalStateException(" all arrived");
                }
    
                limit--;
                this.notifyAll();
            }
        }
    
        @Override
        public int getUnArrived() {
            synchronized (this){
                return limit;
            }
        }
    }
    
    package com.wayne.latchPattern;
    
    import org.omg.CORBA.TIMEOUT;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    public class ProgrammerTravel extends Thread {
    
        private final Latch latch;
    
        private final String programmer;
    
        private final String transport;
    
        public ProgrammerTravel(Latch latch,String programmer,String transport){
            this.latch = latch;
            this.programmer = programmer;
            this.transport = transport;
        }
    
        @Override
        public void run() {
            try{
                System.out.println(programmer+" start to move with "+ transport);
                TimeUnit.SECONDS.sleep(new Random(System.currentTimeMillis()).nextInt(10));
            } catch (Exception e){
                e.printStackTrace();
            }
    
            System.out.println(programmer+" with "+transport+ " has arrived!");
            latch.countdown();
        }
    }
    
    package com.wayne.latchPattern;
    
    public class WaitTimeoutException extends Exception {
    
        public WaitTimeoutException(String message){
            super(message);
        }
    }
    
    package com.wayne.latchPattern;
    
    import java.util.concurrent.TimeUnit;
    
    public class LatchPatternTest {
    
        public static void main(String[] args) throws InterruptedException, WaitTimeoutException {
            Latch latch = new CountDownLatch(4);
            new ProgrammerTravel(latch,"Wayne","Bicycle").start();
            new ProgrammerTravel(latch,"Bruce","Bus").start();
            new ProgrammerTravel(latch,"Batman","Car").start();
            new ProgrammerTravel(latch,"IronMan","Flight").start();
            latch.await(TimeUnit.SECONDS,5);
    
            System.out.println("All arrived,let's start!");
        }
    }
    

     

    展开全文
  • 如果一个线程想要监视另一个线程的进度,在这个线程达到某个进度时就开始执行代码,这个时候就需要进行线程之间的通信,就需要用到wait()等方法, 如题:让线程2监视线程1在线程中计数达到5时,线程2进行打印通知。...

    线程之间的通信

    如果一个线程想要监视另一个线程的进度,在这个线程达到某个进度时就开始执行代码,这个时候就需要进行线程之间的通信,就需要用到wait()等方法,
    如题:让线程2监视线程1在线程中计数达到5时,线程2进行打印通知。代码如下:
    只有synchronized针对的对象才能调用wait()等方法
    wait()方法是让线程进入等待队列,
    需要注意的是:wait()方法会释放锁,但是notify()方法不会释放锁,所以需要

    public class T  {
        private volatile List<Object> list = new ArrayList<>();
        private void add(Object o){
            list.add(o);
        }
        private int size(){
            return list.size();
        }
    
        public static void main(String[] args) {
            T t = new T();
            final Object o = new Object();
            new Thread(() -> {
                synchronized (o) {
                    System.out.println("t2启动");
                    if (t.size() != 5) {
                        try {
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2结束");
                    o.notify();
                }
            }).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                System.out.println("t1启动");
                synchronized (o) {
                    for (int i = 0; i < 10; i++) {
                        t.add(new Object());
                        System.out.println("add"+i);
                        if(t.size()==5){
                            o.notify();
                            try {
                                o.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                    }
                }
            }).start();
        }
    }

    门闩

    代码如下,它可以在一定程度上代替wait()等方法,但是还是有缺陷,因为有可能再放开门闩后第二个线程,抢不过第一个线程,那么还是不能准确地监视线程1,所以在门闩打开后让线程1沉睡一会,保证线程2能够顺利执行,但是还是存在一定风险。

    public class T  {
        private volatile List<Object> list = new ArrayList<>();
        private void add(Object o){
            list.add(o);
        }
        private int size(){
            return list.size();
        }
    
        public static void main(String[] args) {
            T t = new T();
            final Object o = new Object();
            CountDownLatch latch = new CountDownLatch(1);
            new Thread(() -> {
                    System.out.println("t2启动");
                    if (t.size() != 5) {
                        try {
                            latch.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2结束");
            }).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                System.out.println("t1启动");
                    for (int i = 0; i < 10; i++) {
                        t.add(new Object());
                        System.out.println("add"+i);
                        if(t.size()==5){
                            latch.countDown();
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
            }).start();
        }
    }
    展开全文
  • Latch 模式背景释义: 有A、B、C、D若干个并行任务,现在F任务需要等ABCD全部完成之后再进行,只要其中任一一个并发任务未执行完F任务就阻塞或者抛出超时异常、取消任务 代码翻译: 抽象任务接口约束类 ...
    • Latch 模式背景释义:
    • 有A、B、C、D若干个并行任务,现在F任务需要等ABCD全部完成之后再进行,只要其中任一一个并发任务未执行完F任务就阻塞或者抛出超时异常、取消任务
    • 代码翻译:
      抽象任务接口约束类
    public abstract class Latch {
    
        protected int limit;
    
        public Latch(int limit){
            this.limit = limit;
        }
    
        /**
         * 阻塞当前调用者所在线程,阻塞的逻辑为,如果当前还有任务未完成则阻塞
         *
         * @throws InterruptedException
         */
        public abstract void await() throws InterruptedException;
    
        public abstract void await(TimeUnit unit,long time) throws InterruptedException, TimeoutException;
    
        /**
         * 谁执行完任务就将任务完成标志减1,当任务完成标志为0时表示所有任务均已完成
         * 本方法为同步方法,任务线程执行时需要先获取到本接口的锁,具体锁住的对象为
         * limit ,当前任务线程执行完任务之后将标志-1,同时释放锁
         */
        public abstract void countDown();
    
        /**
         * 获取剩下未完成任务的个数
         * @return
         */
        public abstract int getUnarrived();
    }
    
    

    具体任务实现类:

    public class CountDownLatch extends Latch {
    
        public CountDownLatch(int limit) {
            super(limit);
        }
    
        @Override
        public void await() throws InterruptedException {
    
            synchronized (this){
                while(limit > 0){
                    this.wait();
                }
            }
        }
    
        @Override
        public void await(TimeUnit unit, long time) throws InterruptedException, TimeoutException {
            if(time <=0){
                throw new IllegalArgumentException("the time is invalid");
            }
    
            //将时间转换为纳秒
            long remainingNanos = unit.toNanos(time);
            //等待任务将在endNanos 纳秒后 超时
            final long endNanos = System.nanoTime() + remainingNanos;
    
            synchronized (this){
                while(limit > 0){
                    //超时  直接抛出异常
                    if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0){
                        throw  new TimeoutException("time out");
                    }
    
                    //等待remainingNanos  在等待的过程中可能会被中断,需要重新计算remainingNanos时间
                    this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
    
                    //执行线程中断  时重新计算时间
                    remainingNanos = endNanos - System.nanoTime();
                }
            }
        }
    
    
        @Override
        public void countDown() {
    
            synchronized (this){
                if(limit <= 0){
                    throw new IllegalStateException("all of task has done");
                }
    
                limit --;
                notifyAll();
            }
        }
    
        @Override
        public int getUnarrived() {
            return limit;
        }
    }
    
    

    工作线程:

    public class LatchTaskThread extends Thread {
    
        private Latch latch;
    
        private String programmer;
    
        private String transportion;
    
        public LatchTaskThread(Latch latch,String programmer,String transportion){
            this.latch = latch;
            this.programmer = programmer;
            this.transportion = transportion;
        }
    
        @Override
        public void run() {
            super.run();
            System.out.println("26--------执行者:"+this.programmer + "  start task:"+transportion);
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("26--------执行者:"+this.programmer + "  finsh task:"+transportion);
            latch.countDown();
    
        }
    }
    
    
    • 测试用例:
     private void testLatch() throws InterruptedException, TimeoutException {
            latch = new CountDownLatch(4);
            new LatchTaskThread(latch,"A","Bus").start();
            new LatchTaskThread(latch,"B","Stock").start();
            new LatchTaskThread(latch,"C","Play Crad").start();
            new LatchTaskThread(latch,"D","Work").start();
            //latch.await();
    
            latch.await(TimeUnit.SECONDS,5);
            System.out.println("43-------所有任务均已经完成");
        }
    
    
    • 参考书籍《Java 高并发编程详解》
    展开全文
  • 我们在开发的过程中会遇到这种情况: 某一任务,需要等待多个任务执行结束后才能执行。或者说。 某一任务,需要等待前几个任务的执行结果。 我们可以采用Latch多线程设计架构模式

    背景

    生活实例:当我们公司进行团队建设的时候,要统一坐汽车,长途跋涉到 某 景区。那么就需要大家统一到某个地方集合,只有人到齐了,才能出发。

     

    我们在开发的过程中会遇到这种情况:

    某一任务,需要等待多个任务执行结束后才能执行。或者说。

    某一任务,需要等待前几个任务的执行结果。

    我们可以采用Latch的多线程设计架构模式

    设计与实现

    1.Latch.java

    首先定义抽象类,类中定义变量,只有该变量为0的时候,才能继续执行下一步。(也可以理解为,没来的人数为0了, 能开车了)

    package lanch;
    
    public abstract class Latch {
    	protected int limit;
    
    	public Latch(int limit) {
    		this.limit = limit;
    	}
    
    	public abstract void await() throws InterruptedException;
    
    	public abstract void countdown();
    
    	public abstract int getUnarrived();
    
    }
    

    2.CountDownLatch.java

    该类继承Latch类,对于父类中的方法进行实现。

    package lanch;
    
    public class CountDownLatch extends Latch {
    
    	public CountDownLatch(int limit) {
    		super(limit);
    	}
    
    	@Override
    	public void await() throws InterruptedException {
    		synchronized (this) {
    			// 当limit >0时,进入阻塞
    			while (limit > 0) {
    				this.wait();
    			}
    		}
    	}
    
    	@Override
    	public void countdown() {
    		synchronized (this) {
    			if (limit <= 0)
    				throw new IllegalStateException("all of task already arrived");
    			// 使limit减1,并通知阻塞进程。
    			limit--;
    			this.notifyAll();
    		}
    	}
    
    	@Override
    	public int getUnarrived() {
    		return limit;
    	}
    
    }
    

    3.GetBriefDeviceThread

    实现多线程的接口,本文实现的是Callable接口,如果不需要返回值,可以实现runable接口

    这个类就代表每个员工赶往乘车地点

    package lanch;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class GetBriefDeviceThread implements Callable<List<BaseDevice>> {
    	Latch lanch;
    	private String deviceName;
    
    	public GetBriefDeviceThread(String deviceName, Latch latch) {
    		this.deviceName = deviceName;
    		this.lanch = latch;
    	}
    
    	@Override
    	public List<BaseDevice> call() throws Exception {
    		BaseDevice baseDevice = new BaseDevice();
    		baseDevice.setNameKey(deviceName);
    		List<BaseDevice> deviceList = new ArrayList<>();
    		deviceList.add(baseDevice);
    
    		TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
    		this.lanch.countdown();
    		System.out.println("the device is :" + deviceName);
    		return deviceList;
    	}
    
    }
    

    4.DependedPath与BaseDevice

    两个存储数据的数据结构

    package lanch;
    
    public class BaseDevice {
    	private String nameKey;
    	private boolean exist;
    
    	public String getNameKey() {
    		return nameKey;
    	}
    
    	public void setNameKey(String nameKey) {
    		this.nameKey = nameKey;
    	}
    
    	public boolean isExist() {
    		return exist;
    	}
    
    	public void setExist(boolean exist) {
    		this.exist = exist;
    	}
    
    }
    package lanch;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class DependedPath {
    
    	private List<String> usedPath;
    	private List<String> usingPath;
    
    	public DependedPath(List<String> usedPath, List<String> usingPath, List<String> noDependedPath,
    			List<String> paramDependList) {
    		this.usedPath = usedPath;
    		this.usingPath = usingPath;
    
    	}
    
    	public DependedPath() {
    		usedPath = new ArrayList<>();
    		usingPath = new ArrayList<>();
    
    		usedPath.add("srcDevice");
    		usedPath.add("dstDevice");
    		usingPath.add("srcDevice1");
    		usingPath.add("dstDevice1");
    		usingPath.add("srcDevice2");
    		usingPath.add("dstDevice2");
    
    	}
    
    	public List<String> getUsedPath() {
    		return usedPath;
    	}
    
    	public void setUsedPath(List<String> usedPath) {
    		this.usedPath = usedPath;
    	}
    
    	public List<String> getUsingPath() {
    		return usingPath;
    	}
    
    	public void setUsingPath(List<String> usingPath) {
    		this.usingPath = usingPath;
    	}
    
    }
    

    5.Main类应用与测试

     

    package lanch;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class Main {
    
    	public static void main(String[] args) throws Throwable {
    		DependedPath dependedPath = new DependedPath();
    		List<String> usedPath = dependedPath.getUsedPath();// 被依赖的节点
    
    		List<String> usingPath = dependedPath.getUsingPath();// 依赖别人的节点
    
    		ExecutorService pool = Executors.newCachedThreadPool();// 线程池
    		Map<String, BaseDevice> devicesMap = new HashMap<>();// 结果存储器
    
    		Latch latch = new CountDownLatch(usedPath.size());// 第一个锁存器
    		List<Future<List<BaseDevice>>> usedPathResults = new ArrayList<>();
    		for (String path : usedPath) {
    			GetBriefDeviceThread getBriefDeviceThread = new GetBriefDeviceThread(path, latch);
    			Future<List<BaseDevice>> future = pool.submit(getBriefDeviceThread);
    			usedPathResults.add(future);
    		}
    
    		for (Future<List<BaseDevice>> future : usedPathResults) {
    			List<BaseDevice> deviceList = future.get();
    			for (BaseDevice baseDevice : deviceList) {
    				devicesMap.put(baseDevice.getNameKey(), baseDevice);
    			}
    
    		}
    		latch.await();
    
    		System.out.println(devicesMap);
    		System.out.println("usedPath finished");
    	
    
    		Latch latch2 = new CountDownLatch(usingPath.size());// 第二个锁存器
    		List<Future<List<BaseDevice>>> otherResults = new ArrayList<>();
    		for (String path : usingPath) {
    			GetBriefDeviceThread getBriefDeviceThread = new GetBriefDeviceThread(path, latch2);
    			Future<List<BaseDevice>> future = pool.submit(getBriefDeviceThread);
    			otherResults.add(future);
    		}
    
    		for (Future<List<BaseDevice>> future : otherResults) {
    			List<BaseDevice> deviceList = future.get();
    			for (BaseDevice baseDevice : deviceList) {
    				devicesMap.put(baseDevice.getNameKey(), baseDevice);
    			}
    
    		}
    		System.out.println(devicesMap);
    		latch2.await();
    
    		pool.shutdown();// 关闭线程池
    
    	}
    
    }
    

    结果 

    the device is :srcDevice
    the device is :dstDevice
    {srcDevice=lanch.BaseDevice@42a57993, dstDevice=lanch.BaseDevice@75b84c92}
    usedPath finished
    the device is :dstDevice1
    the device is :srcDevice2
    the device is :dstDevice2
    the device is :srcDevice1
    {dstDevice2=lanch.BaseDevice@4aa298b7, dstDevice1=lanch.BaseDevice@7d4991ad, srcDevice2=lanch.BaseDevice@28d93b30, srcDevice=lanch.BaseDevice@42a57993, srcDevice1=lanch.BaseDevice@1b6d3586, dstDevice=lanch.BaseDevice@75b84c92}
    the device is :dstDevice
    the device is :srcDevice
    {srcDevice=lanch.BaseDevice@42a57993, dstDevice=lanch.BaseDevice@75b84c92}
    usedPath finished
    the device is :dstDevice2
    the device is :dstDevice1
    the device is :srcDevice2
    the device is :srcDevice1
    {dstDevice2=lanch.BaseDevice@4aa298b7, dstDevice1=lanch.BaseDevice@7d4991ad, srcDevice2=lanch.BaseDevice@28d93b30, srcDevice=lanch.BaseDevice@42a57993, srcDevice1=lanch.BaseDevice@1b6d3586, dstDevice=lanch.BaseDevice@75b84c92}

    以上是两次执行结果,每组的输出序可能不一样,但是可以保证的是,后面四个一定在前面两个后面。

    展开全文
  • 最近项目中用到了多线程下载Excel,之前一直用闭锁,今天听网友建议用栅栏,就搜了下二者的区别: -闭锁(Latch) 闭锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的...
  • this.latch = latch; } @lombok.SneakyThrows @Override public void run() { int randomNum = ThreadLocalRandom.current().nextInt(1, 4); Thread.sleep(randomNum * 1000); log.info("HELLO WORLD: " + number);...
  • 1什么是Latch 若干线程并发执行某个特定的任务,然后等到所有的子任务都执行结束之后再统一汇总,比如用户想要查询自己三年以来银行账号的流水,为了保证运行数据库的数据量在一个恒定的范围之内,通常数据只会保存...
  • java多线程并发系列之闭锁(Latch)和栅栏(CyclicBarrier) 标签: java并发编程 ...
  • 使用场景: 个任务同时完成,才能进入下一步时使用; #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <random> #include &...
  • Latch设计模式场景描述CountDownLatch实现LatchCountDownLatch使用Reference 场景描述 程序员Alex叫上好朋友Jack、Gavin、Dillon去“同心湖”游玩。每个人前往目的地的方式不一样,到达的时间也不一样,需要等所有人...
  • Latch(阀门)设计模式也叫做 Count Down 设计模式。当若干个线程并发执行完某个特 定的任务,然后等到所有的子任务都执行结束之后再统一汇总。 CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再...
  • Java1.5提供了一个非常高效实用的多线程包:java.util.concurrent, 提供了大量高级工具,可以帮助开发者编写高效、易维护、结构清晰的Java多线程程序。从这篇blog起,我将跟大家一起共同学习这些新的Java多线程构件 ...
  • 学习java多线程并发系列之闭锁(Latch)和栅栏(CyclicBarrier) 转载于:https://www.cnblogs.com/jiyang2018/p/11283284.html
  • JAVA并发包中有三个类用于同步一批线程的行为,分别是闭锁(Latch),信号灯...闭锁(Latch) —— 确保线程在完成各自事务后,才会打开继续执行后面的内容,否则一直等待。 计数器闭锁(CountDownLa...
  • -闭锁(Latch) 闭锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,112
精华内容 4,444
关键字:

latch多线程