Jtoss Jtoss
首页
  • 数据结构与算法

    • 数据结构与算法 - 概述
    • 数据结构与算法 - 复杂度分析
    • 数据结构 - 线性表
    • 算法 - 常见排序算法
  • 代码规范

    • 代码简洁之道
    • 阿里巴巴开发手册
    • 谷歌Java编程风格指南
  • 设计模式

    • 编写高质量代码概述
    • 面向对象
    • 设计原则
    • 设计模式-创建型
    • 设计模式-结构型
    • 设计模式-行为型(上)
    • 设计模式-行为型(下)
    • 浅析框架源码中的设计模式
    • 业务框架实战案例
  • MySQL 基础

    • MySQL - 数据库设计规范
    • MySQL - 必知必会
  • MySQL 进阶

    • MySQL - 基础架构
    • MySQL - InnoDB存储引擎
    • MySQL - InnoDB缓冲池
    • MySQL - 事务与锁
    • MySQL - 索引
    • MySQL - 查询执行计划
    • MySQL - 性能优化
  • Redis 系列

    • Redis入门 - 基础相关
    • Redis进阶 - 数据结构
    • Redis进阶 - 持久化RDB和AOF
    • Redis进阶 - 事件机制
    • Redis进阶 - 事务
    • Redis进阶 - 高可用高可扩展
    • Redis进阶 - 缓存问题
    • Redis进阶 - 性能调优
  • Java 基础

    • Java 基础 - 知识点
    • Java 基础 - 面向对象
    • Java 基础 - Q/A
  • Java 进阶 - 集合框架

    • Java 集合框架详解
  • Java 进阶 - 多线程与并发

    • Java 并发 - 理论基础
    • Java 并发 - 线程基础
    • Java 并发 - 各种锁
    • Java 并发 - 关键字 volatile
    • Java 并发 - 关键字 synchronized
    • JUC - CAS与原子操作
    • JUC - 锁核心类AQS
    • JUC - 锁接口和类简介
    • JUC - 并发容器简介
    • JUC - 通信工具类
    • JUC - Fork-Join框架
    • JUC - 线程池
  • Java 进阶 - JVM

    • JVM - 概述
    • JVM - 类加载机制
    • JVM - 内存结构
    • JVM - 垃圾回收机制
    • JVM - 性能调优
  • Maven系列

    • Maven基础知识
    • Maven项目构建
    • Maven多模块配置
  • Spring 框架

    • Spring 框架 - 框架介绍
    • Spring 框架 - IOC详解
    • Spring 框架 - AOP详解
    • Spring 框架 - SpringMVC详解
  • Spring Boot 系列

    • Spring Boot - 开发入门
    • Spring Boot - 接口相关
  • Spring Cloud 系列
  • Mybatis 系列

    • Mybatis - 总体框架设计
    • Mybatis - 初始化基本过程
    • Mybatis - sqlSession执行过程
    • Mybatis - 插件机制
    • Mybatis - 事务管理机制
    • Mybatis - 缓存机制
  • 业务常见问题

    • Java 业务开发常见错误(一)
    • Java 业务开发常见错误(二)
    • Java 业务开发常见错误(三)
    • Java 业务开发常见错误(四)
    • Java 业务开发常见错误(五)
    • Java 业务开发常见错误(六)
  • IDEA系列

    • IDEA 2021开发环境配置
    • IDEA 快捷键
  • Git系列

    • git status中文乱码
  • 其他

    • Typora+Picgo 自动上传图片
    • hsdis 和 jitwatch
  • 实用技巧
  • 收藏
  • 摄影
  • 学习
  • 标签
  • 归档

Jason Huang

后端程序猿
首页
  • 数据结构与算法

    • 数据结构与算法 - 概述
    • 数据结构与算法 - 复杂度分析
    • 数据结构 - 线性表
    • 算法 - 常见排序算法
  • 代码规范

    • 代码简洁之道
    • 阿里巴巴开发手册
    • 谷歌Java编程风格指南
  • 设计模式

    • 编写高质量代码概述
    • 面向对象
    • 设计原则
    • 设计模式-创建型
    • 设计模式-结构型
    • 设计模式-行为型(上)
    • 设计模式-行为型(下)
    • 浅析框架源码中的设计模式
    • 业务框架实战案例
  • MySQL 基础

    • MySQL - 数据库设计规范
    • MySQL - 必知必会
  • MySQL 进阶

    • MySQL - 基础架构
    • MySQL - InnoDB存储引擎
    • MySQL - InnoDB缓冲池
    • MySQL - 事务与锁
    • MySQL - 索引
    • MySQL - 查询执行计划
    • MySQL - 性能优化
  • Redis 系列

    • Redis入门 - 基础相关
    • Redis进阶 - 数据结构
    • Redis进阶 - 持久化RDB和AOF
    • Redis进阶 - 事件机制
    • Redis进阶 - 事务
    • Redis进阶 - 高可用高可扩展
    • Redis进阶 - 缓存问题
    • Redis进阶 - 性能调优
  • Java 基础

    • Java 基础 - 知识点
    • Java 基础 - 面向对象
    • Java 基础 - Q/A
  • Java 进阶 - 集合框架

    • Java 集合框架详解
  • Java 进阶 - 多线程与并发

    • Java 并发 - 理论基础
    • Java 并发 - 线程基础
    • Java 并发 - 各种锁
    • Java 并发 - 关键字 volatile
    • Java 并发 - 关键字 synchronized
    • JUC - CAS与原子操作
    • JUC - 锁核心类AQS
    • JUC - 锁接口和类简介
    • JUC - 并发容器简介
    • JUC - 通信工具类
    • JUC - Fork-Join框架
    • JUC - 线程池
  • Java 进阶 - JVM

    • JVM - 概述
    • JVM - 类加载机制
    • JVM - 内存结构
    • JVM - 垃圾回收机制
    • JVM - 性能调优
  • Maven系列

    • Maven基础知识
    • Maven项目构建
    • Maven多模块配置
  • Spring 框架

    • Spring 框架 - 框架介绍
    • Spring 框架 - IOC详解
    • Spring 框架 - AOP详解
    • Spring 框架 - SpringMVC详解
  • Spring Boot 系列

    • Spring Boot - 开发入门
    • Spring Boot - 接口相关
  • Spring Cloud 系列
  • Mybatis 系列

    • Mybatis - 总体框架设计
    • Mybatis - 初始化基本过程
    • Mybatis - sqlSession执行过程
    • Mybatis - 插件机制
    • Mybatis - 事务管理机制
    • Mybatis - 缓存机制
  • 业务常见问题

    • Java 业务开发常见错误(一)
    • Java 业务开发常见错误(二)
    • Java 业务开发常见错误(三)
    • Java 业务开发常见错误(四)
    • Java 业务开发常见错误(五)
    • Java 业务开发常见错误(六)
  • IDEA系列

    • IDEA 2021开发环境配置
    • IDEA 快捷键
  • Git系列

    • git status中文乱码
  • 其他

    • Typora+Picgo 自动上传图片
    • hsdis 和 jitwatch
  • 实用技巧
  • 收藏
  • 摄影
  • 学习
  • 标签
  • 归档
  • Java 基础

  • Java 进阶 - 集合框架

  • Java 进阶 - 多线程与并发

    • Java 并发 - 概述
    • Java 并发 - 理论基础
    • Java 并发 - 线程基础
    • Java 并发 - 各种锁
    • Java 并发 - JVM 锁优化
    • Java 并发 - 关键字 volatile
    • Java 并发 - 关键字 synchronized
    • Java 并发 - syschronized 应用及死锁问题
    • Java 并发 - 关键字 final
    • JUC - CAS与原子操作
    • JUC - 锁核心类AQS
    • JUC - 锁接口和类简介
    • JUC - 并发容器简介
    • JUC - 阻塞队列
    • JUC - 通信工具类
    • JUC - Fork/Join框架
    • JUC - Stream并行计算原理
      • 1. Java 8 Stream 简介
      • 2. Stream 单线程串行计算
      • 3. Stream多线程并行计算
      • 4. 从源码看 Stream 并行计算原理
      • 5. Stream 并行计算的性能提升
      • 参考
    • JUC - 线程池
  • Java 进阶 - JVM

  • Java 进阶 - 版本特性

  • Java
  • Java 进阶 - 多线程与并发
Jason
目录

JUC - Stream并行计算原理

# JUC - Stream 并行计算原理

# 1. Java 8 Stream 简介

从Java 8 开始,我们可以使用Stream接口以及 lambda 表达式进行“流式计算”。它可以让我们对集合的操作更加简洁、更加可读、更加高效。

Stream 接口有非常多用于集合计算的方法,比如判空操作 empty、过滤操作 filter、求最 max 值、查找操作 findFirst和 findAny 等等。

# 2. Stream 单线程串行计算

Stream 接口默认是使用串行的方式,也就是说在一个线程里执行。下面举一个例子:

public class StreamDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .reduce((a, b) -> {
                    System.out.println(String.format("%s: %d + %d = %d",
                            Thread.currentThread().getName(), a, b, a + b));
                    return a + b;
                })
                .ifPresent(System.out::println);
    }
}
1
2
3
4
5
6
7
8
9
10
11

我们来理解一下这个方法。首先我们用整数 1~9 创建了一个Stream。这里的 Stream.of(T... values) 方法是 Stream接口的一个静态方法,其底层调用的是 Arrays.stream(T[] array) 方法。

然后我们使用了reduce方法来计算这个集合的累加和。reduce方法这里做的是:从前两个元素开始,进行某种操作(我这里进行的是加法操作)后,返回一个结果,然后再拿这个结果跟第三个元素执行同样的操作,以此类推,直到最后的一个元素。

我们来打印一下当前这个 reduce 操作的线程以及它们被操作的元素和返回的结果以及最后所有 reduce 方法的结果,也就代表的是数字1到9的累加和。

main: 1 + 2 = 3 main: 3 + 3 = 6 main: 6 + 4 = 10 main: 10 + 5 = 15 main: 15 + 6 = 21 main: 21 + 7 = 28 main: 28 + 8 = 36 main: 36 + 9 = 45 45

可以看到,默认情况下,它是在一个单线程运行的,也就是 main 线程。然后每次 reduce 操作都是串行起来的,首先计算前两个数字的和,然后再往后依次计算。

# 3. Stream多线程并行计算

我们思考上面一个例子,是不是一定要在单线程里进行串行地计算呢?假如我的计算机是一个多核计算机,我们在理论上能否利用多核来进行并行计算,提高计算效率呢?

当然可以,比如我们在计算前两个元素 1 + 2 = 3 的时候,其实我们也可以同时在另一个核计算 3 + 4 = 7。然后等它们都计算完成之后,再计算 3 + 7 = 10 的操作。

是不是很熟悉这样的操作手法?没错,它就是 ForkJoin 框架的思想。下面小小地修改一下上面的代码,增加一行代码,使 Stream 使用多线程来并行计算:

public class StreamParallelDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .parallel()
                .reduce((a, b) -> {
                    System.out.println(String.format("%s: %d + %d = %d",
                            Thread.currentThread().getName(), a, b, a + b));
                    return a + b;
                })
                .ifPresent(System.out::println);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

可以看到,与上一个案例的代码只有一点点区别,就是在 reduce 方法被调用之前,调用了 parallel() 方法。下面来看看这个方法的输出:

ForkJoinPool.commonPool-worker-1: 3 + 4 = 7 ForkJoinPool.commonPool-worker-4: 8 + 9 = 17 ForkJoinPool.commonPool-worker-2: 5 + 6 = 11 ForkJoinPool.commonPool-worker-3: 1 + 2 = 3 ForkJoinPool.commonPool-worker-4: 7 + 17 = 24 ForkJoinPool.commonPool-worker-4: 11 + 24 = 35 ForkJoinPool.commonPool-worker-3: 3 + 7 = 10 ForkJoinPool.commonPool-worker-3: 10 + 35 = 45 45

可以很明显地看到,它使用的线程是ForkJoinPool里面的commonPool里面的 worker 线程。并且它们是并行计算的,并不是串行计算的。但由于 Fork/Join 框架的作用,它最终能很好的协调计算结果,使得计算结果完全正确。

如果我们用 Fork/Join 代码去实现这样一个功能,那无疑是非常复杂的。但 Java8 提供了并行式的流式计算,大大简化了我们的代码量,使得我们只需要写很少很简单的代码就可以利用计算机底层的多核资源。

# 4. 从源码看 Stream 并行计算原理

上面我们通过在控制台输出线程的名字,看到了 Stream 的并行计算底层其实是使用的 Fork/Join 框架。那它到底是在哪使用 Fork/Join 的呢?我们从源码上来解析一下上述案例。

Stream.of方法就不说了,它只是生成一个简单的 Stream。先来看看parallel()方法的源码。这里由于我的数据是int类型的,所以它其实是使用的BaseStream接口的parallel()方法。而BaseStream接口的 JDK 唯一实现类是一个叫AbstractPipeline的类。下面我们来看看这个类的parallel()方法的代码:

public final S parallel() {
    sourceStage.parallel = true;
    return (S) this;
}
1
2
3
4

这个方法很简单,就是把一个标识sourceStage.parallel设置为true。然后返回实例本身。

接着我们再来看reduce这个方法的内部实现。

Stream.reduce() 方法的具体实现是交给了ReferencePipeline这个抽象类,它是继承了AbstractPipeline这个类的:

// ReferencePipeline抽象类的reduce方法
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    // 调用evaluate方法
    return evaluate(ReduceOps.makeRef(accumulator));
}

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel() // 调用isParallel()判断是否使用并行模式
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

@Override
public final boolean isParallel() {
    // 根据之前在parallel()方法设置的那个flag来判断。
    return sourceStage.parallel;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

从它的源码可以知道,reduce 方法调用了 evaluate 方法,而 evaluate 方法会先去检查当前的 flag,是否使用并行模式,如果是则会调用evaluateParallel方法执行并行计算,否则,会调用evaluateSequential方法执行串行计算。

这里我们再看看TerminalOp(注意这里是字母l O,而不是数字1 0)接口的evaluateParallel方法。TerminalOp接口的实现类有这样几个内部类:

  • java.util.stream.FindOps.FindOp
  • java.util.stream.ForEachOps.ForEachOp
  • java.util.stream.MatchOps.MatchOp
  • java.util.stream.ReduceOps.ReduceOp

可以看到,对应的是 Stream 的几种主要的计算操作。我们这里的示例代码使用的是 reduce 计算,那我们就看看ReduceOp 类的这个方法的源码:

// java.util.stream.ReduceOps.ReduceOp.evaluateParallel
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
1
2
3
4
5
6

evaluateParallel 方法创建了一个新的 ReduceTask 实例,并且调用了 invoke() 方法后再调用 get() 方法,然后返回这个结果。那这个 ReduceTask 是什么呢?它的 invoke 方法内部又是什么呢?

追溯源码我们可以发现,ReduceTask 类是 ReduceOps 类的一个内部类,它继承了 AbstractTask 类,而AbstractTask 类又继承了 CountedCompleter类,而 CountedCompleter 类又继承了 ForkJoinTask 类!

它们的继承关系如下:

ReduceTask -> AbstractTask -> CountedCompleter -> ForkJoinTask

这里的 ReduceTask 的 invoke 方法,其实是调用的 ForkJoinTask 的 invoke 方法,中间三层继承并没有覆盖这个方法的实现。

所以这就从源码层面解释了 Stream 并行的底层原理是使用了 Fork/Join 框架。

需要注意的是,一个 Java 进程的 Stream 并行计算任务默认共享同一个线程池,如果随意的使用并行特性可能会导致方法的吞吐量下降。我们可以通过下面这种方式来让你的某个并行Stream使用自定义的 ForkJoin 线程池:

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool
  .submit(() -> roster.parallelStream().reduce(0, Integer::sum)).get();
1
2
3

# 5. Stream 并行计算的性能提升

我们可以在本地测试一下如果在多核情况下,Stream 并行计算会给我们的程序带来多大的效率上的提升。用以下示例代码来计算一千万个随机数的和:

public class StreamParallelDemo {
    public static void main(String[] args) {
        System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));

        // 产生100w个随机数(1 ~ 100),组成列表
        Random random = new Random();
        List<Integer> list = new ArrayList<>(1000_0000);

        for (int i = 0; i < 1000_0000; i++) {
            list.add(random.nextInt(100));
        }

        long prevTime = getCurrentTime();
        list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
        System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));

        prevTime = getCurrentTime();
        list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
        System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));

    }

    private static long getCurrentTime() {
        return System.currentTimeMillis();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

输出:

本计算机的核数:8 495156156 单线程计算耗时:223 495156156 多线程计算耗时:95

所以在多核的情况下,使用 Stream 的并行计算确实比串行计算能带来很大效率上的提升,并且也能保证结果计算完全准确。

本文一直在强调的“多核”的情况。其实可以看到,我的本地电脑有 8 核,但并行计算耗时并不是单线程计算耗时除以 8,因为线程的创建、销毁以及维护线程上下文的切换等等都有一定的开销。所以如果你的服务器并不是多核服务器,那也没必要用 Stream 的并行计算。因为在单核的情况下,往往 Stream 的串行计算比并行计算更快,因为它不需要线程切换的开销。

# 参考

  • 转载自:http://concurrent.redspider.group/article/03/19.html
#多线程与并发#Stream
上次更新: 2023-03-21
JUC - Fork/Join框架
JUC - 线程池

← JUC - Fork/Join框架 JUC - 线程池→

最近更新
01
开始
01-09
02
AI工具分享
01-09
03
AI 导读
01-07
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Jason Huang | 闽ICP备2025088096号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式