闭锁/栅栏/信号量/FutureTask分析及使用
1、闭锁
用途:可用于命令一组线程在同一个时刻开始执行某个任务,或者等待一组相关的操作结束。尤其适合计算并发执行某个任务的耗时。
public class CountDownLatchTest { public void timeTasks(int nThreads, final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for(int i = 0; i < nThreads; i++){ Thread t = new Thread(){ public void run(){ try{ startGate.await(); try{ task.run(); }finally{ endGate.countDown(); } }catch(InterruptedException ignored){ } } }; t.start(); } long start = System.nanoTime(); System.out.println("打开闭锁"); startGate.countDown(); endGate.await(); long end = System.nanoTime(); System.out.println("闭锁退出,共耗时" + (end-start)); } public static void main(String[] args) throws InterruptedException{ CountDownLatchTest test = new CountDownLatchTest(); test.timeTasks(5, test.new RunnableTask()); } class RunnableTask implements Runnable{ @Override public void run() { System.out.println("当前线程为:" + Thread.currentThread().getName()); } }
执行结果为: 打开闭锁 当前线程为:Thread-0 当前线程为:Thread-3 当前线程为:Thread-2 当前线程为:Thread-4 当前线程为:Thread-1 闭锁退出,共耗时1109195
2、栅栏
用途:用于阻塞一组线程直到某个事件发生。所有线程必须同时到达栅栏位置才能继续执行下一步操作,且能够被重置以达到重复利用。而闭锁式一次性对象,一旦进入终止状态,就不能被重置
public class CyclicBarrierTest { private final CyclicBarrier barrier; private final Worker[] workers; public CyclicBarrierTest(){ int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable(){ @Override public void run() { System.out.println("所有线程均到达栅栏位置,开始下一轮计算"); } }); this.workers = new Worker[count]; for(int i = 0; i< count;i++){ workers[i] = new Worker(i); } } private class Worker implements Runnable{ int i; public Worker(int i){ this.i = i; } @Override public void run() { for(int index = 1; index < 3;index++){ System.out.println("线程" + i + "第" + index + "次到达栅栏位置,等待其他线程到达"); try { //注意是await,而不是wait barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); return; } catch (BrokenBarrierException e) { e.printStackTrace(); return; } } } } public void start(){ for(int i=0;i<workers.length;i++){ new Thread(workers[i]).start(); } } public static void main(String[] args){ new CyclicBarrierTest().start(); } }
执行结果为: 线程0第1次到达栅栏位置,等待其他线程到达 线程1第1次到达栅栏位置,等待其他线程到达 线程2第1次到达栅栏位置,等待其他线程到达 线程3第1次到达栅栏位置,等待其他线程到达 所有线程均到达栅栏位置,开始下一轮计算 线程3第2次到达栅栏位置,等待其他线程到达 线程2第2次到达栅栏位置,等待其他线程到达 线程0第2次到达栅栏位置,等待其他线程到达 线程1第2次到达栅栏位置,等待其他线程到达 所有线程均到达栅栏位置,开始下一轮计算
3、信号量
用途:用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量可以用来实现某种资源池,或者对容器施加边界。
public class SemaphoreTest<T> { private final Set<T> set; private final Semaphore sema; public SemaphoreTest(int bound){ this.set = Collections.synchronizedSet(new HashSet<T>()); this.sema = new Semaphore(bound); } public boolean add(T o) throws InterruptedException{ sema.acquire(); boolean wasAdded = false; try{ wasAdded = set.add(o); return wasAdded; }finally{ if(!wasAdded){ sema.release(); } } } public boolean remove(T o){ boolean wasRemoved = set.remove(o); if(wasRemoved){ sema.release(); } return wasRemoved; } public static void main(String[] args) throws InterruptedException{ int permits = 5; int elements = permits + 1; SemaphoreTest<Integer> test = new SemaphoreTest<Integer>(permits); for(int i = 0;i < elements; i++){ test.add(i); } } }
输出结果:由于实际待添加的元素个数大于信号量所允许的数量,因此最后一次添加时,会一直阻塞。
4、巧用FutureTask缓存计算过程
当一个计算的代价比较高,譬如比较耗时,或者耗资源,为了避免重复计算带来的浪费,当第一次计算后,通常会将结果缓存起来。比较常见的方式就是使用synchronized进行同步,但该方式带来的代价是被同步的代码只能被串行执行,如果有多个线程在排队对待计算结果,那么针对最后一个线程的计算时间可能比没有使用缓存的时间会更长。
第二种方式是采用ConcurrentHashMap,但对于耗时比较长的计算过程来说,该方式也存在一个漏洞。如果在第一个线程正在计算的过程中,第二个线程开始获取结果,会发现缓存里没有缓存结果,因此第二个线程又启动了同样的计算,这样就导致重复计算,违背了缓存的初衷。计算过程越长,则出现这种重复计算的几率就会越大。
通过第二种方式的缺点分析,得知真正要缓存的应该是计算是否已被启动,而不是等待漫长的计算过程结束后,再缓存结果。一旦从缓存中得知某个计算过程已被其他线程启动,则当前线程不需要再重新启动计算,只需要阻塞等待计算结果的返回。FutureTask就是实现该功能的最佳选择。
public class Memoizer<A, V> implements Computable<A, V> { private ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } @Override public V compute(final A a) { while (true) { Future<V> f = cache.get(a); if (null == f) { Callable<V> eval = new Callable<V>() { @Override public V call() throws Exception { return c.compute(a); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(a, ft); if (null == f) { f = ft; ft.run(); } } try { return f.get(); } catch (InterruptedException e) { e.printStackTrace(); cache.remove(a, f); } catch (ExecutionException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException, ExecutionException { Computable<Integer, String> c = new Computable<Integer, String>() { @Override public String compute(Integer a) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String result = "由线程:" + Thread.currentThread().getName() + "计算得到" + a + "的结果"; return result; } }; Memoizer<Integer, String> memoizer = new Memoizer<Integer, String>(c); ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { Task<Integer, String> task = new Task<Integer, String>(memoizer, 10); String result = executorService.submit(task).get(); System.out.println(result); } executorService.shutdown(); } }
结果如下: 由线程:pool-1-thread-1计算得到10的结果 由线程:pool-1-thread-1计算得到10的结果 由线程:pool-1-thread-1计算得到10的结果 当然如果仅仅只是采用FutureTask,仅仅只是减小了启动同一个计算过程的概率。当两个线程同时经过compute方法时,还是会出现重复启动同一个计算的情况,上面的例子通过结合ConcurrentHashMap 的putIfAbsent方法解决了这个问题
相关推荐
主要介绍了futuretask用法及使用场景介绍,小编觉得挺不错的,这里分享给大家,供大家参考。
最代码,http://www.zuidaima.com/share/1724478138158080.htm 的代码及例子
主要介绍了Java FutureTask类使用案例解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值析,需要的朋友可以参考下
主要介绍了futuretask源码分析(推荐),小编觉得还是挺不错的,这里给大家分享下,供各位参考。
FutureTask底层实现分析,有了FutureTask主线程要想获得工作线程(异步计算线程)的结果变得比较简单
NULL 博文链接:https://dingran.iteye.com/blog/1864962
主要为大家详细介绍了Java中Future、FutureTask原理以及与线程池的搭配使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
主要介绍了Java中的Runnable,Callable,Future,FutureTask的比较的相关资料,需要的朋友可以参考下
FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...
主要介绍了Java线程池FutureTask实现原理详解,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
在本篇文章里我们给大家分享了java中Future与FutureTask之间的关系的内容,有需要的朋友们可以跟着学习下。
主要介绍了简谈java并发FutureTask的实现,FutureTask都是用于获取线程执行的返回结果。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下
JAVA线程总结,包含线程池,显示使用线程实现异步编程,基于JDK中的Future实现异步编程,JDK中的FutureTask等
Semaphore(信号量,流量控制) ReentrantReadWriteLock (读写锁) BlockingQueue(阻塞队列) 线程池 池化技术 线程池的优势 线程池的特点 线程池三大方法 线程池七大参数 线程池四种拒绝策略 ForkJoin 异步回调 ...
Future Future是一个接口,它定义了5个方法: boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException;...
Java 多线程与并发(17_26)-JUC线程池_ FutureTask详解
(3)使用信号量将任何一种容器变成有界阻塞容器 栅栏 能够阻塞一组线程直到某个事件发生 栅栏和闭锁的区别 所有线程必须同时到达栅栏位置,才能继续执行 闭锁用于等待事件,而栅栏...
通过合规策略对服务器进行监控,确保服务器的运行、帐号在服务器上的操作符合预设的规则。日志:收集、整理服务器的日志信息,提供给管理员查看,并作为异常判断、故障排查的依据。进程:监控服务器上的进程,并对...
2 如何使用FutureTask 、Future、Callable、线程池实现线程2.1 FutureTask + Callable实现多线程2.2 线程池+Future+Callable 实现多线程3 Runnable、Callable、Future和FutureTask之间的关系3.1 整体关系介绍3.2 ...
主要介绍了多线程返回值使用示例(callable与futuretask),需要的朋友可以参考下