并行与并发程序设计
到目前为止,我们在CS2030中编写的程序是按顺序运行的。这意味着在任何时候,处理器上运行的程序只有一条指令。
什么是并发性?
一个单核处理器一次只能执行一条指令——这意味着一次只能运行一个进程(或者更确切地说,一个应用程序)。然而,当我们使用电脑时,感觉就像我们在同时运行多个进程。幕后的操作系统实际上是在不同进程之间切换,给用户一种同时运行的错觉。
我们可以编写一个程序,使其并行运行——将计算划分为称为线程的子任务。幕后的操作系统可以在不同的线程之间切换,给用户一种线程同时运行的错觉。这种多线程程序有两种用途:(i)它允许程序员将无关的任务分成线程,并分别编写每个线程(ii)提高了处理器的利用率。例如,如果I/O在一个线程中,而UI呈现在另一个线程中,那么当处理器等待I/O完成时,它可以切换到呈现线程,以确保缓慢的I/O不会影响UI的响应。
什么是并行性?
虽然并发给人一种子任务同时运行的假象,但并行计算指的是多个子任务真正同时运行的场景——要么我们有一个能够同时运行多条指令的处理器,或者我们有多个核心/处理器,并将指令分派给这些核心/处理器,以便它们同时执行。
所有的并行程序都是并发的,但不是所有的并发程序都是并行的。
现代计算机有不止一个核心/处理器1。因此,并行性和并发性之间的界限是模糊的。
并行计算
并行计算是计算机科学的一个重要课题。一个人可以单独教授整个模块(或重点领域)。本讲座的目的不是要深入讨论它,而是让CS2030的学生了解与Java8中的流抽象相关的并行计算概念。
Parallel Stream 并行流
我们已经看到,Java流类是一个强大而有用的类,用于以声明式方式处理数据。但是,我们还没有完全释放流的力量。流最整洁的地方是它允许在一行代码中对流的元素进行并行操作。
让我们考虑下面的程序,打印出2030000到2040000之间的所有素数。
IntStream.range(2_030_000, 2_040_000)
.filter(x -> isPrime(x))
.forEach(System.out::println);
我们可以通过将调用parallel()
添加到流中来并行化代码。
IntStream.range(2_030_000, 2_040_000)
.filter(x -> isPrime(x))
.parallel()
.forEach(System.out::println);
您可能会注意到输出已被重新排序,尽管仍在生成同一组数字。这是因为流已将数字分解为子序列,并对每个子序列并行运行filter
和forEach
。由于在打印顺序上并行任务之间没有协调,因此先完成的并行任务将首先将结果输出到屏幕,从而导致数字序列被重新排序。
如果要按输入的顺序生成输出,请使用forEachOrdered
而不是forEach
,这样会失去并行化的一些好处。
假设现在我们要计算2030000到2040000之间的素数。我们可以运行:
IntStream.range(2_030_000, 2_040_000)
.filter(x -> isPrime(x))
.parallel()
.count()
无论是否并行化,上面的代码都会产生相同的输出。
注意,上面的任务是无状态的,不会产生任何副作用。此外,每个元素单独处理而不依赖于其他元素。这种计算有时被称为令人尴尬的并行。每个并行子任务所需的唯一通信是将子任务的count()
结果合并到最终的count
中(这已经在流中为我们实现了)。
如何并行化流
您已经看到,向流中的调用链添加parallel()
可以实现流的并行处理。注意parallel()
是一个延迟操作——它只是标记要并行处理的流。因此,可以在链中的任何位置插入对parallel()
的调用。
有一个方法sequential()
,它将流标记为按顺序处理。如果在流中同时调用parallel()
和sequential()
,则最后一个调用“wins”。下面的示例按顺序处理流:
s.parallel().filter(x -> x < 0).sequential().forEach(..);
创建并行流的另一种方法是调用Collector
类的方法parallelStream()
,而不是stream()
。这样做将创建一个将从集合中并行处理的流。
什么可以并行化?
为了确保并行执行的输出是正确的,流操作不能干扰流数据,并且大多数时间必须是无状态的。副作用应控制在最低限度。
接口
接口意味着流操作之一在终端操作的执行期间修改流的源。例如:
List<String> list = new ArrayList<>(List.of("Luke", "Leia", "Han"));
list.stream()
.peek(name -> {
if (name.equals("Han")) {
list.add("Chewie"); // they belong together
}
})
.forEach(i -> {});
将导致引发ConcurrentModificationException
。请注意,即使我们使用stream()
而不是parallelStream()
,此非干扰规则也适用。
无状态的
有状态lambda是一种结果依赖于流执行期间可能更改的任何状态的lambda。
例如,下面的generate
和map
操作是有状态的,因为它们依赖于队列中的事件和商店的状态。并行化这可能会导致不正确的输出。为了确保输出是正确的,还需要做一些额外的工作来确保状态更新对所有并行子任务都是可见的。
Stream.generate(this.events::poll)
.takeWhile(event -> event != null)
.filter(event -> event.happensBefore(sim.expireTime()))
.peek(event -> event.log())
.map(event -> sim.handle(event))
.forEach(eventStream -> this.schedule(eventStream));
副作用
在并行执行中,副作用可能导致不正确的结果。考虑下面的代码:
List<Integer> list = new ArrayList<>(
Arrays.asList(1,3,5,7,9,11,13,15,17,19));
List<Integer> result = new ArrayList<>();
list.parallelStream()
.filter(x -> isPrime(x))
.forEach(x -> result.add(x));
forEach lambda生成了一个副作用——它修改了结果。ArrayList是我们称之为非线程安全的数据结构。如果两个线程同时操作它,可能会导致错误的结果。
有两种方法可以解决这个问题。第一,我们可以使用.collect
方法
list.parallelStream()
.filter(x -> isPrime(x))
.collect(Collectors.toList())
其次,我们可以使用线程安全的数据结构。Java在Java.util.concurrent
包中提供了几个示例,包括CopyOnWriteArrayList
。
List<Integer> result = new CopyOnWriteArrayList<>();
list.parallelStream()
.filter(x -> isPrime(x))
.forEach(x -> result.add(x));
结合性
reduce
操作本质上是可并行的,因为我们可以轻松地减少每个子流,然后使用组合器将结果组合在一起。考虑这个例子:
Stream.of(1,2,3,4).reduce(1, (x,y)->x*y, (x,y)->x*y);
然而,为了让我们并行运行reduce
,身份、累加器和组合器必须遵循几个规则:
combiner.apply(identity,i)
必须等于i
。- 组合器和累加器必须是联合的——应用的顺序必须无关紧要。
- 组合器和累加器必须兼容--
combiner.apply(u,accumulator.apply(identity,t))
必须等于accumulator.apply(u,t)
上面的乘法例子符合三个规则:
i*1
等于i
(x*y)*z
等于x*(y*z)
u*(1*t)
等于u*t
并行流性能
让我们回到:
IntStream.range(2_030_000, 2_040_000)
.filter(x -> isPrime(x))
.parallel()
.count()
将上面的代码并行化可以节省多少时间?
让我们使用Java中的Instant
和Duration
类来帮助我们:
Instant start = Instant.now();
long howMany = IntStream.range(2_000_000, 3_000_000)
.filter(x -> isPrime(x))
.parallel()
.count();
Instant stop = Instant.now();
System.out.println(howMany + " " + Duration.between(start,stop).toMillis() + " ms");
上面的代码大致测量了计算200万到300万素数所需的时间。在我的iMac上,只需要1秒钟多一点。如果我去掉parallel()
,大约需要450-550毫秒,所以我们可以获得大约50%的性能。
我们能再平行一些吗?还记得我们是如何实现isPrime
的吗
boolean isPrime(int n) {
return IntStream.range(2, (int)Math.sqrt(n) + 1)
.noneMatch(x -> n % x == 0);
}
让我们把它并行化,让它更快!
boolean isPrime(int n) {
return IntStream.range(2, (int)Math.sqrt(n) + 1)
.parallel()
.noneMatch(x -> n % x == 0);
}
但是,如果您运行上面的代码,您会发现代码并没有我们期望的那么快。在我的iMac上,大约需要18秒,大约慢了18倍!
并行化流并不总能提高性能。
为了理解原因,我们必须深入研究Java如何实现并行流。我们先绕道看一些并行编程概念和与并行编程相关的重要Java类。
Fork 和 Join
假设我们有以下代码:
b = f(a);
c = g(b);
d = h(b);
e = i(c,d);
我们可以将其可视化为一个计算图,如下所示:
很明显,f(a)
必须在g(b)
和h(b)
之前调用,i(c,d)
必须在g(b)
和h(b)
完成之后调用。那么g(b)
和h(b)
的顺序呢?
假设g()
和h()
是纯函数,即输出c和d只依赖于b而不依赖于其他函数,并且g()
和h()
不产生任何副作用,那么我们可以安全地得出结论:g(b)和h(b)可以按任何顺序调用。不仅如此,它们还可以相互独立地并行调用。
为了表示我们希望与h()并行运行g(),我们可以分叉任务g()——这意味着我们告诉JVM它可以3与h()同时执行g()。
然后我们可以重新加入任务g()。join操作使代码等待g()完成,从而确保在调用i(c,d)时c的更新值可用。
这种并行调用任务的模式称为fork/join框架。它通常涉及一些递归的分叉和连接,将一个巨大的任务分解为许多较小的任务(但不必)。下面我们将看到一个更具体的例子。
ForkJoinTask<V>抽象类
Java为可以分叉和联接的任务提供了一个抽象,恰当地称为ForkJoinTask。这是一个抽象类,我们不会直接使用它。这个类有很多方法,但是我们将使用的两个最重要的方法是fork()
和join()
。方法fork()
将此任务提交给JVM执行,可能是并行执行。方法join()
等待计算完成并返回V类型的值。
RecursiveTask<V>抽象类
ForkJoinTask<V>有一个子类RecursiveTask<V>,它也是抽象的。它有一个方法V compute()
,我们可以用compute所需的任务自定义它。
下面是一个如何使用RecursiveTask的示例任务<V>
static class Summer extends RecursiveTask<Integer> {
final int FORK_THRESHOLD = 2;
int low;
int high;
int[] array;
Summer(int low, int high, int[] array) {
this.low = low;
this.high = high;
this.array = array;
}
@Override
protected Integer compute() {
// stop splitting into subtask if array is already small.
if (high - low < FORK_THRESHOLD) {
int sum = 0;
for (int i = low; i < high; i++) {
sum += array[i];
}
return sum;
}
int middle = (low + high) / 2;
Summer left = new Summer(low, middle, array);
Summer right = new Summer(middle, high, array);
left.fork();
return right.compute() + left.join();
}
}
为了运行任务,我们调用compute()
Summer task = new Summer(0, array.length, array);
int sum = task.compute();
ForkJoinTask的另一个子类称为RecursiveAction
,它与RecursiveTask非常相似,只是RecursiveAction不返回值。
线程池和Fork/Join
在内部,Java维护一个工作线程池。工作线程是运行任务的抽象。我们可以提交一个任务到池中执行,该任务将加入一个队列。新提交的任务有一个全局队列。每个工人也有一个队列。从工作进程执行的另一个任务派生的任务将加入属于该工作进程的队列。
工作线程可以从队列中选择要执行的任务。完成后,它会选择另一个任务(如果队列中存在任务),以此类推——与服务器(工作线程)和客户(任务)没有什么不同。ForkJoinPool
是为ForkJoinTask
实现线程池的类。执行上述sumTask
的另一种方法是将任务提交到ForkJoinPool
,而不是直接调用它。
int sum = ForkJoinPool.commonPool().invoke(task);
调用invoke(task)
和task.compute()
之间的区别是巨大的,尽管两者都返回上面的正确结果。调用task.compute()
意味着我们将立即直接调用任务(就像任何方法调用一样);然而,调用invoke(task)
意味着我们请求任务加入一个队列,等待工作进程执行,并返回结果。如果我们有太多的递归任务,您可以看到这种效果,在这种情况下,如果我们调用task.compute
,我们将以堆栈溢出结束。
Fork/Join的开销
从上面的描述可以看出,分叉和连接实际上会产生额外的开销——我们首先需要将计算包装在一个对象中,将对象提交到一个任务队列中。有工人通过队列执行任务。您可以尝试使用FORK_TRESHOLD
的不同值来查看效果。以下是iMac上的结果:
fork阈值越小,我们创建的任务越多,每个任务就越小。如图所示,如果要并行化的任务太简单,那么由于开销的原因,并行化是不值得的。
使用Fork/Join的并行流
并行流是用Java中的fork/join实现的。在这里,fork在子流上创建运行相同操作链的子任务,完成后,运行join来合并结果(例如,reduce的combiner在join中运行)。fork和join可以是递归的——例如,fork操作可以将流拆分为两个子任务。子任务可以进一步将子流拆分为四个较小的子流,以此类推,直到子流的大小足够小,以至于实际调用任务为止。
在前面的isPrime示例中,该任务很简单(检查n%x==0),因此,通过并行化它,我们实际上为Java创建了更多的工作。如果我们只按顺序检查n%x==0,效率会更高。
这个故事的寓意是,如果任务足够复杂,并行化的好处大于开销,那么并行化是值得的。当我们在并行流的上下文中讨论这个问题时,这个原则适用于所有并行和并发程序。
有序源与无序源
流元素是有序的还是无序的,也影响着并行流操作的性能。流可以定义相遇顺序。从迭代、有序集合(例如,列表或数组)中创建的流是有序的。从生成或无序集合(例如,集合)创建的流是无序的。
一些流操作遵循相遇顺序。例如,distinct
和sorted
都保持元素的原始顺序(如果顺序保持不变,我们就说操作是稳定的)。
findFirst
、limit
和skip
的并行版本在有序流上可能代价高昂,因为它需要在流之间进行协调以维持顺序。
如果我们有一个有序的流,并且尊重原始顺序并不重要,那么我们可以调用unordered()
作为chain命令的一部分,以使并行操作更加高效。
例如,在我的iMac上,以下操作大约需要700毫秒:
Stream.iterate(0, i -> i + 7)
.parallel()
.limit(10_000_000)
.filter(i -> i % 64 == 0)
.forEachOrdered(i -> { });
但是,插入unordered()
后,大约需要350毫秒,速度提高了2倍!
Stream.iterate(0, i -> i + 7)
.parallel()
.unordered()
.limit(10_000_000)
.filter(i -> i % 64 == 0)
.forEachOrdered(i -> { });
原文地址:https://nus-cs2030.github.io/1718-s2/lec10/index.html
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2170.html
暂无评论