CompletableFuture接口详解

completableFuture实现了CompletionStage接口,如下:

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

    public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) {
        return biRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) {
        return biRunStage(asyncPool, other, action);
    }

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action, Executor executor) {
        return biRunStage(screenExecutor(executor), other, action);
    }
    public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

    public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action,
        Executor executor) {
        return orAcceptStage(screenExecutor(executor), other, action);
    }

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) {
        return orRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) {
        return orRunStage(asyncPool, other, action);
    }

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action,
                                                       Executor executor) {
        return orRunStage(screenExecutor(executor), other, action);
    }

    public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }

    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
        return uniHandleStage(screenExecutor(executor), fn);
    }

首先说明一下已Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行,下文中将会有好多类似的,都不详细解释了。关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。

  • 初始化CompletableFuture的几种方式:

public static void init(){
        //方式一
        CompletableFuture<String> completableFuture1 = new CompletableFuture();
        //方式二
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.completedFuture(111);
        CompletableFuture<String>  completableFuture3 = CompletableFuture.completedFuture("123");
        //方式三
        CompletableFuture<String>  completableFuture4 = CompletableFuture.supplyAsync(()->"123");
        CompletableFuture<Integer>  completableFuture5 = CompletableFuture.supplyAsync(()->123);
        CompletableFuture  completableFuture6 = CompletableFuture.supplyAsync(()->123);
        CompletableFuture  completableFuture7 = CompletableFuture.supplyAsync(()->"123");

        //方式四  创建自定义的executor
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(()->"123", executor);
        try {
            executor.shutdownNow();
            executor.awaitTermination(5, TimeUnit.SECONDS);//在指定时间内,终止任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
  • thenAccpet、thenAcceptAsync 是针对结果进行消耗,因为他的入参是Consumer,有入参无返回值 示例如下:

    public static void thenAccept(){
        CompletableFuture.supplyAsync(()->"hello")
                .thenAccept(s -> System.out.println(s + " world"))
                .thenAccept(v-> System.out.println("done"));

        //hello world
        //done
    }

    public static void thenAcceptAsync(){//默认使用ForkJoinPool.commonPool()
        CompletableFuture.supplyAsync(()->"hello").thenAcceptAsync(s -> System.out.println(s + " world "));
        System.out.println("异步任务执行");

        //异步任务执行
        //hello world
    }

    public static void thenAcceptAsyncOfExecutor(){//创建自定义的executor
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(()->"hello").thenAcceptAsync(s -> System.out.println(s + " world "), executor);
        try {
            executor.shutdownNow();
            executor.awaitTermination(5, TimeUnit.SECONDS);//在指定时间内,终止任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("异步任务执行");
    }
  • thenApply、thenApplyAsync 是针对结果进行转换,有返回值,他的入参是Function,可以使用lambda表达式,示例如下:

   public static void thenApply(){
        String result = CompletableFuture.supplyAsync(()->"hello")
                .thenApply(s -> s + " world")
                .thenApply(v-> v + " done").join();
        System.out.println("result = " + result);
        //hello world done
    }

    public static void thenApplyAsync(){
        ExecutorService executor = Executors.newSingleThreadExecutor();
        String result = CompletableFuture.supplyAsync(()->"hello")
                .thenApplyAsync(s -> s + " world")
                .thenApplyAsync(v-> v + ", 异步线程名:" + Thread.currentThread().getName()).join();
        System.out.println("result = " + result);
        System.out.println("主线程名:" + Thread.currentThread().getName());
        //result = hello world, 异步线程名:ForkJoinPool.commonPool-worker-1
        //主线程名:main
    }

    public static void thenApplyOfExecutor(){
        String result = CompletableFuture.supplyAsync(()->"hello")
                .thenApplyAsync(s -> s + " world")
                .thenApplyAsync(v-> v + ", 异步线程名:" + Thread.currentThread().getName()).join();
        System.out.println("result = " + result);
        System.out.println("主线程名:" + Thread.currentThread().getName());
        //result = hello world, 异步线程名:ForkJoinPool.commonPool-worker-1
        //主线程名:main
  • thenRun、thenRunAsync对上一步的计算结果不关心,执行下一个操作,他的入参是Runnable,无返回值,示例如下:

public static void thenRun(){
        IntStream stream = IntStream.of(10,9,8,7,6,5,4,3,2,1);
        Runnable runnable = ()->{
            stream.forEach(i->{
                try {
                    Thread.sleep(1000);
                    System.out.format("线程名称:%s, 倒计时:%d\n", Thread.currentThread().getName(), i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        };

        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000);
                System.out.format("线程名称:%s\n" , Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(runnable);
        while (true){}

    }

    public static void thenRunAsync(){
        IntStream stream = IntStream.of(10,9,8,7,6,5,4,3,2,1);
        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000);
                System.out.format("线程名称:%s\n" , Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRunAsync(()->{
            stream.forEach(i->{
                try {
                    Thread.sleep(1000);
                    System.out.format("线程名称:%s, 倒计时:%d\n", Thread.currentThread().getName(), i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
        while (true){}

    }

    public static void thenRunAsyncOfExecutor(){
        ExecutorService executor = Executors.newCachedThreadPool();
        IntStream stream = IntStream.of(10,9,8,7,6,5,4,3,2,1);
        CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000);
                System.out.format("线程名称:%s\n" , Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRunAsync(()->{
            stream.forEach(i->{
                try {
                    Thread.sleep(1000);
                    System.out.format("线程名称:%s, 倒计时:%d\n", Thread.currentThread().getName(), i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            executor.shutdown();

        }, executor);

        while(true){
            if(executor.isTerminated()){
                System.out.println("线程任务都已经完成");
                break;
            }
        }

    }
  • thenCombine、thenCombineAsync 它需要原来的处理返回值,利用这两个返回值,进行转换后返回指定类型的值。

public static void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("线程名称:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称:" + Thread.currentThread().getName());
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }


    public static void thenCombineAsync() {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称:" + Thread.currentThread().getName());
            return "hello";
        }).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称:" + Thread.currentThread().getName());
            return "world";
        }), (s1, s2) -> s1 + " " + s2, executor).join();
        executor.shutdown();
        System.out.println(result);
    }
  • thenAcceptBoth、thenAcceptBothAsync 它需要原来的处理返回值,并且other代表的CompletionStage也要返回值之后,利用这两个返回值,进行消耗

public static void thenAcceptBoth() {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "world";
        }), (s1, s2) ->{
            System.out.println(s1 + " " + s2);
            executor.shutdown();
        }, executor);

        while(true){
            if(executor.isTerminated()){
                System.out.println("线程任务都已经完成");
                break;
            }
        }
    }
  • runAfterBoth、runAfterBothAsync 不关心这两个CompletionStage的结果,只关心这两个CompletionStage执行完毕,之后在进行操作(Runnable)。

public static void runAfterBoth(){
        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> {
            System.out.println("hello world");
            executor.shutdown();
        }, executor);

        while(true){
            if(executor.isTerminated()){
                System.out.println("线程任务都已经完成");
                break;
            }
        }
    }

    //hello world
    //线程任务都已经完成
  • applyToEither、applyToEitherAsync 两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的转化操作。我们现实开发场景中,总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。

  • 示例如下:

public static void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }
  • runAfterEither、runAfterEitherAsync 两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)示例如下:

public static void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }
  • 当运行时出现了异常,可以通过exceptionally进行补偿

public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }
public static void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

    //java.lang.RuntimeException: 测试一下异常情况
    //hello world
  • whenComplete、whenCompleteAsync 当运行完成时,对结果的记录。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断。这里为什么要说成记录,因为这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。所以不会对结果产生任何的作用。

    示例如下:

public static void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println("s:" + s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

这里也可以看出,如果使用了exceptionally,就会对最终的结果产生影响,它没有口子返回如果没有异常时的正确的值,这也就引出下面我们要介绍的handle。

  • handle、handleAsync 运行完成时,对结果的处理。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断示例如下:

public static void handle() {
        //出现异常时
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出现异常
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }


public static void handle1() {
    //未出现异常时
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";
        }
        return s;
    }).join();
    System.out.println(result);
}

Last updated