流(三)收集器

流(三)收集器

八. 用流收集数据

8.1 收集器

收集器简洁灵活的定义了collect用来生成结果集合的标准,collect操作本质是一个归约操作,遍历流中的每个元素,进行转换操作,将结果累积到一个数据结构中并最终输出。

1
2
3
4
5
<R, A> R collect(Collector<? super T, A, R> collector);

<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

Collector接口的实现决定了如何对流进行归约操作,Collectors提供了许多常用的静态工厂方法,主要提供了三个功能:将流元素归约为一个值,元素分组,元素分区。

8.2 归约和汇总

具体使用场景可以参考以下实例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//统计:统计菜单有多少道菜肴
long dishedNum = menu.stream().collect(Collectors.counting());
long dishedNum1 = menu.stream().count();

//求最大/最小值:找出热量最高的菜肴
Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCaloriesDish = menu.stream().collect(Collectors.maxBy(dishCaloriesComparator));
Optional<Dish> mostCaloriesDish1 = menu.stream().max(dishCaloriesComparator);

//汇总求和:求所有菜肴总热量
int totalCalories = menu.stream().collect(Collectors.summingInt(Dish::getCalories));
////这种写法最简洁,且性能最好,避免了自动拆箱
int totalCalories1 = menu.stream().mapToInt(Dish::getCalories).sum();

//汇总求平均:求所有菜肴热量平均值
double averageCalories = menu.stream().collect(Collectors.averagingInt(Dish::getCalories));

//一次遍历求所有汇总结果
IntSummaryStatistics statistics = menu.stream().collect(Collectors.summarizingInt(Dish::getCalories));
System.out.println(statistics);//IntSummaryStatistics{count=9, sum=4200, min=120, average=466.666667, max=800}

//连接字符串
String shortMenu = menu.stream().map(Dish::getName).collect(Collectors.joining());
System.out.println(shortMenu);//porkbeefchickenfrench friesriceseason fruitpizzaprawnssalmon

//加分隔符
String shortMenu1 = menu.stream().map(Dish::getName).collect(Collectors.joining(", "));
System.out.println(shortMenu1);//pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon

下图为summingInt收集器的累积过程。

summingInt收集器的累积过程

前面都是常用情况的便捷工具,Collectors提供的reducing()则需要三个参数(初始值,转换函数,操作)。有一个重载的单参数reducing(),流中第一个元素作为起点,转换函数为默认输入参数。

1
2
3
4
5
6
7
int totalCalories3 = menu.stream().collect(Collectors.reducing(0,Dish::getCalories,(i,j) -> i + j));
int totalCalories4 = menu.stream().map(Dish::getCalories).reduce(0,(i,j) -> i + j);

//单参数reducing(),
Optional<Dish> mostCaloriesDish2 = menu.stream()
.collect(Collectors.reducing(
(d1,d2) -> d1.getCalories() > d2.getCalories() ? d1 :d2));

下图为totalCalories3计算菜单总热量的归约过程。

计算菜单总热量的归约过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static <T, U>
Collector<T, ?, U> reducing(U identity,
Function<? super T, ? extends U> mapper,
BinaryOperator<U> op) {
return new CollectorImpl<>(
boxSupplier(identity),
(a, t) -> { a[0] = op.apply(a[0], mapper.apply(t)); },
(a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
a -> a[0], CH_NOID);
}

//单参数
public static <T> Collector<T, ?, Optional<T>>
reducing(BinaryOperator<T> op) {
class OptionalBox implements Consumer<T> {
T value = null;
boolean present = false;

@Override
public void accept(T t) {
if (present) {
value = op.apply(value, t);
}
else {
value = t;
present = true;
}
}
}

return new CollectorImpl<T, OptionalBox, Optional<T>>(
OptionalBox::new, OptionalBox::accept,
(a, b) -> { if (b.present) a.accept(b.value); return a; },
a -> Optional.ofNullable(a.value), CH_NOID);
}

collect和reduce的区别

二者往往可以实现相同的功能,reduce方法旨在把两个值结合起来生成一个新值,是一个不可变的归约。collect方法则相反,设计上就是要改变容器,从而累积要输出的结果。还有一点就是collect方法相比reduce更适合并行工作。

8.3 分组

Collectors提供了静态工厂方法groupingBy,参数Function叫分类函数,用来把流中元素分成不同的组。

1
2
3
4
5
6
7
8
Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(Collectors.groupingBy(Dish::getType));
dishesByType.forEach((k,v) -> System.out.println(k + "->" + v));

//groupingBy
public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier) {
return groupingBy(classifier, toList());
}

在分组的过程中对流中的项目进行分类

实际开发中,用来分类的Type可能不存在,但我们可以自定义,并用Lambda表达式来描述逻辑。

1
2
3
4
5
6
7
8
9
//以热量分组
Map<CaloricLevel,List<Dish>> dishesByCaloricLevel = menu.stream().collect(Collectors.groupingBy(
dish -> {
if(dish.getCalories() <= 400) return CaloricLevel.DIET;
else if(dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}
));
dishesByCaloricLevel.forEach((k,v) -> System.out.println(k + "->" + v));

多级分组需要调用双参数版本的groupingBy方法,可以扩展至任意层级的树形结构Map。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//多级分组
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
menu.stream().collect(
Collectors.groupingBy(Dish::getType,
Collectors.groupingBy(dish ->{
if(dish.getCalories() <= 400) return CaloricLevel.DIET;
else if(dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}))
);


public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream) {
return groupingBy(classifier, HashMap::new, downstream);
}

下图表示n层嵌套映射和n维分类表间的等价关系。

n层嵌套映射和n维分类表间的等价关系

按子组收集数据: groupingBy方法的第二个参数可以是任意Collector的实现,不一定非要是groupingBy。collectingAndThen方法则可以在收集器完成处理后转换返回类型,需要参数:收集器和转换函数,最后返回一个收集器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//分类汇总求和
Map<Dish.Type, Long> typesCount =
menu.stream().collect(
Collectors.groupingBy(Dish::getType,Collectors.counting()));

//分类汇总取最大值
Map<Dish.Type, Optional<Dish>> mostCaloricByType =
menu.stream().collect(
Collectors.groupingBy(Dish::getType,Collectors.maxBy(Comparator.comparing(Dish::getCalories))));

//收集后转换类型
Map<Dish.Type, Dish> mostCaloricByType1 =
menu.stream().collect(
Collectors.groupingBy(Dish::getType,Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparing(Dish::getCalories)),Optional::get)));
mostCaloricByType1.forEach((k,v) -> System.out.println(k + "->" + v));

原始流先根据分类函数分为多个子流,每个子流再由第二个收集器进行处理,归约收集器返回取得的最大值Optional<Dish>collectingAndThen 收集器返回从 Optional<Dish> 提取的值,最后第二级收集器的结果成为分组映射的值。

8.4 分区

分区是分组的特殊情况,由一个谓词作为分类函数,即分区函数。分区函数返回一个布尔值,意味着得到的分组Map的键值为Boolean,所以只能分为两组。

1
Map<Boolean, List<Dish>> partitionedMenu = menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian));

分区也可以像分组那样多级组合,可以看作是特殊的使用谓词来做分组函数的分组。

8.5 收集器接口

Collector接口为实现具体的归约操作(收集器)提供了范本,

  • T是流中要收集的项目的泛型。
  • A是累加器的类型,累加器在收集过程中用于累积部分结果的对象。
  • R是收集操作得到的对象类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public interface Collector<T, A, R> {
//创建并返回新的可变结果容器的函数
Supplier<A> supplier();

//将值折叠到可变结果容器中的函数
BiConsumer<A, T> accumulator();

//接受两个部分结果并将其合并的函数。
//组合器函数可以将状态从一个参数折叠到另一个参数并返回该参数,或者返回一个新的结果容器。
BinaryOperator<A> combiner();

//执行从中间累积类型A到最终结果类型R的最终转换
Function<A, R> finisher();

//返回Collector.Characteristics的Set,指示此收集器的特征。这个集合应该是不可变的。
Set<Characteristics> characteristics();

//返回由给定的supplier、accumulator和combiner函数描述的新Collector。生成的Collector具有Collector.Characteristics.IDENTITY_FINISH特征。
public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
BiConsumer<R, T> accumulator,
BinaryOperator<R> combiner,
Characteristics... characteristics) {
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
Objects.requireNonNull(characteristics);
Set<Characteristics> cs = (characteristics.length == 0)
? Collectors.CH_ID
: Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,
characteristics));
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
}

//返回由给定的supplier、accumulator、combiner和finisher函数描述的新Collector。
public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Function<A, R> finisher,
Characteristics... characteristics) {
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
Objects.requireNonNull(finisher);
Objects.requireNonNull(characteristics);
Set<Characteristics> cs = Collectors.CH_NOID;
if (characteristics.length > 0) {
cs = EnumSet.noneOf(Characteristics.class);
Collections.addAll(cs, characteristics);
cs = Collections.unmodifiableSet(cs);
}
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
}

//表示Collector属性的特性,可用于优化缩减实现。
enum Characteristics {
//指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器同时调用的累加器函数。
//如果CONCURRENT收集器也不是UNORDERED,则仅当应用于无序数据源时,才应并发计算该收集器。
CONCURRENT,

//指示集合操作不承诺保留输入元素的相遇顺序。(如果结果容器没有内在顺序,例如Set,则可能是这样的。)
UNORDERED,

//表示分页装订器函数是标识函数,可以省略。如果设置了,则从A到R的未检查强制转换必须成功。
IDENTITY_FINISH
}
}

8.5.1 supplier

supplier方法返回一个结果为空的 Supplier<A> ,也就是一个无参数函数,被调用时会创建一个空的累加器实例,供数据收集过程使用。对于把累加器本身作为结果返回的收集器(如toList),在对空流操作时,这个空的累加器也代表着收集过程的结果。

8.5.2 accumulator

accumulator方法会返回执行归约操作的函数。当遍历到流的第n个元素时,此函数执行时会有两个参数:保存归约结果的累加器,和第n个元素。此函数会返回void,因为累加器是原位更新,即函数的执行改变了它的内部状态。

8.5.3 finisher

在遍历完流后,finisher方法返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果。

这三个方法已足够对流进行顺序归约,下图为顺序归约过程的逻辑步骤。

顺序归约过程的逻辑步骤

8.5.4 combiner

combiner方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得到的累加器要如何进行合并。

有了此方法就可以对流进行并行归约了,过程要用到分支/合并框架和Spliterator抽象。

  • 原始流会以递归的方式拆分为子流,直到定义流是否需要进一步拆分的一个条件为非。
  • 所以子流都可以并行处理,每个子流都可以进行顺序归约算法。
  • 使用收集器combiner返回的函数,将所有部分结果两两合并。

下图为使用combiner方法来并行化归约过程。

使用combiner方法来并行化归约过程

8.5.5 characteristics

characteristics会返回一个不可变的Characteristics集合,它定义了收集器的行为——尤其是关于流是否可以进行并行归约,以及可以使用哪些优化的提示。

枚举类Characteristics包含三个项目

  • UNORDERED:归约结果不受流中项目的遍历和累积顺序的影响。
  • CONCURRENT:accumulator函数可以从多个线程同时调用,且该收集器可以进行并行归约流。如果收集器没有标注UNORDERED,则仅当应用于无序数据源时才应并发归约。
  • IDENTITY_FINISH:表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象会直接用作归约过程的最终结果。也意味着将累加器A不加检查的转换为结果R是安全的。

8.5.6 融合实现ToListCollector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() {//创建集合操作的起始点
return ArrayList::new;
}

@Override
public BiConsumer<List<T>, T> accumulator() {//累积遍历过的项目,原位修改累加器
return List::add;
}

@Override
public Function<List<T>, List<T>> finisher() {//恒等函数
return Function.identity();
}

@Override
public BinaryOperator<List<T>> combiner() {//修改第一个累加器,将其和第二个累加器内容合并
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}

@Override
public Set<Characteristics> characteristics() {//累加器不是UNORDERED的,因为我们想保留顺序
return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH,Characteristics.CONCURRENT));
}
}

我们实现的这个收集器和 Collectors.toList() 区别只是在一些优化上。collect()有一个三参数的重载方法(supplier,accumulator,combiner),

1
2
3
4
5
List<Dish> dishes = menu.stream().collect(Collectors.toList());
//toList是一个工厂方法由单例生成,ToListCollector则需要实例化
List<Dish> dishes1 = menu.stream().collect(new ToListCollector<>());
//简单但不易读,且收集器永远是IDENTITY_FINISH+CONCURRENT且非UNORDERED的
List<Dish> dishes2 = menu.stream().collect(ArrayList::new, List::add,List::addAll);

8.6 实现收集器

1
2
3
4
5
6
7
8
9
10
11
12
13
14

Map<Boolean, List<Integer>> primes = partitionPrimes(5);
primes.forEach((k,v) -> System.out.println(k + "->" + v));
//false->[4]
//true->[2, 3, 5]

public static Map<Boolean, List<Integer>> partitionPrimes(int n){
return IntStream.rangeClosed(2, n).boxed().collect(Collectors.partitioningBy(candidate -> isPrime(candidate)));
}

public static boolean isPrime(int candidate){
int candidateRoot = (int) Math.sqrt((double) candidate);
return IntStream.rangeClosed(2, candidateRoot).noneMatch(i -> candidate % i == 0);
}

若除数本身不是质数则无需测试,只需要用被测试数之前的质数来测试,但我们所使用的收集器在收集过程无法访问部分结果,也就是我们无法访问已经找到的质数列表。

1
2
3
public static boolean isPrime(List<Integer> primes, int candidate){
return primes.stream().noneMatch(i -> candidate % i == 0);
}

我们需要筛选已找到的质数中小于被测数的,通过filter只能对整个流进行筛选,如果流很大会造成问题,我们只要在质数大于被测数平方根的时候停下就行了。因此实现如下takeWhile方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static boolean isPrime(List<Integer> primes, int candidate){
int candidateRoot = (int) Math.sqrt((double) candidate);
return takeWhile(primes, i -> i <= candidateRoot).stream().noneMatch(i -> candidate % i == 0);
}

public static <A> List<A> takeWhile(List<A> list, Predicate<A> p){//给定一个排序集合和一个谓词,返回元素满足谓词的最长前缀
int i = 0;
for(A item : list){
if(!p.test(item)){//检查列表中的当前项目是否符合谓词
return list.subList(0, i);//不满足就返回该项目之前的前缀子列表
}
i++;
}
return list;
}

takeWhile方法是及时的,但我们需要一个延迟求值的方法来与noneMatch合并,需要熟悉Stream API的实现原理才可以实现。有了算法我们可以手动实现一个收集器了,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class PrimeNumbersCollector implements Collector<Integer,
Map<Boolean, List<Integer>>,
Map<Boolean, List<Integer>>> {
@Override
public Supplier<Map<Boolean, List<Integer>>> supplier() {
//创建了作为累加器的map,并为true和false键初始化了空列表。
return () -> new HashMap<Boolean, List<Integer>>(){
{
put(true,new ArrayList<>());
put(false,new ArrayList<>());
}
};
}

@Override
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
acc.get(isPrime(acc.get(true),candidate)) //根据isPrime结果获取质数或非质数列表
.add(candidate); //将被测数添加到相应列表
};
}

@Override
public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
//此收集器是无法并行运算的,因为算法本身是顺序的,所以当前combiner()无法被调用
return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
map1.get(true).addAll(map2.get(true));
map1.get(false).addAll(map2.get(false));
return map1;
};
}

@Override
public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
return Function.identity();
}

@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
}

/**
* 判断是否为质数
* @param primes 已有质数列表
* @param candidate 当前被测数
* @return 结果
*/
public boolean isPrime(List<Integer> primes, int candidate){
int candidateRoot = (int) Math.sqrt((double) candidate);
return takeWhile(primes, i -> i <= candidateRoot).stream().noneMatch(i -> candidate % i == 0);
}

/**
* 查找符合谓词的元素集合
* @param list 元素列表
* @param p 谓词
* @return 符合元素集合
*/
public <A> List<A> takeWhile(List<A> list, Predicate<A> p){
int i = 0;
for(A item : list){
if(!p.test(item)){//检查列表中的当前项目是否符合谓词
return list.subList(0, i);//不满足就返回该项目之前的前缀子列表
}
i++;
}
return list;
}
}

public static Map<Boolean, List<Integer>> partitionPrimesWithPrimeNumbersCollector(int n){
return IntStream.rangeClosed(2, n).boxed().collect(new PrimeNumbersCollector());
}

Map<Boolean, List<Integer>> primes1 = partitionPrimesWithPrimeNumbersCollector(5);

可以写一些测试代码对比一下性能提升比例,本机测试如下,大概提升了30%性能。

1
2
Fastest execution done in 626 msecs
Fastest execution done in 445 msecs

参考:

🔗 《Java8实战》