In modern software development, system functions are becoming more and more complex, and the method of managing complexity is to divide and conquer. Many functions of the system may be divided into small services, providing Web APIs for external development, deployment, and maintenance. For example, in an e-commerce system, there may be product services, order services, user services, recommendation services, preferential services, search services, etc. When displaying a page externally, multiple services may be called. There may be certain dependencies between calls, for example, to display a product page, you need to call the product service. You may also need to call the recommendation service to obtain other recommendations related to the product, and you may need to call the discount service to obtain the promotional offers related to the product. In order to call the preferential service, you may need to call the user service to obtain the user’s membership level.
In addition, modern software often depends on many third-party services, such as map services, SMS services, weather services, exchange rate services, etc. When implementing a specific function, you may need to access multiple such services, and there may be dependencies.
In order to improve performance and make full use of system resources, these calls to external services should generally be asynchronous
and as concurrent
as possible. Asynchronous task execution service, using ExecutorService
can easily submit a single independent asynchronous task, you can easily obtain the result of the asynchronous task through the Future
interface when needed, but for multiple asynchronous tasks, especially with certain dependencies, this support is not enough.
So, there comes CompletableFuture
, which is a concrete class that implements two interfaces, one is Future
and the other is CompletionStage
. Future
represents the result of an asynchronous task, and CompletionStage
literally means the completion stage, multiple CompletionStage
can be pipelined in combination, for one of the CompletionStage
, it has a calculation task, but it may need to wait for one or more other stages to complete before it can start. After it is completed, it may trigger other stages to start running. CompletionStage
provides a large number of methods. Using them, you can easily respond to task events, build a task pipeline, and implement combined asynchronous programming. How to use it? Below we will explain step by step, CompletableFuture
is also a Future
, let us first look at the similarities with Future
.
Future
is a class added by Java5 to describe the result of an asynchronous calculation. You can use the isDone()
method to check whether the calculation is complete, or use get()
to block the calling thread until the calculation is completed and return the result, or you can use the cancel()
method to stop the execution of the task.
1 | public class BasicFuture { |
Future
and related methods provide the ability to execute tasks asynchronously, but it is inconvenient to obtain results, and the task results can only be obtained by blocking or polling. The blocking method is contrary to the asynchronous programming we understand, and polling consumes unnecessary CPU resources. Moreover, the calculation result cannot be obtained in time. Why can’t the observer design mode be used to notify the listener in time when the calculation result is completed?
In Java 8, a new class with about 50 methods has been added: CompletableFuture, which provides a very powerful extension of Future
, which can help us simplify the complexity of asynchronous programming, provides the ability of functional programming, and the way to deal with the calculation results, and provides a method to convert and combine CompletableFuture
.
Let’s take a look at its function below.
1 | public class CompletableFutureSimple { |
getPriceAsync()
is an asynchronous method. It returns a futurePrice
immediately after the call. It is simulated as a time-consuming operation with Thread.sleep(5000)
. After the thread is executed, it sets futurePrice
to the completion state and gives the result.
CompletableFuture
‘s whenComplete()
is also asynchronous, so we can see the output as follows:
1 | 111 |
Completion of calculations
The CompletableFuture
class implements the CompletionStage and Future interfaces, so you can still get the results by blocking or polling as before, although this method is not recommended.
1 | public T get() |
getNow()
is a bit special. If the result has been calculated, it will return the result or throw an exception, otherwise return the given valueIfAbsent
value.
join()
returns the result of the calculation or throws an unchecked exception(CompletionException
), which is slightly different from get()
‘s handling of the thrown exception.
Create CompletableFuture
CompletableFuture.completedFuture()
is a static helper method used to return an already calculated CompletableFuture
.
1 | public static <U> CompletableFuture<U> completedFuture(U value) |
The following four static methods are used to create a CompletableFuture
object for an asynchronously executed code:
1 | public static CompletableFuture<Void> runAsync(Runnable runnable) |
Methods that end in Async
and do not specify Executor
will use ForkJoinPool.commonPool()
as its thread pool to execute asynchronous code.
The runAsync()
method is also easy to understand. It takes the Runnable
functional interface type as a parameter, so the CompletableFuture
calculation result is empty.
The supplyAsync
method takes the Supplier<U>
functional interface type as a parameter, and the CompletableFuture
calculation result type is U
.
Because the parameter types of methods are all functional interfaces, lambda
expressions can be used to implement asynchronous tasks, such as:
1 | CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { |
Processing when the calculation is completed
When the calculation of CompletableFuture
is completed, or when an exception is thrown, we can execute a specific Action
. The main methods are as follows:
1 | public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) |
We can see that the type of Action
is BiConsumer<? Super T ,? super Throwable>
, which can handle normal calculation results or abnormal exceptions.
The methods that do not end with Async
means that Action
uses the same thread to execute, and Async
may use other threads to execute (if the same thread pool is used, it may also be selected for execution by the same thread).
Note that these methods will return CompletableFuture
. When the Action
is executed, its result will return to the original CompletableFuture
calculation result or return an exception.
The exceptionally()
method returns a new CompletableFuture
. When the original CompletableFuture
throws an exception, it will trigger the calculation of the CompletableFuture
and call the function to calculate the value, otherwise if the original CompletableFuture
is calculated normally, the new CompletableFuture
is also calculated, The value is the same as the calculated value of the original CompletableFuture
. So, this exceptionally()
method is used to handle abnormal situations.
Although the following set of methods also returns a CompletableFuture
object, the value of the object is different from the value calculated by the original CompletableFuture
. When the original CompletableFuture
value calculation is completed or an exception is thrown, the CompletableFuture
object calculation is triggered, and the result is calculated by the BiFunction
parameter. Therefore, this group of methods has both functions of whenComplete
and conversion
.
1 | public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) |
Similarly, the method that does not end with Async
is calculated by the original thread, and the method that ends with Async
is run by the default thread pool ForkJoinPool.commonPool()
or the specified thread pool executor.
Conversion
CompletableFuture
can be used as a monad
and functor
. Due to the implementation of the callback style, we do not have to block the calling thread just waiting for a calculation to complete, but rather tell CompletableFuture
to perform a certain function when the calculation is completed. And we can also chain these operations together, or combine CompletableFuture
.
1 | public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) |
The function of these methods is to pass the result to the function fn
after the original CompletableFuture
is calculated, and use the result of fn
as the new CompletableFuture
calculation result. Therefore, its function is equivalent to converting CompletableFuture<T>
into CompletableFuture<U>
.
The difference between these three functions is the same as described above. The method that does not end with Async
is calculated by the original thread, and the method that ends with Async
is run by the default thread pool ForkJoinPool.commonPool()
or the specified thread pool executor
. Java’s CompletableFuture
class always follows this principle.
Example:
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); |
It should be noted that these conversions are not executed immediately and will not block, but will continue to execute after the previous stage is completed.
The difference between them and the handle()
method is that the handle()
method will handle normal calculated values and exceptions, so it can shield exceptions and prevent them from being thrown. The thenApply()
method is only used to handle normal values, so it will be thrown once there is an exception.
Pure consumption (execution Action)
The above methods are that when the calculation is completed, a new calculation result(thenApply()
, handle()
) will be generated, or the same calculation result is returned whenComplete()
. CompletableFuture
also provides a method of processing the result, only execute Action
on the result, and not return new calculated value, so the calculated value is Void
:
1 | public CompletableFuture<Void> thenAccept(Consumer<? super T> action) |
It is clear by looking at its parameter types. They are functional interface consumers. This interface has only inputs and no return values.
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { |
thenAcceptBoth()
and related methods provide similar functions. When the two CompletionStage
complete the calculation normally, the provided action will be executed, which is used to combine another asynchronous result.
runAfterBoth()
is to execute a Runnable
when both CompletionStage
finishes the calculation normally. This Runnable
does not use the calculation result.
1 | public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) |
Example:
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); |
More thoroughly, the following set of methods will execute a Runnable
when the calculation is complete. Unlike thenAccept()
, Runnable
does not use the results of CompletableFuture
calculations.
1 | public CompletableFuture<Void> thenRun(Runnable action) |
The result of the previous CompletableFuture
calculation is ignored, and this method returns an object of type CompletableFuture<Void>
.
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); |
A simple way to memorize the methods based on the type of parameters.
Runnable
type parameters will ignore the calculation results.Consumer
is the pure consumption calculation result (not return value).BiConsumer
will combine anotherCompletionStage
.Function
will convert the calculation result.BiFunction
will combine anotherCompletionStage
calculation result to perform conversion.
Compose
1 | public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) |
This set of methods accepts a Function
as a parameter. The input of this Function
is the current CompletableFuture
calculation value, and the returned result will be a new CompletableFuture
, which will combine the original CompletableFuture
and the CompletableFuture
returned by the function.
Remember, the objects returned by thenCompose()
are not the objects returned by the function fn
. If the original CompletableFuture
has not been calculated, it will generate a new combined CompletableFuture
.
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); |
The following set of methods thenCombine()
is used to compound the results of another CompletionStage
.
The two CompletionStage
are executed in parallel, and there is no order of dependency between them. The other does not wait for the previous CompletableFuture
to complete before executing.
1 | public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) |
In fact, in terms of functionality, their function is more similar to thenAcceptBoth()
, except that thenAcceptBoth()
is pure consumption, its function parameter has no return value, and thenCombine()
‘s function parameter fn
has a return value.
Either
thenAcceptBoth()
and runAfterBoth()
are calculated when both CompletableFutures
are completed, and the method we want to understand below is when any CompletableFuture
is calculated.
1 | public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
The acceptEither()
method is that when any CompletionStage
is completed, the action consumer will be executed. This method returns CompletableFuture<Void>
The applyToEither()
method is when any CompletionStage
is completed, fn
will be executed, and its return value will be used as the calculation result of the new CompletableFuture<U>
.
The following example sometimes outputs 100 and sometimes 200. Which Future
is completed first will be calculated based on its result.
1 | Random rand = new Random(); |
CompletableFuture exception handling
If an exception occurs before setting CompletableFuture.complete(value)
, then get()
or other callback functions like whenComplete()
will wait indefinitely.
One method is to give a timeout when calling get(timeout)
, and get a TimeoutException
if no result is obtained within the specified time. Another way is to spread the exception through completeExceptionally(ex)
in the thread
Helper methods allOf() and anyOf()
1 | public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) |
The allOf()
method is to perform calculations after all CompletableFuture
have been executed.
The anyOf()
method is to execute a calculation after any CompletableFuture
is executed, and the calculation result is the same.
anyOf()
and applyToEither()
are different. anyOf()
accepts any number of CompletableFuture
but applyToEither()
only has two CompletableFuture
. The calculation result of the return value of anyOf()
is one of the CompletableFuture
calculation parameters. The calculation result of the return value of applyToEither()
is processed by fn
.