概述
CyclicBarrier也是JDK提供的辅助类。它允许一组线程相互等待,直到到达一个公共的屏障点(CommonBarrierPoint)。通过它可以使多个线程间相互等待,只有当每个线程都准备就绪后才能各自继续执行后面的操作。
与CountDownLatch的比较:
- CountDownLatch是实现一个或多个线程需要等待其他线程完成某项操作之后才会才会继续执行。而CyclicBarrier是实现多个线程间相互等待,直到所有线程都满足条件后继续执行后续操作。
- 同时通过计数器实现,该类不过是加一操作;
- 当线程调用await()时,进入等待状态;
- 当达到指定值时都会执行一定的操作:CountDownLatch是计数到达0或超时时间过期会执行下一步操作;CyclicBarrier是加一操作计数达到设定初始值或超时时间过期时,等待的线程会继续执行后续的操作。
- 当CountDownLatch计数到0后,就不可再使用了;但CyclicBarrier计数到设定值后,可以复用,可以reset重新计数使用。(呼应名字:循环屏障)
- 使用场景相似:CyclicBarrier和CountDownLatch都可用于多线程计算数据,最后汇总计算。但CyclicBarrier可以处理对付更加复杂的场景,例如:若一次并发执行后出现错误,接着可reset计数器,重新执行一次。
演示例子-1
1 | @Slf4j |
运行结果:
1 | 19:12:14.192 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 0 is ready |
结果分析:
- 当线程0和1都已经到达ready后,才会continue。
- 当上一层执行结束后,紧接着开始下一层的执行。
源码分析
看一下该类的内部成员:
带有阻塞是操作指令参数的构造函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
带有超时时间参数的await():1
2
3
4
5
6public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
设置当前屏障无效,并唤醒每个线程。(持锁时有效)1
2
3
4
5private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
重置当前屏障计数:先加锁,再设置当前屏障代无效并创建新一代。1
2
3
4
5
6
7
8
9
10public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
创建新一代屏障:更新屏障闸的state字段并唤醒每个线程(持锁时有效)。1
2
3
4
5
6
7private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
await()中真正的实现方法==>dowait():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;//使用重入锁。
lock.lock();//锁内操作保安全
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//count为parties即聚集个数(互相等待的个数)
//当index减为0,即所有线程都执行了该dowait()方法,都已执行完毕。
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {//如果有执行命令则运行,并返回方法。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)//执行命令为null,则破除屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();//操作结束释放锁
}
}
Conditin:放闸前所处的等待状态1
private final Condition trip = lock.newCondition();
演示例子-2
1 | @Slf4j |
运行结果:(部分截取)
1 | 20:45:32.745 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample2 - 0 is ready |
演示例子-3
1 | @Slf4j |
运行结果:(截取部分)
1 | 20:48:26.018 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 0 is ready |