J.U.C之ForkJoin

概述

ForkJoin框架是JDK 1.7提供的一种用于并行任务执行的框架。有些像Hadoop中的MapReduce机制。即将一个大任务分为(fork)多个小任务分别执行,最后将多个小任务的执行结果进行汇总(join)。

该框架采用的工作窃取算法,即当一个线程执行完它的任务后,可以从其他线程的任务队列尾部开始自行窃取任务进行执行,最后与该队列的另一个线程接头,以充分发挥该框架的优势(消除线程等待),提高效率,促进性能提升。
其中,每个线程的任务队列采用双端队列进行实现。
缺点:1. 当一个线程的双端队列中只有一个任务时,也会发生线程竞争。2. 由于使用双端队列,系统会分配更多的资源。

执行任务的局限性

  1. 任务只能使用Fork或Join作为同步机制;
  2. 线程队列中的任务不可以有IO操作;
  3. 任务不能抛出检查异常。(若有则需要必要的代码进行处理)

ForkJoinPool

看一下注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
* <em>work-stealing</em>: all threads in the pool attempt to find and
* execute tasks submitted to the pool and/or created by other active
* tasks (eventually blocking waiting for work if none exist). This
* enables efficient processing when most tasks spawn other subtasks
* (as do most {@code ForkJoinTask}s), as well as when many small
* tasks are submitted to the pool from external clients. Especially
* when setting <em>asyncMode</em> to true in constructors, {@code
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*/

实质上,ForkJoinPool是一个从非ForkJoinTask的请求中,为运行中的ForkJoinTask的子任务提供切入点的ExecutorService。它不同于其他的ExecutorService,主要是它虚拟部署了任务窃取。尤其当在构造器中设置asyncMode为true时,ForkJoinPool也可能适当地使用事件模式的任务(从未被汇总过??)。
ForkJoinPool主要做实现:包括工作窃取算法、工作线程的管理、任务的状态管理、任务的执行信息等。

ForkJoinTask

看下注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
* <p>A "main" {@code ForkJoinTask} begins execution when it is
* explicitly submitted to a {@link ForkJoinPool}, or, if not already
* engaged in a ForkJoin computation, commenced in the {@link
* ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
* related methods. Once started, it will usually in turn start other
* subtasks. As indicated by the name of this class, many programs
* using {@code ForkJoinTask} employ only methods {@link #fork} and
* {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
* advanced usages, as well as extension mechanics that allow support
* of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable)
* reflecting their main use as computational tasks calculating pure
* functions or operating on purely isolated objects. The primary
* coordination mechanisms are {@link #fork}, that arranges
* asynchronous execution, and {@link #join}, that doesn't proceed
* until the task's result has been computed. Computations should
* ideally avoid {@code synchronized} methods or blocks, and should
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling.
*/

ForkJoinTask:主要提供任务中fork/join的机制。

我:大家自己意会一下,但可千万不要言传哦~ 真的很容易看懂的~
读者:emmmmm,是挺容易的,因为它容易就容易在它容易它奶奶的腿……

演示例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

public static final int threshold = 2;
private int start;
private int end;

public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;

//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();

//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);

try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
运行结果:
1
14:58:35.442 [main] INFO com.mmall.concurrency.aqs.ForkJoinTaskExample - result:5050
例子分析
  1. 该测试类需要继承RecursiveTask类,即在任务fork中需要递归地拆分任务;
  2. ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);指定startend,其构造函数完成其任务的fork的具体实现,之后再将其子结果join并返回。
  3. 最后通过ForkJoinPool调用submit()执行该任务。
  4. 其中:在任务拆分(fork)中,声明了一个threshold(阈值),即指定任务不可拆分的界限。
SupriseMF wechat
欢迎关注微信订阅号【星球码】,分享学习编程奇淫巧技~
喜欢就支持我呀(*^∇^*)~

热评文章