Executors, Futures & Starvation

Blocking Work Inside parallelStream / ForkJoinPool

Blocking Work Inside parallelStream / ForkJoinPool: practice a Java concurrency bug with symptoms like Throughput collapse, Parallel work underperforms,...

  • Work-stealing pitfalls
  • ForkJoinPool
  • parallelStream
  • Java
  • Intermediate

Production symptoms

  • Throughput collapse
  • Parallel work underperforms
  • Latency spikes

Failure scenario

Code

Java example
List<Response> responses = ids.parallelStream()
        .map(id -> callRemoteLikeThing(id))
        .collect(toList());

Prod Symptoms

A request path uses parallelStream for many HTTP, database, filesystem, or rate-limited calls. Under load, the work finishes in slow waves instead of smooth parallel progress.

Key signal: parallelStream is a poor fit for unbounded blocking work on the shared common pool.

  • Latency arrives in batches tied to common-pool parallelism
  • ForkJoin common-pool workers spend time waiting on external work
  • Unrelated code using parallel streams slows down during the same window
  • CPU is not maxed because the workers are waiting on external work
  • Increasing input size produces tail-latency spikes rather than proportional speedup

Run Locally

  • Elapsed time tends to arrive in batches tied to common-pool parallelism
  • Workers are not doing CPU work; they are sleeping as a stand-in for I/O
  • Reducing common-pool parallelism makes the batching easier to see

Inspect hints

  • Thread dumps during the sleep show ForkJoinPool.commonPool-worker threads in TIMED_WAITING
  • If production has real I/O, inspect network/client waits rather than CPU hotspots
Run
javac BlockingParallelStreamDemo.java
java BlockingParallelStreamDemo
Inspect during a longer run
jps
jcmd <pid> Thread.print
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=2 BlockingParallelStreamDemo
BlockingParallelStreamDemo.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class BlockingParallelStreamDemo {
    public static void main(String[] args) {
        int parallelism = ForkJoinPool.getCommonPoolParallelism();
        int tasks = Math.max(16, parallelism * 4);

        List<Integer> ids = IntStream.range(0, tasks)
                .boxed()
                .collect(Collectors.toList());

        long start = System.nanoTime();
        List<String> results = ids.parallelStream()
                .map(BlockingParallelStreamDemo::callRemoteLikeThing)
                .collect(Collectors.toList());
        long elapsedMillis = (System.nanoTime() - start) / 1_000_000;

        System.out.println("common pool parallelism = " + parallelism);
        System.out.println("tasks = " + results.size());
        System.out.println("elapsed ms = " + elapsedMillis);
    }

    private static String callRemoteLikeThing(int id) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "ok-" + id;
    }
}

Note: Thread.sleep stands in for blocking external I/O. Exact timings depend on common-pool parallelism.

Diagnosis and fix

Explanation

ForkJoinPool works best when tasks split CPU work and keep workers available for stealing.

Key signal: Use explicit concurrency control for blocking work instead of borrowing the shared common pool.

  • Blocking calls occupy workers without using CPU
  • The common pool has limited parallelism
  • Other parallel streams share that same common pool by default
  • Blocking inside the pool can reduce effective parallelism for unrelated work
  • The issue is workload mismatch and shared-capacity exhaustion, not incorrect results

How to Diagnose

Measure elapsed time, common-pool occupancy, and downstream wait time while the work is in progress.

  • Look for ForkJoinPool.commonPool-worker threads blocked, waiting, or sleeping
  • Check whether the mapped function performs I/O, sleeps, waits on futures, or takes locks
  • Compare elapsed time with expected external-call latency and common-pool parallelism
  • Look for unrelated parallel streams slowing down during the same window
  • Correlate with downstream client metrics such as connection-pool waits, timeouts, and rate limits
  • Use load tests because a single local run may not reveal shared common-pool interference
Commands
jps
jcmd <pid> Thread.print
jstack <pid>
Expected dump shape
"ForkJoinPool.commonPool-worker-1" #... TIMED_WAITING (sleeping)
  at java.lang.Thread.sleep(Native Method)
  at BlockingParallelStreamDemo.callRemoteLikeThing(BlockingParallelStreamDemo.java:...)

Note: Sleeping in the demo represents real waits such as HTTP, database, filesystem, or rate-limited APIs.

How to Fix

  • Use a dedicated ExecutorService, async client, or explicit bulkhead for blocking work
  • Size concurrency from expected blocking latency and downstream capacity, not CPU count alone
  • Keep parallelStream for bounded CPU work with small, non-blocking operations
  • Apply timeouts, cancellation, and backpressure around real external calls
  • Do not globally tune ForkJoinPool.common.parallelism as a local fix for one blocking path
BlockingWorkDedicatedExecutorFixed.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class BlockingWorkDedicatedExecutorFixed {
    public static void main(String[] args) {
        List<Integer> ids = IntStream.range(0, 32)
                .boxed()
                .collect(Collectors.toList());

        ExecutorService blockingPool = Executors.newFixedThreadPool(16);
        try {
            long start = System.nanoTime();
            List<CompletableFuture<String>> futures = new ArrayList<>();

            for (Integer id : ids) {
                futures.add(CompletableFuture.supplyAsync(
                        () -> callRemoteLikeThing(id),
                        blockingPool));
            }

            List<String> results = futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());

            long elapsedMillis = (System.nanoTime() - start) / 1_000_000;
            System.out.println("tasks = " + results.size());
            System.out.println("elapsed ms = " + elapsedMillis);
        } finally {
            blockingPool.shutdown();
        }
    }

    private static String callRemoteLikeThing(int id) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "ok-" + id;
    }
}

Note: A dedicated pool makes blocking capacity explicit. In production, pair it with queue limits and downstream protection.