3年前 (2021-08-27)  Java系列 |   抢沙发  569 
文章评分 0 次,平均分 0.0

Java8引入了并行流parallel stream的概念来进行并行处理。由于廉价的硬件成本,我们现在拥有更多的cpu核,因此可以使用并行处理来更快地执行操作。

让我们通过简单的例子来理解

package org.arpit.java2blog.java8;
 
import java.util.Arrays;
import java.util.stream.IntStream;
 
public class Java8ParallelStreamMain {
 
    public static void main(String[] args) {
 
        System.out.println("=================================");
        System.out.println("Using Sequential Stream");
        System.out.println("=================================");
        int[] array= {1,2,3,4,5,6,7,8,9,10};
        IntStream intArrStream=Arrays.stream(array);
        intArrStream.forEach(s->
        {
            System.out.println(s+" "+Thread.currentThread().getName());
        }
                );
 
        System.out.println("=================================");
        System.out.println("Using Parallel Stream");
        System.out.println("=================================");
        IntStream intParallelStream=Arrays.stream(array).parallel();
        intParallelStream.forEach(s->
        {
            System.out.println(s+" "+Thread.currentThread().getName());
        }
                );
    }
}

当你们运行上面的程序时,你们将得到下面的输出

=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1

如果您注意到输出,则在顺序流的情况下,主线程正在执行所有工作。它等待当前迭代完成,然后进行下一个迭代。

在并行流的情况下,会同时生成4个线程,并在内部使用Fork和Join池创建和管理线程。并行流通过静态ForkJoinPool.commonPool()方法创建ForkJoinPool实例。

并行流利用所有可用CPU核的优点,并行处理任务。如果任务数超过核心数,则剩余任务将等待当前正在运行的任务完成。

Parallel Stream很酷,所以你应该一直使用它吗?

一个很大的拒绝!!

只需添加。parallel,就可以轻松地将顺序流转换为并行流,但这并不意味着您应该始终使用它。

在使用并行流时,有很多因素需要考虑,否则将受到并行流的负面影响。

并行流比顺序流有更高的开销,并且线程之间的协调需要很长的时间。

您需要考虑并行流,当且仅当:

您要处理的数据集很大。

正如您所知,Java使用ForkJoinPool来实现并行性,ForkJoinPool分叉源流并提交以供执行,所以您的源流应该是可拆分的。

例如:

ArrayList很容易拆分,因为我们可以通过索引找到中间元素并进行拆分,但是LinkedList很难拆分,并且在大多数情况下执行得不太好。

实际上,您正遭受性能问题的困扰。

您需要确保线程之间的所有共享资源都需要正确同步,否则可能会产生意外的结果。

测量并行性的最简单公式是Brian Goetz在其演示文稿中提供的“NQ”模型。

NQ型号:

N x Q > 10000

N=数据集中的项目数

Q=每个项目的工作量

这意味着,如果您有大量的数据集,并且每个项目的工作量较少(例如:Sum),并行性可能会帮助您更快地运行程序,反之亦然。因此,如果您的数据集数量较少,而每个项目的工作较多(做一些计算工作),那么并行性也可能帮助您更快地获得结果。

让我们看看另一个例子。

在本例中,我们将看到在并行流和顺序流的情况下执行长时间计算时CPU的行为。我们正在进行一些arbit计算以使CPU繁忙。

package org.arpit.java2blog.java8;
import java.util.ArrayList;
import java.util.List;
 
public class PerformanceComparisonMain {
 
    public static void main(String[] args) {
 
        long currentTime=System.currentTimeMillis();
        List<Integer> data=new ArrayList<Integer>();
        for (int i = 0; i < 100000; i++) {
            data.add(i);
        }
 
        long sum=data.stream()
                .map(i ->(int)Math.sqrt(i))
                .map(number->performComputation(number))
                .reduce(0,Integer::sum);
 
        System.out.println(sum);
        long endTime=System.currentTimeMillis();
        System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
 
    }
 
    public static int performComputation(int number)
    {
        int sum=0;
        for (int i = 1; i < 1000000; i++) {
            int div=(number/i);
            sum+=div;
 
        }
        return sum;
    }
}

当你们运行上面的程序时,你们将得到下面的输出。

117612733
Time taken to complete:6 minutes

但我们对这里的输出不感兴趣,而是对执行上述操作时CPU的行为感兴趣。

Java Parallel Stream

正如您所看到的,在顺序流的情况下,CPU并没有得到充分利用。

让我们在第16行进行更改,使流并行并再次运行程序。

long sum=data.stream()
                .parallel()
                .map(i ->(int)Math.sqrt(i))
                .map(number->performComputation(number))
                .reduce(0,Integer::sum);

当并行运行流时,您将得到低于输出的结果。

117612733
Time taken to complete:3 minutes

让我们在使用并行流运行程序时检查CPU历史记录。

Java Parallel Stream

正如您所看到的,并行流使用了所有4个CPU核来执行计算。

并行流中的自定义线程池

默认情况下,并行流使用ForkJoinPool.commonPool,其线程数比处理器数少一个。这意味着并行流使用所有可用的处理器,因为它也使用主线程。

如果您正在使用多个并行流,则它们将共享同一个ForkJoinPool.commonPool。这意味着您可能无法使用分配给每个并行流的所有处理器。

要解决此问题,您可以在处理流时创建自己的线程池

ForkJoinPool fjp = new ForkJoinPool(parallelism);

这将创建具有目标并行级别的ForkJoinPool。如果不传递并行性,默认情况下它将等于处理器的数量。

现在您可以将并行流提交到此自定义池。

ForkJoinPool fjp1 = new ForkJoinPool(5);
Callable<Integer> callable1 = () -> data.parallelStream()
                   .map(i -> (int) Math.sqrt(i))
                   .map(number -> performComputation(number))
                   .peek( (i) -> {
                      System.out.println("Processing with "+Thread.currentThread()+" "+ i);
 
                    })
                    .reduce(0, Integer::sum);
 
        try {
            sumFJ1 = fjp1.submit(callable1).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

让我们通过例子来理解。

package org.arpit.java2blog;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
 
public class PerformanceComparisonMain {
 
    public static void main(String[] args) {
 
        List<Integer> data = new ArrayList<Integer>();
        for (int i = 0; i < 10; i++) {
            data.add(i);
        }
 
        System.out.println("================");
        System.out.println("Parallel stream 1");
        System.out.println("================");
        long sum1 =data.parallelStream()
        .map(i -> (int) Math.sqrt(i))
        .map(number -> performComputation(number))
        .peek( (i) -> {
           System.out.println("Processing with "+Thread.currentThread()+" "+ i);
 
           })
        .reduce(0, Integer::sum);
 
        System.out.println("Sum: "+sum1);
 
        System.out.println("================");
        System.out.println("Parallel stream 2");
        System.out.println("================");
 
        long sum2 = data.parallelStream()
                .map(i -> ((int) Math.sqrt(i)*10))
                .map(number -> performComputation(number))
                .peek( (i) -> {
                   System.out.println("Processing with "+Thread.currentThread()+" "+ i);
 
                   })
                .reduce(0, Integer::sum);
 
        System.out.println("Sum: "+sum2);
 
        System.out.println("================");
        System.out.println("Parallel stream with custom thread pool 1");
        System.out.println("================");
 
        ForkJoinPool fjp1 = new ForkJoinPool(5);
        long sumFJ1 = 0;
 
        Callable<Integer> callable1 = () -> data.parallelStream()
                   .map(i -> (int) Math.sqrt(i))
                   .map(number -> performComputation(number))
                   .peek( (i) -> {
                      System.out.println("Processing with "+Thread.currentThread()+" "+ i);
 
                    })
                    .reduce(0, Integer::sum);
 
        try {
            sumFJ1 = fjp1.submit(callable1).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
 
        System.out.println("Sum: "+sumFJ1);
 
        System.out.println("================");
        System.out.println("Parallel stream with custom thread pool 2");
        System.out.println("================");
 
        Callable<Integer> callable2 = () -> data.parallelStream()
                .map(i -> (int) Math.sqrt(i)*10)
                .map(number -> performComputation(number))
                .peek( (i) -> {
                   System.out.println("Processing with "+Thread.currentThread()+" "+ i);
 
                   })
                .reduce(0, Integer::sum);
 
        long sumFJ2 = 0;
 
        ForkJoinPool fjp2 = new ForkJoinPool(4);
 
        try {
            sumFJ2 = fjp2.submit(callable2).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
 
        System.out.println("Sum: "+sumFJ2);
    }
 
    public static int performComputation(int number) {
        int sum = 0;
        for (int i = 1; i < 100000; i++) {
            int div = (number / i);
            sum += div;
 
        }
        return sum;
    }
}

输出:

================
Parallel stream 1
================
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 5
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[main,5,main] 3
Sum: 23
================
Parallel stream 2
================
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 111
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[main,5,main] 66
Sum: 522
================
Parallel stream with custom thread pool 1
================
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 5
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 0
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Sum: 23
================
Parallel stream with custom thread pool 2
================
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 111
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 0
Processing with Thread[ForkJoinPool-2-worker-2,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Sum: 522

如您所见,前两个并行流使用ForkJoinPool.commonPool,下两个并行流使用自定义线程池,即ForkJoinPool-1ForkJoinPool-2

使用并行流时应记住的事项

有状态lambda表达式

应避免在流操作中使用有状态lambda表达式。有状态lambda表达式的输出取决于流操作执行期间可能更改的任何状态。

package org.arpit.java2blog;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
 
public class ListOfIntegersStatefulLambda {
 
    public static void main(String[] args) {
 
        List<Integer> listOfIntegers = Arrays.asList(new Integer[] {40,34,21,37,20});
        List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
        listOfIntegers.parallelStream()
 
                // You shou! It uses a stateful lambda expression.
                .map(e -> {
                    syncList.add(e);
                    return e;
                })
        .forEachOrdered(e -> System.out.print(e + " "));
 
        System.out.println("");
 
        syncList.stream().forEachOrdered(e -> System.out.print(e + " "));
        System.out.println("");
    }
}

输出:

40 34 21 37 20
40 34 37 20 21

forEachOrdered按流的顺序处理元素map(e -> {syncList.add(e); return e;})是有状态lambda和.map(e -> {syncList.add(e); return e;})将元素添加到

syncList可能会有所不同,所以在使用并行流时不应使用有状态lambda操作。

Interference

流操作中的Lambda表达式不应修改流的源。

下面的代码尝试将元素添加到整数列表中,并引发concurrentModification异常。

package org.arpit.java2blog;
 
import java.util.ArrayList;
import java.util.List;
 
public class ListOfIntegersStatefulLambda {
 
    public static void main(String[] args) {
 
        List<Integer> listOfIntegers = new ArrayList<>();
        Integer[] intArray =new Integer[] {40,34,21,37,20};
        for(Integer in:intArray)
        {
            listOfIntegers.add(in);
        }
        listOfIntegers.parallelStream()
 
        .peek( i -> listOfIntegers.add(7))      
        .forEach(e -> System.out.print(e + " "));
 
        System.out.println("");
 
    }
}

输出:

34 21 40 20 37 Exception in thread “main” java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.arpit.java2blog.ListOfIntegersStatefulLambda.main(ListOfIntegersStatefulLambda.java:19)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

请注意,所有中间操作都是惰性的,流的执行在调用foreach时开始。peek的参数试图在流执行期间修改流源,这导致Java抛出ConcurrentModificationException

结论

您已经通过示例了解了何时使用并行流。使用并行流时应小心。如果在正确的上下文中使用,并行流是非常强大的。

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2272.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册