J.U.C之AQS—Semaphore

概述

Semaphore字面意思就是信号量。它通过提供同步机制,控制资源可同时被并发访问的线程的个数。当信号量限定为1时,它就和单线程很相似了。Semaphore和CountDownLatch的使用有些相似,其中也有两个核心实现方法:acquire()和release()。

通过semaphore可以实现有限结点个数的链表,虽然可重入锁reentrant也可以实现,但是semaphore的实现更为简单。


使用场景

常适用于仅能提供有限资源访问的场景。
如:数据库链接数远远小于上层应用业务并发的数量,如果不对数据库的访问进行控制,很容易出现因有些线程因无法获取到数据库链接而导致的异常。

例子演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class SemaphoreExample1 {

private final static int threadCount = 5;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(2);//信号量声明

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire();//信号量获取
test(threadNum);
semaphore.release();//信号量释放
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}

运行结果:

1
2
3
4
5
6
7
14:00:22.444 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample1 - 1
14:00:22.444 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample1 - 0
14:00:23.451 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.SemaphoreExample1 - 2
14:00:23.451 [pool-1-thread-4] INFO com.mmall.concurrency.aqs.SemaphoreExample1 - 3
14:00:24.452 [pool-1-thread-5] INFO com.mmall.concurrency.aqs.SemaphoreExample1 - 4

Process finished with exit code 0

例子说明:
仔细的童鞋应该已经根据运行结果发现了:相同的信号量的输出是在同一时刻的!这也就对应了semaphore的含义。semaphore的使用也十分简单,方法执行前声明并制定允许的并发访问的数量,并用semaphore.acquire()和semaphore.release()分别前后包裹着test()方法即可。

看一下semaphore的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void release() {
sync.releaseShared(1);
}

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

其实semaphore.acquire()和semaphore.release()都是可以制定获取/释放信号量的数量的。而且都是使用的sync的方法获取或释放。那再看一下sync是什么鬼?

读者:等等!!你不是说只是看一看源码吗?看看就得了,怎么还一直分析起来了?
我:大爷,来都来了,不进去逛逛?

1
2
3
4
5
6
7
8
9
10
11
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;

/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
xx省略一万字xx
}

sync是其中的内部类,semaphore果然是使用了AQS框架!!使用了AQS的state字段来实现信号量的允许值(state字段之前提到过)。还分为公平和非公平两个版本!!

那sync的方法是怎么实现的呢? 再看看源码:

读者:公子!停停停!STOP!别看了,我有点恶心了~
我:emmm?我裤子都脱了,你让我停?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}

hasQueuedPredecessors()

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

读者:公子!咱们走吧,咱不看了吧~
我:呕~ 呕呕呕~~扶我起来,我还能…(话音未落,倒地身亡~)

acquire()中,调用syncacquireSharedInterruptibly()其中指定参数(默认为1):在共享模式中获取,首先检查线程状态,再至少调用一次tryAcquireShared()查询判断当前对象的state允许在共享模式中被获取,不允许则执行doAcquireSharedInterruptibly(args),其中调用addWaiter(Node.SHARED)(参数指明为在共享模式下)进行声明:添加等待获取的结点,并循环遍历当前等待结点的前一结点,如果是head结点且此时再次查询当前对象的state允许在共享模式中被获取,那么设置队列的head并检查是否成功的对象正处于队列中,若是则传递消息以尝试给下一个队列结点传信号。如果不是head结点则调用cancelAcquire(node),取消获取信号量。

release()中,当参数存在时,调用sync.releaseShared(permits),再调用tryReleaseShared(args)查询是否允许释放,若是则调用doReleaseShared():即使有其他进行中的请求/释放信号量的进程,也要确保释放的消息传递,循环遍历以防止在这个过程中有新的结点加入;一般释放信号量的过程是当head需要信号量时,尝试释放head的继承结点。由于结合了CAS,需循环是否CAS重置状态失败了,若是则重新检查。

如果并发数太多了,但是资源还有限,这时候怎么搞?

semaphore还有一个叫做tryAcquire的方法。
看下例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class SemaphoreExample3 {

private final static int threadCount = 2000;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}

运行结果:

1
2
3
16:00:49.454 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1
16:00:49.455 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 2
16:00:49.453 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 0

为什么只有三个线程执行了测试方法?是不是测试方法中的线程休眠时间太长了?现在改成500ms。

再次运行:

1
2
3
16:03:13.925 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 0
16:03:13.944 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1
16:03:13.945 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 2

再改小一些:==>Thread.sleep(10)

1
2
3
4
5
6
7
8
16:04:12.730 [pool-1-thread-4] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1279
16:04:12.730 [pool-1-thread-5] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1278
16:04:12.742 [pool-1-thread-10] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 9
16:04:12.744 [pool-1-thread-11] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 10
16:04:12.744 [pool-1-thread-12] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 11
16:04:12.753 [pool-1-thread-49] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 75
16:04:12.755 [pool-1-thread-60] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 102
16:04:12.755 [pool-1-thread-62] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 106

其中只截取了一部分,总共应该有45个左右线程执行了。

我再改一下,去掉线程休眠,看看能不能全部2000个线程都执行测试方法:

1
2
3
4
5
16:07:21.873 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1860
16:07:21.873 [pool-1-thread-66] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1856
16:07:21.873 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1889
16:07:21.873 [pool-1-thread-9] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1886
16:07:21.873 [pool-1-thread-10] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1888

在结果中截取了部分,并且只找到了最大的线程数为1889,即当线程不休眠时,有1900个线程执行了测试方法,即90%的线程可以得到运行。

既然测试了该方法,那就看看它的源码吧:

读者君:(~﹃~)~zZ 啊?啊啊??现在几点了?
我:坐正,抬头挺胸!小手放背后!(敲黑板)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Acquires a permit from this semaphore, only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

其实tryAcquire()还可以指定超时时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

例子就不演示了,通过指定时间,准确指定可以并发请求的数量,大大减轻了控制的操作。再配合在核心方法中修改线程休眠时间来控制线程并发访问数量,最少的数量是semaphore和并发请求中的最小值(但一般情况下还是semaphore小,即最小是semaphore声明值)

SupriseMF wechat
欢迎关注微信订阅号【星球码】,分享学习编程奇淫巧技~
喜欢就支持我呀(*^∇^*)~

热评文章