概述
由于线程是实现了Runnable接口或继承了Thread类,其执行后无法回调其线程的执行结果,因此JDK 1.5提供了一些新机制:接口Callable和Future。通过他们可以得到线程的执行结果。
FutureTask间接实现了Future接口,并提供了一个基础的实现:开始/取消一个计算的方法,计算结果的查询。FutureTask可以被用来去包装一个Callable或Runnable对象,并可以被提交给Executor执行器去执行。除了被用作一个独立的服务类,它还提供了protected的功能,在定制任务类时会十分有用。(好吧,被你发现了,这是我翻译的源码中的类注释,哭唧唧~)
Runnable与Callable对比
- Runnable接口只有一个run()方法;
- Callable是一个泛型接口,其中也只有V call()函数,其返回类型即传入的参数泛型。
- 两个接口功能相似,但后者因可以返回执行信息且支持泛型而更强大一些。
Future接口
通过实现该类可以得到一个异步计算的返回结果。并提供方法接口:检查是否计算完成、等待计算的完成、取回计算的结果、判断计算是否在完成前被正常地取消。
FutureTask类
它实现了RunnableFuture接口,而RunnableFuture接口同时继承了Runnable接口和Future接口。即FutureTask类最终也是执行的Callable的方法。组合Runnable和Future的好处:可以另起线程去专门检查并调取最终计算的结果,而其他的线程可以继续其他任务(只需监听该跑腿线程即可)。
演示例子-Callable和Future结合
1 | @Slf4j |
运行结果
1 | 11:29:00.230 [main] INFO com.mmall.concurrency.aqs.FutureExample - do something in main |
例子分析
- 结合Callable和Future两者。
- 实现的Callable类可以被Executors的submit(xxx)提交给线程池运行。(之前提到过)
- 通过Future的get()查询并取回执行结果;若未执行结束则阻塞查询。(前面有说到)
演示例子-FutureTask
1 | @Slf4j |
运行结果
与上个例子结果一模一样,运行速度快了25%左右:Callable和Future结合使用耗时2s 450ms,FutureTask耗时1s 890ms。
看下源码
get()方法的内核:返回执行完成的结果或抛出异常。1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
"unchecked") (
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
还有一个可以指定返回执行结果的构造函数:(内部都是调用Executors的callable方法)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
还有一个指定超时时间的get():1
2
3
4
5
6
7
8
9
10public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
当计算失败时(不包括计算被取消)被内部的run()调用:报告其中的异常1
2
3
4
5
6
7protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
一旦计算完成,该计算就不能被重启或取消了,除非调用runAndReset()方法:(之前概述提到)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}