一、正文前
Java技术书籍看了不少,其中最让我喜欢的就是《Java并发编程的艺术》。现在在读第五遍,书不厚,知识点也不难理解,但是要把他们真正融会贯通需要反复琢磨。
本文是对书中第八章:并发编程工具类做一个自己的总结,希望能在以后的工作中巧妙的使用他们。
二、并发工具类总结
在多核时代,几乎已经没有单线程的程序了。用Java写个Hello World程序,后台都有类似于垃圾收集器这种线程在工作,可以说多线程无处不在。
Java中的并发工具类(CountDownLatch,CyclicBarrier,Semaphore和Exchanger)能提供多线程编程这一块方便安全的功能。现在就通过功能简介,应用场景,底层原理三个方面了解它们吧。
- CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作
简单来说就是现在有5个线程PA,PB,PC,PD,PE。其中PA,PB,PC,PD四个线程在统计2018年度四个季度的总收入,PE线程则是要将四个线程的结果汇总并进行其他处理。这就要求PE要等待其他四个线程处理完毕才能进行操作。CountDownLatch就提供了这种场景的解决方案。
CountDownLatch在初始化时能设定一个“倒计时”,让归并线程(PE)在倒计时为0之前阻塞。线程PA,PB,PC,PD每执行完毕就给倒计时-1。
示例代码
public class CountDownLatchTEST {public static void main(String[] args) {//设置倒计时为3CountDownLatch countDownLatch = new CountDownLatch(4);//显示当前倒计时System.out.println(countDownLatch.getCount());new Thread(new MergeTask(countDownLatch)).start();new Thread(new WorkTask(countDownLatch)).start();new Thread(new WorkTask(countDownLatch)).start();new Thread(new WorkTask(countDownLatch)).start();new Thread(new WorkTask(countDownLatch)).start();}//工作线程,相当于PA,PB,PC,PDstatic class WorkTask implements Runnable {CountDownLatch countDownLatch;public WorkTask(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {System.out.println(Thread.currentThread() + " :任务结束");//任务结束,倒计时-1countDownLatch.countDown();try {//为了更好的显示效果Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}//归并线程,相当于PEstatic class MergeTask implements Runnable {CountDownLatch countDownLatch;public MergeTask(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {//等待倒计时为0countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread() + " :归并结束");//显示当前倒计时System.out.println(countDownLatch.getCount());}}
}
CountDownLatch底层实现原理
进入CountDownLatch类中,它的方法很少。核心功能的实现依赖于继承AQS的内部类Sync。
关于AQS这里就不再说了,各种书籍和网上有很多的介绍。AQS是并发包中的基础组件,很多并发类都是基于它实现,建议大家去学习一下。我自己也写过一篇关于AQS工作原理的文章:深入理解AQS(AbstractQueueSynchronizer)
CountDownLatch核心方法await(阻塞线程等待倒计时为0),countDown(倒计时数字-1)也是基于Sync。
private static final class Sync extends AbstractQueuedSynchronizer {//初始化时设置的倒计时就是AQS中的count,通过count来控制线程Sync(int count) {setState(count);}int getCount() {return getState();}//await方法会调用该方法protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}//CountDown方法会调用该方法protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
在CountDownLatch构造时传入的倒计时就是state的值,整个运行流程就是归并线程通过await方法(调用tryAcquireShared)去判断state是否为0,非0就阻塞。
工作线程则是在工作完毕时调用CountDown方法(调用tryReleaseShared)去通过CAS的方法让state-1。如果state已经是0就什么也不做。因此要注意的是如果倒计时初始值是3,然后用4个线程去调用CountDown方法,三个线程调用完毕之后state值就是0了,await阻塞的线程就会继续运行了。所以一定要严格让初始state值和工作线程数相等。
- CyclicBarrier
CyclicBarrier让一组线程到达一个屏障时阻塞,直到最后一个线程到达时解除屏障,所有阻塞的线程继续运行
举例来说:现在有个秒杀系统,用户要秒杀一个游戏机,但是为了保证用户的公平性,达到100个用户时再由这些用户去抢游戏机。这里就可以用CyclicBarrier让准备完毕的用户线程阻塞在屏障处,等阻塞的用户线程达到100时打开屏障,让100个用户去秒杀游戏机。
示例代码
public class CyclicBarrierTEST {static Boolean gameBoy = new Boolean(false);static CyclicBarrier cb = new CyclicBarrier(10);//这里就用10个用户来演示public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {threadPool.submit(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread() + " : 准备OK ");try {//等待屏障开启cb.await();} catch (Exception e) {e.printStackTrace();}//双重检查锁秒杀游戏机if(!gameBoy) {synchronized (gameBoy) {if(!gameBoy) {gameBoy = true;System.out.println(Thread.currentThread() + " : gameBoy是我的啦!! ");}}}}});}Thread.sleep(1000);threadPool.shutdown();}
}
CyclicBarrier底层实现原理
CyclicBarrier类中有一个ReentrantLcok,在拦截方法await方法中通过ReentrantLcok判断count(预设的开启屏障的阈值)是否到达0,非0就通过内部的Condtion阻塞线程。count为0时就唤醒所有线程继续运行。
核心方法是await方法调用的dowait方法,它给count-1并根据count去控制线程。
//有删减,只贴出核心逻辑
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException{final ReentrantLock lock = this.lock;final Generation g = generation;lock.lock();try {//count递减,直到为0开启屏障int index = --count;if (index == 0) { boolean ranAction = false;try {//barrierCommand可以作为CyclicBarrier的构造参数传入。可以在开启屏障时执行一些工作final Runnable command = barrierCommand;if (command != null)command.run();//该方法中调用trip.signAll()方法唤醒所有线程nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}for (;;) {try {//阻塞线程trip.await();} catch (InterruptedException ie) {....}if (g != generation)return index;}} finally {lock.unlock();}}
在dowait方法中通过ReentrantLock同步去缩减count的值,并把线程阻塞,直到最后的线程到来,就开启屏障,解放所有线程。
CyclicBarrier和CountDownLatch的异同
CyclicBarrier和CountDownLatch都可以通过一个条件去控制多个线程,然后在条件满足时做一些操作。CyclicBarrier在条件满足时可以让所有线程都继续运行,而CountDownLatch只能让一开始就在等待的线程去运行。
CyclicBarrier中为了满足条件而工作的线程和满足条件后工作的线程是同一组线程。CountDownLatch中为了满足条件而工作的线程和满足条件后工作的线程不是一组线程(也可以是一组,不过写起来比较麻烦)。
CyclicBarrier可以复用,开启屏障之后count会回归到初始值,可以进行下一次屏障拦截线程。而CountDownLatch只能使用一次,倒计时完毕后count的值不会变化。
CyclicBarrier底层实现复杂,用了ReentrantLock和Condtion去控制线程。CountDownLatch比较简单,方法也很少,底层实现AQS进行线程管理。其实ReentrantLock底层也是通过AQS来实现功能。可以说二者的实现原理都是基于AQS的。
总体来说就是CyclicBarrier实现复杂,功能强大,可复用。CountDownLatch实现简洁,功能精简。二者功能相近却有不同的地方。要根据业务场景合理选择。
- Semaphore
Semaphroe可以控制同时运行的线程数。
Semaphore可以说是应用最广泛的并发工具类了,现在很多高并发解决方案的技术都依赖于Semaphore。比如在一个工程中A服务要调用B服务完成一个请求,B服务比较脆弱,只能同时满足10个线程要求。因此A服务同一时间请求B服务的线程不能多于10个。Semaphore就是用来实现这样的场景,控制同一时刻运行的线程数,协调各个线程,合理使用公共资源。
示例代码
public class SemaphoreTEST {static Semaphore semaphore = new Semaphore(3);//为了更好演示,同时访问B服务的次数设置为3public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();for(int i = 0; i < 12; i++) {executorService.submit(new Runnable() {@Overridepublic void run() {try {//acquire和release之间的代码会严格按照Semaphore的线程调度去执行semaphore.acquire();System.out.println(Thread.currentThread() + " : 我正在请求B服务,当前时间是" + System.currentTimeMillis());Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {semaphore.release();}}});}Thread.sleep(5000);executorService.shutdown();}
}
可以从结果中看到,最多只有三个线程是在同一秒中同时运行的。
Semaphore的底层实现原理
Semaphore类中也是有一个实现AQS的内部类Sync,通过AQS进行线程控制,保证同一时刻只有N个线程在运行,其他的都进行阻塞。并且Semaphore基于Sync又实现了NonfairSync和FiarSync两个子类,这两个子类分别可以实现公平等待和非公平等待。
Semphore中的核心方法acquire和release调用了Sync子类的方法。Sync子类中的方法通过操纵count值去判断线程应该阻塞还是放行,count值是Semaphore的构造函数中传入的值,表示同时运行的线程数。
这里就不贴出代码了,就是很常规的AQS实现代码,通过state(count)来控制线程数。(AQS真的是万物的心脏啊)
4.Exchanger
Exchanger是一个线程资源交换器,提供一个同步点,在同步点两个线程可以交换数据。
Exchanger顾名思义,就是能在一个时间点让线程互换数据。例如线程A持有一个安全令牌,其他线程(假设是线程B)在获取安全令牌时必须提供自己的信息以留作记录。这个场景就可以用Exchanger实现,两个线程在一个同步点互换数据,线程A拿到线程B的信息做记录,并且把令牌给到线程B。
示例代码
public class ExchangerTEST {public static void main(String[] args) {Exchanger<String> exchanger = new Exchanger<>();new Thread(new TokenHolder(exchanger)).start();new Thread(new Woker(exchanger)).start();}static class TokenHolder implements Runnable{Exchanger<String> exchanger;public TokenHolder(Exchanger<String> exchanger) {this.exchanger = exchanger;}@Overridepublic void run() {try {//给出令牌,拿回线程IDString threadID = exchanger.exchange("token");System.out.println(Thread.currentThread() + " 线程: " + threadID + " 使用了令牌");} catch (InterruptedException e) {e.printStackTrace();}}}static class Woker implements Runnable{Exchanger<String> exchanger;public Woker(Exchanger<String> exchanger) {this.exchanger = exchanger;}@Overridepublic void run() {try {//给出线程ID,拿回令牌String token = exchanger.exchange(Thread.currentThread().toString());System.out.println(Thread.currentThread() + " 拿到了令牌:" + token);} catch (InterruptedException e) {e.printStackTrace();}}}
}
两个线程成功交换了数据。
Exchanger的底层实现原理
Exchanger和其他并发工具类不同,它协调的只有两个线程。其底层并没有用到AQS。核心方法也只有exchange方法。在exchange方法中调用了slotExchange和arenaExchange方法,这两个方法是不对外开放的。
slotExchange方法主要逻辑的是把两个线程与其提供的资源记录下来,放入交换槽中,并且两个交换线程还没有全部抵达交换点时阻塞先到达的线程,直到两个线程都到达时唤醒。
arenaExchange方法主要是对交换槽中的数据进行交换。
Exchanger对交换数据的安全性作了保证,并且因为其中并没有使用AQS,所以对线程的操作也是Exchanger自己实现的。
简单来说Exchanger就是持有两个盒子,两个线程到来时把交换的数据放到盒子里,然后Exchanger交换两个盒子的数据,返回给线程。
三、 小结
以上就是四个并发工具类的介绍。
总体概括一下:
CountDownLatch可以提供一个倒计时,在倒计时为0时触发逻辑。
CyclicBarrier用屏障拦截到来的线程,在线程数达到一定要求时开启屏障,释放线程。
Semaphore能控制同一时刻运行的线程的数量。
Exchanger能让两个线程在一个时间点交换数据。
四个并发工具中CyclicBarrier和Semaphore使用的最多,前三个类的底层实现用到了AQS。
并发工具类提供了不同的功能,在日常工作和学习时要根据具体的场景去灵活使用他们,而且这些工具都是已经经过很多人的使用检验的,安全性肯定是没问题的。