java-concurrent
  • 前言
  • Java多线程基础
    • 线程简介
      • 什么是线程
      • 为什么要使用多线程/线程使用的好处
      • 线程的优先级
      • 线程的状态
      • Daemon线程
    • 启动和终止线程
      • 构造线程
      • 启动线程
      • 中断线程
      • 过期的suspend()、resume()和stop()
      • 安全地终止线程
    • 多线程实现方式
    • 多线程环境下,局部变量和全局变量都会共享吗?
    • Java线程间的协助和通信
      • Thread.join的使用
      • volatile、ThreadLocal、synchronized3个关键字区别
      • volatile关键字
      • ThreadLocal关键字
      • synchronized关键字
      • Java线程等待和通知的相关方法
    • 实战应用
      • 连接池
      • 线程池
      • 如何计算合适的线程数
  • Java线程池与框架
    • Executor 框架
    • 自定义线程池——ThreadPoolExecutor
    • 线程池工具类(单例模式)
    • 关闭线程池
    • 合理地配置线程池
    • 线程池的监控
    • RejectedExecutionException产生的原因
    • SpringBoot配置线程池工具类
    • FutureTask详解
    • CompletionService讲解
    • Future、FutureTask、CompletionService、CompletableFuture区别
  • Java内存模型
    • Java 内存模型的基础
      • 并发编程模型的两个关键问题
      • Java内存模型的抽象结构
      • 从源代码到指令序列的重排序
      • 并发编程模型的分类
    • 重排序
      • 数据依赖性
      • as-if-serial语义
      • 程序顺序规则
      • 重排序对多线程的影响
    • 顺序一致性
      • 数据竞争与顺序一致性
      • 顺序一致性内存模型
      • 同步程序的顺序一致性效果
      • 未同步程序的执行特性
    • volatile内存语义
      • volatile的特性
      • volatile写-读建立的happens-before关系
      • volatile写-读的内存语义
      • volatile内存语义的实现
      • JSR-133为什么要增强volatile的内存语义
    • 锁内存定义
      • 锁的释放-获取建立的happens-before关系
      • 锁的释放和获取的内存语义
      • 锁内存语义的实现
      • concurrent包的实现
    • final域内存语义
      • final域的重排序规则
      • 写final域的重排序规则
      • 读final域的重排序规则
      • final域为引用类型
      • 为什么final引用不能从构造函数内“溢出”
      • final语义在处理器中的实现
      • JSR-133为什么要增强final的语义
    • happens-before
    • 双重检查锁定与延迟初始化
      • 双重检查锁定的由来
      • 问题的根源
      • 基于volatile的解决方案
      • 基于类初始化的解决方案
    • Java内存模型综述
      • 处理器的内存模型
      • 各种内存模型之间的关系
      • JMM的内存可见性保证
      • JSR-133对旧内存模型的修补
  • HashMap实现原理
    • 讲解(一)
    • 讲解(二)
    • HashMap原理(面试篇)
    • HashMap原理(面试篇二)
  • ConcurrentHashMap的实现原理与使用
    • 为什么要使用ConcurrentHashMap
    • ConcurrentHashMap的结构
    • ConcurrentHashMap的初始化
    • 定位Segment
    • ConcurrentHashMap的操作
    • ConcurrentHashMap讲解(一)
  • Java中的阻塞队列
    • 什么是阻塞队列
    • Java里的阻塞队列
    • 阻塞队列的实现原理
  • Fork/Join框架
    • 什么是Fork/Join框架
    • 工作窃取算法
    • Fork/Join框架的设计
    • 使用Fork/Join框架
    • Fork/Join框架的异常处理
    • Fork/Join框架的实现原理
    • ForkJoinPool的commonPool相关参数配置
  • java.util.concurrent包讲解
    • 线程安全AtomicInteger的讲解
    • CompletableFuture讲解
      • CompletableFuture接口详解
      • CompletableFuture与parallelStream()性能差异
      • CompletableFuture接口详解2
  • Java线程安全
    • 性能与可伸缩性
    • 解决死锁
    • 死锁定义
    • 如何让多线程下的类安全
    • 类的线程安全性定义
    • 实战:实现一个线程安全的单例模式
  • Java常用并发开发工具和类的源码分析
    • CountDownLatch
    • CyclicBarrier
    • Semaphore
    • Exchange
    • ConcurrentHashMap
    • ConcurrentSkipListMap
    • HashMap
      • HashMap源码实现及分析
      • HashMap的一些面试题
    • List
  • Java中的锁
    • 基础知识
    • 番外篇
    • synchronized 是可重入锁吗?为什么?
    • 自旋锁
  • Java多线程的常见问题
    • 常见问题一
Powered by GitBook
On this page

Was this helpful?

  1. java.util.concurrent包讲解
  2. CompletableFuture讲解

CompletableFuture与parallelStream()性能差异

parallelStream() 与 CompletableFuture 的性能差异假设一个这样的耗时 1000 毫秒的计算任务

private static int getJob() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    return 50;
}

分别用下面两个方法来测试,任务数可以通过参数来控制

private static long testParallelStream(int jobCount) {
    List<Supplier<Integer>> tasks = new ArrayList<>();
    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob));

    long start = System.currentTimeMillis();
    int sum = tasks.parallelStream().map(Supplier::get).mapToInt(Integer::intValue).sum();
    return System.currentTimeMillis() - start;
}

private static long testCompletableFutureDefaultExecutor(int jobCount) {
    List<CompletableFuture<Integer>> tasks = new ArrayList<>();
    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob)));

    long start = System.currentTimeMillis();
    int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
    return System.currentTimeMillis() - start;
}

写下本文的测试机器用Runtime.getRuntime().availableProcessors()得到的 CPU 内核数是 4, 下面是两个方法在不同的并发任务数时一组耗时对照表

从上面数据来看,默认情况下,随着任务数的增加,CompletableFuture反而比最简单的parallelStream()操作方式性能显得越发差一些,是不是有些失望了。慢着,我们是说默认情况下, parallelStream()和CompletableFuture.supplyAsync(job)的 job 都会分配给默认 ForkJoinPool.commonPool()去执行,而这个线程池的大小是 CPU 的内核数,所以它们没有多大区别,甚至CompletableFuture的方式比parallelStream()更快达到线程的饱和,性能还略微差一些。

可是,不用急,我们还有大招,因为parallelStream()不能灵活的干预线程池的大小(默认为 CPU 内核数),要改的话会影响到系统全局的ForkJoinPool.commonPool()的大小。可通过指定系统属性 java.util.concurrent.ForkJoinPool.common.parallelism来指定这个线程池大小。然而CompletableFuture还有第二个版本的 supplyAsync(supplier, executor)方法,由第二个参数来指定所使用的线程池,所以我们再定义一个方法,把自定义线程池大小调到 20, 然后重新进行测试并与前面数据对比

private static long testCompletableFutureDefaultExecutor(int jobCount) {
    List<CompletableFuture<Integer>> tasks = new ArrayList<>();
    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob)));

    long start = System.currentTimeMillis();
    int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
    return System.currentTimeMillis() - start;
}

测试的结果如下

现在 CompletableFuture的优越性就体现出来了,任务数在没有超过线程池大小 20 的时候,仿佛在执行一个任务一样,所需耗时一保持在 1000 毫秒左右,任务数在 21 的时候才发生第一次线程等待的情况,所以耗时为2005毫秒。

继续观察上面那个耗时对照表,结合执行每个方法时的线程池大小来解读上面的测试结果,简单点来讲上面标记为绿色的数字开始出现了任务等待线程池中的可用线程,当把线程池调大一些并发性能就越高。当然不是线程数越大越好,因为我们这儿例子中的计算任务很简单,所以线程再大些也无妨。

对于如何选择合适的线程池大小,这里不深究了,需权衡到计算任务的耗时,是否有 I/O 的等,有一个公式是

Nthreads = Ncpu * Ucpu * (1 + W/C)

Ncpu 是 CPU 内核数,通过 Runtime.getRuntime().availableProcessors() 得到的值
Ucpu 是期望的 CPU 的使用率(介于 0 与 1 之间)
W/C 是等待时间与任务执行时间的比率

下面是本篇测试用的完整代码:

package cc.unmi;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static java.lang.String.format;

public class Main {

    private static int PROCESSORS = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) {
        System.out.println("Processors: " + PROCESSORS);

        Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17).forEach(offset -> {
            int jobNum = PROCESSORS + offset;
            System.out.println(
                format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s",
                    jobNum, testStream(jobNum), testParallelStream(jobNum),
                    testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum)));
        });

    }

    private static long testStream(int jobCount) {
        List<Supplier<Integer>> tasks = new ArrayList<>();
        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob));

        long start = System.currentTimeMillis();
        int sum = tasks.stream().map(Supplier::get).mapToInt(Integer::intValue).sum();
        checkSum(sum, jobCount);
        return System.currentTimeMillis() - start;
    }

    private static long testParallelStream(int jobCount) {
        List<Supplier<Integer>> tasks = new ArrayList<>();
        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob));

        long start = System.currentTimeMillis();
        int sum = tasks.parallelStream().map(Supplier::get).mapToInt(Integer::intValue).sum();
        checkSum(sum, jobCount);
        return System.currentTimeMillis() - start;
    }

    private static long testCompletableFutureDefaultExecutor(int jobCount) {
        List<CompletableFuture<Integer>> tasks = new ArrayList<>();
        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob)));

        long start = System.currentTimeMillis();
        int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
        checkSum(sum, jobCount);
        return System.currentTimeMillis() - start;
    }

    private static long testCompletableFutureCustomExecutor(int jobCount) {
        Executor executor = new ForkJoinPool(20);

        List<CompletableFuture<Integer>> tasks = new ArrayList<>();
        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob, executor)));

        long start = System.currentTimeMillis();
        int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
        checkSum(sum, jobCount);
        return System.currentTimeMillis() - start;
    }

    private static int getJob() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }

        return 50;
    }

    private static void checkSum(int sum, int jobNum) {
        if(sum != 50 * jobNum) {
            throw new AssertionError("Result Error");
        }
    }
}

运行后控制台输出

Processors: 4
When 1 tasks => stream: 1007, parallelStream: 1009, future default: 1005, future custom: 1004
When 3 tasks => stream: 3014, parallelStream: 1004, future default: 1005, future custom: 1003
When 4 tasks => stream: 4010, parallelStream: 1002, future default: 2005, future custom: 1004
When 5 tasks => stream: 5013, parallelStream: 2005, future default: 2002, future custom: 1002
When 6 tasks => stream: 6019, parallelStream: 2006, future default: 2008, future custom: 1005
When 8 tasks => stream: 8026, parallelStream: 2006, future default: 3008, future custom: 1001
When 9 tasks => stream: 9028, parallelStream: 3012, future default: 3008, future custom: 1003
When 14 tasks => stream: 14045, parallelStream: 4014, future default: 5016, future custom: 1002
When 20 tasks => stream: 20049, parallelStream: 5020, future default: 7025, future custom: 1003
When 21 tasks => stream: 21057, parallelStream: 6018, future default: 7026, future custom: 2005

不同机器上跑的需依据实际 CPU 内核和所定制的线程池大小数来调整每次测试的任务数,即须修改

Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17)

中的测试参数来覆盖某些边界值

PreviousCompletableFuture接口详解NextCompletableFuture接口详解2

Last updated 5 years ago

Was this helpful?