CompletableFuture用法

CompletableFuture是JDK1.8新增的工具类。它继承了Future和CompletionStage接口,里面提供了大量的链式调用的API。下面简单的介绍一下其用法:

异步执行任务

CompletableFuture对接口的封装,使得调用者很容易实现Future模式的异步调用。如下Demo:

public class CalcTest {
	
	public static Integer calc(Integer para) {
		try {
			Thread.sleep(1000);
		} catch (Exception e) {
		}
		return para * para;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> calc(50));
		System.out.println(future.get());
	}
}

首先CompletableFuture.supplyAsync()方法构造一个CompletableFuture对象返回。在supplyAsync()方法内部,会创建一个线程,然后执行传入的参数。这里面执行的就是calc(50)这个方法,通常情况下,这个执行体是比较耗时的,但是CompletableFuture的返回是立即的。但是当调用get()方法时,就会等待异步任务的执行完成。

在CompletableFuture中还有一些类似的方法:

// 传入一个函数接口(Supplier),例如lambda表达式的:() -> calc(50)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 比上面方法多一个Executor(线程池),所提交的任务会在executor中执行
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
// 传入一个Runnable线程异步执行
static CompletableFuture<Void> runAsync(Runnable runnable);
// 比上面方法多一个Executor(线程池),所提交的任务会在executor中执行
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

从上述提交异步任务的API看来,CompletableFuture可以构造(提交)Supplier和Runnable类型的异步处理任务。

流式调用

CompletableFuture除了提供上述异步执行之外,还提供了处理完任务之后要执行的东西。也就是说CompletableFuture可以通过链式调用,让异步任务串联起来。参见如下Demo:

public class CalcTest {
	
	public static Integer calc(Integer para) {
		try {
			Thread.sleep(1000);
		} catch (Exception e) {
		}
		return para * para;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50))
				.thenApply((i) -> Integer.toString(i))
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}
}

如上代码所示,创建完CompletableFuture对象之后,调用了thenApply()方法对上一个方法处理完成的结果进行处理。上一个异步任务处理完成的出参,就是下一个异步任务的入参,并最终把结果打印出来。

注意:当创建CompletableFuture的时候,如果没有传入Executor,那么默认情况下,异步任务会提交到ForkJoinPool.commonPool()的线程池中去执行。因为提交到这个线程池中的任务都会被设置成daemon模式,所以如果异步任务提交后,不调用get()方法等待的话,就很有可能造成异步任务没有执行完,系统就退出了。

异常处理

在使用CompletableFuture处理异步任务时,可以通过链式调用优雅的处理异常。如下所示:

public class CalcTest2 {
	
	public static Integer calc(Integer para) {
		return para / 0;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50))
				.exceptionally(ex -> {
					System.out.println(ex.toString());
					return 0;
				})
				.thenApply((i) -> Integer.toString(i))
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}
}

组合多个CompletableFuture

CompletableFuture可以在执行完成后,将结果通过Function传递给下一个CompletionStage进行处理。(Function接口返回的是新的CompletableFuture)如下:

public class CalcCompose {
	
	public static Integer calc(Integer para) {
		return para / 2;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50))
				.thenCompose((i) -> CompletableFuture.supplyAsync(() -> calc(i)) )
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}
}

另外一种组合的签名如下:

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn);

thenCombine()首先完成当前CompletableFuture和other的执行。然后将两者的执行结果传递给BiFunction,并返回一个合并后的CompletableFuture。如下所示:

public class CalcCombine {
	
	public static Integer calc(Integer para) {
		return para / 2;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Integer> fu1 = CompletableFuture.supplyAsync(() -> calc(50));
		CompletableFuture<Integer> fu2 = CompletableFuture.supplyAsync(() -> calc(25));
		
		CompletableFuture<Void> fu = fu1.thenCombine(fu2, (i, j) -> (i + j))
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}
}


参考:《Java高并发程序设计》