流(二)使用和构建

流(二)使用和构建

五. 流的使用

5.1 筛选和切片

流怎样选择元素:用谓词筛选,筛选出各不相同的元素,忽略流中的头几个元素,或将流截短至指定长度。

5.1.1 用谓词筛选

Stream接口支持filter方法,接受一个Predicate参数,并返回符合谓词元素T的流。

1
Stream<T> filter(Predicate<? super T> predicate);

用谓词筛选一个流

5.1.2 筛选出各不相同的元素

Stream接口支持distinct方法,返回一个元素各异的流(根据流所生成元素的hashcode和equals方法实现),避免重复元素。

1
Stream<T> distinct();

筛选流中各异的元素代码

筛选流中各异的元素

5.1.3 截短流

Stream接口支持limit方法,返回一个不超过给定长度的流。

1
Stream<T> limit(long maxSize);

截短流代码

截短流

5.1.4 跳过元素

Stream接口支持skip方法,返回一个扔掉了前n个元素的流,如果流中元素不足n个,会返回一个空流。

1
Stream<T> skip(long n);

在流中跳过元素

5.2 映射

类似于数据库查询表中指定属性列,流也可以从对象中选择信息。

5.2.1 对流中每一个元素应用函数

Stream接口支持map方法,接受一个Function参数,会应用到每个元素上,并将元素映射为一个新的元素,最后返回流。

1
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
1
2
3
4
List<String> words = Arrays.asList("Java 8","Lambdas","In","Action");
//对流中每个元素应用函数
List<Integer> wordLengths = words.stream().map(String::length).collect(Collectors.toList());
System.out.println(wordLengths);//[6, 7, 2, 6]

5.2.2 流的扁平化

如果我们对于一个字符串集合,需要返回一张列表,列出所有不同的字符。我们可能会先通过map实现如下。

1
words.stream().map(word -> word.split("")).dintinct().collect(Collectors.toList());

传递给map方法的Lambda为每个单词返回了 String[] ,所以map返回的流应该是 Stream<String[]> 格式的,而我们想要的是 Stream<String>

不正确的使用map找出单词列表中不同的字符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//流的扁平化
//首先通过map把单词映射为字符列表,再distinct去重,但返回是Stream<String[]>
List<String[]> wordCharsArray = words.stream().map(word -> word.split("")).distinct().collect(Collectors.toList());
wordCharsArray.forEach(wordChar -> System.out.println(Arrays.toString(wordChar)));

//我们想要的是Stream<String>而不是Stream<String[]>,Arrays::stream这个方法好像可以把数组转为流,但只是把字符数组变为了流
words.stream().map(word -> word.split("")).map(Arrays::stream).distinct().collect(Collectors.toList());

//再通过flatMap
List<String> wordChars = words.stream()
.map(word -> word.split("")) //将每个单词转换为由其字母构成的数组
.flatMap(Arrays::stream) //将各个生成流扁平化为单个流
.distinct()
.collect(Collectors.toList());
System.out.println(wordChars);

执行结果如下。

1
2
3
4
5
6
[J, a, v, a,  , 8]
[L, a, m, b, d, a, s]
[I, n]
[A, c, t, i, o, n]

[J, a, v, , 8, L, m, b, d, s, I, n, A, c, t, i, o]

flatMap()的作用是:各个数组并不是分别映射为一个流,而是映射为流的内容。所有 map(Arrays::stream) 生成的流都会被合并起来,扁平化为一个流。简单的说就是 flatMap() 会把一个流中每个值都换为另一个流,再把所有流连接起来成为一个流。

使用flatMap找出单词列表中不同的字符

1
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

5.3 查找和匹配

还有一个常见的数据处理套路是查询数据集中有某些元素是否匹配一个给定的数据。

5.3.1 检查谓词是否至少匹配一个元素

Stream接口支持anyMatch方法,接受一个Predicate参数,并返回布尔值,判断流中是否有一个元素可以匹配给定谓词。

1
boolean anyMatch(Predicate<? super T> predicate);

5.3.2 检查谓词是否匹配所有元素

Stream接口支持allMatch方法,接受一个Predicate参数,并返回布尔值,判断流中是否所有元素可以匹配给定谓词。

1
boolean allMatch(Predicate<? super T> predicate);

和allMatch相反的是noneMatch方法,判断流中是否没有元素可以匹配给定谓词。

1
boolean noneMatch(Predicate<? super T> predicate);

我们大概都学过短路技术,比如在一些逻辑判断时不需要进行所有操作就可以得到最终结果,对于流来说,如anyMatch、allMatch、noneMatch、findFirst、findAny这些操作也是可以应用短路的操作。在遇到无限流时,我们就能够把无限流变为有限流。

5.3.3 查找元素

Stream接口支持findAny方法,返回当前流中的任意元素。通过Optional可以避免什么都没找到时返回null。

1
Optional<T> findAny();

5.3.4 查找第一个元素

对于一些有序的流,你可能想要获取第一个元素。Stream接口支持findFirst方法,返回当前流中的第一个元素。通过Optional可以避免什么都没找到时返回null。

1
Optional<T> findFirst();

findAny()相比findFirst()在并行上限制更小,所以如果只是需要获取一个元素,建议使用findAny()。

5.4 归约

“计算菜单中菜肴的总卡路里”和“菜单中卡路里最高的菜是哪个”,类似这种需要在元素间计算的复杂需求,需要我们把一个流中的元素反复组合起来,得到一个值。这样的查询我们称为归约操作(把流归约为一个值),一般称类似操作为折叠。

5.4.1 元素求和

reduce()方法接受两个参数,identity是初始值,accumulator是函数式接口BinaryOperator,定义了对同类型两个操作数的处理操作,并产生相同类型的结果。

1
2
3
T reduce(T identity, BinaryOperator<T> accumulator);

Optional<T> reduce(BinaryOperator<T> accumulator);

可以统计一下菜单中菜肴的总卡路里。Lambda会反复结合每个元素,直到流被归约为一个值。

1
2
int sum = menu.stream().map(Dish::getCalories).reduce(0,(a, b) -> a + b);
System.out.println("sum: " + sum);//sum: 4200

使用reduce来对流中的数字求和

reduce()重载的一个没有初始值的变体,返回值是Optional。因为没有初始值,所以要考虑没有元素的情况。

5.4.2 最大值和最小值

通过传递给reduce()一个求最大值/最小值的Lambda表达式就可以计算流中的最大值/最小值。

一个归约操作求最大值

1
2
3
4
Optional<Integer> max = menu.stream().map(Dish::getCalories).reduce(Integer::max);
System.out.println("max: " + max);//max: Optional[800]
Optional<Integer> min = menu.stream().map(Dish::getCalories).reduce(Integer::min);
System.out.println("min: " + min);//min: Optional[120]

map和reduce的连接通常称为map-reduce模式,因为Google用它进行网络搜索而知名,很容易并行化。

可变的累加器模式对并行化是死路一条,迭代求和要更新共享变量sum,非常不利于并行化,加入同步后线程竞争又抵消了并行本应带来的性能提升。当然归约方法也会有其限制,比如不能更改状态,操作必须满足结合律才能以任意顺序执行等。

如map或filter等操作从输入流获取每个元素,并在输出流中得到0或1个结果,这些操作一般是无状态的。而如reduce、sum和max等操作则需要内部状态来累积结果,但内部状态一般是有界的。对于sort和distinct等操作,它们和map很像,但却有一个关键区别:从流中排序或删除重复项都需要知道历史,排序需要把所有元素放入缓冲区后才能给输出流加入一个项目,这一操作的存储要求是无界的,如果流比较大甚至无限就会有问题。这些操作叫有状态操作

5.5 练习

中间操作和终端操作

有交易员Trader和交易Transction,请解答以下问题。

  1. 找出2011年发生的所有交易,并按交易额升序排序
  2. 交易员都在哪些不同的城市工作过
  3. 查找所有来自于剑桥的交易员,并按姓名排序
  4. 返回所有交易员的姓名字符串,按字母顺序排序
  5. 有没有交易员是在米兰工作的
  6. 打印生活在剑桥的交易员的所有交易额
  7. 所有交易中,最高的交易额是多少
  8. 找到交易额最小的交易
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Trader {
private final String name;
private final String city;
......
}

public class Transaction {
private final Trader trader;
private final int year;
private final int value;
......
}

//测试数据
Trader raoul = new Trader("Raoul","Cambridge");
Trader mario = new Trader("Mario","Milan");
Trader alan = new Trader("Alan","Cambridge");
Trader brian = new Trader("Brian","Cambridge");

List<Transaction> transactions = Arrays.asList(
new Transaction(brian,2011,300),
new Transaction(raoul,2012,1000),
new Transaction(raoul,2011,400),
new Transaction(mario,2012,710),
new Transaction(mario,2012,700),
new Transaction(alan,2012,950)
);

解答

注释掉部分是因为书中有更好的解法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class Practice {
public static void main(String[] args) {
Trader raoul = new Trader("Raoul","Cambridge");
Trader mario = new Trader("Mario","Milan");
Trader alan = new Trader("Alan","Cambridge");
Trader brian = new Trader("Brian","Cambridge");

List<Transaction> transactions = Arrays.asList(
new Transaction(brian,2011,300),
new Transaction(raoul,2012,1000),
new Transaction(raoul,2011,400),
new Transaction(mario,2012,710),
new Transaction(mario,2012,700),
new Transaction(alan,2012,950)
);

//Question 1 : 找出2011年发生的所有交易,并按交易额升序排序
List<Transaction> result1 = transactions.stream()
.filter(t -> 2011 == t.getYear())
.sorted(Comparator.comparing(Transaction::getValue))
.collect(Collectors.toList());
System.out.println("------------------------------\nQuestion 1 : ");
System.out.println(result1);

//Question 2 : 交易员都在哪些不同的城市工作过
// List<String> result2 = transactions.stream()
// .map(Transaction::getTrader)
// .map(Trader::getCity)
// .distinct()
// .collect(Collectors.toList());
Set<String> result2 = transactions.stream()
.map(t -> t.getTrader().getCity())
.collect(Collectors.toSet());
System.out.println("------------------------------\nQuestion 2 : ");
System.out.println(result2);

//Question 3 : 查找所有来自于剑桥的交易员,并按姓名排序
List<Trader> result3 = transactions.stream()
.map(Transaction::getTrader)
.filter(t -> "Cambridge".equals(t.getCity()))
.distinct()
.sorted(Comparator.comparing(Trader::getName))
.collect(Collectors.toList());
System.out.println("------------------------------\nQuestion 3 : ");
System.out.println(result3);

//Question 4 : 返回所有交易员的姓名字符串,按字母顺序排序
// String result4 = transactions.stream()
// .map(t -> t.getTrader().getName())
// .distinct()
// .sorted()
// .reduce("",(s1,s2) -> s1 + s2);
String result4 = transactions.stream()
.map(t -> t.getTrader().getName())
.distinct()
.sorted()
.collect(Collectors.joining());
System.out.println("------------------------------\nQuestion 4 : ");
System.out.println(result4);

//Question 5 : 有没有交易员是在米兰工作的
boolean result5 = transactions.stream()
.map(Transaction::getTrader)
.anyMatch(t->"Milan".equals(t.getCity()));
System.out.println("------------------------------\nQuestion 5 : ");
System.out.println(result5);

//Question 6 : 打印生活在剑桥的交易员的所有交易额
System.out.println("------------------------------\nQuestion 6 : ");
transactions.stream()
.filter(t -> "Cambridge".equals(t.getTrader().getCity()))
.map(Transaction::getValue)
.forEach(System.out::println);

//Question 7 : 所有交易中,最高的交易额是多少
Optional<Integer> result7 = transactions.stream()
.map(Transaction::getValue)
.reduce(Integer::max);
System.out.println("------------------------------\nQuestion 7 : ");
System.out.println(result7);

//Question 8 : 找到交易额最小的交易
Optional<Transaction> result8 = transactions.stream()
.min(Comparator.comparing(Transaction::getValue));
System.out.println("------------------------------\nQuestion 8 : ");
System.out.println(result8);
}
}

打印结果如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
------------------------------
Question 1 :
[{Brian,2011,300}, {Raoul,2011,400}]
------------------------------
Question 2 :
[Milan, Cambridge]
------------------------------
Question 3 :
[{Alan,Cambridge}, {Brian,Cambridge}, {Raoul,Cambridge}]
------------------------------
Question 4 :
AlanBrianMarioRaoul
------------------------------
Question 5 :
true
------------------------------
Question 6 :
300
1000
400
950
------------------------------
Question 7 :
Optional[1000]
------------------------------
Question 8 :
Optional[{Brian,2011,300}]

常见用法:

1
2
3
// Stream List<List<Object>> 转 List<Object>
List<String> collect = list1.stream().flatMap(item -> item.stream()).collect(Collectors.toList());
List<String> collect = list1.stream().flatMap(Collection::stream).collect(Collectors.toList());

六. 数值流

6.1 原始类型流特化

有时我们需要统计对象的某个数值属性列,根据归约我们可以如此实现。

1
int sum = menu.stream().map(Dish::getCalories).reduce(0,(a, b) -> a + b);

这种实现存在一个装箱成本,Integer类型的属性需要转为int进行求和,你或许会想会什么不能直接调用sum()来进行求和。因为对于流接口来说,它对于对象求和是没有意义的,但Stream API提供了原始类型流特化,专门用来支持处理数值流的方法。

操作 类型 返回类型 操作参数 函数描述符
mapToInt 中间 IntStream ToIntFunction T -> int
mapToLong 中间 LongStream ToLongFunction T -> long
mapToDouble 中间 DoubleStream ToDoubleFunction T -> double

所以我们可以改写代码如下:

1
2
3
int sumSpec = menu.stream()
.mapToInt(Dish::getCalories) //先映射为IntStream
.sum(); //再调用IntStream提供的sum()求和

数值流也可以再转为对象流,通过boxed()进行装箱。

sum()有默认值0,但如求最大值等则需要Optional的特定版本:OptionalInt、OptionalDouble、OptionalLong来作为返回值。

6.2 数值范围

range()rangeClosed() 可以生成数值范围,两个参数分别是起始和结束值,range()不包括结束值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static IntStream range(int startInclusive, int endExclusive) {
if (startInclusive >= endExclusive) {
return empty();
} else {
return StreamSupport.intStream(
new Streams.RangeIntSpliterator(startInclusive, endExclusive, false), false);
}
}

public static IntStream rangeClosed(int startInclusive, int endInclusive) {
if (startInclusive > endInclusive) {
return empty();
} else {
return StreamSupport.intStream(
new Streams.RangeIntSpliterator(startInclusive, endInclusive, true), false);
}
}

6.3 应用

创建一个勾股数(若a^2^ + b^2^ = c^2^,三个都是整数,则a、b、c是一组勾股数)。

我们需要能表示三元组(a, b, c),可以采用数组int[]。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//勾股数
Stream<int[]> pythagoreanTriples = IntStream.rangeClosed(1,100) //从1至100取值
.boxed() //数值流装箱转换为Stream<Integer>
.flatMap(a -> //把生成的三元流扁平为一个三元数流
IntStream.rangeClosed(a,100) //从a至100取值,避免重复
.filter(b -> //过滤不合法的(a, b)取值
Math.sqrt(a * a + b * b) % 1 == 0) //Math.sqrt(a * a + b * b) % 1 == 0判断c是否为整数
.mapToObj(b -> //映射出三元组
new int[]{a, b, (int) Math.sqrt(a * a + b * b)}));
System.out.println("++++++++++++++++++++");
pythagoreanTriples.limit(5).forEach(t -> System.out.println(t[0] + ", " + t[1] + ", " + t[2]));

//优化
Stream<double[]> pythagoreanTriples2 = IntStream.rangeClosed(1,100)
.boxed()
.flatMap(a ->
IntStream.rangeClosed(a, 100)
.mapToObj(b -> new double[]{a, b, Math.sqrt(a * a + b * b)}) //先生成所有三元组
.filter(t -> t[2] % 1 == 0)); //再筛选掉不合法的

七. 构建流

7.1 由值创建流

Stream.of() 方法可以用来由值创建流。

1
2
3
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
1
2
//由值创建流
Stream<String> stringStream = Stream.of("a","b");

7.2 由数组创建流

Arrays.stream() 方法可以用来由数组创建流

1
2
3
public static IntStream stream(int[] array) {
return stream(array, 0, array.length);
}
1
2
3
//由数组创建流
int[] a = {1,2,3};
Stream<Integer> integerStream = Arrays.stream(a).boxed();

7.3 由文件生成流

1
2
3
4
5
6
7
8
9
10
//由文件生成流
long uniqueWords = 0;
try(Stream<String> lines = Files.lines(Paths.get("d:/data.txt"), Charset.defaultCharset())){
uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" ")))
.distinct()
.count();
}catch (IOException ex){
ex.printStackTrace();
}
System.out.println("文件中不重复单词有:" + uniqueWords + "个");

7.4 由函数生成流:创建无限流

Stream.iterate()Stream.generate() 方法用来从函数生成流,可以创建所谓的无限流:没有固定大小的流。注意加limit限制,防止无限的打印。

(1)迭代

1
2
3
4
//迭代
Stream.iterate(0, n -> n + 2)
.limit(10)
.forEach(System.out::println);

顺序的不断迭代+2,如果没有限制会无限的运行下去,这种流是无界的。

(2)生成

1
2
3
4
//生成
Stream.generate(Math::random)
.limit(5)
.forEach(System.out::println);

我们使用的源(指向random的方法引用)是无状态的:它不会在任何地方记录任何值来以便备用。但也可以使用有存储状态的源,修改其状态,留给流生成下一个值时使用。

(3)实现斐波那契数列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//斐波那契数列,每个数字都是前两个数字之和
//打印二元组
Stream.iterate(new int[]{0, 1}, temp -> new int[]{temp[1], temp[0] + temp[1]})
.limit(20)
.forEach(t -> System.out.println("(" + t[0] + ", " + t[1] + ")"));

//打印斐波那契数列
Stream.iterate(new int[]{0, 1}, temp -> new int[]{temp[1], temp[0] + temp[1]})
.limit(20)
.map(t -> t[0])
.forEach(System.out::println);

//实例化IntSupplier,修改状态,实现斐波那契数列
IntSupplier fib = new IntSupplier() {
private int pre = 0;
private int current = 1;
@Override
public int getAsInt() {
int old = this.pre;
int next = this.pre + this.current;
this.pre = this.current;
this.current = next;
return old;
}
};
IntStream.generate(fib).limit(10).forEach(System.out::println);

两种实现:使用iterate方法不会出现修改状态操作,但每次迭代都会创建新的元组。generate方法则实例化了IntSupplier接口,并在每次调用getAsInt方法时改变了对象的状态。不变的方法会有利于并行处理哦。

八. 问题

8.1 排序时比较器不规范

生产环境遇到异常:java.lang.IllegalArgumentException: Comparison method violates its general contract! 异常表述,比较方法违反了标准规范。

1
2
3
4
5
6
7
8
9
10
11
12
13
java.lang.IllegalArgumentException: Comparison method violates its general contract!
at java.util.TimSort.mergeHi(TimSort.java:899)
at java.util.TimSort.mergeAt(TimSort.java:516)
at java.util.TimSort.mergeForceCollapse(TimSort.java:457)
at java.util.TimSort.sort(TimSort.java:254)
at java.util.Arrays.sort(Arrays.java:1512)
at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:348)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
......

对应代码:

1
2
3
4
5
6
7
8
9
10
11
xxxList = xxxList.stream().collect(Collectors.toMap(XXXDTO::getName, s -> s, (s1, s2) -> {
if (s1.getSortNum() < s2.getSortNum()) {
return s1;
} else if (Long.parseLong(s1.getTaskId()) < Integer.parseInt(s2.getTaskId())) {
return s1;
} else if (Long.parseLong(s1.getSortId()) <= Integer.parseInt(s2.getSortId())) {
return s1;
} else {
return s2;
}
})).values().stream().collect(Collectors.toList());

SOF上搜索得到导致该问题原因为JDK7版本后,java.util.Arrays.sort和java.util.Collections.sort内部排序算法改用TimSort导致,会抛出早期版本忽略的异常,当违反可比较规范时,可以使用系统属性 java.util.Arrays.useLegacyMergeSort 来恢复旧的合并排序:

1
2
3
4
5
6
Area: API: Utilities
Synopsis: Updated sort behavior for Arrays and Collections may throw an IllegalArgumentException
Description: The sorting algorithm used by java.util.Arrays.sort and (indirectly) by java.util.Collections.sort has been replaced. The new sort implementation may throw an IllegalArgumentException if it detects a Comparable that violates the Comparable contract. The previous implementation silently ignored such a situation.
If the previous behavior is desired, you can use the new system property, java.util.Arrays.useLegacyMergeSort, to restore previous mergesort behavior.
Nature of Incompatibility: behavioral
RFE: 6804124

所以在不变动代码的情况下,临时解决方案为在启动脚本中添加 -Djava.util.Arrays.useLegacyMergeSort=true

1
System.setProperty("java.util.Arrays.useLegacyMergeSort", "true");

复现改异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class StreamUtil {

public static void main(String[] args) {
System.out.println(sortError(1, 2, 3, 4, 5, 6, 7, 8, 9));

System.out.println(sortError(9, 8, 7, 6, 5, 4, 3, 2, 1));
System.out.println(sortError(8, 9, 5, 5, 1, 1, 2, 2, 4, 7, 9, 5, 3, 3, 3, 2, 1, 1));

// exception
System.out.println(sortError(1, 2, 3, 3, 2, 1, 1, 2, 3, 3, 2, 1, 1, 2, 3, 3, 2, 1, 1, 2, 3, 3, 2, 1, 1, 2, 3, 3, 2, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
}

// 复现java.lang.IllegalArgumentException: Comparison method violates its general contract!
private static List<Integer> sortError(Integer... ints) {
List<Integer> list = Arrays.asList(ints);
list.sort((o1, o2) -> {
if (o1 < o2) {
return -1;
} else{
return 1;
}
});
return list;
}
}

增加相等时的返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

private static List<Integer> sort(Integer... ints) {
List<Integer> list = Arrays.asList(ints);
list.sort((o1, o2) -> {
if (o1 < o2) {
return -1;
} else if (o1.equals(o2)) {
return 0;
} else{
return 1;
}
});
return list;
}

校验方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Comparators {

/**
* Verify that a comparator is transitive.
*
* @param <T> the type being compared
* @param comparator the comparator to test
* @param elements the elements to test against
* @throws AssertionError if the comparator is not transitive
*/
public static <T> void verifyTransitivity(Comparator<T> comparator, Collection<T> elements)
{
for (T first: elements)
{
for (T second: elements)
{
int result1 = comparator.compare(first, second);
int result2 = comparator.compare(second, first);
if (result1 != -result2)
{
// Uncomment the following line to step through the failed case
//comparator.compare(first, second);
throw new AssertionError("compare(" + first + ", " + second + ") == " + result1 +
" but swapping the parameters returns " + result2);
}
}
}
for (T first: elements)
{
for (T second: elements)
{
int firstGreaterThanSecond = comparator.compare(first, second);
if (firstGreaterThanSecond <= 0)
continue;
for (T third: elements)
{
int secondGreaterThanThird = comparator.compare(second, third);
if (secondGreaterThanThird <= 0)
continue;
int firstGreaterThanThird = comparator.compare(first, third);
if (firstGreaterThanThird <= 0)
{
// Uncomment the following line to step through the failed case
//comparator.compare(first, third);
throw new AssertionError("compare(" + first + ", " + second + ") > 0, " +
"compare(" + second + ", " + third + ") > 0, but compare(" + first + ", " + third + ") == " +
firstGreaterThanThird);
}
}
}
}
}

/**
* Prevent construction.
*/
private Comparators()
{
}
}

在 JDK7 版本以上,Comparator要满足:

  • 自反性:x与y的比较结果和y与x的比较结果相反。
  • 传递性:x>y,y>z,则x>z。
  • 对称性:x=y,则x与z比较结果和y与z的比较结果相同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* Returns a {@code Collector} that accumulates elements into a
* {@code Map} whose keys and values are the result of applying the provided
* mapping functions to the input elements.
*
* <p>If the mapped
* keys contains duplicates (according to {@link Object#equals(Object)}),
* the value mapping function is applied to each equal element, and the
* results are merged using the provided merging function. The {@code Map}
* is created by a provided supplier function.
*
* @implNote
* The returned {@code Collector} is not concurrent. For parallel stream
* pipelines, the {@code combiner} function operates by merging the keys
* from one map into another, which can be an expensive operation. If it is
* not required that results are merged into the {@code Map} in encounter
* order, using {@link #toConcurrentMap(Function, Function, BinaryOperator, Supplier)}
* may offer better parallel performance.
*
* @param <T> the type of the input elements
* @param <K> the output type of the key mapping function
* @param <U> the output type of the value mapping function
* @param <M> the type of the resulting {@code Map}
* @param keyMapper a mapping function to produce keys
* @param valueMapper a mapping function to produce values
* @param mergeFunction a merge function, used to resolve collisions between
* values associated with the same key, as supplied
* to {@link Map#merge(Object, Object, BiFunction)}
* @param mapSupplier a function which returns a new, empty {@code Map} into
* which the results will be inserted
* @return a {@code Collector} which collects elements into a {@code Map}
* whose keys are the result of applying a key mapping function to the input
* elements, and whose values are the result of applying a value mapping
* function to all input elements equal to the key and combining them
* using the merge function
*
* @see #toMap(Function, Function)
* @see #toMap(Function, Function, BinaryOperator)
* @see #toConcurrentMap(Function, Function, BinaryOperator, Supplier)
*/
public static <T, K, U, M extends Map<K, U>>
Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
}


/**
* {@code BinaryOperator<Map>} that merges the contents of its right
* argument into its left argument, using the provided merge function to
* handle duplicate keys.
*
* @param <K> type of the map keys
* @param <V> type of the map values
* @param <M> type of the map
* @param mergeFunction A merge function suitable for
* {@link Map#merge(Object, Object, BiFunction) Map.merge()}
* @return a merge function for two maps
*/
private static <K, V, M extends Map<K,V>>
BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) {
return (m1, m2) -> {
for (Map.Entry<K,V> e : m2.entrySet())
m1.merge(e.getKey(), e.getValue(), mergeFunction);
return m1;
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}



/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;

return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);

if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}

@Override
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}

参考:

🔗 《Java8实战》

🔗 《“Comparison method violates its general contract!” - Stack Overflow

🔗 《Comparison method violates its general contract!