精华内容
下载资源
问答
  • 2016-12-23 09:26:48
    1 I/O处理比较花费时间,故把执行I/O处理和非IO处理的线程分开。CPU执行速度很快、而内存的写入、读取很慢,所以有关CPU和内存交互会降低指令的速度。
    2 start方法运行有2个步骤
    启动新的线程
    运行new对象的run方法
    3 所有线程结束,程序结束。  主线程可以先结束,但并不表示程序结束。
    4 一个线程已经获得锁定,其他线程都不能执行该实例的synchronized方法。
    5 wait set 线程休息区,当线程调用wait()方法时,线程自动进入wait set 区等待,直到被唤醒。
    6 wait notify notifyAll方法都是在锁定时调用,唤醒的线程重新等待锁。并且都有对象obj.wait(),否则默认this.wait()
    7 线程好坏评判标准
       安全性和生存性为必考虑
    安全性    ---不损坏对象
    生存性    ---进行必要处理 , 防止线程挂掉
    复用性    ---可在利用类
    性能      ---能快速、大量的进行处理
    吞吐量越大、响应性越快、容量越大,性能越好
    8 可能发生死锁的3个条件
    a、具有多个sharedResourse参与者,即共享资源
    b、线程锁定一个sharedSourse时,还没解除前就去锁定另一个sharedSourse.
    c、获取sharedSourse的顺序不固定
    只要破坏a b c中的任意一个条件,就可以避免死锁的发生。
    9 临界区的大小与执行性能
    获取锁定需要花时间
    线程冲突时必须等待
    10 有锁定时一定要记得解锁,中间不能有return 或是异常。
    11 要在线程中共享long 和 double 的字段,必须在synchronized中操作,或是声明成volatile
    12 Immutable Thread(不可变线程)
    字段为 final私有
    没有setter方法
      优点:不需要synchronized字段,频繁访问的情况下可以大大提高性能。
    13 被阻挡而等待
    等待
    while(!ready){
    wait();
    }


    唤醒
    ready=true;
    notify()/notifyAll()
    14 ReadWriteLock
    Read  Read    不冲突
    Read  Write   冲突
    Write  Write   冲突
    15 进程与线程的区别
    a 进程的内存是独立的、线程的内存是共享的

    16 interrupt() 方法会唤醒sleep/wait/join ,但会直接到catch语句,而不是运行其后面的语句。
    17 join()等待线程结束










    疑问:
    1 同一个类中,2个方法用了synchronized, 调用其中的一个方法,另一个方法也被锁住了?
    答:是










    1、多核、多CPU线程
    Lock
    lock.lockInterruptibly(),其实和lock.lock()效果一样,只有当调用interrupt()方法时,前面的会先运行catch里代码。
    Condition 
    Timer
    TimerTask
    Callable<V>
    2 WeakReference弱引用,防止内存泄露,将弱引用对象占用空间释放。
    3  Futrue<V>  FutrueTask<V>
    4 JAVA netive关键字
    native关键字说明其修饰的方法是一个原生态方法,方法对应的实现不是在当前文件,而是在用其他语言(如C和C++)实现的文件中。
    Java语言本身不能对操作系统底层进行访问和操作,但是可以通过JNI接口调用其他语言来实现对底层的访问。
    5 public ThreadPoolExecutor(int corePoolSize,  
                                  int maximumPoolSize,  
                                  long keepAliveTime,  
                                  TimeUnit unit,  
                                  BlockingQueue<Runnable> workQueue,  
                                  ThreadFactory threadFactory,  
                                  RejectedExecutionHandler handler)
    a.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 
    b.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 
    c.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 
    d.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 
    e.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 
    f.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 
      





    更多相关内容
  • JAVA多线程设计模式(带完整书签清晰扫描版).pdf 高清带完整书签清晰扫描版
  • Java多线程设计模式 清晰完整PDF版 Java多线程设计模式源代码
  • Java多线程设计模式_清晰完整PDF版

    千次下载 热门讨论 2015-04-22 17:37:18
    Java多线程设计模式_清晰完整PDF版,带有源码。绝对的清晰版,并且内容完整。
  • Java多线程设计模式(PDF清晰扫描版带完整书签);多多学习,共同提高
  • 书中包含Java线程的介绍导读、12个重要的线程设计模式和全书总结以及丰富的附录内容。第一章相关线程设计模式的介绍,都举一反三使读者学习更有效。最后附上练习问题,让读者可以温故而知新,能快速地吸收书中的精华...
  • 《JAVA多线程设计模式》(结城浩著2005).pdf,多线程里面最经典通俗易懂的一本。
  • 十一、流水线模式(Pipeline) 1、核心思想 将一个任务处理分解为若干个处理阶段,其中每个处理阶段的输出作为下一个处理阶段的输入,并且各个处理阶段都有相应的工作者线程去执行相应的计算。 2、评价: 充分...

    十一、流水线模式(Pipeline)
    1、核心思想
    将一个任务处理分解为若干个处理阶段,其中每个处理阶段的输出作为下一个处理阶段的输入,并且各个处理阶段都有相应的工作者线程去执行相应的计算。
    2、评价:
    充分利用CPU,提高其计算效率。
    允许子任务间存在依赖关系的条件下实现并行计算。
    非常便于采用单线程模型实现对子任务的处理。
    有错误处理 PipeContext
    3、适用场景
    a、适合于处理规模较大的任务,否则可能得不偿失。各个处理阶段所使用的工作者线程或者线程池、输入输出对象的创建和转移都有自身的时间和空间消耗。

    /**
     * 对处理阶段的抽象。
     * 负责对输入进行处理,并将输出作为下一处理阶段的输入
     * @author huzhiqiang
     *
     * @param <IN>
     * @param <OUT>
     */
    public interface Pipe<IN, OUT> {
        /**
         * 设置当前Pipe实例的下个Pipe实例
         * @param nextPipe
         */
        public void setNextPipe(Pipe<?,?> nextPipe);
    
        /**
         * 对输入的元素进行处理,并将处理结果作为下一个Pipe实例的输入
         * @param input
         * @throws InterruptedException
         */
        public void process(IN input) throws InterruptedException;
    
        public void init(PipeContext pipeCtx);
        public void shutdown(long timeout, TimeUnit unit);
    }
    
    
    /**
     * 对复合Pipe的抽象。一个Pipeline实例可包含多个Pipe实例
     * @author huzhiqiang
     *
     * @param <IN>
     * @param <OUT>
     */
    public interface PipeLine<IN, OUT> extends Pipe<IN, OUT> {
        void addPipe(Pipe<?,?> pipe);
    }
    
    
    
    public abstract class AbsractPipe<IN, OUT> implements Pipe<IN, OUT> {
        protected volatile Pipe<?, ?> nextPipe = null;
        protected volatile PipeContext PipeCtx = null;
    
        @Override
        public void setNextPipe(Pipe<?, ?> nextPipe) {
            this.nextPipe = nextPipe;
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public void process(IN input) throws InterruptedException {
            try {
                OUT out = doProcess(input);
                if(null != nextPipe){
                    if(null != out){
                        ((Pipe<OUT, ?>) nextPipe).process(out);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (PipeException e) {
                PipeCtx.handleError(e);
            }
    
        }
    
        @Override
        public void init(PipeContext pipeCtx) {
            this.PipeCtx = pipeCtx;
        }
    
        @Override
        public void shutdown(long timeout, TimeUnit unit) {
            //什么也不做
        }
    
        /**
         * 留给子类实现,用于子类实现其任务处理逻辑
         */
        public abstract OUT doProcess(IN input) throws PipeException;
    }
    
    
    public abstract class AbstractParallePipe<IN, OUT, V> extends AbsractPipe<IN, OUT> {
        private final ExecutorService executorService;
    
        public AbstractParallePipe(BlockingQueue<IN> queue, ExecutorService executorService) {
            super();
            this.executorService = executorService;
        }
    
        /**
         * 留给子类实现,用于根据指定的输入元素input构造一组子任务
         * @param input
         * @return
         * @throws Exception
         */
        protected abstract List<Callable<V>> buildTasks(IN input) throws Exception;
    
        /**
         * 留给子类实现,对各个子任务的处理结果进行合并,形成相应输入元素的输出结果
         * @param subTaskResults
         * @return
         * @throws Exception
         */
        protected abstract OUT combineResults(List<Future<V>> subTaskResults) throws Exception;
    
        /**
         * 以并行的方式执行一组子任务
         * @param tasks
         * @return
         * @throws Exception
         */
        protected List<Future<V>> invokeParallel(List<Callable<V>> tasks) throws Exception{
            return executorService.invokeAll(tasks);
        }
    
    
        @Override
        public OUT doProcess(IN input) throws PipeException {
            OUT out = null;
            try {
                out = combineResults(invokeParallel(buildTasks(input)));
            } catch (Exception e) {
                throw new PipeException(this, input, "Task failed", e);
            }
            return out;
        }
    
    }
    
    
    public class SimplePipeline<IN, OUT> extends AbsractPipe<IN, OUT> implements PipeLine<IN, OUT> {
        private final Queue<Pipe<?, ?>> pipes = new LinkedList<Pipe<?, ?>>();
        private final ExecutorService helperService;
    
        public SimplePipeline() {
            //创建固定线程数为1的线程池,整型的最大数的LinkedBlockingQueue的缓存队列
            this(Executors.newSingleThreadExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "SimplePpeLine-Helper");
                    t.setDaemon(true);
                    return t;
                }
            }));
        }
    
        public SimplePipeline(final ExecutorService helperService) {
            super();
            this.helperService = helperService;
        }
    
        @Override
        public void shutdown(long timeout, TimeUnit unit) {
            Pipe<?,?> pipe;
    
            while(null != (pipe = pipes.poll())){
                pipe.shutdown(timeout, unit);
            }
    
            helperService.shutdown();
        }
    
        @Override
        public void addPipe(Pipe<?, ?> pipe) {
            pipes.add(pipe);
        }
    
        @Override
        public OUT doProcess(IN input) throws PipeException {
            // TODO Auto-generated method stub
            return null;
        }
    
        @Override
        public void process(IN input) throws InterruptedException {
            @SuppressWarnings("unchecked")
            Pipe<IN, ?> firstPipe = (Pipe<IN, ?>) pipes.peek();
    
            firstPipe.process(input);
        }
    
        @Override
        public void init(PipeContext pipeCtx) {
            LinkedList<Pipe<?, ?>> pipesList = (LinkedList<Pipe<?, ?>>) pipes;
            Pipe<?, ?> prevPipe = this;
            //设置处理任务的先后顺序
            for(Pipe<?, ?> pipe: pipesList){
                prevPipe.setNextPipe(pipe);
                prevPipe = pipe;
            }
    
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    for(Pipe<?, ?> pipe: pipes){
                        pipe.init(pipeCtx);
                    }
                }
            };
    
            helperService.submit(task);
        }
    
        public <INPUT, OUTPUT> void addAsWorkerThreadBasedPipe(Pipe<INPUT, OUTPUT> delegate, int workCount){
            addPipe(new WorkThreadPipeDecorator<INPUT, OUTPUT>(delegate, workCount));
        }
    
        public <INPUT, OUTPUT> void addAsThreadBasedPipe(Pipe<INPUT, OUTPUT> delegate, ExecutorService executorService){
            addPipe(new ThreadPoolPipeDecorator<INPUT, OUTPUT>(delegate, executorService));
        }
    
        public PipeContext newDefaultPipeContext(){
            return new PipeContext() {
                @Override
                public void handleError(PipeException exp) {
                    helperService.submit(new Runnable() {
                        @Override
                        public void run() {
                            exp.printStackTrace();
                        }
                    });
                }
            };
        }
    }
    
    
    
    public class ThreadPoolPipeDecorator<IN, OUT> implements Pipe<IN, OUT> {
        private final Pipe<IN, OUT> delegate;
        private final TerminationToken terminationToken;
        private final ExecutorService executorService;
        private final CountDownLatch stageProcessDoneLatch = new CountDownLatch(1);
    
        public ThreadPoolPipeDecorator(Pipe<IN, OUT> delegate, ExecutorService executorService) {
            super();
            this.delegate = delegate;
            this.executorService = executorService;
            terminationToken = TerminationToken.newInstance(executorService);
        }
    
        @Override
        public void setNextPipe(Pipe<?, ?> nextPipe) {
            delegate.setNextPipe(nextPipe);
        }
    
        @Override
        public void process(IN input) throws InterruptedException {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    int remainingReservations = -1;
                    try {
                        delegate.process(input);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        remainingReservations = terminationToken.reservations.decrementAndGet();
                    }
    
                    if(terminationToken.isToShutDown()  &&  0 == remainingReservations){
                        //最后一个任务执行结束
                        stageProcessDoneLatch.countDown();
                    }
                }
            };
    
            executorService.submit(task);
            terminationToken.reservations.incrementAndGet();
        }
    
        @Override
        public void init(PipeContext pipeCtx) {
            delegate.init(pipeCtx);
        }
    
        @Override
        public void shutdown(long timeout, TimeUnit unit) {
            terminationToken.setIsToShutdown();
            if(terminationToken.reservations.get() > 0){
                try {
                    if(stageProcessDoneLatch.getCount() > 0){
                        //保证线程池中的所有任务都已经执行结束才delegate.shutdown
                        stageProcessDoneLatch.await(timeout, unit);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            delegate.shutdown(timeout, unit);
        }
    
        private static class TerminationToken extends com.threadDesign.twoPhase.TerminationToken{
            private final static ConcurrentHashMap<ExecutorService, TerminationToken> 
                INSTANCE_MAP = new ConcurrentHashMap<ExecutorService, TerminationToken>();
    
            private TerminationToken(){
    
            }
    
            void setIsToShutdown(){
                this.toShutDown = true;
            }
    
            static TerminationToken newInstance(ExecutorService executorService){
                TerminationToken token = INSTANCE_MAP.get(executorService);
                if(null == token){
                    token = new TerminationToken();
                    TerminationToken existingToken = INSTANCE_MAP.putIfAbsent(executorService, token);
    
                    if(null != existingToken){
                        token = existingToken;
                    }
                }
    
                return token;
            }
        }
    }
    
    
    /**
     * 基于工作者线程的Pipe实现类
     * 提交到该Pipe的任务由指定个数的工作者线程共同处理
     * @author huzhiqiang
     *
     * @param <IN>
     * @param <OUT>
     */
    public class WorkThreadPipeDecorator<IN, OUT> implements Pipe<IN, OUT> {
        protected final BlockingQueue<IN> workQueue;
        protected final Set<AbstractTerminatableThread> workerThreads = new HashSet<AbstractTerminatableThread>();
        protected final TerminationToken terminationToken = new TerminationToken();
    
        private final Pipe<IN, OUT> delegate;
    
        public WorkThreadPipeDecorator(Pipe<IN, OUT> delegate, int workerCount){
            this(new SynchronousQueue<IN>(), delegate, workerCount);
        }
    
        public WorkThreadPipeDecorator(BlockingQueue<IN> workQueue, Pipe<IN, OUT> delegate, int workerCount) {
            if(workerCount <= 0){
                throw new IllegalArgumentException("workerCount should be positive!");
            }
    
            this.workQueue = workQueue;
            this.delegate = delegate;
            for(int i=0; i<workerCount; i++){
                workerThreads.add(new AbstractTerminatableThread() {
    
                    @Override
                    protected void doRun() throws Exception {
                        try {
                            dispatch();
                        }finally {
                            terminationToken.reservations.decrementAndGet();
                        }
                    }
                });
            }
        }
    
        private void dispatch() throws InterruptedException {
            IN input = workQueue.take();
            delegate.process(input);
        }
    
        @Override
        public void setNextPipe(Pipe<?, ?> nextPipe) {
            delegate.setNextPipe(nextPipe);
        }
    
        @Override
        public void process(IN input) throws InterruptedException {
            workQueue.put(input);
            terminationToken.reservations.incrementAndGet();
        }
    
        @Override
        public void init(PipeContext pipeCtx) {
            delegate.init(pipeCtx);
            for(AbstractTerminatableThread thread : workerThreads){
                thread.start();
            }
        }
    
        @Override
        public void shutdown(long timeout, TimeUnit unit) {
            for(AbstractTerminatableThread thread : workerThreads){
                thread.terminate();
                try {
                    thread.join(TimeUnit.MILLISECONDS.convert(timeout, unit));
                } catch (InterruptedException e) {
                }
            }
            delegate.shutdown(timeout, unit);
        }
    
    }
    
    
    public class PipeException extends Exception {
    
        private static final long serialVersionUID = 8647786507719222800L;
    
        /**
         * 抛出异常的Pipe实例
         */
        public final Pipe<?, ?> sourcePipe;
    
        public final Object input;
    
        public PipeException(Pipe<?, ?> sourcePipe, Object input, String message) {
            super(message);
            this.sourcePipe = sourcePipe;
            this.input = input;
        }
    
        public PipeException(Pipe<?, ?> sourcePipe, Object input, String message, Throwable cause) {
            super(message, cause);
            this.sourcePipe = sourcePipe;
            this.input = input;
        }
    }
    
    
    /**
     * 对各个处理阶段的计算环境进行抽象,主要用于异常处理
     * @author huzhiqiang
     */
    public interface PipeContext {
        public void handleError(PipeException exp);
    }
    
    
    
    /**
     * 测试代码
     * @author huzhiqiang
     *
     */
    public class ThreadPoolBasedPipeExample {
        public static void main(String[] args) {
            final ThreadPoolExecutor threadPoolExecutor;
            threadPoolExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors()*2, 60, TimeUnit.MINUTES,
                    new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
    
            final SimplePipeline<String, String> pipeLine = new SimplePipeline<String, String>();
    
            Pipe<String, String> pipe = new AbsractPipe<String, String>() {
                @Override
                public String doProcess(String input) throws PipeException {
                    String result = input + "->[pipe1, " + Thread.currentThread().getName() + "]";
                    System.out.println(result);
                    return result;
                }
            };
    
            pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
    
            pipe = new AbsractPipe<String, String>() {
                @Override
                public String doProcess(String input) throws PipeException {
                    String result = input + "->[pipe2, " + Thread.currentThread().getName() + "]";
                    System.out.println(result);
    
                    try {
                        Thread.sleep(new Random().nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    return result;
                }
            };
    
            pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
    
            pipe = new AbsractPipe<String, String>() {
                @Override
                public String doProcess(String input) throws PipeException {
                    String result = input + "->[pipe3, " + Thread.currentThread().getName() + "]";
                    System.out.println(result);
    
                    try {
                        Thread.sleep(new Random().nextInt(200));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    return result;
                }
    
                @Override
                public void shutdown(long timeout, TimeUnit unit) {
                    threadPoolExecutor.shutdown();
    
                    try {
                        threadPoolExecutor.awaitTermination(timeout, unit);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
    
            pipeLine.init(pipeLine.newDefaultPipeContext());
    
            int N = 10;
    
            try {
                for(int i=0; i<N; i++){
                    pipeLine.process("Task-" + i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            pipeLine.shutdown(10, TimeUnit.SECONDS);
        }
    }
    展开全文
  • Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...
  • Producer-Consumer生产者消费者模式

    这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。

    模式名称

    Producer-Consumer生产者消费者模式

    模式面对的问题

    有的线程的结果是另外一些线程的原料,也就是说,一些线程是生产者,另外一些线程是消费者,消费者需要生产者生产的东西才能正常运行,协调两者的关系成了一个大的问题。

    解决思路

    有一个中间的存储位置,用来存储生产者生产出来的东西,称之为通道。
    Producer生产者
    Product生产者所生产的任务
    Channel通道的抽象
    BlockingQueueChannel基于阻塞队列的Channel实现
    Consumer消费者

    Created with Raphaël 2.1.0 Client Client Producter Producter product product Channel Channel 1service() 2creat 3put() 4 5

    例子代码

    某内容管理系统需要支持对文档附件中的文件进行全文检索。改系统中,附件会被上传到专用的文件服务器上,对附件进行全文检索的功能模块也是部署在文件服务器上的。
    模式主类

    public class AttachmentProcessor {
        private final String ATTACHMENT_STORE_BASE_DIR = "/home/viscent/tmp/attachments/";
    
        //模式角色Producer-Consumer.Channer
        priuvate final Channer<File> channer = new BlockingQueueChannel<File>(
                new ArrayBlockingQueue<File>(200));
        //模式角色Producer-Consumer.Consumer
        private final AbstractTerminatableThread indexingThread = new
            AbstractTerminatableThread(){
            @Override
            protected void doRun()throws Exception{
                File file =null;
    
                file = channel.take();
                try{
                    indexFile(file);
                }catch(Exception e){
                    e.printStackTrace();
                }finally{
                    terminationToken.reservations.decrementAndGet();
                }
            }
    
            //根据制定文件生成全文搜索所需的索引文件
            private void indexFile(File file) throws Exception{
                //省略与模式无关的代码
    
                //模拟生成索引文件的时间消耗
                Radom rnd =new Random();
                try{
                    Thread.sleep(rnd.nextInt(100));
                }catch(InterruptedException e){
                    ;
                }
            }
        };
    
        public void init(){
            indexingThread.start();
        }
    
        public void shutdown(){
            indexingThread.terminate();
        }
    
        public void saveAttachment(InputStream in,
                String documentId,String originalFileName)throws IOException{
            File file = saveAsFile(in,documentId,originalFileName);
            try{
                channel.put(file);
            }catch(InterruptedException e){
                ;
            }
            indexingTread.terminationToken.reservations.incrementAndGet();
        }
    
        private FTPClient initFTPClient(String ftpServer,String userName,
                String password) throws Exception{
            FTPClient ftpClient = new FTPClient();
    
            FTOClientConfig config = new FTPClientConfig();
            ftpClient.config(config);
    
            int reply;
            ftpClient.connect(ftpServer);
    
            System.out.print(ftpClient.getReplyString());
    
            reply = ftpClient.getReplyCode();
    
            if(!dirName.equals(file.getCanoicalFile().getParent())){
                throw new SecurityException("Invalid originalFileName:"+originalFileName);
            }
    
            BufferedOutputStream bos =null;
            BufferedInputStream bis = new BufferedInputStream(in);
            byte[]buf = new byte[2048];
            int len = -1;
            try{
                bos =new BufferedOutputStream(new FileOutputSteram(file));
                while((len = bis.read(buf) > 0)){
                    bos.write(buf,0,len);
                }
                bos.flush();
            }finally{
                try{
                    bis.close();
                }catch(IOException e){
                    ;
                }
                try{
                    if(null != bos){
                        bos.close();
                    }
                }catch(IOException e){
                    ;
                }
            }
    
            ftpClient.setFileType(FTP.ASCII_FILE_TYPE);
            return ftpClient;
        }
    }

    Channel接口

    public interface Channel<P> {
        //从通道中取一个"产品"。
        P take() throws InterruptedException;
    
        //往通道里面存储一个"产品"。
        void put(P product) throws InterruptedException;
    }

    BlockingQueueChannel类

    public class BlockingQueueChannel<P> implements Channel<P> {
        private final BlockingQueue<P>queue;
        public BlockingQueueChannel(BlockingQueue<P>queue){
            this.queue = queue;
        }
    
        @Override
        public P take() throws InterruptedException{
            return queue.take();
        }
    
    
        @Override
        public void put(P product) throws InterruptedException{
            queue.put(product);
        }
    }

    模式的评价与实现考量

    生产这消费者模式是一个经典的线程模式吗,但是它也有一些容易出现的问题:
    1. 管道积压:生产者消费者模式中消费者的处理能力往往低于生产这的处理能力,会出现管道挤压的现象。处理这种现象,有集中方法:使用有界阻塞队列,队列到一定数量就不在生产,等待消费;使用有流量控制的无界阻塞队列,在线程的时间分配时对生产者的时间进行限制来平衡。
    2. 工作窃取算法:如果是多个消费者从管道中取得产品,会出现线程安全的问题,所以会有一个通道实例对应多个队列实例来处理。
    3. 线程的停止:整个模式也可以看做一个线程,这个线程的停止会比一般的线程要复杂一些,需要注意处理。
    4. 高性能高可靠性:这里的示例代码是一个比较一般的实现,如果有较高的要求,可以考虑Producer-Consumer模式实现库LMAX Disruptor:https://github.com/LMAX-Exchange/disruptor

    展开全文
  • Java多线程编程实战指南(设计模式篇) 黄文海著 中文pdf扫描版
  • 多线程设计模式之保护性暂停模式

    万次阅读 2020-09-14 14:56:30
    多线程设计模式之保护性暂停模式 定义 保护性暂停模式(Guarded Suspension Design Pattern):当线程在访问某个对象时,发现条件不满足时,就暂时挂起等待条件满足时再次访问。 如果某个结果需要在多线程之间传递,...

    多线程设计模式之保护性暂停模式

    定义

    保护性暂停模式(Guarded Suspension Design Pattern):当线程在访问某个对象时,发现条件不满足时,就暂时挂起等待条件满足时再次访问。

    如果某个结果需要在多线程之间传递,则可以让这些线程关联到一个对象GuardedObject,但是如果这个结果需要不断的从一个线程到另一个线程那么可以使用消息队列(生产者/消费者)

    Thread.join()、Future就采用了保护性暂停模式。

    简单实现

    package com.morris.concurrent.pattern.guardedsuspension.v2;
    
    public class GuardedObject<V> {
    
        private final Object lock = new Object();
    
        private V v;
    
        private boolean isFinished;
    
        public void set(V v) {
            synchronized (lock) {
                this.v = v;
                isFinished = true;
                lock.notifyAll();
            }
        }
    
        public V get() throws InterruptedException {
            synchronized (lock) {
                while (!isFinished) {
                    lock.wait();
                }
            }
            return v;
        }
    }
    

    超时实现

    public V get(long millis) {
        synchronized (lock) {
            while (!isFinished) {
                try {
                    lock.wait(millis);
                    break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return v;
    }
    

    上面的超时实现有点小问题。

    分析:假如超时时间是5s,但是在第2s的时候别人把这个线程中断了但是此时还没有结果,那么这个线程就会进入下次循环继续等待5s,这样这个线程就等待7s了。

    最终实现如下:

    public V get(long millis) {
        long begin = System.currentTimeMillis(); // 等待开始时间
        long waitedTime = 0; // 等待了多长时间
        synchronized (lock) {
            while (!isFinished) {
                if(waitedTime >= millis) {
                    System.out.println("超时了,不等了");
                    break;
                }
                try {
                    lock.wait(millis - waitedTime); // 继续等待剩余时间
                    break;
                } catch (InterruptedException e) {
                    waitedTime = System.currentTimeMillis() - begin;
                    System.out.println("被中断了,等了" + waitedTime + "ms");
                }
            }
        }
        return v;
    }
    

    使用保护性暂停模式实现Future和Callable

    public interface Callable<T> {
        T call();
    }
    
    public interface Future<T> {
        T get();
    }
    
    public class FutureTask<T> implements Future<T> {
    
        private boolean isFinish;
    
        private T t;
    
        public synchronized void set(T t) {
            this.t = t;
            this.isFinish = true;
            this.notifyAll();
        }
    
        @Override
        public synchronized T get() {
            while (!isFinish) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return t;
        }
    }
    
    public abstract class Executors {
    
        public static <T> Future<T> newSingleThreadPool(Callable<T> callable) {
            FutureTask<T> future = new FutureTask<>();
            new Thread(()->{
                T t = callable.call();
                future.set(t);
            }).start();
            return future;
        }
    
    }
    
    展开全文
  • 多线程设计模式.rar

    2010-07-13 14:15:25
    多线程设计模式 多线程设计模式 多线程设计模式 多线程设计模式 多线程设计模式
  • 多线程设计模式

    千次阅读 2012-04-09 20:42:49
    多线程设计模式: 1.Single Threaded Execution Pattern  [同一时刻只允许一个线程操作]    比喻:三个挑水的和尚,只能同一时间一个人过桥,不然都掉河里喂鱼了。  总结:在多个线程同时要访问的方法上...
  • Java多线程设计模式(带源码) Java多线程设计模式(带源码)Java多线程设计模式(带源码)
  • 图解Java多线程设计模式

    千次阅读 2017-09-12 16:18:39
    图解Java多线程设计模式
  • 多线程设计模式 - Future模式

    千次阅读 2019-11-24 21:57:14
    一起来看看多线程设计模式中的Future模式吧~概述Future模式是多线程开发中非常常见的一种设计模式,它的核心思想是异步调用。这类似我们日常生活中的在线购物流程,带在...
  • Java,多线程设计模式,电子书,阅读文档 Java,多线程设计模式,电子书,阅读文档 Java,多线程设计模式,电子书,阅读文档 Java,多线程设计模式,电子书,阅读文档 Java,多线程设计模式,电子书,阅读...
  • 多线程设计模式——Pipeline(流水线)模式

    万次阅读 多人点赞 2016-07-14 19:34:07
    Pipeline(流水线)模式
  • 多线程设计模式——最后总结

    千次阅读 2016-07-16 14:55:20
    多线程设计模式 最后总结
  • java多线程设计模式详解PDF及源码,java多线程设计模式详解PDF及源码,java多线程设计模式详解PDF及源码
  • java多线程设计模式详解 java多线程设计模式详解
  • 《Java多线程编程实战指南(设计模式篇)》采用Java(JDK1.6)语言和UML 为描述语言,并结合作者多年工作经历的相关实战案例,介绍了多线程环境下常用设计模式的来龙去脉:各个设计模式是什么样的及其典型的实际应用...
  • 虽然设计模式我们一般中用的很少,但是作为程序员设计模式是我们自我修养的一部分,so最近学习了一个设计模式.记下来喽: 观察者模式(有时又被称为模型-视图(View)模式、源-收听者(Listener)模式或从属者模式)是...
  • java多线程12种设计模式

    千次阅读 2014-07-09 22:22:15
    1、Single Threaded Execution Pattern(单线程执行模式)
  • 模式名称Promise(承诺)模式
  • Thread Specific Storage(线程特有存储)模式
  • 国内首部Java多线程设计模式原创作品《Java多线程编程实战指南(设计模式篇)》已由电子工业出版社出版。本书从理论到实战,用生活化的实例和通俗易懂的语言全面介绍Java多线程编程的"三十六计"——多线程设计模式

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 473,589
精华内容 189,435
关键字:

多线程的设计模式

友情链接: manual.rar