精华内容
下载资源
问答
  • java同步方法异步处理

    千次阅读 2015-12-29 12:17:27
    有时需要处理一个大任务,这个大任务处理完可能需要80秒,如果按照正常来访问就会出现超时,这时就会想着把这个大任务拆分,比如可以分成两个(根据任务性质来拆分)可以并行处理的子任务,第一个子任务用时30秒,第...

    有时需要处理一个大任务,这个大任务处理完可能需要80秒,如果按照正常来访问就会出现超时,这时就会想着把这个大任务拆分,比如可以分成两个(根据任务性质来拆分)可以并行处理的子任务,第一个子任务用时30秒,第二个子任务用时30秒,再花20秒来整合结果,这样只用了50秒就完成了,就不会超时了。

    下面是这个思路的一个简单实现:

    任务一代码:

    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    
    public class Task001  implements Callable<String>{
    
    	private String name;
    	private CountDownLatch countDown;
    	public Task001(String name){
    		this.name=name;
    	}
    	public Task001(String name,CountDownLatch countDown){
    		this(name);
    		this.countDown=countDown;
    	}
    	@Override
    	public String call() throws Exception {
    		long begin=System.currentTimeMillis();
    		System.out.println(name+" 任务一开始....");
    		Thread.sleep(3000);
    		System.out.println(name+" 任务一结束....");
    		System.out.println("任务一 用时:"+((System.currentTimeMillis()-begin)/1000)+"秒");		
    		if(this.countDown!=null)this.countDown.countDown();		
    		return "成功一";
    	}
    
    }

    任务二代码:

    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    
    public class Task002  implements Callable<String>{
    
    	private String name;
    	private CountDownLatch countDown;
    	
    	public Task002(String name){
    		this.name=name;
    	}
    	public Task002(String name,CountDownLatch countDown){
    		this(name);
    		this.countDown=countDown;
    	}
    	@Override
    	public String call() throws Exception {
    		long begin=System.currentTimeMillis();
    		System.out.println(name+" 第二个分任务开始....");
    		Thread.sleep(5000);
    		System.out.println(name+" 第二个分任务结束....");
    		System.out.println("任务二 用时:"+((System.currentTimeMillis()-begin)/1000)+"秒");
    		if(this.countDown!=null)this.countDown.countDown();	
    		return "成功二";
    	}
    
    }
    
    主方法的两种实现

    实现一:

    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    		long begin=System.currentTimeMillis();
    		ExecutorService  executorService =Executors.newFixedThreadPool(2);
    		Future<String> future=executorService.submit(new Task001("张三"));
    		Future<String> future2=executorService.submit(new Task002("李四"));
    		executorService.shutdown();
    		System.out.println("*-------------");
    		while(!executorService.isTerminated()){
    			Thread.sleep(100);
    		}
    		System.out.println("执行完善结果:"+future.get()+":"+future2.get()+" 总用时:"+((System.currentTimeMillis()-begin)/1000)+"秒");
    	}

    实现二:

    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    		CountDownLatch  countDown=new CountDownLatch(2);
    		long begin=System.currentTimeMillis();
    		ExecutorService  executorService =Executors.newFixedThreadPool(2);
    		Future<String> future=executorService.submit(new Task001("张三",countDown));
    		Future<String> future2=executorService.submit(new Task002("李四",countDown));
    		executorService.shutdown();
    		System.out.println("*-------------");
    		countDown.await();
    		System.out.println("执行完善结果:"+future.get()+":"+future2.get()+" 总用时:"+((System.currentTimeMillis()-begin)/1000)+"秒");
    	}
    运行结果:

    张三 任务一开始....
    李四 第二个分任务开始....
    *-------------
    张三 任务一结束....
    任务一 用时:3秒
    李四 第二个分任务结束....
    任务二 用时:5秒
    执行完善结果:成功一:成功二 总用时:5秒

    可以从结果看出:第一个任务用了3秒、第二个任务用了5秒。但是总的用时只有5秒而不是8秒。

    展开全文
  • 使用ExtJs处理批量数据时,使用for循环通过Ext Ajax调用后台,代码如下: for(var i=0; i; i++){ Ext.Ajax.request({ url:this.url, params:params[i], method:'POST', success:function(response) { },...
    【问题】 
    

    使用ExtJs处理批量数据时,使用for循环通过Ext Ajax调用后台,代码如下:

    for(var i=0; i<params.length; i++){
    	Ext.Ajax.request({
    		url:this.url,
    		params:params[i],
    		method:'POST',
    		success:function(response) {
    		},
    		scope:this
    	});
    }

    因为Ajax是异步处理的,导致后台接收到的数据无法保持正常的次序,后台的逻辑就会出现错乱。

    【思路】

    最简单的思路就是将异步调用转换为同步调用,但找了很多办法,Ext.Ajax.request要支持同步发送需要修改核心代码,这个才目前的产品上是不允许的;其它的同步调用的方法暂时没有找到,于是放弃了此思路;

    另外的思路就是Ext.Ajax.request真正处理结束后才执行下一次调用:

    1. 此时Ext.Ajax.request的success回调是不行的,因为只在成功的时候才会执行此回调,因此需要使用callback回调方法;

    2. 如果使用for循环,不会等到Ext.Ajax.request的回调,就进行下一个循环了,所以for循环不行;如果在Ext.Ajax.request没有回调前,在for中写一个等待的逻辑,例如while(isOk);显然也会有性能等诸多问题;

    3. 如何才能解决2的问题,最好的办法是使用递归模拟for循环;

    【执行】

    添加一个递归函数:

    batchProcess: function(index, length, params){
    	if(index >= length){
    		alert('处理结束');
    		return;
    	}else{
    		Ext.Ajax.request({
    			url:this.url,
    			params:params[index],
    			method:'POST',
    			success:function(response) {
    				batchProcess(++index, length, params);
    			},
    			scope:this
    		});	
    	}
    }

    然后使用batchProcess(0, params.length, params);进行调用。

    至此我们发现问题得到解决,此外还发现另外一个好处就是,我们可以控制批量执行时从第几条数据开始执行,例如从第3条开始执行:batchProcess(2, params.length, params);

    【扩展:使用Java模拟异步处理,并使用递归同步调用】

    【1:使用线程模拟Ext.Ajax.request调用】

    package sync;
    
    /**
     * 使用线程模拟异步处理逻辑
     * 
     * @author 李文锴
     * @since 2012-8-16 下午05:20:39
     * 
     */
    public abstract class Asyn extends Thread {
    
    	@Override
    	public void run() {
    		try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		onSuccess();
    	}
    
    	/**
    	 * 当异步处理成功时的回调
    	 * 
    	 * @Description:
    	 * @author 李文锴
    	 * @since:2012-8-16 下午05:21:32
    	 */
    	public abstract void onSuccess();
    }
    

     

    【2:使用Java模拟for循环调用异步逻辑】

    package sync;
    
    /**
     * for循环执行异步逻辑
     * 
     * @author 李文锴
     * @since 2012-8-17 上午08:46:07
     * 
     */
    public class AsynFor {
    
    	public static void main(String[] args) {
    		String[] params = { "aaa", "bbb", "ccc", "ddd" };
    		new AsynFor().asynProcess(params);
    	}
    
    	/**
    	 * 模拟对于异步方法执行for循环
    	 * 
    	 * @Description:
    	 * @param params
    	 * @author 李文锴
    	 * @since:2012-8-17 上午08:32:22
    	 */
    	public void asynProcess(String[] params) {
    		for (int i = 0; i < params.length; i++) {
    			doBusinessProcess(i, params[i]);
    		}
    	}
    
    	/**
    	 * 异步调用执行业务逻辑
    	 * 
    	 * @Description:
    	 * @param index
    	 * @author 李文锴
    	 * @since:2012-8-17 上午08:46:45
    	 */
    	private void doBusinessProcess(final int index, final String data) {
    		new Asyn() {
    
    			@Override
    			public void onSuccess() {
    				System.out.println("业务执行:" + index);
    				System.out.println(index + " : 业务完成后输出 => " + data + "\n");
    			}
    		}.start();
    	}
    
    }
    

    执行后输出如下:

    业务执行:2
    2 : 业务完成后输出 => ccc
    
    业务执行:0
    0 : 业务完成后输出 => aaa
    
    业务执行:1
    1 : 业务完成后输出 => bbb
    
    业务执行:3
    3 : 业务完成后输出 => ddd


    我们发现并没有按照正常的次序执行,通过上面的Java代码复现了在ExtJs中会遇到的问题

    【3:使用Java模拟递归处理批量异步逻辑同步执行问题】

    package sync;
    
    /**
     * 使用递归模拟for循环处理业务逻辑,同步执行
     * 
     * @author 李文锴
     * @since 2012-8-17 上午08:47:33
     * 
     */
    public class SyncFor {
    
    	public static void main(String[] args) {
    		String[] params = { "aaa", "bbb", "ccc", "ddd" };
    		new SyncFor().syncProcess(0, params.length, params);
    	}
    
    	/**
    	 * 使用递归进行同步输出
    	 * 
    	 * @Description:
    	 * @param index
    	 * @param length
    	 * @param params
    	 * @author 李文锴
    	 * @since:2012-8-17 上午08:29:10
    	 */
    	public void syncProcess(final int index, final int length, final String[] params) {
    		if (index >= length) {
    			return;
    		} else {
    
    			new Asyn() {
    
    				@Override
    				public void onSuccess() {
    					System.out.println("业务执行:" + index);
    					System.out.println(index + " : 业务完成后输出 => " + params[index] + "\n");
    
    					syncProcess(index + 1, length, params);
    				}
    			}.start();
    		}
    	}
    }
    


    执行输出如下:

    业务执行:0
    0 : 业务完成后输出 => aaa
    
    业务执行:1
    1 : 业务完成后输出 => bbb
    
    业务执行:2
    2 : 业务完成后输出 => ccc
    
    业务执行:3
    3 : 业务完成后输出 => ddd


    结果按照我们期望的异步逻辑同步处理了。

    【延伸】

    1. “同步、异步、线程、并发、锁、队列、点对点、发布订阅、主题、消息”这些都是同一个领域的相关概念,融汇贯通,其本质是一样的。

    2. 要分清楚什么时候需要使用异步、什么时候需要使用同步。如果不存在一段代码调用后立刻执行后面的逻辑(不需要等后面处理结束),还是建议使用同步处理,这样可能用户体验会有写问题,但产品质量不会存在问题。

    3. 用户体验和产品质量是个两难的问题,需要平衡。

     

     

     

     

     

    展开全文
  • JAVA同步转异步

    千次阅读 2013-10-29 18:07:20
    * 如果队列设置太大,导致即使处理不过来,也不会增加线程,更不会启动拒绝策略,意思就是:我现在处理不过来了,积压着吧,等我慢慢处理。 * 最终要么是队列增大,设置超过JVM内存,最终因为内存溢出而挂了...



    客户端使用,代码清洁,装饰器模式

     

    public class JobHandlerAsyncDemo {
    
        public static class JobHandlerDirect implements IJobHandler<String> {
            @Override
            public void doJob(String job) {
                System.out.println("do job ..."+job);
            }
        }
    
        public static void main(String[] args) {
            IJobHandler<String> jobHandlerDirect = new JobHandlerDirect();//同步的
            IJobHandler<String> jobHanderAsync = new JobHandlerAsync<String>(3,500,"JobHand",jobHandlerDirect);//同步转换成异步
            jobHanderAsync.doJob("1");
            jobHanderAsync.doJob("2");
            jobHanderAsync.doJob("3");
        }
    
    }
    

     

    先告诉我,如果不异步的时候,需要如何处理?实现IJobHandler接口

    public interface IJobHandler<T> {
    
    	public void doJob(T job);
    }
    


    装饰器JobHandlerAsync是如何把一个同步处理转换成异步处理的

    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class JobHandlerAsync<T> implements JobHandlerAsyncMBean,IJobHandler<T> {
    
    	private static final Logger log = LoggerFactory.getLogger(JobHandlerAsync.class);
    
    	private int queueCapacity;
    	private BlockingQueue<Runnable> queue;
    
    	private ThreadPoolExecutor executor;
    
    	private IJobHandler<T> delegate;
    
    	private static final AtomicInteger instanceCount = new AtomicInteger(1);
    	private int instanceNum;
    
    	public JobHandlerAsync(int fixedThreadSize, int queueCapacity, String threadTag,IJobHandler<T> hbb) {
    		this(hbb, fixedThreadSize, fixedThreadSize, 1000*60*2, threadTag, fixedThreadSize, new LinkedBlockingQueue<Runnable>(queueCapacity));
    	}
    
    	public JobHandlerAsync(IJobHandler<T> hbb) {
    		/*
    		 * 队列大小不宜太大,积压超过50个,立马要增加线程来处理;如果线程增加了,还是处理不过了,基本表示系统后方出问题了。
    		 * 后方出问题了,应该尽管启动拒绝策略,决绝策略可以报警。
    		 * 如果队列设置太大,导致即使处理不过来,也不会增加线程,更不会启动拒绝策略,意思就是:我现在处理不过来了,积压着吧,等我慢慢处理。
    		 * 最终要么是队列增大,设置超过JVM内存,最终因为内存溢出而挂了;要么是客户端长时间等待,客户端认为超时,响应已经没有意义了。
    		 * */
    		this(hbb, 5, 10, 1000*60*2, "JobHandler", 50);
    	}
    
    	public JobHandlerAsync(IJobHandler<T> hbb, int coreThreadSize, int maxThreadSize, int keepAliveSec,String threadTag, int queueCapacity) {
    	    this(hbb, coreThreadSize, maxThreadSize, keepAliveSec, threadTag, queueCapacity, new LinkedBlockingQueue<Runnable>(queueCapacity));
    	}
    
    	protected JobHandlerAsync(IJobHandler<T> hbb, int coreThreadSize, int maxThreadSize, int keepAliveSec,String threadTag, int queueCapacity,BlockingQueue<Runnable> queue) {
    		this.threadTag = threadTag;
    
    		this.delegate = hbb;
    
    		this.queueCapacity = queueCapacity;
    		this.queue = queue;
    
    		RejectedExecutionHandler rejectedHandler = new AlertPolicy();
    		ThreadFactory threadFactory = new TagThreadFactory(threadTag);
    
    		this.executor = new ThreadPoolExecutor(coreThreadSize,maxThreadSize, keepAliveSec, TimeUnit.SECONDS, queue,threadFactory,rejectedHandler);
    		this.instanceNum = instanceCount.getAndIncrement();
    
    		/* 注册JMX */
    		String beanName = getMBeanName();
    	    if (JmxMBeanManager.getInstance().isRegistered(beanName)) {
    	        log.warn("MBean '{}' is already registered, removing...", beanName);
    	        JmxMBeanManager.getInstance().unregisterMBean(beanName);
    	    }
    
    	    log.info("Registering MBean '{}'...", beanName);
    	    JmxMBeanManager.getInstance().registerMBean(this, beanName);
    	}
    
    	private String threadTag;
    
    	 private String getMBeanName() {
    	    //return JMX_MBEAN_OBJ_NAME + "-" + instanceNum;
    		//"com.sohu.tv.live.counter:type=JobHandlerAsync"
    		 //com.sohu.tv.utils.threads
    		 return "com.sohu.tv.utils.threads:type="+threadTag + "-" + instanceNum;
    	 }
    
    	/** 拒绝策略自己实现的意义:系统处理不过来的时候,可以短信告警,即使发现系统的问题。*/
    	public static class AlertPolicy implements RejectedExecutionHandler  {
    		/* new ThreadPoolExecutor.AbortPolicy() Not Fit */
    		@Override
    		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    			log.warn("reject alert Job: {}", r);//目前登记日志报警,可以发短信的
    		}
    	}
    
    	/** 线程工厂自己实现的意义:可以统计程序在什么时间段增加了线程;还能给线程起名称(这样jstack看起来方便)。*/
    	public static class TagThreadFactory implements ThreadFactory {
    		/* 代码取自:  Executors.defaultThreadFactory(); */
    		static final AtomicInteger poolNumber = new AtomicInteger(1);
            final ThreadGroup group;
            final AtomicInteger threadNumber = new AtomicInteger(1);
            final String namePrefix;
    
            TagThreadFactory(String tagName) {
                SecurityManager s = System.getSecurityManager();
                group = (s != null)? s.getThreadGroup() :
                                     Thread.currentThread().getThreadGroup();
    
                namePrefix = tagName+"-P"+poolNumber.getAndIncrement() +"T";
            }
    
            /**线程名称:Tag-PnTm-timestamp*/
            @Override
            public Thread newThread(Runnable r) {
            	String namePostfix = "-"+System.currentTimeMillis();//系统调用,比较消耗性能,但是能标识线程是什么时候创建的
            	/*标识线程的创建时间,从某种意义上能看出这个线程被复用的程度*/
    
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement() + namePostfix,
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
    
    	}
    
    
    
    	@Override
    	public int getActiveThreadSize() {
    		return executor.getActiveCount();
    	}
    
    	@Override
    	public int getAliveThreadSize() {
    		return executor.getPoolSize();
    	}
    
    	@Override
    	public int getQueueSize() {
    		return queue.size();
    	}
    
    
    	@Override
    	public int getHistoricalPeekThreadSize() {
    		return executor.getLargestPoolSize();
    	}
    
    	@Override
    	public long getHistoricalTotalTaskCount() {
    		return executor.getTaskCount();
    	}
    
    	@Override
    	public long getHistoricalCompletedTaskCount() {
    		return executor.getCompletedTaskCount();
    	}
    
    
    	@Override
    	public int getCoreThreadSize() {
    		return executor.getCorePoolSize();
    	}
    
    	@Override
    	public int getMaxThreadSize() {
    		return executor.getMaximumPoolSize();
    	}
    
    	@Override
    	public int getQueueCapacity() {
    		return queueCapacity;
    	}
    
    	@Override
    	public void doJob(T job) {
    		executor.execute(new JobRunnable(job));
    	}
    
    	public class JobRunnable implements Runnable {
    
    		private final T job;
    
    		public JobRunnable(T attachment) {
    			this.job = attachment;
    		}
    
    		@Override
    		public void run() {
    			delegate.doJob(job);
    		}
    
    		@Override
    		public String toString() {
    			return job.toString();
    		}
    
    	}
    
    
    
    	@Override
    	public void shutdown() {
    		executor.shutdown();
    	}
    
    	@Override
    	public List<Runnable> shutdownNow() {
    		return executor.shutdownNow();
    	}
    
    
    }
    

     

    支持JMX管理的Bean,另外为了运维方便,给线程都打上Tag

    import java.util.List;
    
    public interface JobHandlerAsyncMBean {
    	/* 标准MBean条件:MBean接口和实现必须在同一个包下;实现类和MBean接口名称仅仅相差MBean后缀,接口:ABCMBean,那实现类必须是ABC */
    	String JMX_MBEAN_OBJ_NAME = "com.sohu.tv.live.counter:type=JobHandlerAsync";
    	
    	
    	/* 第一部分:当前信息部分 */
    	
    	/** 返回当前有多少个线程正在忙碌地处理报文 */
    	public int getActiveThreadSize();
    	
    	/** 返回当前有多少个线程是存活的。注意:存活的(Alive)-忙碌的(Active)=空闲的Idle */	
    	public int getAliveThreadSize();
    	
    	/** 返回当前队列中积压的作业 */
    	public int getQueueSize();
    	
    	
    	/* 第二部分: 统计信息部分  */
    	
    	/** 返回历史上最高峰值  */
    	public int getHistoricalPeekThreadSize();
    	
    	/** 返回累计到现在处理了多少个Job */
    	public long getHistoricalTotalTaskCount();
    	
    	/** 返回累计到现在处理成功了多少个Job */
    	public long getHistoricalCompletedTaskCount();
    	
    	
    	
    	/* 第三部分: 配置信息部分 */
    	
    	/** 返回最小设置(初始化不一定能到达最小设置的)*/
    	public int getCoreThreadSize();
    	
    	/** 返回最大设置*/
    	public int getMaxThreadSize();
    	
    	/** 返回当前队列的最大容量 */
    	public int getQueueCapacity();
    	
    	public void shutdown();
    	
    	public List<Runnable> shutdownNow();
    
    }
    


     

    import java.lang.management.ManagementFactory;
    
    import javax.management.MBeanServer;
    import javax.management.ObjectName;
    
    public class JmxMBeanManager {
        private static final Object mbsCreateMonitor = new Object();
        private static JmxMBeanManager thisObj;
    
        private final MBeanServer mbs;
    
        public JmxMBeanManager() {
            mbs = ManagementFactory.getPlatformMBeanServer();
        }
    
        public static JmxMBeanManager getInstance() {
            synchronized (mbsCreateMonitor) {
                if (null == thisObj) {
                    thisObj = new JmxMBeanManager();
                }
            }
    
            return thisObj;
        }
    
        public boolean isRegistered(String name) {
            try {
                ObjectName objName = new ObjectName(name);
                return mbs.isRegistered(objName);
            }
            catch (Exception e) {
                throw new RuntimeException("exception while checking if MBean is registered, " + name, e);
            }
        }
    
        public void registerMBean(Object theBean, String name) {
            try {
                ObjectName objName = new ObjectName(name);
                mbs.registerMBean(theBean, objName);
            }
            catch (Exception e) {
                throw new RuntimeException("exception while registering MBean, " + name, e);
            }
        }
    
        public void unregisterMBean(String name) {
            try {
                ObjectName objName = new ObjectName(name);
                mbs.unregisterMBean(objName);
            }
            catch (Exception e) {
                throw new RuntimeException("exception while unregistering MBean, " + name, e);
            }
        }
    
        public Object getAttribute(String objName, String attrName) {
            try {
                ObjectName on = new ObjectName(objName);
                return mbs.getAttribute(on, attrName);
            }
            catch (Exception e) {
                throw new RuntimeException("exception while getting MBean attribute, " + objName + ", " + attrName, e);
            }
        }
    
        public Integer getIntAttribute(String objName, String attrName) {
            return (Integer) getAttribute(objName, attrName);
        }
    
        public String getStringAttribute(String objName, String attrName) {
            return (String) getAttribute(objName, attrName);
        }
    }
    


     

     

     

     

     

    展开全文
  • java同步队列,包括非阻塞队列与阻塞队列

    在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。

    使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。

    非阻塞的实现方式则可以使用循环CAS的方式来实现。

    ConcurrentLinkedQueue

    我们一起来研究一下如何使用非阻的方式来实现线程安全队列ConcurrentLinkedQueue的
    ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

    ConcurrentLinkedQueue结构

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
        private transient volatile Node<E> head;//头指针
        private transient volatile Node<E> tail;//尾指针
        public ConcurrentLinkedQueue() {//初始化,head=tail=(一个空的头结点)
            head = tail = new Node<E>(null);
        }
        private static class Node<E> {
            volatile E item;
            volatile Node<E> next;//内部是使用单向链表实现
            ......
        }
        ......
    }

    入队

        public boolean offer(E e) {
            checkNotNull(e);
            final Node<E> newNode = new Node<E>(e);//入队前,创建一个新节点
    
            for (Node<E> t = tail, p = t;;) {//除非插入成功并返回,否则反复循环
                Node<E> q = p.next;
                if (q == null) {
                    // p is last node
                    if (p.casNext(null, newNode)) {//利用CAS操作,将p的next指针从旧值null更新为newNode 
                        if (p != t) // hop two nodes at a time
                            casTail(t, newNode);  // Failure is OK.利用CAS操作更新tail,如果失败说明其他线程添加了元素,由其他线程负责更新tail
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next 如果添加元素失败,说明其他线程添加了元素,p后移,并继续尝试
                }
                else if (p == q) //如果p被移除出链表,我们需要调整指针重新指向head,否则我们指向新的tail
                    p = (t != (t = tail)) ? t : head;
                else
                    //p指向tail或者q
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }

    casTail(cmp,value)方法用于更新tail节点。tail被设置为volatile保证可见性。

    p.casNext(cmp,value)方法用于将入队节点设置为当前队列尾节点的next节点。value也被设置为volatile。

    对于出队操作,也是使用CAS的方式循环尝试将元素从头部移除。

    因为采用CAS操作,允许多个线程并发执行,并且不会因为加锁而阻塞线程,使得并发性能更好。

    Java中的阻塞队列

    本节将介绍什么是阻塞队列,以及Java中阻塞队列的4种处理方式,并介绍Java 7中提供的7种阻塞队列,最后分析阻塞队列的一种实现方式。

    什么是阻塞队列

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
    1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    JDK 7提供了7个阻塞队列,如下。
    ·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    ·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
    ·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    ·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    ·SynchronousQueue:一个不存储元素的阻塞队列。
    ·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    ·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    内容来自:

    《并发编程的艺术》

    展开全文
  • 三、同步操作 1.创建节点 2.获取数据 3.设置数据 4.获取子节点 5.节点权限控制 6.节点存在 7.删除节点 一、zookeeper java api zookeeper 可以通过java api连接操作,进行ZNode的创建删除,数据的获取设置...
  • Java同步锁synchronized的最全总结

    千次阅读 2019-10-18 11:34:35
    一、并发同步问题   线程安全是Java并发编程中的重点,而造成线程安全问题的主要原因有两点,一是存在共享数据(也称临界资源),二是存在多条线程共同操作共享数据。因此,当存在多个线程操作共享数据时,需要保证...
  • Java同步框架AQS原文分析

    千次阅读 2017-04-06 09:04:10
    0、引言自J2SE1.5开始,java中的同步类(Lock,Semphore等等)都基于AbstractQueuedSynchronizer(后文简称AQS)。AQS提供了一种原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本文主要是分析此...
  • Java 同步和 异步 的区别、联系

    万次阅读 2018-02-25 23:34:25
    但话又说回来了,既然逃避掉,那我们就坦然面对吧~今天就让我们一起来研究一下常见的并发和同步吧。 为了更好的理解并发和同步,我们需要先明白两个重要的概念:同步和异步    2、如何处理并发和同步  今天...
  • http://www.iteye.com/topic/1131578#2399581 ...https://www.ibm.com/developerworks/cn/java/j-lo-javaio/ 同步和异步站在任务调度者看任务之间有无顺序关系; 阻塞和非阻塞是站在CPU角度看内设(cpu
  • JAVA 线程 同步 信号量

    千次阅读 2011-05-02 17:29:00
    JAVA 线程 同步 信号量
  • java同步(synchronized)详解

    千次阅读 2015-04-06 19:05:52
    Java中内置了语言级的同步原语--synchronized,这也大大简化了Java中多线程同步的使用。我们首先编写一个非常简单的多线程的程序,是模拟银行中的多个线程同时对同一个储蓄账户进行存款、取款操作的。 在程序中...
  • java实现同步的几种方式(总结)

    万次阅读 2018-05-09 11:53:05
     java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查), 将会导致数据准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用, 从而...
  • java io模型,及相关同步异步,阻塞非阻塞概念解析
  • 深入理解Java并发之synchronized实现原理

    万次阅读 多人点赞 2017-06-04 17:44:44
    【版权申明】未经博主同意,谢绝转载!(请尊重原创,博主保留追究权) ... 出自【zejian的博客】...深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深...
  • 多线程三个特征:原子性、可见性以及有序性. ...Java 虚拟机中的同步(Synchronization)基于进入和退出管程(Monitor)对象实现, 无论是显式同步(有明确的 monitorenter 和 monitorexit 指令,即同...
  • java 对两张表进行数据同步

    千次阅读 2021-01-06 17:39:53
    java对sqlserver两张表进行同步操作 功能描述 在大型项目中,我们经常会用到读写分离技术来进行优化,及一张主表仅用来查询。一张附表用来增删改。我们现在要做的功能就是当附表数据变动后,要同步更新主表。这里...
  • Java中的多线程与同步

    千次阅读 2016-04-24 11:38:06
    一、进程与线程  进程是可并发执行的程序在一个数据集上的一次执行过程,它是系统进行资源分配的基本单位。  线程为进程所有,作为调度执行的基本单位,...并发执行是为了增强计算机系统的处理能力和提高资源利用率,
  • java.util.concurrent 同步器框架详解

    千次阅读 多人点赞 2019-07-05 09:51:49
    一般的应用系统中,存在着大量的计算和大量的 I/O 处理,通过多线程可以让系统运行得更快。但在 Java 多线程编程中,会面临很多的难题,比如线程安全、上下文切换、死锁等问题。 线程安全 引用 《Java Concurrency ...
  • 要说明线程同步问题首先要说明Java线程的两个特性,可见性和有序性。多个线程之间是能直接传递数据交互的,它们之间的交互只能通过共享变量来实现。例如,假设在多个线程之间共享了Count类的一个对象,Count类有一...
  • Java中,最基本的互斥同步手段就是synchronized关键字,synchronized关键字经过编译 之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令,这两个字节 码都需要一个reference类型的参数来...
  • Java并发编程2-同步

    千次阅读 2014-03-27 10:57:44
    上一篇文章中介绍了原子性和可见性,让我们知道了为什么需要使用同步,这篇文章将介绍怎么在Java中使用同步。首先从Java中的同步机制原理开始,然后介绍同步可能导致的风险,最后介绍Java中常见的同步方法。 Java的...
  • java实现同步的几种方式

    千次阅读 2018-03-05 22:25:11
    java允许许多线程并发控制,当多个线程同时操作一个可共享的资源变量是(如数据的增、删、改、查),将会导致数据准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而...
  • java 分布式事务处理

    万次阅读 2007-09-04 14:15:00
    例如,考虑一个在三个分离的远程数据库上修改的 客户帐户平衡表,如果在事务写阶段,任何一个数据库连接失败,数据库之间就失去同步。怎样检测并更正这种情形呢?事务处理(TP)监示一个叫做两阶段提交 的过程并在某种...
  • java后台简单异步处理例子

    万次阅读 2017-06-30 01:08:13
    背景:最近项目的一个需求,要求新建一个演练任务向某个主机发起调用攻击指令,同时等待对方主机告警信息同步到本机数据库表上后,将当前演练任务必须关联上告警信息id思考:因为涉及多个动作,且中间等待对方主机...
  • Java 多线程同步的五种方法

    千次阅读 2016-05-23 21:06:07
    Java 多线程同步的五种方法 一、引言 前几天面试,被大师虐残了,好多基础知识必须得重新拿起来啊。闲话多说,进入正题。 二、为什么要线程同步 因为当我们有多个线程要同时访问一个变量或对象时,...
  • Java中的同步与异步

    千次阅读 2012-10-02 21:13:51
    经常看到介绍 ArrayList 和HashMap是异步,Vector和HashTable是同步,这里同步是线程安全的,异步不是线程安全的,举例说明:  当创建一个Vector对象时候,  Vector ve=new Vector();  ve.add("1");  当...
  • java中的同步与异步

    万次阅读 2012-04-30 23:14:44
    经常看到介绍 ArrayList 和HashMap是异步,Vector和HashTable是同步,这里同步是线程安全的,异步不是线程安全的,举例说明:  当创建一个Vector对象时候,  Vector ve=new Vector();  ve.add("1");  当...
  • 概述同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。本文就目前常用...
  • 如何解决Java线程同步中的阻塞问题

    千次阅读 2012-03-30 20:29:00
    Java线程同步需要我们不断的进行相关知识的学习,下面我们就来看看如何才能更好的在学习中掌握相关的知识讯息,来完善我们自身的编写手段。希望大家有所收获。  Java线程同步的优先级代表该线程的重要程度,当有多...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 185,721
精华内容 74,288
关键字:

java同步不成功的处理

java 订阅