CompletableFuture 超时处理

1 背景

有时候,一个异步任务会分为多个阶段,一个阶段完成并拿到结果后才能执行后续阶段。如果用 FutureTask 来处理这种异步任务,则每个阶段完成的时候都要在主线程里通过调用 get 方法来阻塞获取(包括超时控制),降低了执行效率。

CompletableFuture 提供了流式操作,通过提供回调函数让这个任务一次性地完成,最后再通过 get 或 join 获取最终结果。同时,对于每个阶段的超时控制,可以将超时控制逻辑由主线程移动到单独的 ScheduledExecutorService 线程池进行处理,达到超时异步处理的效果。

2 CompletableFuture 中关键的字段和方法简介

2.1 result

保存计算结果,一般取值有三种类型:

  • null
  • AltResult
  • 其他

在没有其他操作的情况下(如 complete 等),如果一个 CompletableFuture 没有异步任务或者任务还未完成,则 result 为 null。

在没有其他操作的情况下,如果一个任务正常完成,则 result 的类型为传入的类型。

如果中间有异常发生,或者调用 complete、completeExceptionally 等操作,则 result 的类型为 AltResult。

2.2 UNSAFE.compareAndSwapObject

在 CompletableFuture 中,基本上很多操作都会使用 UNSAFE.compareAndSwapObject 方法(一种 CAS 操作)来更新 result。比如 supplyAsync、complete、completeExceptionally、applyToEither、acceptEither 等。

UNSAFE.compareAndSwapObject(Object obj, long offset, Object expect, Object update) 是一种内存级别的操作,会先找到 obj 中偏移量为 offset 的字段,然后比较该字段的值是否和 expect 的值相同。如果相同,则将其更新为 updaet 对应的值;否则不进行任何操作。

3 不存在异步任务的 CompletableFuture

1
2
3
4
CompletableFuture<Void> promise = new CompletableFuture<>();

// 永久阻塞.
promise.join();

由于该 completableFuture 中并没有异步任务,因此 completableFuture 中的 result 始终为 null(始终执行不完)。在调用 get 或类似获取结果的操作时,会永久阻塞。

4 complete

其内部实现如下:

1
2
3
4
5
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
1
2
3
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);
}

可以看出,complete 用于给 result 为 null 的任务注册一个返回结果。

当进行 join 或其他相关获取结果的操作时:

  • 如果任务并未执行完成,且此前已经通过 complete 对该任务注册了一个返回结果,则不会发生阻塞而立即返回 complete 指定的结果。
  • 如果任务并未执行完成,则会发生阻塞。如果此后的某一个时刻对该任务注册了一个返回结果,则会立即返回 complete 指定的结果。
  • 如果任务已经执行完成,则不会发生阻塞。无论此前是否用 complete 注册了返回结果,都会立即返回任务计算结果。

从 complete 的实现也可以看出,对于某个任务,complete 操作只能调用一次,后续的所有 complete 都无效。

4.1 获取结果时任务未完成 (提前注册 complete)

1
2
3
4
5
CompletableFuture<String> cf = new CompletableFuture<>();

cf.complete("default value");
// 不会阻塞, 立即返回.
System.out.println(cf.join());

out:

1
default value

4.2 获取结果时任务未完成 (后注册 complete)

1
2
3
4
5
6
7
8
9
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("scheduler-%d").build());

CompletableFuture<String> cf = new CompletableFuture<>();
// 延后 3 秒注册.
scheduler.schedule(()
-> cf.complete("delay value"), 3000, MILLISECONDS);

System.out.println("joining...");
System.out.println(cf.join());
1
2
joining...
delay value

4.3 获取结果时任务已完成

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello world");
// 保证任务在执行后续操作前已经完成.
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

cf.complete("default value");
// 由于任务在此之前已经完成, 因此并不会返回 complete 指定的值.
System.out.println(cf.join());

out:

1
hello world

5 completeExceptionally

其内部实现如下:

1
2
3
4
5
6
public boolean completeExceptionally(Throwable ex) {
if (ex == null) throw new NullPointerException();
boolean triggered = internalComplete(new AltResult(ex));
postComplete();
return triggered;
}
1
2
3
final boolean internalComplete(Object r) { // CAS from null to r
return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
}

可以看出,completeExceptionally 用于给 result 为 null 的任务注册一个异常。

当进行 join 或其他相关获取结果的操作时:

  • 如果任务并未执行完成,且此前已经通过 completeExceptionally 对该任务注册了一个异常,则不会发生阻塞而立即抛出 completeExceptionally 指定的异常。
  • 如果任务并未执行完成,则会发生阻塞。如果此后的某一个时刻对该任务注册了一个异常,则会立即抛出 completeExceptionally 指定的异常。
  • 如果任务已经执行完成,则不会发生阻塞。无论此前是否用 completeExceptionally 注册了异常,都会立即返回任务计算结果。

同样,对于某个任务,completeExceptionally 操作只能调用一次,后续的所有 completeExceptionally 会被忽略掉。

5.1 获取结果时任务未完成 (提前注册 completeExceptionally)

1
2
3
4
5
CompletableFuture<String> cf = new CompletableFuture<>();

cf.completeExceptionally(new TimeoutException("Timeout"));
// 抛出异常.
System.out.println(cf.join());

out:

1
2
3
4
5
6
7
8
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Timeout

at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
at com.netease.music.ad.CompletableFutureTest.TestCF(CompletableFutureTest.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

5.2 获取结果时任务未完成 (后注册 completeExceptionally)

1
2
3
4
5
6
7
8
9
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("scheduler-%d").build());

CompletableFuture<String> cf = new CompletableFuture<>();
// 延后 3 秒注册.
scheduler.schedule(()
-> cf.completeExceptionally(new TimeoutException("Timeout")), 3000, MILLISECONDS);

System.out.println("joining...");
System.out.println(cf.join());

out:

1
2
3
4
5
6
7
8
9
joining
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Timeout

at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
at com.netease.music.ad.CompletableFutureTest.TestCF(CompletableFutureTest.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

5.3 获取结果时任务已完成

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello world");
// 保证任务在执行后续操作前已经完成.
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

cf.completeExceptionally(new TimeoutException("Timeout"));
// 由于任务在此之前已经执行完成, 因此并不会抛出异常.
System.out.println(cf.join());

out:

1
hello world

6 基于 ScheduledExecutorService 和 completeExceptionally 的超时处理

1
2
3
4
5
6
7
8
9
10
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("scheduler-%d").build());

public static <T> CompletableFuture<T> failAfter(Duration duration) {
final CompletableFuture<T> promise = new CompletableFuture<>();
scheduler.schedule(() -> {
final TimeoutException ex = new TimeoutException("Timeout after " + duration);
return promise.completeExceptionally(ex);
}, duration.toMillis(), MILLISECONDS);
return promise;
}

该方法会返回一个 completableFuture。首先会通过 ScheduledExecutorService 开启一个延时线程池,每个延时线程会在 duration 对应时间后生成 result (一个包装了超时异常的 AltResult)。基于该方法,就可以实现超时处理了,如下所示:

1
2
3
4
public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) {
final CompletableFuture<T> timeout = failAfter(duration);
return future.applyToEither(timeout, Function.identity());
}

该方法基于 applyToEither 返回一个新的 completableFuture. 如果传入的 future 在 duration 对应时间内没有返回结果(此时 timeout 任务已经有了结果), 则返回的新的 completableFuture 的结果为 timeout 中的结果(一个包装了超时异常的 AltResult)。当对该 completableFuture 调用 join 等获取结果的操作时,会抛出异常,从而达到超时控制的目的。


Reference