Coding With Fun
Home Docker Django Node.js Articles FAQ

parallelStream pit, do not step do not know, a step startled


May 31, 2021 Article blog


Table of contents


The article comes from the public number: Little Sister Taste

Many students like to use lambda expressions, which allow you to define short, concise functions that reflect your superior coding skills. Of course, this feature is a bit of a loss for some companies that measure workload by the number of lines of code.

For example, the following code snippet, let people read as if reading poetry. But if it's not used well, it can be fatal.

List<Integer> transactionsIds =
widgets.stream()
             .filter(b -> b.getColor() == RED)
             .sorted((x,y) -> x.getWeight() - y.getWeight())
             .mapToInt(Widget::getWeight)
             .sum();

This code has a key function, which is stream I t allows you to convert a normal list into a stream, and then you can manipulate list in a pipeline-like way. In short, all used is good.

Here comes the problem

What happens if we change stream to parallelStream

Literally, the stream changes from 串行 to 并行

Since it's parallel, think about it with your ass and you know there's going to be a threading security issue in it. B ut we're not talking about thread-safe collections here, it's too low-level. At this stage, knowing how to use thread-safe collections in a thread-insecure environment is already a basic skill.

Where the pit is placed this time is a performance issue for parallel streams.

Let's talk in code.

The following code opens eight threads, all of which use parallel streams for 数据计算 In the logic of execution, we let each task sleep for 1 second, which simulates the time-consuming waiting for some I/O requests.

With stream the program will return after 30 seconds, but we expect the program to return in more than 1 second because it is a parallel stream and can afford the title.

The test found that we waited a long time before the task was completed.

static void paralleTest() {
    List<Integer> numbers = Arrays.asList(
            0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
            10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
            20, 21, 22, 23, 24, 25, 26, 27, 28, 29
    );
    final long begin = System.currentTimeMillis();
    numbers.parallelStream().map(k -> {
        try {
            Thread.sleep(1000);
            System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return k;
    }).collect(Collectors.toList());
}


public static void main(String[] args) {
//    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
}

pit

In fact, this code takes different amounts of time to execute on different machines.

Since it's parallel, there must be a parallelity. I t's too low to reflect parallel ability; I was dismayed to find that a lot of advanced research and development, the thread pool of various parameters back of the rotten melon, all kinds of tuning, dare to open one eye closed one eye in I/O intensive business with parallelStream

To understand this 并行度 we need to look at the specific construction methods. Find this code in the ForkJoinPool class.

try {  // ignore exceptions in accessing/parsing properties
    String pp = System.getProperty
        ("java.util.concurrent.ForkJoinPool.common.parallelism");
    if (pp != null)
        parallelism = Integer.parseInt(pp);
    fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
        "java.util.concurrent.ForkJoinPool.common.threadFactory");
    handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
        "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
} catch (Exception ignore) {
}


if (fac == null) {
    if (System.getSecurityManager() == null)
        fac = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        fac = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
    parallelism = 1;
if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;

As you can see, the degree of parallelion is exactly what it is and is controlled by the parameters below. If this parameter cannot be obtained, the parallelity CPU个数-1 is used by default.

As you can see, this function is designed for computationally intensive business. If you feed it a bunch of tasks, it goes from parallel execution to a serial-like effect.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

Even if you set an initial value size using -Djava.util.concurrent.ForkJoinPool.common.parallelism=N it still has a problem.

Because, parallelism this variable is final, once set, not allowed to modify. That is, the above parameters will only take effect once.

Zhang San may use the following code to set the parallelity size to 20

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

Li Si may have set this value to 30 in the same way. Which value is actually used in the project, you have to ask the JVM how the class information is loaded.

This approach is not very reliable.

A solution

We can achieve different types of task separation by providing an external forkjoinpool which is to change the way we submit.

The code looks like this, and task separation is achieved by explicitly committing code.

ForkJoinPool pool = new ForkJoinPool(30);


final long begin = System.currentTimeMillis();
try {
    pool.submit(() ->
            numbers.parallelStream().map(k -> {
                try {
                    Thread.sleep(1000);
                    System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return k;
            }).collect(Collectors.toList())).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

In this way, different scenarios can have different degrees of parallelion. This approach is similar CountDownLatch and we need to manage resources manually.

In this way, the amount of code increases, and 优雅 has little to do with, not only is not elegant, but also ugly. The white swan becomes an ugly duckling, will you still love it?

Above is W3Cschool编程狮 about parallelStream pit, do not step do not know, a step scared related to the introduction, I hope to help you.