流(一)入门介绍

流(一)入门介绍

一. 引文

1.1 为什么有了集合还需要流?

流的出现是为了在某些场景下能够代替集合,集合的业务中常常涉及到类似数据库的操作,当有一些复杂需求需要嵌套遍历运算时,集合就需要公式似的写一串代码,很难让人一眼看明白代码代表的意思。如下案例实现在集合中筛选金额较高的交易,并按货币分组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//创建累积交易分组的Map
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
//遍历交易List
for(Transaction transaction : transactions){
//筛选金额较高的交易
if(transaction.getPrice() > 1000){
//提取交易货币
Currency currency = transaction.getCurrency();
List<Transaction> transactionForCurrency = transactionsByCurrencies.get(currency);
//如果此货币分组为空,就新建一个
if(transactionForCurrency == null){
transactionForCurrency = new ArratList();
transactionsByCurrencies.put(currency,transactionForCurrency);
}
//将当前遍历的交易添加到具有同一货币的交易List中
transactionForCurrency.add(transaction);
}
}

如果通过Stream API,可以改写上述代码如下。

1
2
import static java.util.stream.Collectors.toList;
Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().filter((Transaction t) -> t.getPrice() > 1000).collect(groupingBy(Transaction::getCurrency));

流允许我们以声明性方式处理数据几个(通过查询语句实现,而不是临时编写一个实现)。

1.2 流解决了集合的两个问题

两个问题是:集合处理时的固定套路和晦涩,以及难以有效利用多核性能。

集合的主要目的是为了存储和访问数据,而Stream则主要用于描述对数据的计算。

集合需要手动实现迭代和运算处理,这种数据迭代叫外部迭代,而流式的迭代完全在库内部实现,叫做内部迭代。流则和SQL的思路一样,采用了更高级的语言表达,由实现(Streams库)来选择最佳的低级执行机制。这样就可以避免用synchronized编写代码,它不仅容易出错,且在多核环境下需要很大的成本。筛选一个集合的最快方案就是先转为Stream,再进行并行处理,最后再转换为List。编程中往往会有一些反复出现的数据处理模式,可以根据标准进行筛选数据提取数据给数据分组等会很方便。

这些操作往往可以并行化。比如在两个CPU上筛选集合,可以一个处理列表前半部,一个处理后半部,这叫做分支步骤(1);然后CPU对各自半个集合进行筛选(2);最后(3),一个CPU将两个结果合并。

1.3 流的优点

流相较集合的优点:

  • 声明性:如果使用SQL这种数据库语言会发现一些查询语句转化为Java要变得很长,Java8的流支持这种简明的数据库查询式编程且用的是Java语法,我们的目标是说明要做什么,而不是要如何来实现这个操作。
  • 可复合:可以对流做一些集合不能做的优化操作,如可以将对同一个流的若干操作组合起来,从而只遍历一次数据,而不是花大代价的去多次遍历。
  • 可并行:Java可以自动的将流操作并行化,而集合则不行。
  • 流被设计为无需同时将数据调入内存,这样就可以处理无法装入内存的流数据了。

二. 流简介

2.1 stream和parallelStream

集合接口Collection在Java 8中增加了默认方法stream()和parallelStream(),分别代表着流的顺序和并行处理

1
2
3
4
5
6
7
8
9
10
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
//顺序处理
List<Student> result = students.stream().filter((Student s) -> s.getAge() > 18).collect(Collectors.toList());
//并行处理
List<Student> result = students.parallelStream().filter((Student s) -> s.getAge() > 18).collect(Collectors.toList());

2.2 流是什么

简单的定义,流是从支持数据处理操作的源生成的元素序列

  • 元素序列:和集合一样,流也提供了接口Stream,可以访问特定元素类型的一组有序值。集合是数据结构,其目的是以特定的时间/空间复杂度存储和访问元素。流的目的是表达计算。集合表述数据,流表述计算。
  • :流会使用一个提供数据的源,如集合、数组或输入/输出资源。从有序集合生成流时会保留顺序。
  • 数据处理操作:流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执行也可以并行执行。
  • 流水线:很多流操作本身会返回一个流,所以这些操作就可以链接起来,形成一个大的流水线。也为我们做如延迟和短路这些优化提供了可能。
  • 内部迭代:与使用迭代器进行显示迭代的集合不同,流的迭代操作是在背后进行的。

将流操作链接起来构成流的流水线

如下代码中,数据源是菜肴列表,它提供给流一个元素序列。一系列数据处理操作构成了流水线,最后的collect操作并没有返回流,而是返回List结果。在collect操作被调用前,流水线上其他操作都在排队等待。

1
2
3
4
5
6
List<String> threeHighCaloricDishNames = menu.stream() //从集合获得流,准备创建流水线
.filter(d -> d.getCalories() > 300) //筛选高热量的菜肴
.map(Dish::getName) //获取菜名
.limit(3) //只选择头三个
.collect(Collectors.toList()); //将结果存在另一个List中
System.out.println(threeHighCaloricDishNames);

使用流来筛选菜单找出三个热量最高的菜肴名字

流水线类似于构建器模式,一个调用链来设置一套配置,再调用build()方法。


三. 流和集合

我们可以把DVD中存储的电影(基于字节或帧无所谓)看作是一个集合,而网络中通过视频流看同一部电影看作是流。视频流不需要用户等待加载完整个视频,而是只要下载当前观看位置的那几帧就可以了。

流与集合

这样看,流和集合的差异就在于何时进行计算,集合是一个内存中的数据结构,它包含了数据结构中目前的所有值,集合中的所有元素都要先计算出来才能添加到集合中,你可以对集合元素做增删操作,但每个元素都必须算出后存放在内存里。

流则只是概念上的固定数据结构,你不能增删元素,它的元素是按需计算的。这是一种生产者——消费者关系,流也可以看作是一个延迟创建的集合:只有消费者要求时才会计算值(需求驱动,实时制造)。集合则是急切创建的(供应商驱动)。比如我们需要创建一个所有质数的集合,质数是无穷无尽的,所以这个质数集合我们永远也拿不到。另一个例子是查询分页,集合就是我把所有元素都返回给用户,流则是根据需求返回优先级最高的一页数据。

3.1 只能遍历一次

流和迭代器一样,只能遍历一次。如果还需要重新遍历就需要从数据源再重新获取一个新流,当然这里指的是集合之类可以重复的源,而不是I/O通道。

1
2
3
Stream s = menu.stream();
s.forEach(System.out::println);
s.forEach(System.out::println);

执行后抛出异常: stream has already been operated upon or closed

1
2
3
4
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at categories.java.a7stream.Test.main(Test.java:31)

3.2 外部迭代和内部迭代

集合Collection需要开发者自行去做迭代,叫外部迭代。流则是内部迭代,Stream库帮你做了迭代,并把得到的流值存放起来,你只要声明要做什么即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
List<String> names = new ArrayList<>();
//集合通过for-each循环外部迭代
for(Dish dish : menu){
names.add(dish.getName());
}
//集合通过迭代器做外部迭代
Iterator<Dish> iterator = menu.iterator();
while (iterator.hasNext()){
Dish d = iterator.next();
names.add(d.getName());
}
//流内部迭代
names = menu.stream().map(Dish::getName).collect(Collectors.toList());

我们设想一个应用场景,你需要女儿把地上的玩具收回箱子。如果是外部迭代,你需要一个个的指示她去整理所有玩具,需要显式的取出每个项目进行处理。如果是内部迭代,她可以选择一手拿一个,也可以自己决定先拿离盒子较近的玩具,而你只需要告诉她把所有的玩具收回箱子即可。流的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现,而集合就需要你自己手动来处理所有的并行问题。

内部迭代与外部迭代


四. 流的操作

可以连接起来的流操作称为中间操作,关闭流的操作叫终端操作

  • 一个数据源来执行一个查询
  • 一个中间操作链,形成一条流水线
  • 一个终端操作,执行流水线,并能生成结果

4.1 中间操作

除非流水线触发一个终端操作,否则中间操作不会执行任何处理,中间操作可以合并起来,在终端操作时一次性处理。

中间操作与终端操作

可以尝试在中间操作中添加打印,观察执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
//打印每个操作
List<String> delayNames = menu.stream() //从集合获得流,准备创建流水线
.filter(d -> {
System.out.println("filtering" + d.getName());
return d.getCalories() > 300;
}) //筛选高热量的菜肴
.map(d -> {
System.out.println("maping" + d.getName());
return d.getName();
}) //获取菜名
.limit(3) //只选择头三个
.collect(Collectors.toList()); //将结果存在另一个List中
System.out.println(threeHighCaloricDishNames);

打印结果如下,可以发现Java通过流的延迟性质进行了多个优化:(1)通过limit操作和短路技巧可以只取前3个元素。(2)循环合并技术把filter和map操作合并到同一次遍历中了。

1
2
3
4
5
6
7
filtering pork
maping pork
filtering beef
maping beef
filtering chicken
maping chicken
[pork, beef, chicken]
操作 类型 返回类型 操作参数 函数描述符
filter 中间 Stream Predicate T -> boolean
map 中间 Stream Function<T, R> T -> R
limit 中间 Stream long maxSize
sorted 中间 Stream Comparator (T, T) -> int
peek 中间 Stream Consumer T -> void
distinct 中间 Stream
skip 中间 Stream long n
mapToInt 中间 IntStream ToIntFunction T -> int
mapToLong 中间 LongStream ToLongFunction T -> long
mapToDouble 中间 DoubleStream ToDoubleFunction T -> double
flatMap 中间 Stream Function<T, Stream> T -> R
flatMapToInt 中间 IntStream Function<T, IntStream> T -> int
flatMapToLong 中间 LongStream Function<T, LongStream> T -> long
flatMapToDouble 中间 DoubleStream Function<T, DoubleStream> T -> double

4.2 终端操作

终端操作会从流水线生成非流的结果,如List、Integer、void等。

操作 类型 目的
forEach 终端 消费流中的每个元素并对其应用Lambda,返回void
count 终端 返回流中元素的个数,返回long
collect 终端 把流归约为集合,如List、Map甚至Integer

参考:

🔗 《Java8实战》