Java基础系列-Stream简明教程

Stream 是 Java8 中一个重大的更新。Stream 为Java 真正带来了函数式编程的特性。对函数式编程不了解的人往往不知道如何动手,通过Benjamin 的教程来完整的学习一下 Java 的这个特性,学会这些技能会让你的代码看起来更酷。


这是一个通过代码示例来深度讲解 Java8 Stream 的教程。当我第一次看到 Stream 的 API 时,我感到很迷惑,因为这个名称听起来和 Java I/O 包中的 InputStreamOutputStream 有关系。但是实际上它们是完全不同的东西。 Stream 是 Monad(函数式编程),它为 Java 带来了函数式编程的特性,下面是维基百科对 Monad 的解释:

In functional programming, a monad is a structure that represents computations defined as sequences of steps. A type with a monad structure defines what it means to chain operations, or nest functions of that type together.

这份教程会讲解 Java8 Stream 的原理以及不同操作之间的区别。你将会学习到 Stream 操作的处理顺序以及不同的顺序对性能的影响。还会对常用的操作如 ReducecollectflatMap 进行详细讲解。在教程的最后会说明并行 Stream 的优点。

注:Stream 中的 API 称之为操作

如果你还不熟悉 Java8 的 lambda 表达式、函数式接口以及方法引用,可以先去读一下这份Java8 教程

Stream 原理

一个 Stream 代表着一组元素以及支持对这些元素进行计算的不同操作

1
2
3
4
5
6
7
8
9
10
11
12
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);

// C1
// C2

Stream 操作分为中间操作终端操作。中间操作会返回一个 Stream 对象,所以可以对中间操作进行链式操作。终端操作会返回一个 void 或者非 Stream 的对象。在上面的例子中,filtermapsorted 都是中间操作,而 forEach 则是一个终端操作。Stream 完整的操作 API 可以查看文档。Stream 链式操作可以查看上面的例子,链式操作也称之为管道操作

许多 Stream 操作接受 Lambda 或者函数式接口来限定操作范围。这些操作中绝大多数都必须是non-interfering无状态的,这是什么意思呢?

注:在函数式编程中,函数本身是可以作为参数的

non-interfering 表示方法在执行的过程中不会改动流中原数据,比如在前面的例子中没有 lambda 表达式修改了 myList 中的元素。

无状态表示方法多次执行的结果是确定的,比如前面的例子中没有 lambda 表达式会依赖在执行过程中会被修改的外部作用域中的变量。

不同种类的 Stream

Stream 可以通过多种方式创建,尤其是各种容器对象。List 和 Set 都支持 stream()parallelStream() 方法来创建串行或者并行的 Stream。并行 Stream 可以同时运行在多个线程上,在下文会详细讲解,当前先通过串行 Stream 来演示:

1
2
3
4
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); //a1

调用 List 的 stream() 方法会返回一个 Stream 对象。但是得到 Stream 对象不一定要创建 Collection 对象,看下面的代码:

1
2
3
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out.println);

只需要通过 Stream.of() 就可以把一堆对象创建为 Stream。

另外在 Java8 还可以通过 IntStreamLongStreamDoubleStream 等来操作原生数据类型 intlongdouble

IntStream 通过 range() 方法可以替代 for 循环:

1
2
3
4
5
IntStream.range(1,4)
.forEach(System.out::println);
// 1
// 2
// 3

所有的原生类型都可以和其他对象一样使用 Stream,但是所有的原生类型 Stream 都使用专门的 lambda 表达式,比如 int 使用 IntFunction 而不是 Function,使用 IntPredicate 而不是 Predicate

并且原生类型 Stream 还另外支持终端聚合操作 sum() 以及 average():

1
2
3
4
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0

这些操作在将对象转化为原生类型的时候非常有用,反之亦然。出于这个目的,普通 Stream 支持特别的 map 操作,比如 mapToInt()mapToLong()mapToDouble()

1
2
3
4
5
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3

原生数据类型可以通过 mapToObj() 转化为对象:

1
2
3
4
5
6
7
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);

// a1
// a2
// a3

下面这个例子是一个组合操作:double Stream 的元素首先被转成 int 最后被转化成 String:

1
2
3
4
5
6
7
8
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);

// a1
// a2
// a3

处理次序

上文中已经详细描述了如何创建和使用不同类型的 Stream,下面会深入研究 Stream 的操作是如何进行的。

中间操作的一个重要特征是延迟,看下面这个没有终端操作的例子:

1
2
3
4
5
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});

当上面的代码片段执行完成的时候,控制台并没有输出任何东西。这是因为中间操作在有终端操作的时候才会执行。

给上面的例子加上终端操作 forEach:

1
2
3
4
5
6
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));

执行这段代码会有如下的输出:

1
2
3
4
5
6
7
8
9
10
filter:  d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c

输出结果的顺序可能会让人惊讶。之前你可能会认为 Stream 中的元素会在一个操作中全部处理完之后才会进入到下一个操作。但实际的情况是一个元素在所有的操作执行完成之后才会轮到下一个元素。"d2" 首先被 filterforEach 的处理,然后 "a2" 才会被处理。

这样可以减少每个操作实际处理元素的个数,看下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2

这个 anyMatch 操作只在输入元素满足条件的情况下才会返回 true。在上面的例子中,运行到第二个元素 "a2" 时就会返回 true,然后就会停止处理其他元素,所以 map 操作也只是执行了两次,这正是得益于 Stream 的链式处理次序。

为什么次序很关键

下面的这个例子由两个中间操作 mapfilter 以及一个终端操作 forEach 组成。再看一下这些操作是如何执行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));

// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C

正如上面的例子所分析,map 和 filter 对每个字符串各执行了 5 次,而 forEach 仅仅执行了一次。

可以简单的调整操作的顺序来减少操作执行的总次数,下面的例子中把 filter 操作放到了 map 前面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c

调整后,map 只执行了一次,整个操作管道在输入大量元素时的执行速度会快很多。如果 Stream 有很多的操作,时序考虑一下能不能通过调整持续来优化。

在上面的例子中另外加上 sorted 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

sotred 是一个另类的中间操作,它是有状态的。因为在排序的过程中必须要维护数据的状态。

执行上面的例子会产生如下输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sort:    a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2

首先,sorted 会把输入的所有元素排好序之后才会进入下一个操作,和其他操作不同,sorted 是水平执行的。所以在上面的例子中 sorted 才会被执行 8 次。

通过调整操作的次序可以再一次提升执行的性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2

在这个例子中 sorted 永远也不会被执行,在 filter 执行完了之后就剩下一个元素,也就没有排序的必要。在输入大量元素的情况下,性能也会得到极大的提升。

重用 Stream

Java8 中的 Stream 是不能被重用的。一旦执行了终端操作,那么 Stream 就会被关闭:

1
2
3
4
5
6
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception

在 anyMatch 之后调用 noneMatch 会产生如下的异常:

1
2
3
4
5
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

如果需要解决这一点,可以为每一个终端操作创建一个新的 Stream,比如可以使用 Supplier 来创建所有中间操作已经执行完成的 Stream:

1
2
3
4
5
6
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok

每调用一次 get() 方法都会创建一个新的 Stream,然后就可以执行需要执行的终端操作了。

进阶操作

Stream 支持大量不同的操作,在上面的例子中已经介绍了最重要的操作如 filtermap。完整的操作可以在官方文档中查看。下面会重点介绍更加复杂的操作 collectflatMapreduce

这节绝大部分的代码例子都会使用下面 Person list 作为演示数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Person {
String name;
int age;

Person(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public String toString() {
return name;
}
}

List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));

Collect

如果需要将 Stream 中运行的结果转成一个不同的类型,比如 List、Set 或者 Map,collect 就非常有用。collect 操作接受由 suppileraccumulatorcombinerfinisher 等四个部分组成的 Collector 对象。听起来很复杂,但 java8 中 Collectors 类中的大量方法开箱即用,对很多通用的操作并不需要自己去实现:

注:suppiler, accumulator, combiner, finisher 都是函数式接口

1
2
3
4
5
6
7
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());

System.out.println(filtered); // [Peter, Pamela]

很简单就可以从 Stream 中获取一个 List,如果需要一个 Set,调用 Collectors.toSet() 就行了。

下面的这个例子是通过年龄来给 person 分组:

1
2
3
4
5
6
7
8
9
10
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));

personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors 功能很多,还可以用来对 Stream 中的元素做聚合操作,比如计算所有 person 的平均年龄:

1
2
3
4
5
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge); // 19.0

还可以用来做统计,summarizing 会返回一个内建的统计对象,通过这个对象可以很方便的得到最大年龄、最小年龄、平均年龄等统计结果:

1
2
3
4
5
6
7
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子中把所有 person 的名字拼成了一个字符串:

1
2
3
4
5
6
7
8
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

joining 接收一个间隔符和可选的前缀、后缀字符串。

为了输出 map 结果。必须指定 map 的 key 和 value。需要注意 key 必须是唯一的,否则会报 IllegalStateException 异常。可以通过传入另外一个合并方法作为参数来避免这个异常:

1
2
3
4
5
6
7
8
9
Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

上面介绍了一些很强大 Collectors 的内建方法。下面来实现一个自定义的 collector。将所有 Person 的名称转成大写并输入到一个字符串中,每个名字使用 | 来隔开。自定义的 collecotr 使用 Collecotr.of() 来实现,需要实现其中的四个部分:supplieraccumulatorcombinerfinisher

1
2
3
4
5
6
7
8
9
10
11
12
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher

String names = persons
.stream()
.collect(personNameCollector);

System.out.println(names); // MAX | PETER | PAMELA | DAVID

在 Java 中,String 对象是不可变的。所以需要一个 StringJoiner 来组合字符串,suppiler 实例化一个带 | 分隔符的 StringJoiner 对象。accumulator 把字符串转成大写并且放进 StringJoiner 对象,combiner 将两个 StringJoiner 对象合成一个,最后 finisher 把 StringJoiner 对象输出为 String 对象。

flatMap

在上面已经介绍了如何使用 map 将 Stream 中的对象转成另外一种类型的对象。map 只能把一种类型转成另外一种特定的类型,在把一种类型转成任意种类型的情况下,map 就有点受限制了。而 flatMap 正是来解决这个问题的。

flatMap 会把 Stream 中的每个元素转成另一个 Stream 中的其他对象。所以每个元素依赖 STream 会被转成 0 个,1 个或者多个其他对象。这些生成的新的 stream 会在 flatMap 操作结束的时候返回。

在使用 flatMap 之前,需要定义以下的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Foo {
String name;
List<Bar> bars = new ArrayList<>();

Foo(String name) {
this.name = name;
}
}

class Bar {
String name;

Bar(String name) {
this.name = name;
}
}

接下来,利用 Stream 初始化一些对象:

1
2
3
4
5
6
7
8
9
10
11
12
List<Foo> foos = new ArrayList<>();

// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

现在,生成了包含三个 foo 对象的 list,每个 foo 对象中又包含三个 bar 对象。

flatMap 接收一个返回 Stream 对象的方法作为参数,为了分解 foo 中的每个 bar 对象,传入一个合适的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

上面那个例子成功的把一个包含三个 foo 对象的 Stream 转成了包含 9 个 bar 对象的 Stream。

而且,上面的那些代码可以被简化成一个 Stream 管道操作:

1
2
3
4
5
6
7
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));

flatMap 操作对 java8 中的 Optional 对象也有用,Optional 对象的操作会返回另一个类型的 Optional 对象。所以这个特性可以用来消除空指针检查。

定义类的抽象层次如下:

1
2
3
4
5
6
7
8
9
10
11
class Outer {
Nested nested;
}

class Nested {
Inner inner;
}

class Inner {
String foo;
}

为了从 Outer 对象中调用 Inner 对象中的 foo 字符串,需要做很多的空指针检查来避免潜在的空指针异常:

1
2
3
4
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}

这些操作可以通过 flatMap 来进行优化:

1
2
3
4
5
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);

每调用一次都会返回一个 Optional 对象,对象中包裹着目标对象或者 null。

Reduce

Reduce 组合 Stream 中所有的元素,然后产生一个单独的结果。Java8 支持三种 reduce 方法。第一种 reduce 对于每个 Stream 只会返回一个元素。下面这个例子计算除了年龄最大的人的名字:

1
2
3
4
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela

reduce 方法接受一个 BinaryOperator 函数。在 Person 这个例子中,实际上是一个 BiFunction,两个操作数的类型都是一致的。BiFunction 与 Function 很像,但是前者接收两个参数。这个例子中比较所有 person 的年龄属性来找出最大年龄的 person。

第二种 reduce 方法接受一个目标对象和一个 BinaryOperator。下面这个方法可以聚合所有的 person 属性来创建一个新的 person:

1
2
3
4
5
6
7
8
9
10
11
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三种 reduce 接受三个参数:一个目标对象、一个 BiFunction、一个 BinaryOperator 类型的 combiner。因为这个传入的值不一定是 Person 类型,所以我们可以利用这个特性来计算所有 Person 年龄的总和:

1
2
3
4
5
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum); // 76

最后的结果是 76,那么中间的计算过程是什么样的的呢?下面 debug 了计算的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

可以看到 accumulator 函数完成了所有的计算,调用的第一次得到的是初始值 0 和 Max person。然后后续的三步完成了所有年龄的的累加。在最后一步得到了所有年龄的累加结果 76。

但是上面的例子看起来稍微有点问题,因为 combiner 函数根本没有执行,但是真的是这样的吗?看下面的代码我们就能发现秘密所在:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

在并行执行的情况下有着完全不同的执行行为。在这里 combiner 执行了,accumulator 在并行情况下被执行的时候,combiner 用来累加 accumulator 的执行结果。

在下一节会详细分析并行 Stream。

并行 Stream

在输入元素数量很多的情况下,通过并行执行 Stream 可以提升执行性能。并行 Stream 使用了 ForkJoinPool,这个对象可以通过 ForkJoinPool.commonPool() 来得到。底层的线程池最多可以有五个线程,取决于物理机器可以用的 CPU 有几个。

1
2
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3

在我的机器上这个线程的数量被设定为 3。这个值可以通过 JVM 的参数来进行调整:

1
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collection 对象可以通过 parallelStream() 来创建一个并行的 Stream。或者也可以对一个串行的 Stream 对象调用 parallel() 来转成并行 Stream。

为了理解 Stream 是如何并行执行的,下面这个例子把线程的情况都打印出来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));

通过研究 debug 输出,可以看到 Stream 执行过程中哪些线程确实用到了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filter:  b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

并行 Stream 执行操作的过程中用到了线程池中所有的线程。上面输出的结果顺序可能每次都是不一样的,这是因为线程执行的顺序本身就是不一样的。

给上面的例子加上 sort 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));

执行的结果看起来有点奇怪:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

可以看到了 sort 操作只会在主线程中执行。并行 Stream 中的 sort 操作实际用到了 Java8 中的新接口 Arrays.parallelSort()。在 Javadoc 中说明了数组的长度决定了这个排序操作是串行还是并行执行:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上面的例子,可以发现 combiner 函数只会在并行情况下执行。下面来看一下哪些线程确实执行了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));

persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});

上面例子的输出说明了 accumulator 和 combiner 在并行 Stream 中会在所有的可用线程上执行:

1
2
3
4
5
6
7
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]

所有在输入元素的量很大的情况下,并行 Stream 会带来很大的性能提升。但是需要注意一些操作比如 reducecollect 需要额外的 combine 操作,但是在串行 Stream 中并不需要。

此外,所有的并行 Stream 都依赖 ForkJoinPool 线程池。所以应当尽量避免实现一些阻塞 Stream 的操作,因为这样会降低那些依赖并行 Stream 的程序的性能。

(完)

@2020 rayjun