概述
ForkJoin框架是JDK 1.7提供的一种用于并行任务执行的框架。有些像Hadoop中的MapReduce机制。即将一个大任务分为(fork)多个小任务分别执行,最后将多个小任务的执行结果进行汇总(join)。
该框架采用的工作窃取算法,即当一个线程执行完它的任务后,可以从其他线程的任务队列尾部开始自行窃取任务进行执行,最后与该队列的另一个线程接头,以充分发挥该框架的优势(消除线程等待),提高效率,促进性能提升。
其中,每个线程的任务队列采用双端队列进行实现。
缺点:1. 当一个线程的双端队列中只有一个任务时,也会发生线程竞争。2. 由于使用双端队列,系统会分配更多的资源。
执行任务的局限性
- 任务只能使用Fork或Join作为同步机制;
- 线程队列中的任务不可以有IO操作;
- 任务不能抛出检查异常。(若有则需要必要的代码进行处理)
ForkJoinPool
看一下注释:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
* <em>work-stealing</em>: all threads in the pool attempt to find and
* execute tasks submitted to the pool and/or created by other active
* tasks (eventually blocking waiting for work if none exist). This
* enables efficient processing when most tasks spawn other subtasks
* (as do most {@code ForkJoinTask}s), as well as when many small
* tasks are submitted to the pool from external clients. Especially
* when setting <em>asyncMode</em> to true in constructors, {@code
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*/
实质上,ForkJoinPool是一个从非ForkJoinTask的请求中,为运行中的ForkJoinTask的子任务提供切入点的ExecutorService。它不同于其他的ExecutorService,主要是它虚拟部署了任务窃取。尤其当在构造器中设置asyncMode为true时,ForkJoinPool也可能适当地使用事件模式的任务(从未被汇总过??)。
ForkJoinPool主要做实现:包括工作窃取算法、工作线程的管理、任务的状态管理、任务的执行信息等。
ForkJoinTask
看下注释:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
* <p>A "main" {@code ForkJoinTask} begins execution when it is
* explicitly submitted to a {@link ForkJoinPool}, or, if not already
* engaged in a ForkJoin computation, commenced in the {@link
* ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
* related methods. Once started, it will usually in turn start other
* subtasks. As indicated by the name of this class, many programs
* using {@code ForkJoinTask} employ only methods {@link #fork} and
* {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
* advanced usages, as well as extension mechanics that allow support
* of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable)
* reflecting their main use as computational tasks calculating pure
* functions or operating on purely isolated objects. The primary
* coordination mechanisms are {@link #fork}, that arranges
* asynchronous execution, and {@link #join}, that doesn't proceed
* until the task's result has been computed. Computations should
* ideally avoid {@code synchronized} methods or blocks, and should
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling.
*/
ForkJoinTask:主要提供任务中fork/join的机制。
我:大家自己意会一下,但可千万不要言传哦~ 真的很容易看懂的~
读者:emmmmm,是挺容易的,因为它容易就容易在它容易它奶奶的腿……
演示例子
1 | @Slf4j |
运行结果:
1 | 14:58:35.442 [main] INFO com.mmall.concurrency.aqs.ForkJoinTaskExample - result:5050 |
例子分析
- 该测试类需要继承
RecursiveTask
类,即在任务fork
中需要递归地拆分任务; ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
指定start
和end
,其构造函数完成其任务的fork
的具体实现,之后再将其子结果join
并返回。- 最后通过ForkJoinPool调用submit()执行该任务。
- 其中:在任务拆分(fork)中,声明了一个threshold(阈值),即指定任务不可拆分的界限。