/ Java  

Java CompletableFuture

In modern software development, system functions are becoming more and more complex, and the method of managing complexity is to divide and conquer. Many functions of the system may be divided into small services, providing Web APIs for external development, deployment, and maintenance. For example, in an e-commerce system, there may be product services, order services, user services, recommendation services, preferential services, search services, etc. When displaying a page externally, multiple services may be called. There may be certain dependencies between calls, for example, to display a product page, you need to call the product service. You may also need to call the recommendation service to obtain other recommendations related to the product, and you may need to call the discount service to obtain the promotional offers related to the product. In order to call the preferential service, you may need to call the user service to obtain the user’s membership level.

In addition, modern software often depends on many third-party services, such as map services, SMS services, weather services, exchange rate services, etc. When implementing a specific function, you may need to access multiple such services, and there may be dependencies.

In order to improve performance and make full use of system resources, these calls to external services should generally be asynchronous and as concurrent as possible. Asynchronous task execution service, using ExecutorService can easily submit a single independent asynchronous task, you can easily obtain the result of the asynchronous task through the Future interface when needed, but for multiple asynchronous tasks, especially with certain dependencies, this support is not enough.

So, there comes CompletableFuture, which is a concrete class that implements two interfaces, one is Future and the other is CompletionStage. Future represents the result of an asynchronous task, and CompletionStage literally means the completion stage, multiple CompletionStage can be pipelined in combination, for one of the CompletionStage, it has a calculation task, but it may need to wait for one or more other stages to complete before it can start. After it is completed, it may trigger other stages to start running. CompletionStage provides a large number of methods. Using them, you can easily respond to task events, build a task pipeline, and implement combined asynchronous programming. How to use it? Below we will explain step by step, CompletableFuture is also a Future, let us first look at the similarities with Future.

Future is a class added by Java5 to describe the result of an asynchronous calculation. You can use the isDone() method to check whether the calculation is complete, or use get() to block the calling thread until the calculation is completed and return the result, or you can use the cancel() method to stop the execution of the task.

1
2
3
4
5
6
7
8
9
10
11
12
public class BasicFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> f = es.submit(() -> {
// long running asynchronous task
// ...
// return sth
return 100;
});
f.get();
}
}

Future and related methods provide the ability to execute tasks asynchronously, but it is inconvenient to obtain results, and the task results can only be obtained by blocking or polling. The blocking method is contrary to the asynchronous programming we understand, and polling consumes unnecessary CPU resources. Moreover, the calculation result cannot be obtained in time. Why can’t the observer design mode be used to notify the listener in time when the calculation result is completed?

In Java 8, a new class with about 50 methods has been added: CompletableFuture, which provides a very powerful extension of Future, which can help us simplify the complexity of asynchronous programming, provides the ability of functional programming, and the way to deal with the calculation results, and provides a method to convert and combine CompletableFuture.

Let’s take a look at its function below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CompletableFutureSimple {
public static void main(String[] args) {
CompletableFuture<Double> futurePrice = getPriceAsync();

//do anything you want, current thread is not blocked
System.out.println(111);

// task complete, call the callback without block subsequent operations
futurePrice.whenComplete((aDouble, throwable) -> {
System.out.println(aDouble);
//do something else
});

System.out.println(222);
}

static CompletableFuture<Double> getPriceAsync() {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();

new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
futurePrice.complete(23.55);
}).start();

return futurePrice;
}
}

getPriceAsync() is an asynchronous method. It returns a futurePrice immediately after the call. It is simulated as a time-consuming operation with Thread.sleep(5000). After the thread is executed, it sets futurePrice to the completion state and gives the result.

CompletableFuture‘s whenComplete() is also asynchronous, so we can see the output as follows:

1
2
3
111
222
23.55

Completion of calculations

The CompletableFuture class implements the CompletionStage and Future interfaces, so you can still get the results by blocking or polling as before, although this method is not recommended.

1
2
3
4
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()

getNow() is a bit special. If the result has been calculated, it will return the result or throw an exception, otherwise return the given valueIfAbsent value.

join() returns the result of the calculation or throws an unchecked exception(CompletionException), which is slightly different from get()‘s handling of the thrown exception.

Create CompletableFuture

CompletableFuture.completedFuture() is a static helper method used to return an already calculated CompletableFuture.

1
public static <U> CompletableFuture<U> completedFuture(U value)

The following four static methods are used to create a CompletableFuture object for an asynchronously executed code:

1
2
3
4
public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Methods that end in Async and do not specify Executor will use ForkJoinPool.commonPool() as its thread pool to execute asynchronous code.

The runAsync() method is also easy to understand. It takes the Runnable functional interface type as a parameter, so the CompletableFuture calculation result is empty.

The supplyAsync method takes the Supplier<U> functional interface type as a parameter, and the CompletableFuture calculation result type is U.

Because the parameter types of methods are all functional interfaces, lambda expressions can be used to implement asynchronous tasks, such as:

1
2
3
4
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// long running task
return "·00";
});

Processing when the calculation is completed

When the calculation of CompletableFuture is completed, or when an exception is thrown, we can execute a specific Action. The main methods are as follows:

1
2
3
4
5
6
7
8
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

We can see that the type of Action is BiConsumer<? Super T ,? super Throwable>, which can handle normal calculation results or abnormal exceptions.

The methods that do not end with Async means that Action uses the same thread to execute, and Async may use other threads to execute (if the same thread pool is used, it may also be selected for execution by the same thread).

Note that these methods will return CompletableFuture. When the Action is executed, its result will return to the original CompletableFuture calculation result or return an exception.

The exceptionally() method returns a new CompletableFuture. When the original CompletableFuture throws an exception, it will trigger the calculation of the CompletableFuture and call the function to calculate the value, otherwise if the original CompletableFuture is calculated normally, the new CompletableFuture is also calculated, The value is the same as the calculated value of the original CompletableFuture. So, this exceptionally() method is used to handle abnormal situations.

Although the following set of methods also returns a CompletableFuture object, the value of the object is different from the value calculated by the original CompletableFuture. When the original CompletableFuture value calculation is completed or an exception is thrown, the CompletableFuture object calculation is triggered, and the result is calculated by the BiFunction parameter. Therefore, this group of methods has both functions of whenComplete and conversion.

1
2
3
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

Similarly, the method that does not end with Async is calculated by the original thread, and the method that ends with Async is run by the default thread pool ForkJoinPool.commonPool() or the specified thread pool executor.

Conversion

CompletableFuture can be used as a monad and functor. Due to the implementation of the callback style, we do not have to block the calling thread just waiting for a calculation to complete, but rather tell CompletableFuture to perform a certain function when the calculation is completed. And we can also chain these operations together, or combine CompletableFuture.

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

The function of these methods is to pass the result to the function fn after the original CompletableFuture is calculated, and use the result of fn as the new CompletableFuture calculation result. Therefore, its function is equivalent to converting CompletableFuture<T> into CompletableFuture<U>.

The difference between these three functions is the same as described above. The method that does not end with Async is calculated by the original thread, and the method that ends with Async is run by the default thread pool ForkJoinPool.commonPool() or the specified thread pool executor. Java’s CompletableFuture class always follows this principle.

Example:

1
2
3
4
5
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);

CompletableFuture<String> conversion = future.thenApplyAsync(i -> i * 10).thenApply(Object::toString);

System.out.println(conversion.get());

It should be noted that these conversions are not executed immediately and will not block, but will continue to execute after the previous stage is completed.

The difference between them and the handle() method is that the handle() method will handle normal calculated values and exceptions, so it can shield exceptions and prevent them from being thrown. The thenApply() method is only used to handle normal values, so it will be thrown once there is an exception.

Pure consumption (execution Action)

The above methods are that when the calculation is completed, a new calculation result(thenApply(), handle()) will be generated, or the same calculation result is returned whenComplete(). CompletableFuture also provides a method of processing the result, only execute Action on the result, and not return new calculated value, so the calculated value is Void:

1
2
3
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

It is clear by looking at its parameter types. They are functional interface consumers. This interface has only inputs and no return values.

1
2
3
4
5
6
7
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});

CompletableFuture<Void> f = future.thenAccept(System.out::println);

System.out.println(f.get());

thenAcceptBoth() and related methods provide similar functions. When the two CompletionStage complete the calculation normally, the provided action will be executed, which is used to combine another asynchronous result.

runAfterBoth() is to execute a Runnable when both CompletionStage finishes the calculation normally. This Runnable does not use the calculation result.

1
2
3
4
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

Example:

1
2
3
4
5
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);

CompletableFuture<Void> f = future.thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y));

System.out.println(f.get()); // null

More thoroughly, the following set of methods will execute a Runnable when the calculation is complete. Unlike thenAccept(), Runnable does not use the results of CompletableFuture calculations.

1
2
3
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

The result of the previous CompletableFuture calculation is ignored, and this method returns an object of type CompletableFuture<Void>.

1
2
3
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Void> f = future.thenRun(() -> System.out.println("finished"));
System.out.println(f.get()); // null

A simple way to memorize the methods based on the type of parameters. Runnable type parameters will ignore the calculation results. Consumer is the pure consumption calculation result (not return value). BiConsumer will combine another CompletionStage. Function will convert the calculation result. BiFunction will combine another CompletionStage calculation result to perform conversion.

Compose

1
2
3
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

This set of methods accepts a Function as a parameter. The input of this Function is the current CompletableFuture calculation value, and the returned result will be a new CompletableFuture, which will combine the original CompletableFuture and the CompletableFuture returned by the function.

Remember, the objects returned by thenCompose() are not the objects returned by the function fn. If the original CompletableFuture has not been calculated, it will generate a new combined CompletableFuture.

1
2
3
4
5
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);

CompletableFuture<String> f = future.thenCompose( i -> CompletableFuture.supplyAsync(() -> (i * 10) + ""));

System.out.println(f.get()); //1000

The following set of methods thenCombine() is used to compound the results of another CompletionStage.

The two CompletionStage are executed in parallel, and there is no order of dependency between them. The other does not wait for the previous CompletableFuture to complete before executing.

1
2
3
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

In fact, in terms of functionality, their function is more similar to thenAcceptBoth(), except that thenAcceptBoth() is pure consumption, its function parameter has no return value, and thenCombine()‘s function parameter fn has a return value.

Either

thenAcceptBoth() and runAfterBoth() are calculated when both CompletableFutures are completed, and the method we want to understand below is when any CompletableFuture is calculated.

1
2
3
4
5
6
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

The acceptEither() method is that when any CompletionStage is completed, the action consumer will be executed. This method returns CompletableFuture<Void>

The applyToEither() method is when any CompletionStage is completed, fn will be executed, and its return value will be used as the calculation result of the new CompletableFuture<U>.

The following example sometimes outputs 100 and sometimes 200. Which Future is completed first will be calculated based on its result.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Random rand = new Random();

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});

CompletableFuture<String> f = future.applyToEither(future2,i -> i.toString());

CompletableFuture exception handling

If an exception occurs before setting CompletableFuture.complete(value), then get() or other callback functions like whenComplete() will wait indefinitely.

One method is to give a timeout when calling get(timeout), and get a TimeoutException if no result is obtained within the specified time. Another way is to spread the exception through completeExceptionally(ex) in the thread

Helper methods allOf() and anyOf()

1
2
public static CompletableFuture<Void>   allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

The allOf() method is to perform calculations after all CompletableFuture have been executed.

The anyOf() method is to execute a calculation after any CompletableFuture is executed, and the calculation result is the same.

anyOf() and applyToEither() are different. anyOf() accepts any number of CompletableFuture but applyToEither() only has two CompletableFuture. The calculation result of the return value of anyOf() is one of the CompletableFuture calculation parameters. The calculation result of the return value of applyToEither() is processed by fn.

References

  1. http://colobu.com/2016/02/29/Java-CompletableFuture/
  2. http://www.cnblogs.com/swiftma/p/7424185.html
  3. https://unmi.cc/java-8-completablefuture-brief-touch/