JUC - Stream并行计算原理
# JUC - Stream 并行计算原理
# 1. Java 8 Stream 简介
从Java 8 开始,我们可以使用Stream
接口以及 lambda 表达式进行“流式计算”。它可以让我们对集合的操作更加简洁、更加可读、更加高效。
Stream 接口有非常多用于集合计算的方法,比如判空操作 empty、过滤操作 filter、求最 max 值、查找操作 findFirst和 findAny 等等。
# 2. Stream 单线程串行计算
Stream 接口默认是使用串行的方式,也就是说在一个线程里执行。下面举一个例子:
public class StreamDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
我们来理解一下这个方法。首先我们用整数 1~9 创建了一个Stream
。这里的 Stream.of(T... values) 方法是 Stream接口的一个静态方法,其底层调用的是 Arrays.stream(T[] array) 方法。
然后我们使用了reduce
方法来计算这个集合的累加和。reduce
方法这里做的是:从前两个元素开始,进行某种操作(我这里进行的是加法操作)后,返回一个结果,然后再拿这个结果跟第三个元素执行同样的操作,以此类推,直到最后的一个元素。
我们来打印一下当前这个 reduce 操作的线程以及它们被操作的元素和返回的结果以及最后所有 reduce 方法的结果,也就代表的是数字1到9的累加和。
main: 1 + 2 = 3 main: 3 + 3 = 6 main: 6 + 4 = 10 main: 10 + 5 = 15 main: 15 + 6 = 21 main: 21 + 7 = 28 main: 28 + 8 = 36 main: 36 + 9 = 45 45
可以看到,默认情况下,它是在一个单线程运行的,也就是 main 线程。然后每次 reduce 操作都是串行起来的,首先计算前两个数字的和,然后再往后依次计算。
# 3. Stream多线程并行计算
我们思考上面一个例子,是不是一定要在单线程里进行串行地计算呢?假如我的计算机是一个多核计算机,我们在理论上能否利用多核来进行并行计算,提高计算效率呢?
当然可以,比如我们在计算前两个元素 1 + 2 = 3 的时候,其实我们也可以同时在另一个核计算 3 + 4 = 7。然后等它们都计算完成之后,再计算 3 + 7 = 10 的操作。
是不是很熟悉这样的操作手法?没错,它就是 ForkJoin 框架的思想。下面小小地修改一下上面的代码,增加一行代码,使 Stream 使用多线程来并行计算:
public class StreamParallelDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.parallel()
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
12
可以看到,与上一个案例的代码只有一点点区别,就是在 reduce 方法被调用之前,调用了 parallel() 方法。下面来看看这个方法的输出:
ForkJoinPool.commonPool-worker-1: 3 + 4 = 7 ForkJoinPool.commonPool-worker-4: 8 + 9 = 17 ForkJoinPool.commonPool-worker-2: 5 + 6 = 11 ForkJoinPool.commonPool-worker-3: 1 + 2 = 3 ForkJoinPool.commonPool-worker-4: 7 + 17 = 24 ForkJoinPool.commonPool-worker-4: 11 + 24 = 35 ForkJoinPool.commonPool-worker-3: 3 + 7 = 10 ForkJoinPool.commonPool-worker-3: 10 + 35 = 45 45
可以很明显地看到,它使用的线程是ForkJoinPool
里面的commonPool
里面的 worker 线程。并且它们是并行计算的,并不是串行计算的。但由于 Fork/Join 框架的作用,它最终能很好的协调计算结果,使得计算结果完全正确。
如果我们用 Fork/Join 代码去实现这样一个功能,那无疑是非常复杂的。但 Java8 提供了并行式的流式计算,大大简化了我们的代码量,使得我们只需要写很少很简单的代码就可以利用计算机底层的多核资源。
# 4. 从源码看 Stream 并行计算原理
上面我们通过在控制台输出线程的名字,看到了 Stream 的并行计算底层其实是使用的 Fork/Join 框架。那它到底是在哪使用 Fork/Join 的呢?我们从源码上来解析一下上述案例。
Stream.of
方法就不说了,它只是生成一个简单的 Stream。先来看看parallel()
方法的源码。这里由于我的数据是int
类型的,所以它其实是使用的BaseStream
接口的parallel()
方法。而BaseStream
接口的 JDK 唯一实现类是一个叫AbstractPipeline
的类。下面我们来看看这个类的parallel()
方法的代码:
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
2
3
4
这个方法很简单,就是把一个标识sourceStage.parallel
设置为true
。然后返回实例本身。
接着我们再来看reduce
这个方法的内部实现。
Stream.reduce() 方法的具体实现是交给了ReferencePipeline
这个抽象类,它是继承了AbstractPipeline
这个类的:
// ReferencePipeline抽象类的reduce方法
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
// 调用evaluate方法
return evaluate(ReduceOps.makeRef(accumulator));
}
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel() // 调用isParallel()判断是否使用并行模式
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
@Override
public final boolean isParallel() {
// 根据之前在parallel()方法设置的那个flag来判断。
return sourceStage.parallel;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
从它的源码可以知道,reduce 方法调用了 evaluate 方法,而 evaluate 方法会先去检查当前的 flag,是否使用并行模式,如果是则会调用evaluateParallel
方法执行并行计算,否则,会调用evaluateSequential
方法执行串行计算。
这里我们再看看TerminalOp
(注意这里是字母l O,而不是数字1 0)接口的evaluateParallel
方法。TerminalOp
接口的实现类有这样几个内部类:
- java.util.stream.FindOps.FindOp
- java.util.stream.ForEachOps.ForEachOp
- java.util.stream.MatchOps.MatchOp
- java.util.stream.ReduceOps.ReduceOp
可以看到,对应的是 Stream 的几种主要的计算操作。我们这里的示例代码使用的是 reduce 计算,那我们就看看ReduceOp 类的这个方法的源码:
// java.util.stream.ReduceOps.ReduceOp.evaluateParallel
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
2
3
4
5
6
evaluateParallel 方法创建了一个新的 ReduceTask 实例,并且调用了 invoke() 方法后再调用 get() 方法,然后返回这个结果。那这个 ReduceTask 是什么呢?它的 invoke 方法内部又是什么呢?
追溯源码我们可以发现,ReduceTask 类是 ReduceOps 类的一个内部类,它继承了 AbstractTask 类,而AbstractTask 类又继承了 CountedCompleter类,而 CountedCompleter 类又继承了 ForkJoinTask 类!
它们的继承关系如下:
ReduceTask -> AbstractTask -> CountedCompleter -> ForkJoinTask
这里的 ReduceTask 的 invoke 方法,其实是调用的 ForkJoinTask 的 invoke 方法,中间三层继承并没有覆盖这个方法的实现。
所以这就从源码层面解释了 Stream 并行的底层原理是使用了 Fork/Join 框架。
需要注意的是,一个 Java 进程的 Stream 并行计算任务默认共享同一个线程池,如果随意的使用并行特性可能会导致方法的吞吐量下降。我们可以通过下面这种方式来让你的某个并行Stream使用自定义的 ForkJoin 线程池:
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool
.submit(() -> roster.parallelStream().reduce(0, Integer::sum)).get();
2
3
# 5. Stream 并行计算的性能提升
我们可以在本地测试一下如果在多核情况下,Stream 并行计算会给我们的程序带来多大的效率上的提升。用以下示例代码来计算一千万个随机数的和:
public class StreamParallelDemo {
public static void main(String[] args) {
System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));
// 产生100w个随机数(1 ~ 100),组成列表
Random random = new Random();
List<Integer> list = new ArrayList<>(1000_0000);
for (int i = 0; i < 1000_0000; i++) {
list.add(random.nextInt(100));
}
long prevTime = getCurrentTime();
list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));
prevTime = getCurrentTime();
list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));
}
private static long getCurrentTime() {
return System.currentTimeMillis();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
输出:
本计算机的核数:8 495156156 单线程计算耗时:223 495156156 多线程计算耗时:95
所以在多核的情况下,使用 Stream 的并行计算确实比串行计算能带来很大效率上的提升,并且也能保证结果计算完全准确。
本文一直在强调的“多核”的情况。其实可以看到,我的本地电脑有 8 核,但并行计算耗时并不是单线程计算耗时除以 8,因为线程的创建、销毁以及维护线程上下文的切换等等都有一定的开销。所以如果你的服务器并不是多核服务器,那也没必要用 Stream 的并行计算。因为在单核的情况下,往往 Stream 的串行计算比并行计算更快,因为它不需要线程切换的开销。
# 参考
- 转载自:http://concurrent.redspider.group/article/03/19.html