Fork/Join Framework

Imagine you have a massive pile of 10,000 unsorted documents to organize. You could do it all yourself (sequential processing), or you could split the pile in half, give one half to a friend, and keep splitting until everyone has a manageable stack. Once everyone is done, you merge the sorted stacks back together. This is the essence of the Fork/Join framework.

In this chapter, we’ll explore how Java’s ForkJoinPool leverages multi-core processors to solve complex problems efficiently using the work-stealing algorithm.

1. The Fork/Join Framework

Introduced in Java 7, the ForkJoinPool is a specialized implementation of ExecutorService designed for tasks that can be broken down into smaller pieces recursively.

Core Components

  1. ForkJoinPool: The engine that manages worker threads and executes tasks.
  2. ForkJoinTask: The base class for tasks running within the pool.
    • **RecursiveTask**: A task that returns a result.
    • RecursiveAction: A task that does not return a result (side-effect only).

2. The Algorithm: Work-Stealing

The secret sauce of ForkJoinPool is Work-Stealing.

  • Standard Thread Pools: Typically use a single shared queue for tasks. Threads contend for the lock on this queue.
  • ForkJoinPool: Each worker thread has its own double-ended queue (deque).
  • Threads push new tasks to the head of their own deque.
  • Threads pop tasks from the head of their own deque (LIFO - Last In, First Out).
  • When a thread is empty, it steals a task from the tail of another thread’s deque (FIFO - First In, First Out).

This minimizes contention because threads operate on opposite ends of the deques!

Interactive: Work-Stealing Simulator

Visualize how idle threads steal work from busy threads to keep the CPU fully utilized.

Work-Stealing Simulator

3. Code Implementation

Let’s implement a Merge Sort algorithm using the Fork/Join framework. Merge Sort is a classic “divide and conquer” algorithm perfectly suited for recursive task decomposition.

Java

We extend RecursiveAction because sorting an array happens in-place (no return value needed). If we were summing numbers, we’d use RecursiveTask<Integer>.

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.Arrays;

public class ParallelMergeSort {

  private static class MergeSortTask extends RecursiveAction {
    private final int[] array;
    private final int left;
    private final int right;
    private static final int THRESHOLD = 100; // Granularity control

    public MergeSortTask(int[] array, int left, int right) {
      this.array = array;
      this.left = left;
      this.right = right;
    }

    @Override
    protected void compute() {
      if (right - left < THRESHOLD) {
        // Base case: Sort small chunk sequentially
        Arrays.sort(array, left, right + 1);
      } else {
        // Recursive case: Split task
        int mid = (left + right) / 2;
        MergeSortTask leftTask = new MergeSortTask(array, left, mid);
        MergeSortTask rightTask = new MergeSortTask(array, mid + 1, right);

        // Fork: Push tasks to deque
        invokeAll(leftTask, rightTask);

        // Join: Wait for completion (implicit in invokeAll)
        merge(mid);
      }
    }

    private void merge(int mid) {
      // Standard merge logic combining sorted halves
      int[] temp = new int[right - left + 1];
      int i = left, j = mid + 1, k = 0;
      while (i <= mid && j <= right) {
        if (array[i] <= array[j]) temp[k++] = array[i++];
        else temp[k++] = array[j++];
      }
      while (i <= mid) temp[k++] = array[i++];
      while (j <= right) temp[k++] = array[j++];
      System.arraycopy(temp, 0, array, left, temp.length);
    }
  }

  public static void main(String[] args) {
    int[] data = new int[10000];
    // fill random...

    // Use common pool
    ForkJoinPool pool = ForkJoinPool.commonPool();
    pool.invoke(new MergeSortTask(data, 0, data.length - 1));
  }
}

[!TIP] Threshold Matters: Setting the threshold too low causes overhead from creating too many small objects. Setting it too high reduces parallelism. It requires tuning.

Go

Go doesn’t have a direct ForkJoinPool equivalent because its Goroutines are already lightweight and managed by a work-stealing runtime scheduler. However, we can simulate the “Divide and Conquer” pattern using sync.WaitGroup.

package main

import (
  "fmt"
  "sort"
  "sync"
)

const threshold = 100

func parallelMergeSort(arr []int) {
  if len(arr) < threshold {
    sort.Ints(arr)
    return
  }

  mid := len(arr) / 2
  var wg sync.WaitGroup
  wg.Add(2)

  // Fork Left: Spawn a goroutine
  go func() {
    defer wg.Done()
    parallelMergeSort(arr[:mid])
  }()

  // Fork Right: Spawn a goroutine
  go func() {
    defer wg.Done()
    parallelMergeSort(arr[mid:])
  }()

  // Join: Wait for both halves
  wg.Wait()
  merge(arr, mid)
}

func merge(arr []int, mid int) {
  // Standard merge logic
  left := make([]int, mid)
  right := make([]int, len(arr)-mid)
  copy(left, arr[:mid])
  copy(right, arr[mid:])

  i, j, k := 0, 0, 0
  for i < len(left) && j < len(right) {
    if left[i] <= right[j] {
      arr[k] = left[i]
      i++
    } else {
      arr[k] = right[j]
      j++
    }
    k++
  }
  for i < len(left) {
    arr[k] = left[i]
    i++
    k++
  }
  for j < len(right) {
    arr[k] = right[j]
    j++
    k++
  }
}

func main() {
  data := []int{9, 3, 7, 1, 5, 2, 8, 4, 6, 0}
  parallelMergeSort(data)
  fmt.Println("Sorted:", data)
}

4. Key Differences

Feature Java ForkJoinPool Go Runtime
Abstraction Explicit API (RecursiveTask) Implicit in Language (go keyword)
Control Fine-grained (custom pools) Runtime managed (GOMAXPROCS)
State Shared memory (careful with mutation) Shared memory or Channels
Philosophy “Framework for parallelism” “Concurrency is built-in”

[!NOTE] In Go, the runtime scheduler is a work-stealing scheduler. When a generic goroutine (P) runs out of work, it attempts to steal goroutines from other Ps, exactly like Java’s ForkJoinPool.

5. When to Use Fork/Join?

  1. Recursive Problems: Matrix multiplication, sorting, tree traversal.
  2. Processor Intensive: Tasks that are CPU-bound, not IO-bound.
  3. Independent Subtasks: Tasks should not block waiting for each other (except at the join point).

6. Summary

The Fork/Join framework provides a structured way to parallelize recursive algorithms. By using Work-Stealing, it ensures that all CPU cores remain busy even if task sizes are uneven. While Java requires explicit setup, Go’s runtime handles similar scheduling dynamics automatically for all goroutines.