Future模式

Future模式是一种让调用线程可以异步处理任务的设计模式,它可以让调用线程利用异步处理的时间去执行其他任务,从而使硬件资源达到一个更好的利用率。参考以下时序图:

下面介绍一下各个成员的作用:

MainTest:主线程入口,调用Client发出请求。

Client:开启异步线程,并提交任务,返回FutureData。

FutureData:同步返回的对象,对RealData做了包装,提供了阻塞的getResult()方法。

RealData:异步任务执行的地方,完成后将结果返回给FutureData。

代码实现

Data

public interface Data {

	String getResult();
	
}

RealData

public class RealData implements Data {

	protected final String result;

	public RealData(String para) {
		// RealData构造可能很慢,这里使用sleep模拟
		StringBuffer sb = new StringBuffer("");
		for (int i = 0; i < 10; i++) {
			sb.append(para);
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
			}
		}
		result = sb.toString();
	}

	@Override
	public String getResult() {
		return result;
	}

}

FutureData


public class FutureData implements Data {
	
	protected RealData realData = null;// FutureData是RealData的包装
	protected boolean isReady = false;
	
	public synchronized void setRealData(RealData realData){
		if(isReady) {
			return ;
		}
		this.realData = realData;
		isReady = true;
		notifyAll();	// 通知getResult()
	}
	
	@Override
	public synchronized String getResult(){
		while(!isReady) {
			try {
				wait();	// 一直等待,直到RealData被注入
			} catch (Exception e) {
			}
		}
		return realData.result;
	}

}

Client

public class Client {

	public Data request(final String queryStr) {
		final FutureData future = new FutureData();
		new Thread() {// ReadData构建很慢,所以在单独的线程执行
			@Override
			public void run() {
				RealData realData = new RealData(queryStr);
				future.setRealData(realData);
			}
		}.start();
		return future;
	}

}

MainTest

public class MainTest {

	public static void main(String[] args) {
		Client client = new Client();
		// 下面会立即返回
		Data data = client.request("name");
		System.out.println("请求完毕");
		try {
			// 业务处理
			Thread.sleep(2000);
		} catch (Exception e) {
		}
		System.out.println("数据=" + data.getResult());
	}

}

JDK中的Future模式

JDK中的Future类和接口的结构如下所示:

从上面类图可以看出,我们普通会用Future接口来接返回的Future,但是其具体的实现是FutureTask。FutureTask是往线程池里面提交任务的封装,其中聚合了Callable的对象,下面具体的介绍一下。

Future接口

// 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。
// 此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。
// 参数:mayInterruptIfRunning - 如果应该中断执行此任务的线程,则为 true;否则允许正在运行的任务运行完成
// 返回:如果无法取消任务,则返回 false,这通常是由于它已经正常完成;否则返回 true
boolean cancel(boolean mayInterruptIfRunning);
// 如果在任务正常完成前将其取消,则返回 true。
boolean isCancelled();
// 如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
boolean isDone();
// 等待计算完成,然后获取其结果。
V get();
// 超时等待
V get(long timeout, TimeUnit unit);

FutureTask

// 当前FutureTask的状态
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

// 线程的封装
private Callable<V> callable;
// 要返回的数据
private Object outcome;
// 执行run()的线程
private volatile Thread runner;
// 存放调用get()等待的线程队列
private volatile WaitNode waiters;

对于FutureTask的流转状态,会有以下几种情况:

1、正常执行
NEW -> COMPLETING -> NORMAL
2、执行异常
NEW -> COMPLETING -> EXCEPTIONAL
3、不允许执行中的任务取消
NEW -> CANCELLED
4、允许执行中的取消
NEW -> INTERRUPTING -> INTERRUPTED

get()

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) // 如未完成,则进入循环等待,当前线程挂起
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {// 如被中断,则抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {// 如果完成,则返回完成状态
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 正在完成中,当前线程让出时间片
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {// 超时等待处理
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

从上面代码看来,在执行get()的时候,此时如果任务尚未完成,则会进入线程挂起状态。当run()方法执行完成之后,get()方法才会返回结果。