CompletableFuture组合式异步编程

CompletableFuture组合式异步编程

一. 引文

相比直接使用线程的方式,使用分支/合并框架和并行流能以更简单、更有效的方式来实现并行的效果。如今的互联网程序通常是混聚的方式:使用来自多个来源的内容,将这些内容聚合在一起,来实现功能需求。

典型的混聚式应用

我们在应用中可能需要和互联网上的多个Web服务通信,但我们并不希望因为等待某些服务的响应而阻塞应用程序的运行,浪费掉数十亿宝贵的CPU时钟周期。

如分支/合并框架和并行流这些工具可以帮我们实现并行处理:将一个操作切分为多个子操作,在多个不同的核、CPU甚至是机器上并行地执行这些子操作

如果你的意图是想实现并发,而不是并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够的忙碌从而最大化程序的吞吐量,那么你真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费掉宝贵的计算资源。Future接口,尤其是其新版实现CompletableFuture是处理这种情况的利器。

并发和并行

二. Future接口

Java 5时引入了Future接口,设计初衷是对将来某个时刻会发生的结果进行建模。它模拟了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。所以在执行Future接口中的耗时运算时,调用线程可以被解放出来做其他工作,而不是一直等待运算完成。

如下代码,使用Future以异步的方式执行一个耗时的操作。我们在需要异步操作结果才能继续运行时再调用get方法获取操作的结果,如果耗时操作不能像我们期待那样顺利的得出结果,也可以通过参数设定等待时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
//创建一个ExecutorService,可以通过它向线程池提交任务
ExecutorService executor = Executors.newCachedThreadPool();
//向ExecutorService提交一个Callable对象
Future<Double> future = executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
//以异步方式在新的线程中执行耗时的操作
return doSomeLongComputation();
}
});
//异步操作进行的同时可以做其他事情
doSomethingElse();

try {
//获取异步操作的结果,如果最终被阻塞,无法得到结果,则最多等待1秒钟后退出
Double result = future.get(1, TimeUnit.SECONDS);
}catch (ExecutionException ex){
//计算抛出一个异常
}catch (InterruptedException ex){
//当前线程在等待过程中被中断
}catch (TimeoutException ex){
//在Future对象完成之前已过期
}
}

使用Future以异步方式执行长时间操作

2.1 Future简介

可以参考Future接口,常见的线程池中的FutureTask实现

2.2 Future接口的局限性

Future接口提供了方法(isDone)来检测异步计算是否已经结束,等待异步操作结束,以及获取计算的结果。但这些特性并不能让我们编写足够简洁的并发代码,比如我们很难表述Future结果间的依赖性:“当长时间计算任务完成时,请将该计算的结果通知另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。

Future不能满足的一些需求:

  • 将两个异步计算合并为一个,两个异步计算间相互独立,同时第二个又依赖于第一个的结果;
  • 等待Future集合中的所有任务都完成;
  • 仅等待Future集合中最快的任务完成,并返回其结果;
  • 仅通过编程方式完成一个Future任务的执行,以手工设定异步操作结果的方式;
  • 应对Future的完成事件,即当Future的完成事件发生时会收到通知,并能用Future计算的结果进行下一步的操作,而不是简单的阻塞等待结果。

三. 实现异步API

3.1 使用CompletableFuture构建异步应用

假设一个需求:最佳价格查询器,需要查询多个在线商店,依据给定的产品或服务找出最低的价格

  1. 学会如何为客户提供异步API
  2. 掌握如何让使用了同步API的代码变为非阻塞代码。了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。
  3. 学会如何以响应式的方式处理异步操作的完成事件

同步API和异步API?

  • 同步API:同步API是对传统方法调用的称呼,调用方会在被调用方运行时等待,被调用方运行结束返回,调用方取得返回值继续运行。即使两者在不同的线程中运行,调用方还是要等被调用方结束运行,这就是阻塞式调用
  • 异步API:异步API则会直接返回,或者至少在被调用方法计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的,这种方式是非阻塞式调用。执行剩余计算的线程会把计算结果返回给调用方,返回的方式要么是通过回调函数,要么是通过调用方再执行一个“等待直到计算完成”的方法调用。

3.2 同步API

我们首先根据需求用传统方式实现API,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Shop {
/**
* 获取价格
* @param product 产品
* @return 价格
*/
public double getPrice(String product){
//需要查询商店的数据库
//可能要联系其他外部服务,比如商店的供应商,和制造商相关的推广折扣
//ToDo
return calculatePrice(product);
}

/**
* 计算获取价格
* @param product 产品
* @return 价格
*/
public double calculatePrice(String product){
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}

/**
* 模拟耗时操作,1s延迟
*/
public static void delay(){
try{
Thread.sleep(1000L);
}catch (InterruptedException ex){
throw new RuntimeException();
}
}
}

3.3 将同步方法转换为异步方法

我们通过CompletableFuture来实现这一转换,如下代码。创建一个代表异步计算的CompletableFuture实例,它在计算完成时会包含计算的结果,接着调用fork创建了另一个线程去执行实际的价格计算工作,不等待计算完成,直接返回一个Future实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 获取价格—异步
* @param product 产品
* @return 价格-暂不可知
*/
public Future<Double> getPriceAsync(String product){
//创建CompletableFuture对象,它会包含计算的结果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//开启一个新的线程来执行计算
new Thread(() -> {
double price = calculatePrice(product);
//设置Future的返回值
futurePrice.complete(price);
}).start();
//无需等待还未结束的计算,直接返回Future<Double>对象
return futurePrice;
}

我们尝试调用异步API,代码如下。用户向商店查询了某种商品的价格,商店实现了异步API,所以直接取到了Future返回。在等待结果的过程中,用户可以做一些其他操作,等到客户要执行的操作依赖于商品价格时,再调用 futurePrice.get() 。执行此操作后,客户要么获得Future封装的值,要么发生阻塞直到异步任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Test {
public static void main(String[] args) {
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime + " msecs");

//执行更多任务,比如查询其他商店
doSomethingElse();
try {
double price = futurePrice.get();
System.out.printf("Price is %.2f%n",price);
}catch (Exception ex){
throw new RuntimeException(ex);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
}

public static void doSomethingElse(){
try{
System.out.println("doSomethingElse");
Thread.sleep(1000L);
}catch (InterruptedException ex){
throw new RuntimeException();
}
}
}

执行代码结果如下,异步API的返回远远早于价格计算完成的时间。

1
2
3
4
Invocation returned after 182 msecs
doSomethingElse
Price is 155.07
Price returned after 1224 msecs

3.4 错误处理

如果上述代码在计算过程中发生了错误呢?用于提示错误的异常会被限制在视图计算商品价格的当前线程内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久的封闭。

我们可以调用get方法的重载版本设置一个等待时间,防止永久的等待。但这种方法我们无法得知计算时到底发生了什么问题,如果我们想让客户端能了解商店无法提供请求商品价格的原因,可以使用CompletableFuture的completeExceptionally方法,将导致CompletableFuture发生问题的异常抛出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//设置一个等待时间
double price = futurePrice.get(1, TimeUnit.SECONDS);

public Future<Double> getPriceAsync(String product){
//创建CompletableFuture对象,它会包含计算的结果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//开启一个新的线程来执行计算
new Thread(() -> {
try{
double price = calculatePrice(product);
//设置Future的返回值
futurePrice.complete(price);
}catch (Exception ex){
//抛出导致失败的异常,完成此次Future操作
futurePrice.completeExceptionally(ex);
}
}).start();
//无需等待还未结束的计算,直接返回Future<Double>对象
return futurePrice;
}

3.5 通过工厂方法supplyAsync优化CompletableFuture

CompletableFuture提供了很多精巧的工厂方法来帮助我们更容易的实现流程,还不用操心实现的细节,可以重写getPriceAsync方法如下。此实现和上述是等价的,也具有错误处理机制。

1
2
3
public Future<Double> getPriceAsync(String product){
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync方法接受一个Supplier参数,返回CompletableFuture对象,此对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行。重载版本添加了参数Executor,表示你可以自行指定执行线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}

3.6 避免阻塞

如果我们无法控制API的实现,可能最终拿到的API都是同步阻塞式的方法,那么要如何以异步的方式来查询多个商店,避免被单一的请求阻塞?

假设我们需要根据产品名查询多个商店中对应的价格,首先可以试一下用Stream来实现这一需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));

/**
* 查询产品商店名和价格
* @param product 产品
* @return
*/
public static List<String> findPrices(String product){
return shops.stream()
.map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))
.collect(Collectors.toList());
}

测试一下findPrices性能,运行大概需要4s,对四个商店的查询是顺序执行的,一个查询操作会阻塞另一个,而每个查询大概都需要1s的时间计算产品价格。

1
2
3
4
5
6
7
8
9
long start = System.nanoTime();
List<String> result = findPrices("myPhone27S");
System.out.println(result);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");

//输出如下
[BestPrice price is 188.28, LetsSaveBig price is 207.11, MyFavoriteShop price is 223.33, BuyItAll price is 165.98]
Done in 4152 msecs

试着用并行流进行优化,避免顺序计算,结果总耗时降低到1s多,效果还是很明显的。

1
2
3
4
5
6
7
8
9
public static List<String> findPrices(String product){
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))
.collect(Collectors.toList());
}

//输出如下
[BestPrice price is 178.36, LetsSaveBig price is 162.40, MyFavoriteShop price is 160.91, BuyItAll price is 214.49]
Done in 1127 msecs

再试着用CompletableFuture将findPrices方法中对不同商店的同步调用替换为异步调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static List<String> findPricesAsync(String product){
//通过CompletableFuture以异步方式计算每种商品的价格
List<CompletableFuture<String>> priceFuture =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))
)).collect(Collectors.toList());
//等待所有异步操作结束,join方法与Future.get()含义相同,但不会抛出检测的异常
return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

//输出如下
[BestPrice price is 137.63, LetsSaveBig price is 128.50, MyFavoriteShop price is 146.31, BuyItAll price is 193.07]
Done in 1193 msecs

我们使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上链接两个map操作,考虑流操作之间的延迟性,如果在单一流水线中处理流,发像不同商家的请求只能以同步、顺序执行的方式才会成功。所以单一流水线时每个CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作,通知join方法返回计算结果。

下图显式了为什么Stream的延迟特性会引起顺序执行,以及如何避免。

为什么Stream的延迟特性会引起顺序执行,以及如何避免

使用CompletableFuture后的效率并不比并行流要高,究竟是因为什么?首先并行流能提高效率是因为它能并行的执行四个任务,那如果我们把商家增加到线程无法一一分配的数量呢?我在加到8个的时候还能保持在1s,但如果商家再多时间就会马上增加一倍。多出的商家只能等前面任务结束才能执行。如果是CompletableFuture版本呢?结果也会在达到线程上限时成倍增加耗时。

1
2
Done in 1191 msecs //8个商家
Done in 2131 msecs //9个商家

两种方法都采用同样的通用线程池,具体线程数取决于Runtime.getRuntime().availableProcessors()。CompletableFuture的优势在于允许对执行其(Executor)进行配置,特别是线程池的大小,这是并行流所无法提供的。

3.7 定制执行器

线程数量如何选择是一个问题,这里不作扩展,在并发编程部分再作整理。我们先用商店数目来作线程数,然后给一个上限以防服务器超负荷崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//创建一个线程数为100以及商店数目中较小数字数量的线程池
private final Executor executor = Executors.newFixedThreadPool(
Math.min(shops.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
//使用守护线程,不会阻止程序的关停
t.setDaemon(true);
return t;
}
}
);

List<CompletableFuture<String>> priceFuture =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " + shop.getPrice(product),executor)
).collect(Collectors.toList());

//输出如下
[BestPrice price is 159.6635803338069, LetsSaveBig price is 211.5815957999706, MyFavoriteShop price is 163.89033905158914, BuyItAll price is 207.90324683524352, aaa price is 228.41628707718928, bbb price is 217.44062760239154, ccc price is 228.53941558431086, ddd price is 124.72630278636122, ddd price is 207.93080231005356]
Done in 1074 msecs

经过线程优化后,耗时又降低到了1s。

怎样选择流和CompletableFuture?

  • 如果要进行的是计算密集型操作,且没有I/O,推荐使用流。因为实现简单,效率也可能是最高的,如果所有线程都是计算密集型,就没有必要创建大于CPU核心数的线程了。
  • 如果并行工作的单元还涉及到等待I/O的操作,则选择CompletableFuture灵活性会更好,还有一个原因是并行流不能清楚了判断触发等待的原因。

3.8 对多个异步任务进行流水线操作

3.8.1 实现折扣服务

我们假设所以商店都同意使用一个集中式的折扣服务,总共有五种不同的折扣代码,对应不同的折扣率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 折扣
*/
public class Discount {
public enum Code{
NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20);

private final int percentage;

Code(int percentage) {
this.percentage = percentage;
}
}

public static String applyDiscount(Quote quote){
//将折扣代码应用于商品原价
return quote.getShopName() + " price is " +
Discount.apply(quote.getPrice(),quote.getDiscountCode());
}

private static double apply(double price, Code code){
//模拟Discount服务的响应延迟
delay();
return price * (100 - code.percentage) / 100;
}
}

然后实现getPrice方法增加了返回随机折扣价格,更新了所有商家的价格返回格式。

1
2
3
4
5
6
7
8
9
10
/**
* 获取商品以及随机折扣
* @param product 产品
* @return 价格
*/
public String getPrice(String product){
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s",name,price,code);
}

实现了Quote类对新的返回格式字符串进行解析,静态工厂方法parse得到实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 解析商店返回字符串
*/
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;

public static Quote parse(String s){
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName,price,discountCode);
}
......
}

3.8.2 构造同步和异步操作

然后就是使用这些方法了,首先我们用最直接的顺序执行方式,实现getPrices方法。我们运行并观察性能,结果耗时10s,其中顺序查询5个商店耗时5s,又加上了Discount服务为5个商店返回价格申请折扣消耗5s。

1
2
3
4
5
6
7
8
9
10
11
12
public List<String> getPrices(String product){
return shops.stream().map(shop -> shop.getPrice(product))
//通过Quote对shop返回字符串进行解析
.map(Quote::parse)
//调用applyDiscount方法为每个Quote申请折扣
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}

//输出如下
[BestPrice price is 118.42199999999998, LetsSaveBig price is 166.13099999999997, MyFavoriteShop price is 136.0765, BuyItAll price is 166.00799999999998, ShopEasy price is 195.2365]
Done in 10088 msecs

直接转为并行流,耗时下降到2s。

1
Done in 2116 msecs

构造同步操作和异步任务

通过CompletableFuture以异步方式重新实现getPrices方法。大概流程:shop对象——(supplyAsync)——>shop.getPrice——(thenApply)——>new Quote——(thenCompose)——>applyDiscount——(join)——>price对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static List<String> getPricesAsync(String product){
List<CompletableFuture<String>> priceFuture =
shops.stream()
//以异步的方式取得每个shop中指定产品的价格
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product),executor))
//当Quote对象存在时,对返回值进行转换
.map(future -> future.thenApply(Quote::parse))
//使用另一个异步任务构造期望的Future,申请折扣
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote),executor)))
.collect(Collectors.toList());
//等待流中所有Future执行完毕,提取各自返回值
return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

获取价格,第一步以异步方式对shop查询,获得 Stream<CompletableFuture<String>> ,当操作结束每个CompletableFuture对象都包含着对应shop返回的字符串。

解析报价,第二步将字符串转为订单,因为解析过程一般不涉及远程服务或I/O操作,几乎都会在第一时间进行,可以采用同步操作。当CompletableFuture最终结束运行时,传递Lambda表达式给thenApply,将流中的每个 CompletableFuture<String> 转换为 CompletableFuture<Quote>

为计算折扣价格构造Future,第三步涉及到远程服务Discount,为从商店中获取的原价申请折扣率。这一步需要远程执行,所以应该异步执行。所以我们像第一步那样调用supplyAsync。最终实现了两步异步操作,用了两个CompletableFuture对象进行建模。我们想要把这两步操作以级联的方式串接起来工作。

从shop对象获取价格,把价格转换为Quote,拿到返回的Quote对象,参数传递给Discount服务,取得最终折扣价格。thenCompose方法允许我们对两个异步操作进行流水线,第一个操作完成时将结果作为参数传递给第二个操作。这样即使Future在向不同的商店收集报价时,主线程还能继续执行其他重要操作。thenCompose也有Async命名结尾版本,会为后续任务新开一个线程。这里选择thenCompose的原因是更高效一些,减少了线程切换的开销。

最后一步收集后,我们可以得到 List<CompletableFuture<String>> ,等待CompletableFuture对象都执行完毕,利用join方法取得返回值。

3.8.3 将两个CompletableFuture对象整合起来,无论它们是否存在依赖

getPricesAsync方法中我们对CompletableFuture对象调用了thenCompose,并传递了第二个CompletableFuture,第二个需要第一个的执行结果作为输入。有的时候我们需要将两个完全不相干的CompletableFuture对象的结果整合起来,且不希望二者是顺序执行。

thenCombine方法接受参数CompletionStage和BiFunction,BiFunction定义了两个CompletableFuture完成计算后如何合并。Async版本会把合并操作提交到线程池,由另一个任务异步执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}

public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}

private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.biApply(this, b, f, null)) {
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}

假设我们需要实现汇率转换的功能,以异步的方式来进行商店价格查询和远程的汇率查询,最后将二者结果进行合并,因为只是相乘合并,所以用Async版本新开线程有些浪费资源。

1
2
3
4
5
6
7
8
9
10
11
public Future<Double> a(Shop shop,String product){
Future<Double> futurePriceUSD =
CompletableFuture.supplyAsync(
() -> shop.getPrice(product)
).thenCombine(
CompletableFuture.supplyAsync(
() -> ExchangeService.getRate(ExchangeService.Money.EUR, ExchangeService.Money.USD)
), (price, rate) -> price * rate
);
return futurePriceUSD;
}

合并两个相互独立的异步任务

3.8.4 回顾Future和CompletableFuture

我们通过实践可以感受到CompletableFuture相比Future的巨大优势,CompletableFuture可以利用Lambda表达式以声明式的API用最简单有效的方式吧多个同步或一部操作结合起来。

如果我们只用Future来实现上个汇率转换会如何?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Future<Double> converterFuture(Shop shop,String product){
//创建ExecutorService将任务提交到线程池
ExecutorService executor = Executors.newCachedThreadPool();
//创建一个查询汇率转换的Future
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
return ExchangeService.getRate(ExchangeService.Money.EUR, ExchangeService.Money.USD);
}
});
Future<Double> futurePriceUSD = executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
double priceInEUR = shop.getPrice(product);
return priceInEUR * futureRate.get();
}
});
return futurePriceUSD;
}

对比两种实现,就可以对CompletableFuture的优势有直观感受。

3.9 响应CompletableFuture的completion事件

我们模拟远程服务调用都是通过1s的等待,但实际网络环境中延迟区别很大,可能因为服务器负荷、网络延迟等等。我们目前对价格的查询需要取得所有商店返回值才能显示,但实际上我们可能想要每个商店只要返回价格就第一时间显示返回值。

1
2
3
4
5
6
7
8
9
10
11
12
private static final Random random = new Random();
/**
* 模拟耗时操作,0.5s - 2.5s随机延迟
*/
public static void randomDelay(){
int delay = 500 + random.nextInt(2000);
try{
Thread.sleep(delay);
}catch (InterruptedException ex){
throw new RuntimeException();
}
}

所以首先我们不能用List来存放所有价格查询结果,应该直接处理CompletableFuture流,每个CompletableFuture对象都在为某个商店执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 重构findPrices方法返回一个Future构成的流
* @param product
* @return
*/
public Stream<CompletableFuture<String>> findPricesStream(String product){
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product),executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote),executor
)));
}

接下来我们要为CompletableFuture注册一个操作,此操作需要在CompletableFuture完成执行后使用其返回值。thenAccept方法接收CompletableFuture执行完毕后的返回值做参数。

1
findPricesStream("myPhone27S").map(f -> f.thenAccept(System.out::println));

当CompletableFuture计算结束,返回一个 CompletableFuture<Void> ,所以map方法返回 Stream<CompletableFuture<Void>> ,最终的返回我们能做的事情很有限,只能等待其运行结束。可能我们还希望让最慢的商店也可以打印价格,可以把Stream的所有 CompletableFuture<Void> 放到一个数组中,等待所有的任务执行完成。

1
2
3
4
5
6
CompletableFuture[] futures = 
findPricesStream("myPhone27S")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
//allOf方法接收一个由CompletableFuture构成的数组,数组对象执行完毕后,返回一个CompletableFuture<Void>对象
CompletableFuture.allOf(futures).join();

如果希望在有任何一个执行完就不再等待,比如查询多个汇率服务器,只要得到一个就满足需求,可以使用工厂方法anyOf,接收一个由CompletableFuture构成的数组,数组对象执行完毕后,返回一个 CompletableFuture<Object> 对象。


参考:

🔗 《Java8实战》