精华内容
下载资源
问答
  • Exchanger

    2018-09-27 13:42:53
    Exchanger

    1 Exchange概述

    Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据。

    当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。
    如下示例:

    public class ExchangeThread {
    
    	private static AtomicInteger count = new AtomicInteger(0);
    	
    	static class ProducerExchange {
    		private Exchanger<Integer> exchanger;
    		
    		private Thread thread = new Thread() {
    			public void run() {
    				while(!this.isInterrupted()) {
    					try {
    						Integer tmpCount = count.getAndAdd(2);
    						System.out.println("Producer before : "+tmpCount);
    						tmpCount = exchanger.exchange(tmpCount);
    						System.out.println("Producer after : "+tmpCount);
    						Thread.sleep(2000);
    					} catch (InterruptedException e) {
    						System.out.println("线程中断");
    						this.isInterrupted();// 重新设置中断标志
    					} finally {
    						System.out.println("===========================");
    					}
    				}
    			};
    		};
    
    		public void setExchanger(Exchanger<Integer> exchanger) {
    			this.exchanger = exchanger;
    		}
    		
    		public void start() {
    			thread.start();
    		}
    	}
    	static class ConsumerExchange {
    		private Exchanger<Integer> exchanger;
    		
    		private Thread thread = new Thread() {
    			public void run() {
    				while(!this.isInterrupted()) {
    					try {
    						Thread.sleep(4000);
    						Integer tmpCount = count.decrementAndGet();
    						System.out.println("Consumer before : " + tmpCount);
    						tmpCount = exchanger.exchange(tmpCount);
    						System.out.println("Consumer after : "+ tmpCount);
    					} catch (InterruptedException e) {
    						this.isInterrupted();
    					}
    				}
    			};
    		};
    		
    		public void setExchanger(Exchanger<Integer> exchanger) {
    			this.exchanger = exchanger;
    		}
    		
    		public void start() {
    			thread.start();
    		}
    	}
    	
    	public static void main(String[] args) {
    		Exchanger<Integer> exchanger = new Exchanger<Integer>();
    		ProducerExchange producerExchange = new ProducerExchange();
    		ConsumerExchange consumerExchange = new ConsumerExchange();
    		producerExchange.setExchanger(exchanger);
    		consumerExchange.setExchanger(exchanger);
    		producerExchange.start();
    		consumerExchange.start();
    	}
    }
    
    
    

    2 Exchanger源码分析:

     private Object doExchange(Object item, boolean timed, long nanos) {
            Node me = new Node(item);                 
            int index = hashIndex();                  //根据thread id计算出自己要去的那个交易位置(slot)
            int fails = 0;                            
    
            for (;;) {
                Object y;                             
                Slot slot = arena[index];
                if (slot == null)                    
                    createSlot(index);     //slot = null,创建一个slot,然后会回到for循环,再次开始
                else if ((y = slot.get()) != null &&  //slot里面有人等着(有Node),则尝试和其交换
                         slot.compareAndSet(y, null)) { //关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
                    Node you = (Node)y;               
                    if (you.compareAndSet(null, item)) {//把Node里面的东西,换成自己的
                        LockSupport.unpark(you.waiter); //唤醒对方
                        return you.item; //自己把对方的东西拿走
                    } //关键点2:如果你运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始                          
                }
                else if (y == null &&                 //slot里面为空(没有Node),则自己把位置占住
                         slot.compareAndSet(null, me)) {
                    if (index == 0)                   //如果是0这个位置,自己阻塞,等待别人来交换
                        return timed? awaitNanos(me, slot, nanos): await(me, slot);
                    Object v = spinWait(me, slot);    //不是0这个位置,自旋等待
                    if (v != CANCEL)  //自旋等待的时候,运气好,有人来交换了,返回
                        return v;
                    me = new Node(item);     //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环    
                    int m = max.get();
                    if (m > (index >>>= 1))           
                        max.compareAndSet(m, m - 1); 
                }
                else if (++fails > 1) { //失败 case1: slot有人,要交互,但被人家抢了  case2: slot没人,自己要占位置,又被人家抢了    
                    int m = max.get();
                    if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                        index = m + 1;   //3次匹配失败,把index扩大,再次开始for循环             
                    else if (--index < 0)
                        index = m;                   
                }
            }
        }
    
    
    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,710
精华内容 684
关键字:

exchanger