流(四)并行化处理

流(四)并行化处理

九. 并行化数据处理

在早期版本中,并行处理数据集合非常麻烦。第一,我们要把包含数据的数据结构分成若干子部分。第二,我们要给每个子部分分配一个独立的线程。第三,我们需要在恰当的时候对它们进行同步来避免竞争条件,等待所有线程完成,最后把这些结果合并。

Java 7时引入了分支/合并框架,使这些操作更稳定。

9.1 并行流

并行流就是一个把内容分成多个数据块,并用不同线程分别处理每个数据块的流。通过 parallelStream() 方法把集合转换为并行流。通过parallel()方法将顺序流转为并行流。sequential()方法可以使并行流转为顺序流。所以结合这两个方法可以细化的控制哪些操作并行执行哪些操作顺序执行,但最后一次会决定整个流水线何种执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 计算从1到给定数字的和
* @param n 值
* @return
*/
public static long sequentialSum(long n){
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}

/**
* 迭代计算从1到给定数字的和
* @param n 值
* @return
*/
public static long iterativeSum(long n){
long result = 0;
for(long i = 1L;i <= n;i++){
result += i;
}
return result;
}

/**
* 并行计算从1到给定数字的和
* @param n 值
* @return
*/
public static long parallelSum(long n){
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //将流转为并行流
.reduce(0L, Long::sum);
}

同一个归纳操作会把各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果,并行归纳操作如下图所示。

并行归纳操作

parallel()只是改变了一个布尔标识,流本身不会有其他变化。

并行流的内部使用了默认的ForkJoinPool,默认线程数量为处理器数量( Runtime.getRuntime().availableProcessors() 获取)。

对比并行流和顺序流性能

测试并行流性能,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    System.out.println("Sequential sum done in " + measureSumPerf(ParallelTest::sequentialSum, 10_000_000) + " mscs");
System.out.println("Iterative sum done in " + measureSumPerf(ParallelTest::iterativeSum, 10_000_000) + " mscs");
System.out.println("Parallel sum done in " + measureSumPerf(ParallelTest::parallelSum, 10_000_000) + " mscs");

public static long measureSumPerf(Function<Long, Long> adder, long n){
long fastest = Long.MAX_VALUE;
for(int i = 0;i < 10;i++){//测试运行10次
long start = System.nanoTime();
long sum = adder.apply(n);//执行函数
long duration = (System.nanoTime() - start) / 1_000_000;//取运行时间的毫秒值
if(duration < fastest)//检查执行是否是最快的一个
fastest = duration;
}
return fastest;
}

执行结果如下。传统for循环迭代版本相比顺序流要快很多,因为它更底层以及不需要对原始类型做任何装箱/拆箱操作。但并行流却意外的比顺序流还要多消耗一倍时间。

1
2
3
Sequential sum done in 181 mscs
Iterative sum done in 5 mscs
Parallel sum done in 378 mscs

并行版本慢的原因:

  • iterate生成的是装箱的对象,必须要拆箱才能求和
  • 很难吧iterate分成多个独立块执行(在实现斐波那契数列有提到iterate不利于并行化)

iterate迭代操作很难并行化,每次应用时都要依赖于前一次应用的结果。所以我们实际上并没有使操作并行,每次还要把求和操作分配到另一个线程,额外增加了开销。

如何优化上述代码使其达到并行化效果呢?首先我们通过LongStream.rangeClosed方法直接生成原始类型long数值,对比顺序流效率可以感受拆箱的开销。

1
2
3
4
5
6
7
8
/**
* 没有装箱的求和函数
* @param n 值
* @return
*/
public static long rangedSum(long n){
return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);
}

执行结果如下,数值流避免了非针对性流这些没必要的自动装箱/拆箱操作,最终明显的降低了开销,选择合适的数据结构比并行化算法更显得重要。

1
Range sum done in 7 mscs

尝试把数值流应用于并行流,再观察开销。

1
2
3
4
5
6
7
8
9
10
/**
* 并行没有装箱的计算从1到给定数字的和
* @param n 值
* @return
*/
public static long parallelRangedSum(long n){
return LongStream.rangeClosed(1, n)
.parallel()
.reduce(0L, Long::sum);
}

执行结果如下,终于并行结果要快于顺序执行了,因为这次终于分开了数据块进行了并行运算。

1
Parallel range sum done in 2 mscs

并行化其实有额外的代价,需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,把操作的结果合并到一个值等。而且在多核之间移动数据也有很大的开销,所以最重要的是要保证在内核中并行执行工作的时间要比在内核间传输数据的时间长,要合理的选择并行化。

正确使用并行流

错误的运用并行流的主要原因是使用的算法改变了某些共享状态。

下述代码sideEffectSum方法类似于我们常用的指令式迭代求和,其算法本质上就是顺序的,因为累加器的total变量每次访问都会出现竞争,我们可以尝试并行化它,看最终执行效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    /**
* 通过累加器类对前n个自然数求和
* @param n 值
* @return
*/
public static long sideEffectSum(long n){
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}

/**
* 通过累加器类对前n个自然数求和(并行化)
* @param n 值
* @return
*/
public static long sideEffectParallelSum(long n){
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}

/**
* 累加器
*/
public class Accumulator {
public long total = 0;
public void add(long value){
total += value;
}
}

执行结果如下,并行时每次执行结果都是错误值,因为我们并行化的多线程处理操作都不是原子操作,根源就在于forEach所调用的函数会改变多线程共享变量的状态,所以并行化时应该避免共享可变状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Result<1>: 50000005000000
Result<2>: 50000005000000
Result<3>: 50000005000000
Result<4>: 50000005000000
Result<5>: 50000005000000
Result<6>: 50000005000000
Result<7>: 50000005000000
Result<8>: 50000005000000
Result<9>: 50000005000000
Result<10>: 50000005000000
Side effect sum done in 7 mscs
Result<1>: 8130267751928
Result<2>: 7068326376926
Result<3>: 8906998285116
Result<4>: 8213474338667
Result<5>: 9907376000941
Result<6>: 8850837255635
Result<7>: 7447244528787
Result<8>: 8092934175612
Result<9>: 8285103974383
Result<10>: 7964325408454
Side effect parallel sum done in 88 mscs

高效使用并行流

给出准确的使用并行流建议是不现实的,不同的场景和需求都会有无法定量的差异存在。

  • 多做测试:顺序流转为并行流是很容易的,但并行流未必比顺序流更快,所以请多测试。
  • 注意装箱:自动装箱和拆箱操作会大大降低性能,请多留意。
  • 避免有序:一些操作注定在并行流上效率要差于顺序流,特别是limit和findFirst等依赖于元素顺序的操作,如findAny因为不需要有序就要比findFirst快,unordered方法可以把有序流转为无序流,在满足有些需求的情况下能提高这些操作的性能。
  • 单次成本:假设N是元素总数,Q是单个元素通过流水线的成本,Q值越大则表示并行化更高效的可能性更大,我们要注意流水线的总计算成本。
  • 避免小数据:小量数据对于并行化总不会是一个好的应用场景,带来的好处抵不上额外开销。
  • 数据结构的分解效率:一些数据结构的拆分效率需要额外注意,如ArrayList的拆分就比LinkedList高效,实现Spliterator可以掌握分解流程。
  • 流与中间操作的影响:流自身的特点,以及流水线的中间操作修改流的方式,都会影响到分解过程的性能。如一个SIZED流可以分为大小相等的两块,每块都可以高效并行处理,但筛选操作丢弃的个数却无法预测,会导致流自身大小未知。
  • 终端合并的代价:要考虑终端操作合并步骤的代价,如果最后代价太大,可能会超过并行带来的性能提升。
可分解性
ArrayList 极佳
LinkedList
IntStream.range 极佳
Stream.iterate
HashSet
TreeSet

9.2 分支/合并框架

分支/合并框架的目的是以递归的方式将可以并行的任务拆分为更小的子任务,将每个子任务的结果合并成整体结果。它是ExecutorService接口的一个实现,把子任务分配给线程池(ForkJoinPool)中的工作线程。

RecursiveTask

R是并行化任务产生的结果类型,若任务不返回结果,则是RecursiveAction类型。

定义RecursiveTask只须实现compute方法,此方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。

此方法的实现如下述伪代码,只不过是著名的分治算法的并行版本。

1
2
3
4
5
6
7
if(任务足够小或不可分){
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}

递归的任务拆分过程如下图所示。

分支/合并过程

我们可以实现一个分支合并任务进行并行求和,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 分支合并框架进行并行求和
*/
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;//要进行求和的数组
private final int start;//子任务处理的数组起始位置
private final int end;//子任务处理的数组终止位置

public static final long THRESHOLD = 10_000;//停止分解子任务的数组大小

@Override
protected Long compute() {
int length = start - end;//要进行求和的部分大小
if(length <= THRESHOLD){//如果大小小于或等于阈值,就执行顺序计算结果
return computeSequentially();
}
//创建一个子任务为数组的前一半求和
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers,start,start + length/2);
//利用另一个ForkJoinPool线程异步执行新创建的子任务
leftTask.fork();
//创建一个子任务为数组的后一半求和
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers,start + length/2,end);
//同步执行第二个子任务,有可能允许进一步递归划分
Long rightResult = rightTask.compute();
//读取第一个子任务的结果,如果未完成则等待
Long leftResult = leftTask.join();
//任务结果为两个子任务结果合并
return leftResult + rightResult;
}

/**
* 顺序求和
* @return
*/
private long computeSequentially(){
long sum = 0;
for(int i = start;i < end;i++){
sum += numbers[i];
}
return sum;
}

public ForkJoinSumCalculator(long[] numbers){//公共构造器用于创建主任务
this(numbers,0,numbers.length);
}

private ForkJoinSumCalculator(long[] numbers, int start, int end) {//私有构造器用于以递归的方式创建子任务
this.numbers = numbers;
this.start = start;
this.end = end;
}
}

然后再通过forkJoinSum方法调用算法进行并行计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 通过分支合并框架对前n个自然数求和
* @param n 值
* @return
*/
public static long forkJoinSum(long n){
//LongStream生成前n个自然数的数组
long[] numbers = LongStream.rangeClosed(1, n).toArray();
//根据数组实例化ForkJoinTask
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
//实例化ForkJoinPool并把任务交给其调用方法
return new ForkJoinPool().invoke(task);
}

执行结果如下,相比并行流版本要慢一些,因为要把整个数字流都放入一个long[]后才可以在ForkJoinSumCalculator任务中调用数组。实际应用中,使用多个ForkJoinPool没有什么意义,所以一般将其实例静态化成为单例,方便在软件中重用。

1
Fork join sum done in 47 mscs

当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就有池中的一个线程执行,这个线程会调用任务的compute方法。该方法会检查任务十分小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的ForkJoinSumCalculator,而它们也由ForkJoinPool安排执行。因此这一过程可以递归重复,把原任务分成更小的任务,直到满足不方便或不可能再进一步拆分的条件。这时顺序计算每个任务的结果,然后由分支过程创建的任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。这个过程如下图所示。

分支/合并算法

正确使用分支/合并框架

  • 阻塞:对一个任务调用join方法会阻塞调用者,直到任务执行出结果。所以最好在两个任务的计算都开始后再调用它,否则每个子任务间要相互等待。
  • invoke:不要在RecursiveTask内部使用ForkJoinPool的invoke方法,只有顺序代码才应该由invoke方法启动来进行并行计算,应该始终直接调用compute或fork方法。
  • fork:对子任务调用fork方法可以把它加入ForkJoinPool,但要注意同时对两个子任务调用fork效率会比直接对一个调用compute低。这样可以为一个子任务重用同一线程,从而避免了线程池中多分配一个任务所造成的开销。
  • 测试:对分支合并框架的并行计算进行测试会有些麻烦,比如我们常用IDE的栈跟踪来寻找问题,但分支合并计算中调用compute的线程并不是概念上的调用方,后者是调用fork的那个。
  • 合理使用:和并行流一样,多核环境使用分支合并框架不一定会比顺序计算快。只有一个任务可以分解成多个独立的子任务,才能使并行化时性能有提升。子任务的运行时间应该大于拆分子任务所消耗的时间。比如将输入输出放到一个子任务,把计算放到另一个子任务,这样就可以同时进行二者。还有分支合并框架需要“预热”或者说执行几遍才会被JIT编译器优化,所以测量性能时要注意多跑几遍。编译器内置的优化可能会给顺序的版本带来一些优势。

使用分支合并框架,我们需要制定一个策略:决定任务是进一步拆分还是已小到可以顺序求值。

工作窃取

在求和例子中,我们设定求和数组达到1万个项目就不再创建子任务,这个策略比较随意,假设我们有一个1000万项目的数组,那么就需要开辟1000个子任务,而我们的PC可能只有4核。理想的情况下,我们划分的每个子任务都有相同的运行时间,但实际会因为划分策略效率低,或其他不可预知的原因导致子任务间运行差别很大。

分支合并框架采用一种叫工作窃取的技术来解决这一问题。子任务差不多会被平均分配到ForkJoinPool的所有线程上,每个线程都会为分配的任务保存一个双向链式队列,每完成一个任务就会从队列取出下一个任务执行,所以有些线程会提前完成分配的任务,这时此线程会随机选择一个线程,然后从其队列尾巴上“偷走”一个任务。所以这种模式下,更多的子任务有利于工作线程间平衡负载。

工作窃取被用来在线程池中的工作线程间重新分配和平衡任务。闲置线程会不断地尝试窃取别的线程的任务,一个线程中的任务切分为两个子任务时就会被一个闲置线程拿走一个,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。

分支/合并框架使用的工作窃取算法

9.3 Spliterator

我们在使用并行流时并没有指定拆分多个任务的逻辑,所以应该有一种自动机制来帮助我们做了这一工作。可分迭代器-Spliterator,和迭代器-Iterator一样都用于遍历数据源中的元素,区别就是Spliterator专门为并行执行而设计。Java 8为集合框架包含的所有数据结构提供了一个默认的Spliterator实现。

1
2
3
4
5
6
7
8
9
10
11
public interface Spliterator<T> {//T是Spliterator遍历元素的类型
//类似于Iterator,顺序的遍历Spliterator中的元素,如果还有其他元素需要遍历返回true
boolean tryAdvance(Consumer<? super T> action);
//可以把一些元素划分给另一个Spliterator,并使其并行处理
Spliterator<T> trySplit();
//估计还需要遍历的元素个数,未必准确
long estimateSize();
//此迭代器本身特性ID
int characteristics();
......
}

Spliterator还有一个功能是在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数据源,而不是在创建时绑定。这样的Spliterator被称为延迟绑定的Spliterator。

拆分过程

将Stream拆分为多个部分的算法是一个递归过程。

  1. 对一个Spliterator调用trySplit,生成第二个Spliterator
  2. 对这两个Spliterator调用trySplit,就有了四个Spliterator。
  3. 对每一个Spliterator不断地分割,直到trySplit返回null表示数据结构不能再分割了。
  4. 最终所有的Spliterator调用trySplit都返回null,终止。

递归拆分过程

拆分过程受Spliterator的特性的影响,而特性由characteristics方法声明。

Spliterator的特性

特性 含义
ORDERED 元素有既定的顺序(如List),因此Spliterator在遍历和划分时也会遵守这一顺序
DISTINCT 对于任意一对遍历过的元素x和y,x.equals(y)返回false
SORTED 遍历的元素按照一个预定义的顺序排序
SIZED 该Spliterator由一个已知大小的源建立(如Set),因此estimatedSize()返回的是准确值
NONNULL 保证遍历的元素不会为null
IMMUTABLE Spliterator的数据源不能修改,意味着遍历时不能添加、删除或修改任何元素
CONCURRENT Spliterator的数据源可以被其他线程同时修改而无需同步
SUBSIZED Spliterator和所有由它拆分出来的Spliterator都是SIZED

自定义Spliterator

假设需要统计一个字符串的单词数(任意空格为分隔符),我们首先用传统的迭代版本实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 统计字符串中单词个数
* @param s 字符串
* @return 单词个数
*/
public static int countWordsIteratively(String s){
int counter = 0;
boolean lastSpace = true;
for(char c : s.toCharArray()){//逐个遍历字符串的所有字符
if(Character.isWhitespace(c)){
lastSpace = true;
}else {
if(lastSpace) counter++;//上个字符是空格,而当前字符不是空格,计数器+1
lastSpace = false;
}
}
return counter;
}

//传统迭代版本
final String SENTENCE = " Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura" +
" che la dritta via era smarrita";
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");//Found 19 words

然后我们尝试用函数式的风格来重写单词计数器,因为归约时必须要保存两个状态,所以只能创建一个WordCounter类来封装状态,每次遍历到一个元素会调用accumulate方法,定义了WordCounter如何更新状态(我们并不做状态修改,而选择了用新的WordCounter来存储新的状态),combine方法合并流的两个子部分的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class WordCounter {
private final int counter;//当前统计数目
private final boolean lastSpace;//上个字符是否空格

public WordCounter accumulate(Character c){
if(Character.isWhitespace(c)){//如果此字符是空格
//上个字符是空格,返回当前计数器;
// 上个字符不是空格,返回新的计数器,并标记lastSpace
return lastSpace ? this : new WordCounter(counter, true);
}else {//此字符不是空格
//上个字符是空格,返回新计数器,统计+1,重置lastSpace;
// 上个字符不是空格,返回当前计数器
return lastSpace ? new WordCounter(counter+1, false) : this;
}
}

public WordCounter combine(WordCounter wordCounter){
return new WordCounter(counter + wordCounter.counter,wordCounter.lastSpace);//lastSpace不重要
}
}

/**
* 通过自定义WordCounter对字符流进行计算
* @param stream 流
* @return 单词个数
*/
public static int countWords(Stream<Character> stream){
WordCounter wordCounter = stream.reduce(new WordCounter(0,true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}

//函数式版本
//首先把字符串转换为流
Stream<Character> stream = IntStream.range(0,SENTENCE.length()).mapToObj(SENTENCE::charAt);
//然后进行归约计算字数
//保留两个变量状态:当前统计数目,上个字符是否空格
System.out.println("Found " + countWords(stream) + " words");//Found 19 words

现在我们尝试把WordCounter并行的工作,但结果却不正确,原因就是字符串会在任意位置拆分,所以有些词会被拆分为两个词,所以并行流导致了结果出错。

1
2
Stream<Character> stream1 = IntStream.range(0,SENTENCE.length()).mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream1.parallel()) + " words");//Found 41 words

解决这个问题我们需要确保String只在词尾切分,所以要改变流的切分方式,只能自定义Spliterator,让这个迭代器只会在两个词间进行切分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 单词切分迭代器
*/
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;//当前坐标

@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++));//处理当前字符串
return currentChar < string.length();//若还有字符串要处理,返回true
}

@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if(currentSize < 10){//此时切分的已足够小,返回null进行顺序处理
return null;
}
//开始试探拆分的位置为当前字符串中间
for(int splitPos = currentSize / 2 + currentChar; splitPos < string.length() ; splitPos++){
//让拆分位置前进到下一个空格
if(Character.isWhitespace(string.charAt(splitPos))){
//创建一个新的WordCounterSpliterator来解析开始到拆分位置的部分
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar,splitPos));
//再把此Spliterator的起始位置设置为拆分位置
currentChar = splitPos;
return spliterator;
}
}
return null;
}

@Override
public long estimateSize() {
return string.length() - currentChar;
}

@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}

public WordCounterSpliterator(String string) {
this.string = string;
}
}

Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
//第二个参数为true表示创建并行流
Stream<Character> stream2 = StreamSupport.stream(spliterator, true);
System.out.println("Found " + countWords(stream2) + " words");//Found 19 words

tryAdvance方法把字符串当前字符传给了Consumer,并使当前坐标+1。Consumer把要处理的字符传给了归约函数accumulate。

trySplit方法设定了下限10,然后把试探的拆分位置设在当前字符串中间,然后找到空格就创建一个新的Spliterator来遍历从当前位置到拆分位置的子串;把当前位置设置为拆分位置,因为之前的部分将由新的Spliterator处理。


参考:

🔗 《Java8实战》