Java8引入了并行流parallel stream的概念来进行并行处理。由于廉价的硬件成本,我们现在拥有更多的cpu核,因此可以使用并行处理来更快地执行操作。
让我们通过简单的例子来理解
package org.arpit.java2blog.java8;
import java.util.Arrays;
import java.util.stream.IntStream;
public class Java8ParallelStreamMain {
public static void main(String[] args) {
System.out.println("=================================");
System.out.println("Using Sequential Stream");
System.out.println("=================================");
int[] array= {1,2,3,4,5,6,7,8,9,10};
IntStream intArrStream=Arrays.stream(array);
intArrStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
System.out.println("=================================");
System.out.println("Using Parallel Stream");
System.out.println("=================================");
IntStream intParallelStream=Arrays.stream(array).parallel();
intParallelStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
}
}
当你们运行上面的程序时,你们将得到下面的输出
=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1
如果您注意到输出,则在顺序流的情况下,主线程正在执行所有工作。它等待当前迭代完成,然后进行下一个迭代。
在并行流的情况下,会同时生成4个线程,并在内部使用Fork和Join池创建和管理线程。并行流通过静态ForkJoinPool.commonPool()
方法创建ForkJoinPool实例。
并行流利用所有可用CPU核的优点,并行处理任务。如果任务数超过核心数,则剩余任务将等待当前正在运行的任务完成。
Parallel Stream很酷,所以你应该一直使用它吗?
一个很大的拒绝!!
只需添加。parallel,就可以轻松地将顺序流转换为并行流,但这并不意味着您应该始终使用它。
在使用并行流时,有很多因素需要考虑,否则将受到并行流的负面影响。
并行流比顺序流有更高的开销,并且线程之间的协调需要很长的时间。
您需要考虑并行流,当且仅当:
您要处理的数据集很大。
正如您所知,Java使用ForkJoinPool来实现并行性,ForkJoinPool分叉源流并提交以供执行,所以您的源流应该是可拆分的。
例如:
ArrayList很容易拆分,因为我们可以通过索引找到中间元素并进行拆分,但是LinkedList很难拆分,并且在大多数情况下执行得不太好。
实际上,您正遭受性能问题的困扰。
您需要确保线程之间的所有共享资源都需要正确同步,否则可能会产生意外的结果。
测量并行性的最简单公式是Brian Goetz在其演示文稿中提供的“NQ”模型。
NQ型号:
N x Q > 10000
N=数据集中的项目数
Q=每个项目的工作量
这意味着,如果您有大量的数据集,并且每个项目的工作量较少(例如:Sum),并行性可能会帮助您更快地运行程序,反之亦然。因此,如果您的数据集数量较少,而每个项目的工作较多(做一些计算工作),那么并行性也可能帮助您更快地获得结果。
让我们看看另一个例子。
在本例中,我们将看到在并行流和顺序流的情况下执行长时间计算时CPU的行为。我们正在进行一些arbit计算以使CPU繁忙。
package org.arpit.java2blog.java8;
import java.util.ArrayList;
import java.util.List;
public class PerformanceComparisonMain {
public static void main(String[] args) {
long currentTime=System.currentTimeMillis();
List<Integer> data=new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
data.add(i);
}
long sum=data.stream()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
System.out.println(sum);
long endTime=System.currentTimeMillis();
System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
}
public static int performComputation(int number)
{
int sum=0;
for (int i = 1; i < 1000000; i++) {
int div=(number/i);
sum+=div;
}
return sum;
}
}
当你们运行上面的程序时,你们将得到下面的输出。
117612733
Time taken to complete:6 minutes
但我们对这里的输出不感兴趣,而是对执行上述操作时CPU的行为感兴趣。
正如您所看到的,在顺序流的情况下,CPU并没有得到充分利用。
让我们在第16行进行更改,使流并行并再次运行程序。
long sum=data.stream()
.parallel()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
当并行运行流时,您将得到低于输出的结果。
117612733
Time taken to complete:3 minutes
让我们在使用并行流运行程序时检查CPU历史记录。
正如您所看到的,并行流使用了所有4个CPU核来执行计算。
并行流中的自定义线程池
默认情况下,并行流使用ForkJoinPool.commonPool
,其线程数比处理器数少一个。这意味着并行流使用所有可用的处理器,因为它也使用主线程。
如果您正在使用多个并行流,则它们将共享同一个ForkJoinPool.commonPool
。这意味着您可能无法使用分配给每个并行流的所有处理器。
要解决此问题,您可以在处理流时创建自己的线程池
ForkJoinPool fjp = new ForkJoinPool(parallelism);
这将创建具有目标并行级别的ForkJoinPool。如果不传递并行性,默认情况下它将等于处理器的数量。
现在您可以将并行流提交到此自定义池。
ForkJoinPool fjp1 = new ForkJoinPool(5);
Callable<Integer> callable1 = () -> data.parallelStream()
.map(i -> (int) Math.sqrt(i))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
try {
sumFJ1 = fjp1.submit(callable1).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
让我们通过例子来理解。
package org.arpit.java2blog;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
public class PerformanceComparisonMain {
public static void main(String[] args) {
List<Integer> data = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
data.add(i);
}
System.out.println("================");
System.out.println("Parallel stream 1");
System.out.println("================");
long sum1 =data.parallelStream()
.map(i -> (int) Math.sqrt(i))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
System.out.println("Sum: "+sum1);
System.out.println("================");
System.out.println("Parallel stream 2");
System.out.println("================");
long sum2 = data.parallelStream()
.map(i -> ((int) Math.sqrt(i)*10))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
System.out.println("Sum: "+sum2);
System.out.println("================");
System.out.println("Parallel stream with custom thread pool 1");
System.out.println("================");
ForkJoinPool fjp1 = new ForkJoinPool(5);
long sumFJ1 = 0;
Callable<Integer> callable1 = () -> data.parallelStream()
.map(i -> (int) Math.sqrt(i))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
try {
sumFJ1 = fjp1.submit(callable1).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Sum: "+sumFJ1);
System.out.println("================");
System.out.println("Parallel stream with custom thread pool 2");
System.out.println("================");
Callable<Integer> callable2 = () -> data.parallelStream()
.map(i -> (int) Math.sqrt(i)*10)
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
long sumFJ2 = 0;
ForkJoinPool fjp2 = new ForkJoinPool(4);
try {
sumFJ2 = fjp2.submit(callable2).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Sum: "+sumFJ2);
}
public static int performComputation(int number) {
int sum = 0;
for (int i = 1; i < 100000; i++) {
int div = (number / i);
sum += div;
}
return sum;
}
}
输出:
================
Parallel stream 1
================
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 5
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[main,5,main] 3
Sum: 23
================
Parallel stream 2
================
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 111
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[main,5,main] 66
Sum: 522
================
Parallel stream with custom thread pool 1
================
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 5
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 0
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Sum: 23
================
Parallel stream with custom thread pool 2
================
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 111
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 0
Processing with Thread[ForkJoinPool-2-worker-2,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Sum: 522
如您所见,前两个并行流使用ForkJoinPool.commonPool
,下两个并行流使用自定义线程池,即ForkJoinPool-1
和ForkJoinPool-2
使用并行流时应记住的事项
有状态lambda表达式
应避免在流操作中使用有状态lambda表达式。有状态lambda表达式的输出取决于流操作执行期间可能更改的任何状态。
package org.arpit.java2blog;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class ListOfIntegersStatefulLambda {
public static void main(String[] args) {
List<Integer> listOfIntegers = Arrays.asList(new Integer[] {40,34,21,37,20});
List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
listOfIntegers.parallelStream()
// You shou! It uses a stateful lambda expression.
.map(e -> {
syncList.add(e);
return e;
})
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
syncList.stream().forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
}
}
输出:
40 34 21 37 20
40 34 37 20 21
forEachOrdered
按流的顺序处理元素map(e -> {syncList.add(e); return e;})
是有状态lambda和.map(e -> {syncList.add(e); return e;})
将元素添加到
syncList
可能会有所不同,所以在使用并行流时不应使用有状态lambda操作。
Interference
流操作中的Lambda表达式不应修改流的源。
下面的代码尝试将元素添加到整数列表中,并引发concurrentModification
异常。
package org.arpit.java2blog;
import java.util.ArrayList;
import java.util.List;
public class ListOfIntegersStatefulLambda {
public static void main(String[] args) {
List<Integer> listOfIntegers = new ArrayList<>();
Integer[] intArray =new Integer[] {40,34,21,37,20};
for(Integer in:intArray)
{
listOfIntegers.add(in);
}
listOfIntegers.parallelStream()
.peek( i -> listOfIntegers.add(7))
.forEach(e -> System.out.print(e + " "));
System.out.println("");
}
}
输出:
34 21 40 20 37 Exception in thread “main” java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.arpit.java2blog.ListOfIntegersStatefulLambda.main(ListOfIntegersStatefulLambda.java:19)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
请注意,所有中间操作都是惰性的,流的执行在调用foreach
时开始。peek的参数试图在流执行期间修改流源,这导致Java抛出ConcurrentModificationException
结论
您已经通过示例了解了何时使用并行流。使用并行流时应小心。如果在正确的上下文中使用,并行流是非常强大的。
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2272.html
暂无评论