概述
Semaphore字面意思就是信号量。它通过提供同步机制,控制资源可同时被并发访问的线程的个数。当信号量限定为1时,它就和单线程很相似了。Semaphore和CountDownLatch的使用有些相似,其中也有两个核心实现方法:acquire()和release()。
通过semaphore可以实现有限结点个数的链表,虽然可重入锁reentrant也可以实现,但是semaphore的实现更为简单。
使用场景
常适用于仅能提供有限资源访问的场景。
如:数据库链接数远远小于上层应用业务并发的数量,如果不对数据库的访问进行控制,很容易出现因有些线程因无法获取到数据库链接而导致的异常。
例子演示
1 | 4j |
运行结果:
1 | 14:00:22.444 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample1 - 1 |
例子说明:
仔细的童鞋应该已经根据运行结果发现了:相同的信号量的输出是在同一时刻的!这也就对应了semaphore的含义。semaphore的使用也十分简单,方法执行前声明并制定允许的并发访问的数量,并用semaphore.acquire()和semaphore.release()分别前后包裹着test()方法即可。
看一下semaphore的源码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
其实semaphore.acquire()和semaphore.release()都是可以制定获取/释放信号量的数量的。而且都是使用的sync的方法获取或释放。那再看一下sync是什么鬼?
读者:等等!!你不是说只是看一看源码吗?看看就得了,怎么还一直分析起来了?
我:大爷,来都来了,不进去逛逛?
1 | /** All mechanics via AbstractQueuedSynchronizer subclass */ |
sync是其中的内部类,semaphore果然是使用了AQS框架!!使用了AQS的state字段来实现信号量的允许值(state字段之前提到过)。还分为公平和非公平两个版本!!
那sync的方法是怎么实现的呢? 再看看源码:
读者:公子!停停停!STOP!别看了,我有点恶心了~
我:emmm?我裤子都脱了,你让我停?
1 | public final void acquireSharedInterruptibly(int arg) |
hasQueuedPredecessors()1
2
3
4
5
6
7
8
9
10public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
读者:公子!咱们走吧,咱不看了吧~
我:呕~ 呕呕呕~~扶我起来,我还能…(话音未落,倒地身亡~)
在acquire()
中,调用sync
的acquireSharedInterruptibly()
其中指定参数(默认为1):在共享模式中获取,首先检查线程状态,再至少调用一次tryAcquireShared()
查询判断当前对象的state
允许在共享模式中被获取,不允许则执行doAcquireSharedInterruptibly(args)
,其中调用addWaiter(Node.SHARED)
(参数指明为在共享模式下)进行声明:添加等待获取的结点,并循环遍历当前等待结点的前一结点,如果是head
结点且此时再次查询当前对象的state
允许在共享模式中被获取,那么设置队列的head
并检查是否成功的对象正处于队列中,若是则传递消息以尝试给下一个队列结点传信号。如果不是head
结点则调用cancelAcquire(node)
,取消获取信号量。
在release()
中,当参数存在时,调用sync.releaseShared(permits)
,再调用tryReleaseShared(args)
查询是否允许释放,若是则调用doReleaseShared()
:即使有其他进行中的请求/释放信号量的进程,也要确保释放的消息传递,循环遍历以防止在这个过程中有新的结点加入;一般释放信号量的过程是当head
需要信号量时,尝试释放head的继承结点。由于结合了CAS,需循环是否CAS重置状态失败了,若是则重新检查。
如果并发数太多了,但是资源还有限,这时候怎么搞?
semaphore还有一个叫做tryAcquire的方法。
看下例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 4j
public class SemaphoreExample3 {
private final static int threadCount = 2000;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
运行结果:1
2
316:00:49.454 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1
16:00:49.455 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 2
16:00:49.453 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 0
为什么只有三个线程执行了测试方法?是不是测试方法中的线程休眠时间太长了?现在改成500ms。
再次运行:1
2
316:03:13.925 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 0
16:03:13.944 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1
16:03:13.945 [pool-1-thread-3] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 2
再改小一些:==>Thread.sleep(10)1
2
3
4
5
6
7
816:04:12.730 [pool-1-thread-4] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1279
16:04:12.730 [pool-1-thread-5] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1278
16:04:12.742 [pool-1-thread-10] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 9
16:04:12.744 [pool-1-thread-11] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 10
16:04:12.744 [pool-1-thread-12] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 11
16:04:12.753 [pool-1-thread-49] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 75
16:04:12.755 [pool-1-thread-60] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 102
16:04:12.755 [pool-1-thread-62] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 106
其中只截取了一部分,总共应该有45个左右线程执行了。
我再改一下,去掉线程休眠,看看能不能全部2000个线程都执行测试方法:1
2
3
4
516:07:21.873 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1860
16:07:21.873 [pool-1-thread-66] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1856
16:07:21.873 [pool-1-thread-2] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1889
16:07:21.873 [pool-1-thread-9] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1886
16:07:21.873 [pool-1-thread-10] INFO com.mmall.concurrency.aqs.SemaphoreExample3 - 1888
在结果中截取了部分,并且只找到了最大的线程数为1889,即当线程不休眠时,有1900个线程执行了测试方法,即90%的线程可以得到运行。
既然测试了该方法,那就看看它的源码吧:
读者君:(~﹃~)~zZ 啊?啊啊??现在几点了?
我:坐正,抬头挺胸!小手放背后!(敲黑板)
1 | /** |
其实tryAcquire()还可以指定超时时间:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
例子就不演示了,通过指定时间,准确指定可以并发请求的数量,大大减轻了控制的操作。再配合在核心方法中修改线程休眠时间来控制线程并发访问数量,最少的数量是semaphore和并发请求中的最小值(但一般情况下还是semaphore小,即最小是semaphore声明值)