# CompletableFuture接口详解

completableFuture实现了CompletionStage接口，如下：

```
    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

    public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) {
        return biRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) {
        return biRunStage(asyncPool, other, action);
    }

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action, Executor executor) {
        return biRunStage(screenExecutor(executor), other, action);
    }
    public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

    public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action,
        Executor executor) {
        return orAcceptStage(screenExecutor(executor), other, action);
    }

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) {
        return orRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) {
        return orRunStage(asyncPool, other, action);
    }

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action,
                                                       Executor executor) {
        return orRunStage(screenExecutor(executor), other, action);
    }

    public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }

    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
        return uniHandleStage(screenExecutor(executor), fn);
    }
```

首先说明一下已Async结尾的方法都是可以异步执行的，如果指定了线程池，会在指定的线程池中执行，如果没有指定，默认会在ForkJoinPool.commonPool()中执行，下文中将会有好多类似的，都不详细解释了。关键的入参只有一个Function，它是函数式接口，所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果，返回值是经过转化后结果。

* 初始化CompletableFuture的几种方式:

```
public static void init(){
        //方式一
        CompletableFuture<String> completableFuture1 = new CompletableFuture();
        //方式二
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.completedFuture(111);
        CompletableFuture<String>  completableFuture3 = CompletableFuture.completedFuture("123");
        //方式三
        CompletableFuture<String>  completableFuture4 = CompletableFuture.supplyAsync(()->"123");
        CompletableFuture<Integer>  completableFuture5 = CompletableFuture.supplyAsync(()->123);
        CompletableFuture  completableFuture6 = CompletableFuture.supplyAsync(()->123);
        CompletableFuture  completableFuture7 = CompletableFuture.supplyAsync(()->"123");

        //方式四  创建自定义的executor
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(()->"123", executor);
        try {
            executor.shutdownNow();
            executor.awaitTermination(5, TimeUnit.SECONDS);//在指定时间内，终止任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
```

* thenAccpet、thenAcceptAsync 是针对结果进行消耗，因为他的入参是Consumer，有入参无返回值 示例如下：

```
    public static void thenAccept(){
        CompletableFuture.supplyAsync(()->"hello")
                .thenAccept(s -> System.out.println(s + " world"))
                .thenAccept(v-> System.out.println("done"));

        //hello world
        //done
    }

    public static void thenAcceptAsync(){//默认使用ForkJoinPool.commonPool()
        CompletableFuture.supplyAsync(()->"hello").thenAcceptAsync(s -> System.out.println(s + " world "));
        System.out.println("异步任务执行");

        //异步任务执行
        //hello world
    }

    public static void thenAcceptAsyncOfExecutor(){//创建自定义的executor
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(()->"hello").thenAcceptAsync(s -> System.out.println(s + " world "), executor);
        try {
            executor.shutdownNow();
            executor.awaitTermination(5, TimeUnit.SECONDS);//在指定时间内，终止任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("异步任务执行");
    }
```

* thenApply、thenApplyAsync 是针对结果进行转换，有返回值，他的入参是Function，可以使用lambda表达式，示例如下：

```
   public static void thenApply(){
        String result = CompletableFuture.supplyAsync(()->"hello")
                .thenApply(s -> s + " world")
                .thenApply(v-> v + " done").join();
        System.out.println("result = " + result);
        //hello world done
    }

    public static void thenApplyAsync(){
        ExecutorService executor = Executors.newSingleThreadExecutor();
        String result = CompletableFuture.supplyAsync(()->"hello")
                .thenApplyAsync(s -> s + " world")
                .thenApplyAsync(v-> v + ", 异步线程名：" + Thread.currentThread().getName()).join();
        System.out.println("result = " + result);
        System.out.println("主线程名：" + Thread.currentThread().getName());
        //result = hello world, 异步线程名：ForkJoinPool.commonPool-worker-1
        //主线程名：main
    }

    public static void thenApplyOfExecutor(){
        String result = CompletableFuture.supplyAsync(()->"hello")
                .thenApplyAsync(s -> s + " world")
                .thenApplyAsync(v-> v + ", 异步线程名：" + Thread.currentThread().getName()).join();
        System.out.println("result = " + result);
        System.out.println("主线程名：" + Thread.currentThread().getName());
        //result = hello world, 异步线程名：ForkJoinPool.commonPool-worker-1
        //主线程名：main
```

* thenRun、thenRunAsync对上一步的计算结果不关心，执行下一个操作，他的入参是Runnable,无返回值，示例如下：

```
public static void thenRun(){
        IntStream stream = IntStream.of(10,9,8,7,6,5,4,3,2,1);
        Runnable runnable = ()->{
            stream.forEach(i->{
                try {
                    Thread.sleep(1000);
                    System.out.format("线程名称：%s, 倒计时：%d\n", Thread.currentThread().getName(), i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        };

        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000);
                System.out.format("线程名称：%s\n" , Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(runnable);
        while (true){}

    }

    public static void thenRunAsync(){
        IntStream stream = IntStream.of(10,9,8,7,6,5,4,3,2,1);
        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000);
                System.out.format("线程名称：%s\n" , Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRunAsync(()->{
            stream.forEach(i->{
                try {
                    Thread.sleep(1000);
                    System.out.format("线程名称：%s, 倒计时：%d\n", Thread.currentThread().getName(), i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
        while (true){}

    }

    public static void thenRunAsyncOfExecutor(){
        ExecutorService executor = Executors.newCachedThreadPool();
        IntStream stream = IntStream.of(10,9,8,7,6,5,4,3,2,1);
        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000);
                System.out.format("线程名称：%s\n" , Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRunAsync(()->{
            stream.forEach(i->{
                try {
                    Thread.sleep(1000);
                    System.out.format("线程名称：%s, 倒计时：%d\n", Thread.currentThread().getName(), i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            executor.shutdown();

        }, executor);

        while(true){
            if(executor.isTerminated()){
                System.out.println("线程任务都已经完成");
                break;
            }
        }

    }
```

* thenCombine、thenCombineAsync  它需要原来的处理返回值，利用这两个返回值，进行转换后返回指定类型的值。

```
public static void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("线程名称：" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称：" + Thread.currentThread().getName());
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }


    public static void thenCombineAsync() {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称：" + Thread.currentThread().getName());
            return "hello";
        }).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称：" + Thread.currentThread().getName());
            return "world";
        }), (s1, s2) -> s1 + " " + s2, executor).join();
        executor.shutdown();
        System.out.println(result);
    }
```

* thenAcceptBoth、thenAcceptBothAsync 它需要原来的处理返回值，并且other代表的CompletionStage也要返回值之后，利用这两个返回值，进行消耗

```
public static void thenAcceptBoth() {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "world";
        }), (s1, s2) ->{
            System.out.println(s1 + " " + s2);
            executor.shutdown();
        }, executor);

        while(true){
            if(executor.isTerminated()){
                System.out.println("线程任务都已经完成");
                break;
            }
        }
    }
```

* runAfterBoth、runAfterBothAsync 不关心这两个CompletionStage的结果，只关心这两个CompletionStage执行完毕，之后在进行操作（Runnable）。

```
public static void runAfterBoth(){
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> {
            System.out.println("hello world");
            executor.shutdown();
        }, executor);

        while(true){
            if(executor.isTerminated()){
                System.out.println("线程任务都已经完成");
                break;
            }
        }
    }

    //hello world
    //线程任务都已经完成
```

* applyToEither、applyToEitherAsync 两个CompletionStage，谁计算的快，我就用那个CompletionStage的结果进行下一步的转化操作。我们现实开发场景中，总会碰到有两种渠道完成同一个事情，所以就可以调用这个方法，找一个最快的结果进行处理。
* 示例如下：

```
public static void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }
```

* runAfterEither、runAfterEitherAsync 两个CompletionStage，任何一个完成了都会执行下一步的操作（Runnable）示例如下：

```
public static void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }
```

* 当运行时出现了异常，可以通过exceptionally进行补偿

```
public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }
```

```
public static void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

    //java.lang.RuntimeException: 测试一下异常情况
    //hello world
```

* whenComplete、whenCompleteAsync  当运行完成时，对结果的记录。这里的完成时有两种情况，一种是正常执行，返回值。另外一种是遇到异常抛出造成程序的中断。这里为什么要说成记录，因为这几个方法都会返回CompletableFuture，当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。所以不会对结果产生任何的作用。

  示例如下：

```
public static void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println("s：" + s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }
```

这里也可以看出，如果使用了exceptionally，就会对最终的结果产生影响，它没有口子返回如果没有异常时的正确的值，这也就引出下面我们要介绍的handle。

* handle、handleAsync 运行完成时，对结果的处理。这里的完成时有两种情况，一种是正常执行，返回值。另外一种是遇到异常抛出造成程序的中断示例如下：

```
public static void handle() {
        //出现异常时
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出现异常
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }


public static void handle1() {
    //未出现异常时
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";
        }
        return s;
    }).join();
    System.out.println(result);
}
```
