1、多线程开发中,经常会遇到一个线程等待一个或多个线程的情况,遇到这样的场景如何解决?
一个线程等待一个线程:可以通过wait和notify实现 一个线程等待多个线程:可以通过CountDownLatch实现 多个线程之间互相等待:可以通过CyclicBarrier实现2、countDownLatch和CyclicBarrier的区别:
countDownLatch减计数方式
调用countDown()计数减1,调用await()只阻塞线程,对计数无影响 计数为0时,释放所有等待的线程 计数为0时,不可重复利用
CyclicBarrier
加计数方式
调用await方法,计数加1,若加1后的值不等于构造方法的值,则线程阻塞 计数达到指定值时,计数重新置为0,释放所有阻塞线程 计数达到指定值时,可重复利用.
3、CountDownLatch的伪代码如下所示:
//Main thread start
//Create CountDownLatch for N threads
//Create and start N threads
//Main thread wait on latch
//N threads completes there tasks are returns
//Main thread resume execution
4、CountDownLatch如何工作
CountDownLatch.java类中定义的构造函数:
//Constructs a CountDownLatch initialized with the given count.
public
void
CountDownLatch(
int
count) {...}
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
5、CountDownLatch使用例子
在这个例子中,模拟了一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。
BaseHealthChecker.java:这个类是一个Runnable,负责所有特定的外部服务健康的检测。
public abstract class BaseHealthChecker implements Runnable{ private CountDownLatch countDownLatch; private String serviceName; private boolean serviceUp; public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch){ this.serviceName = serviceName; this.countDownLatch = countDownLatch; this.serviceUp = false; } @Override public void run() { try { serviceVerify(); serviceUp = true; } catch (Exception e) { e.printStackTrace(); serviceUp = false; } finally { if (null != countDownLatch) { countDownLatch.countDown(); } } } public String getServiceName() { return serviceName; } public boolean isServiceUp() { return serviceUp; } public abstract void serviceVerify() throws Exception; }
NetworkHealthChecker.java:这个类继承了BaseHealthChecker,实现了verifyService()方法。DatabaseHealthChecker.java和CacheHealthChecker.java除了服务名和休眠时间外,与NetworkHealthChecker.java是一样的。
public class NetworkHealthChecker extends BaseHealthChecker{ public NetworkHealthChecker(CountDownLatch countDownLatch) { super("Network Service", countDownLatch); } @Override public void serviceVerify() throws InterruptedException { System.out.println("Checking " + this.getServiceName()); Thread.sleep(3000); System.out.println(this.getServiceName() + " is checked"); }}
ApplicationStartupUtil.java:这个类是一个主启动类,它负责初始化闭锁,然后等待,直到所有服务都被检测完。
public class ApplicatiionStartupUtil { private static Listcheckers; private ApplicatiionStartupUtil(){ } private final ApplicatiionStartupUtil INSTANCE = new ApplicatiionStartupUtil(); public ApplicatiionStartupUtil getInstance(){ return INSTANCE; } public static boolean checkExternalService() throws InterruptedException{ CountDownLatch countDownLatch = new CountDownLatch(3); checkers = new ArrayList (); checkers.add(new NetworkHealthChecker(countDownLatch)); checkers.add(new CacheHealthChecker(countDownLatch)); checkers.add(new DateBasekHealthChecker(countDownLatch)); ExecutorService executor = Executors.newFixedThreadPool(checkers.size()); for(final BaseHealthChecker checker : checkers){ executor.execute(checker); } countDownLatch.await(); executor.shutdown(); for(final BaseHealthChecker checker : checkers){ if (!checker.isServiceUp()) { return false; } } return true; }}
测试代码:
public class Main { public static void main(String[] args) { boolean result = false; try { result = ApplicatiionStartupUtil.checkExternalService(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("External services validation completed !! Result was :: "+ result); }}
运行结果
Checking Network Service Checking DateBase Service Checking Cache Service Network Service is checked Cache Service is checked DateBase Serviceis Checked External services validation completed !! Result was :: true
6、CyclicBarrier使用的例子
public class CyclicBarrierTest { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { public void run() { try { Thread.sleep(new Random().nextInt(5) * 1000); System.out.println("thread is prepared...." + new Date()); cyclicBarrier.await(); System.out.println(1); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }).start(); try { Thread.sleep(new Random().nextInt(5) * 1000); System.out.println("main is prepared...." + new Date()); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e ) { e.printStackTrace(); } System.out.println(2); } static class A implements Runnable{ @Override public void run() { System.out.println("A......."); } }}
运行结果:
thread is prepared....Fri Oct 27 11:08:44 CST 2017
main is prepared....Fri Oct 27 11:08:47 CST 2017 A....... 1 2