J.U.C之FutureTask

概述

由于线程是实现了Runnable接口或继承了Thread类,其执行后无法回调其线程的执行结果,因此JDK 1.5提供了一些新机制:接口Callable和Future。通过他们可以得到线程的执行结果。

FutureTask间接实现了Future接口,并提供了一个基础的实现:开始/取消一个计算的方法,计算结果的查询。FutureTask可以被用来去包装一个Callable或Runnable对象,并可以被提交给Executor执行器去执行。除了被用作一个独立的服务类,它还提供了protected的功能,在定制任务类时会十分有用。(好吧,被你发现了,这是我翻译的源码中的类注释,哭唧唧~)

Runnable与Callable对比

  1. Runnable接口只有一个run()方法;
  2. Callable是一个泛型接口,其中也只有V call()函数,其返回类型即传入的参数泛型。
  3. 两个接口功能相似,但后者因可以返回执行信息且支持泛型而更强大一些。

Future接口

通过实现该类可以得到一个异步计算的返回结果。并提供方法接口:检查是否计算完成、等待计算的完成、取回计算的结果、判断计算是否在完成前被正常地取消。

FutureTask类

它实现了RunnableFuture接口,而RunnableFuture接口同时继承了Runnable接口和Future接口。即FutureTask类最终也是执行的Callable的方法。组合Runnable和Future的好处:可以另起线程去专门检查并调取最终计算的结果,而其他的线程可以继续其他任务(只需监听该跑腿线程即可)。

演示例子-Callable和Future结合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class FutureExample {

static class MyCallable implements Callable<String> {

@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
运行结果
1
2
3
11:29:00.230 [main] INFO com.mmall.concurrency.aqs.FutureExample - do something in main
11:29:00.230 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.FutureExample - do something in callable
11:29:05.234 [main] INFO com.mmall.concurrency.aqs.FutureExample - resultDone
例子分析
  1. 结合Callable和Future两者。
  2. 实现的Callable类可以被Executors的submit(xxx)提交给线程池运行。(之前提到过)
  3. 通过Future的get()查询并取回执行结果;若未执行结束则阻塞查询。(前面有说到)

演示例子-FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class FutureTaskExample {

public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});

new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}
运行结果

与上个例子结果一模一样,运行速度快了25%左右:Callable和Future结合使用耗时2s 450ms,FutureTask耗时1s 890ms。

看下源码

get()方法的内核:返回执行完成的结果或抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

还有一个可以指定返回执行结果的构造函数:(内部都是调用Executors的callable方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

还有一个指定超时时间的get():

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

当计算失败时(不包括计算被取消)被内部的run()调用:报告其中的异常

1
2
3
4
5
6
7
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

一旦计算完成,该计算就不能被重启或取消了,除非调用runAndReset()方法:(之前概述提到)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

SupriseMF wechat
欢迎关注微信订阅号【星球码】,分享学习编程奇淫巧技~
喜欢就支持我呀(*^∇^*)~

热评文章