J.U.C之AQS—CyclicBarrier

概述

CyclicBarrier也是JDK提供的辅助类。它允许一组线程相互等待,直到到达一个公共的屏障点(CommonBarrierPoint)。通过它可以使多个线程间相互等待,只有当每个线程都准备就绪后才能各自继续执行后面的操作。

原理图

与CountDownLatch的比较:

  1. CountDownLatch是实现一个或多个线程需要等待其他线程完成某项操作之后才会才会继续执行。而CyclicBarrier是实现多个线程间相互等待,直到所有线程都满足条件后继续执行后续操作。
  2. 同时通过计数器实现,该类不过是加一操作;
  3. 当线程调用await()时,进入等待状态;
  4. 当达到指定值时都会执行一定的操作:CountDownLatch是计数到达0或超时时间过期会执行下一步操作;CyclicBarrier是加一操作计数达到设定初始值或超时时间过期时,等待的线程会继续执行后续的操作。
  5. 当CountDownLatch计数到0后,就不可再使用了;但CyclicBarrier计数到设定值后,可以复用,可以reset重新计数使用。(呼应名字:循环屏障)
  6. 使用场景相似:CyclicBarrier和CountDownLatch都可用于多线程计算数据,最后汇总计算。但CyclicBarrier可以处理对付更加复杂的场景,例如:若一次并发执行后出现错误,接着可reset计数器,重新执行一次。

演示例子-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class CyclicBarrierExample1 {

private static CyclicBarrier barrier = new CyclicBarrier(2);

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 6; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
19:12:14.192 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 0 is ready
19:12:15.190 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 1 is ready
19:12:15.190 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 1 continue
19:12:15.190 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 0 continue
19:12:16.190 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 2 is ready
19:12:17.191 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 3 is ready
19:12:17.191 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 3 continue
19:12:17.191 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 2 continue
19:12:18.191 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 4 is ready
19:12:19.192 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 5 is ready
19:12:19.192 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 5 continue
19:12:19.192 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample1 - 4 continue
结果分析:
  1. 当线程0和1都已经到达ready后,才会continue。
  2. 当上一层执行结束后,紧接着开始下一层的执行。

源码分析

看一下该类的内部成员:

图示

带有阻塞是操作指令参数的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

带有超时时间参数的await():

1
2
3
4
5
6
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

设置当前屏障无效,并唤醒每个线程。(持锁时有效)

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

重置当前屏障计数:先加锁,再设置当前屏障代无效并创建新一代。

1
2
3
4
5
6
7
8
9
10
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

创建新一代屏障:更新屏障闸的state字段并唤醒每个线程(持锁时有效)。

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

await()中真正的实现方法==>dowait():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;//使用重入锁。
lock.lock();//锁内操作保安全
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//count为parties即聚集个数(互相等待的个数)
//当index减为0,即所有线程都执行了该dowait()方法,都已执行完毕。
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {//如果有执行命令则运行,并返回方法。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)//执行命令为null,则破除屏障
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();//操作结束释放锁
}
}

Conditin:放闸前所处的等待状态

1
private final Condition trip = lock.newCondition();

演示例子-2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class CyclicBarrierExample2 {

private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
//只等待2000ms,之后的线程就会被抛弃了。
// 因线程的condition改变为broken(因超时而强行打破屏障继续执行)而产生了BrokenBarrierException
// 或因线程被中断而产生线程中断异常而产生的需要try-catch
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
运行结果:(部分截取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
20:45:32.745 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample2 - 0 is ready
20:45:33.742 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample2 - 1 is ready
20:45:34.742 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample2 - 2 is ready
20:45:34.755 [pool-1-thread-2] WARN com.mmall.concurrency.aqs.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
trueat java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
trueat java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
trueat com.mmall.concurrency.aqs.CyclicBarrierExample2.race(CyclicBarrierExample2.java:40)
trueat com.mmall.concurrency.aqs.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:24)
trueat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
trueat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
trueat java.lang.Thread.run(Thread.java:748)
20:45:34.755 [pool-1-thread-3] WARN com.mmall.concurrency.aqs.CyclicBarrierExample2 - BarrierException
java.util.concurrent.BrokenBarrierException: null
trueat java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
trueat java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
trueat com.mmall.concurrency.aqs.CyclicBarrierExample2.race(CyclicBarrierExample2.java:40)
trueat com.mmall.concurrency.aqs.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:24)
trueat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
trueat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
trueat java.lang.Thread.run(Thread.java:748)

演示例子-3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class CyclicBarrierExample3 {

//有参的构造函数,在dowait()方法执行时若存在则优先执行该Runnable指令再返
// 回(该时执行不到打破屏障代码段),之后再打破屏障并继续执行后续操作。
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
运行结果:(截取部分)
1
2
3
4
5
6
7
8
9
10
11
12
13
20:48:26.018 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 0 is ready
20:48:27.015 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 1 is ready
20:48:28.015 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 2 is ready
20:48:29.015 [pool-1-thread-4] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 3 is ready
20:48:30.016 [pool-1-thread-5] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 4 is ready
20:48:30.016 [pool-1-thread-5] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - callback is running
20:48:30.016 [pool-1-thread-5] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 4 continue
20:48:30.016 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 0 continue
20:48:30.016 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 1 continue
20:48:30.016 [pool-1-thread-4] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 3 continue
20:48:30.016 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 2 continue
20:48:31.016 [pool-1-thread-6] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 5 is ready
20:48:32.017 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.CyclicBarrierExample3 - 6 is ready
SupriseMF wechat
欢迎关注微信订阅号【星球码】,分享学习编程奇淫巧技~
喜欢就支持我呀(*^∇^*)~

热评文章