Future模式是一种让调用线程可以异步处理任务的设计模式,它可以让调用线程利用异步处理的时间去执行其他任务,从而使硬件资源达到一个更好的利用率。参考以下时序图:
下面介绍一下各个成员的作用:
MainTest:主线程入口,调用Client发出请求。
Client:开启异步线程,并提交任务,返回FutureData。
FutureData:同步返回的对象,对RealData做了包装,提供了阻塞的getResult()方法。
RealData:异步任务执行的地方,完成后将结果返回给FutureData。
代码实现
Data
public interface Data {
String getResult();
}
RealData
public class RealData implements Data {
protected final String result;
public RealData(String para) {
// RealData构造可能很慢,这里使用sleep模拟
StringBuffer sb = new StringBuffer("");
for (int i = 0; i < 10; i++) {
sb.append(para);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
result = sb.toString();
}
@Override
public String getResult() {
return result;
}
}
FutureData
public class FutureData implements Data {
protected RealData realData = null;// FutureData是RealData的包装
protected boolean isReady = false;
public synchronized void setRealData(RealData realData){
if(isReady) {
return ;
}
this.realData = realData;
isReady = true;
notifyAll(); // 通知getResult()
}
@Override
public synchronized String getResult(){
while(!isReady) {
try {
wait(); // 一直等待,直到RealData被注入
} catch (Exception e) {
}
}
return realData.result;
}
}
Client
public class Client {
public Data request(final String queryStr) {
final FutureData future = new FutureData();
new Thread() {// ReadData构建很慢,所以在单独的线程执行
@Override
public void run() {
RealData realData = new RealData(queryStr);
future.setRealData(realData);
}
}.start();
return future;
}
}
MainTest
public class MainTest {
public static void main(String[] args) {
Client client = new Client();
// 下面会立即返回
Data data = client.request("name");
System.out.println("请求完毕");
try {
// 业务处理
Thread.sleep(2000);
} catch (Exception e) {
}
System.out.println("数据=" + data.getResult());
}
}
JDK中的Future模式
JDK中的Future类和接口的结构如下所示:
从上面类图可以看出,我们普通会用Future接口来接返回的Future,但是其具体的实现是FutureTask。FutureTask是往线程池里面提交任务的封装,其中聚合了Callable的对象,下面具体的介绍一下。
Future接口
// 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。
// 此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。
// 参数:mayInterruptIfRunning - 如果应该中断执行此任务的线程,则为 true;否则允许正在运行的任务运行完成
// 返回:如果无法取消任务,则返回 false,这通常是由于它已经正常完成;否则返回 true
boolean cancel(boolean mayInterruptIfRunning);
// 如果在任务正常完成前将其取消,则返回 true。
boolean isCancelled();
// 如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
boolean isDone();
// 等待计算完成,然后获取其结果。
V get();
// 超时等待
V get(long timeout, TimeUnit unit);
FutureTask
// 当前FutureTask的状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 线程的封装
private Callable<V> callable;
// 要返回的数据
private Object outcome;
// 执行run()的线程
private volatile Thread runner;
// 存放调用get()等待的线程队列
private volatile WaitNode waiters;
对于FutureTask的流转状态,会有以下几种情况:
1、正常执行
NEW -> COMPLETING -> NORMAL
2、执行异常
NEW -> COMPLETING -> EXCEPTIONAL
3、不允许执行中的任务取消
NEW -> CANCELLED
4、允许执行中的取消
NEW -> INTERRUPTING -> INTERRUPTED
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如未完成,则进入循环等待,当前线程挂起
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {// 如被中断,则抛出异常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {// 如果完成,则返回完成状态
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 正在完成中,当前线程让出时间片
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {// 超时等待处理
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
从上面代码看来,在执行get()的时候,此时如果任务尚未完成,则会进入线程挂起状态。当run()方法执行完成之后,get()方法才会返回结果。