Chapter preview

Chapter 10

Essential Knowledge: Parallel Programming

At the heart of any big data system is a plethora of processes and algorithms that run in parallel to crunch data and produce results that would have taken ages if they were run in a sequential manner. Parallel computing is what enables companies like Google to index the Internet and provide big data systems like email, video streaming, etc. Once workloads can be distributed effectively over multiple processes, scaling the processing horizontally becomes an easier task. In this chapter, we will explore how to parallelize work among concurrent processing units; such concepts apply for the most part whether said processing units are concurrent threads in the same process, or multiple processes running on the same machine or on multiple machines. If you haven’t already read about process management and scheduling in Sect. 3.4 of Chap. 3, now would be a good time to visit that.

Note: The examples below are abridged; the book contains more details.

  1. Thread Safety - Part I
  2. Thread Safety - Part II
  3. Volatility - Part I
  4. Volatility - Part II
  5. Synchronization
  6. Ineffectual Synchronization - Part I
  7. Ineffectual Synchronization - Part II
  8. Ineffectual Synchronization - Part III
  9. Synchronization vs. Volatility
  10. Starvation
  11. Deadlocks - Part I
  12. Deadlocks - Part II
  13. Deadlocks - Part II
  14. The Producer-Consumer Problem - Part I
  15. The Producer-Consumer Problem - Part II
  16. The Producer-Consumer Problem - Part III
  17. The Producer-Consumer Problem - Part IV
  18. The Producer-Consumer Problem - Part V
  19. The Producer-Consumer Problem - Part VI
  20. Reader-Writer Locks - Part I
  21. Reader-Writer Locks - Part II
  22. Reader-Writer Locks - Part III
  23. Reader-Writer Locks - Part IV
  24. Reentrant Locks - Part I
  25. Reentrant Locks - Part II
  26. Reentrant Locks - Part III
  27. Reentrant Locks - Part IV
  28. Thread Pools - Part I
  29. Thread Pools - Part II
  30. Thread Pools - Part III
  31. Scheduled Executor Services
  32. Futures
  33. Streams
  34. ParSeq - Part I
  35. ParSeq - Part II
  36. ParSeq - Part III
  37. ParSeq - Part IV
  38. File Locks
  39. Distributed Locks
  40. Linearizable Counters

Thread Safety - Part I

In this chapter, we will be mainly using Java to explore parallel computing concepts that, given they’re supported, can be easily translated into other programming languages.

Note: Code examples in this chapter may sacrifice best OOP practices, like encapsulation, for demonstration purposes and brevity.

# Example 1 - single-threaded logic
def do_work():
  print('Hello, world!')

do_work()

# Example 2 - multi-threaded logic
from threading import Thread

for t in (Thread(target=do_work) for _ in range(10)):
  t.start()

Thread Safety - Part II

class Task implements Runnable {
  static boolean isDone = false;
  
  public void run() {
    try {
      System.out.println("Doing work...");
      Thread.sleep(10); // do some work
      isDone = true;
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

class Poller implements Runnable {
  public void run() {
    int iterations = 0;
    System.out.println("Waiting for work to be done...");
    while (!Task.isDone) { // bad way to wait for a signal
      ++iterations;
    }
    System.out.println("Polled " + iterations + " times");
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new Poller()).start();
    new Thread(new Task()).start();
    Thread.sleep(1000); // ad-hoc wait till threads finish
  }
}

Volatility - Part I

Here’s the new code snippet with a single keyword addition, volatile, to fix the infinite-loop issue:

class Task implements Runnable {
  volatile static boolean isDone = false;
  
  public void run() {
    try {
      System.out.println("Doing work...");
      Thread.sleep(10); // do some work
      isDone = true;
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

class Poller implements Runnable {
  public void run() {
    int iterations = 0;
    System.out.println("Waiting for work to be done...");
    while (!Task.isDone) { // bad way to wait for a signal
      ++iterations;
    }
    System.out.println("Polled " + iterations + " times");
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new Poller()).start();
    new Thread(new Task()).start();
    Thread.sleep(1000); // ad-hoc wait till threads finish
  }
}

Volatility - Part II

More importantly, declaring a shared resource as volatile can be misleading as it may give a false sense of thread safety; it’s crucial to know that the mere act of sprinkling the keyword volatile on fields doesn’t guarantee thread safety and definitely doesn’t protect against concurrency issues. Let’s take the following simple task as an example:

class Task implements Runnable {
  volatile static int sum = 0;
  public void run() { ++sum; }
}

public class Main {
  public static void main(String args[]) throws Exception {
    Task task = new Task();
    for (int i = 0; i < 10000; i++) {
      new Thread(task).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("sum: " + Task.sum);
  }
}

Synchronization

Here’s the new code snippet with a single keyword addition, synchronized, to fix the race condition:

class Task implements Runnable {
  static int sum = 0;
  synchronized public void run() { ++sum; }
}

public class Main {
  public static void main(String args[]) throws Exception {
    Task task = new Task();
    for (int i = 0; i < 10000; i++) {
      new Thread(task).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("sum: " + Task.sum);
  }
}

Ineffectual Synchronization - Part I

To illustrate how subtle concurrency issues are and how hard they are to miss, we will make a very small change that may look extremely inconspicuous:

class Task implements Runnable {
  static int sum = 0;
  synchronized public void run() { ++sum; }
}

public class Main {
  public static void main(String args[]) throws Exception {
    for (int i = 0; i < 10000; i++) {
      new Thread(new Task()).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("sum: " + Task.sum);
  }
}

Ineffectual Synchronization - Part II

class Task implements Runnable {
  static int sum = 0;

  public void run() {
    synchronized(this) {
      ++sum;
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    Task task = new Task();
    for (int i = 0; i < 10000; i++) {
      new Thread(task).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("sum: " + Task.sum);
  }
}

Ineffectual Synchronization - Part III

Another example of ineffectual synchronization, that’s not applicable in Java but can happen in C#, is demonstrated below:

using System;
using System.Threading;

class Program
{
  static int sum = 0;
  
  static void DoWork()
  {
    lock ((object) 0)
    {
      for (int i = 0; i < 10000; i++) 
      {
        ++sum;
      }
    }
  }
  
  static void Main()
  {
    for (int i = 0; i < 4; i++)
    {
      new Thread(DoWork).Start();
    }
    
    Thread.Sleep(1000); // ad-hoc wait till threads finish
    Console.WriteLine("C# sum: " + sum);
  }
}

Synchronization vs. Volatility

class DeepThoughtTask implements Runnable {
  static int finalResult = 0;
  
  private int computeResult() {
    return 1; // assume this is a long-running task
  }
  
  public void run() {
    long partialResult = computeResult(); // lock-free
    synchronized (DeepThoughtTask.class) {
      finalResult += partialResult;
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    DeepThoughtTask task = new DeepThoughtTask();
    for (int i = 0; i < 42; i++) {
      new Thread(task).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("answer: " + DeepThoughtTask.finalResult);
  }
}

Starvation

class DeepThoughtTask implements Runnable {
  static int finalResult = 0;
  
  private int computeResult() {
    return 1; // assume this is a long-running task
  }
  
  public void run() {
    long partialResult = computeResult(); // lock-free
    synchronized (DeepThoughtTask.class) {
      finalResult += partialResult;
    }
  }
}

class ProcessorHog implements Runnable {
  public void run() {
    synchronized (DeepThoughtTask.class) {
      try {
        Thread.sleep(3000); // keep the CPU busy
      } catch(InterruptedException ex) {
        System.out.println(ex);
      }
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new ProcessorHog()).start();
    Thread.sleep(1000); // simulates other work being done
    DeepThoughtTask task = new DeepThoughtTask();
    for (int i = 0; i < 42; i++) {
      new Thread(task).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("answer: " + DeepThoughtTask.finalResult);
  }
}

Deadlocks - Part I

Here’s another example for issues that can occur due to lock objects being visible outside the enclosing class where they belong:

class A implements Runnable {
  public void run() {
    synchronized (A.class) {
      System.out.println("A has A.class and now wants B.class");
      try {
        Thread.sleep(500); // simulates some work being done
      } catch (InterruptedException ex) {
        System.out.println(ex);
      }
      synchronized (B.class) {
        System.out.println("A got both locks and made progress");
      }
    }
  }
}

class B implements Runnable {
  public void run() {
    synchronized (B.class) {
      System.out.println("B has B.class and now wants A.class");
      synchronized (A.class) {
        System.out.println("B got both locks and made progress");
      }
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new A()).start();
    new Thread(new B()).start();
    Thread.sleep(1000); // ad-hoc wait till threads finish
  }
}

Deadlocks - Part II

Let’s revisit one of the previous examples and make use of the AtomicInteger class:

import java.util.concurrent.atomic.AtomicInteger;

class DeepThoughtTask implements Runnable {
  static AtomicInteger finalResult = new AtomicInteger(0);
  
  private int computeResult() {
    return 1; // assume this is a long-running task
  }
  
  public void run() {
    // This is lock-free on modern CPUs
    finalResult.addAndGet(computeResult());
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    DeepThoughtTask task = new DeepThoughtTask();
    for (int i = 0; i < 42; i++) {
      new Thread(task).start();
    }
    Thread.sleep(1000); // ad-hoc wait till threads finish
    System.out.println("answer: " + DeepThoughtTask.finalResult);
  }
}

Deadlocks - Part II

import java.util.concurrent.atomic.AtomicInteger;

class DeepThoughtTask implements Runnable {
  static AtomicInteger finalResult = new AtomicInteger(0);
  
  private int computeResult() {
    return 1; // assume this is a long-running task
  }
  
  public void run() {
    // This is lock-free on modern CPUs
    finalResult.addAndGet(computeResult());
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    DeepThoughtTask task = new DeepThoughtTask();
    Thread[] threads = new Thread[42];
    for (int i = 0; i < threads.length; i++) {
      threads[i] = new Thread(task);
      threads[i].start(); // fork; non-blocking
    }
    for (int i = 0; i < threads.length; i++) {
      threads[i].join(); // wait; blocking
    }
    System.out.println("answer: " + DeepThoughtTask.finalResult);
  }
}

The Producer-Consumer Problem - Part I

Let’s talk a look at an example of a bad implementation; first, let’s examine our shared resource, which is a bounded queue in this case that’s protected with busy waiting:

class Shared {
  static final int MAX_BUFFER_SIZE = 3;
  static Queue<String> buffer = new ArrayDeque<>();
  private static volatile boolean shouldWait = true;
  
  static void waitUntilNotified() {
    while (shouldWait);
    shouldWait = true;
  }
  
  static void notifyWaitingThread() {
    shouldWait = false;
  }
}

The Producer-Consumer Problem - Part II

The consumer class in our simple example consumes a message from the queue and prints it if there’s one; otherwise it waits:

class Consumer implements Runnable {
  public void run() {
    while (true) {
      if (Shared.buffer.size() == 0) {
        Shared.waitUntilNotified();
      }
      
      consume();
      
      if (shouldNotifyProducers()) {
        Shared.notifyWaitingThread();
      }
    }
  }
  
  private void consume() {
    System.out.println("Consumed: " + Shared.buffer.remove());
  }
  
  private boolean shouldNotifyProducers() {
    return Shared.buffer.size() == Shared.MAX_BUFFER_SIZE - 1;
  }
}

The Producer-Consumer Problem - Part III

class Producer implements Runnable {
  private static int i = 0;
  
  public void run() {
    while (true) {
      if (Shared.buffer.size() == Shared.MAX_BUFFER_SIZE) {
        Shared.waitUntilNotified();
      }
      
      Shared.buffer.add(produce());
      
      if (shouldNotifyConsumers()) {
        Shared.notifyWaitingThread();
      }
    }
  }
  
  private String produce() {
    return String.valueOf(i++);
  }
  
  private boolean shouldNotifyConsumers() {
    return Shared.buffer.size() == 1;
  }
}

The Producer-Consumer Problem - Part IV

Running the following program will result into a race condition:

// { autofold
class Shared {
  static final int MAX_BUFFER_SIZE = 3;
  static Queue<String> buffer = new ArrayDeque<>();
  private static volatile boolean shouldWait = true;
  
  static void waitUntilNotified() {
    while (shouldWait);
    shouldWait = true;
  }
  
  static void notifyWaitingThread() {
    shouldWait = false;
  }
}

class Consumer implements Runnable {
  public void run() {
    while (true) {
      if (Shared.buffer.size() == 0) {
        Shared.waitUntilNotified();
      }
      
      consume();
      
      if (shouldNotifyProducers()) {
        Shared.notifyWaitingThread();
      }
    }
  }
  
  private void consume() {
    System.out.println("Consumed: " + Shared.buffer.remove());
  }
  
  private boolean shouldNotifyProducers() {
    return Shared.buffer.size() == Shared.MAX_BUFFER_SIZE - 1;
  }
}

class Producer implements Runnable {
  private static int i = 0;
  
  public void run() {
    while (true) {
      if (Shared.buffer.size() == Shared.MAX_BUFFER_SIZE) {
        Shared.waitUntilNotified();
      }
      
      Shared.buffer.add(produce());
      
      if (shouldNotifyConsumers()) {
        Shared.notifyWaitingThread();
      }
    }
  }
  
  private String produce() {
    return String.valueOf(i++);
  }
  
  private boolean shouldNotifyConsumers() {
    return Shared.buffer.size() == 1;
  }
}
// }

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new Producer(), "produce").start();
    new Thread(new Consumer(), "consume").start();
  }
}

The Producer-Consumer Problem - Part V

In Java, it’s idiomatic to use wait and notify, which require acquiring a lock on the object for which those methods are called:

class Shared {
  static final int MAX_BUFFER_SIZE = 3;
  static Queue<String> buffer = new ArrayDeque<>();
  private static final Object lock = new Object();
  
  static void waitUntilNotified() {
    try {
      synchronized (lock) {
        lock.wait();
      }
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
  
  static void notifyWaitingThread() {
    synchronized (lock) {
      lock.notify();
    }
  }
}

// { autofold
class Consumer implements Runnable {
  public void run() {
    while (true) {
      if (Shared.buffer.size() == 0) {
        Shared.waitUntilNotified();
      }
      
      consume();
      
      if (shouldNotifyProducers()) {
        Shared.notifyWaitingThread();
      }
    }
  }
  
  private void consume() {
    System.out.println("Consumed: " + Shared.buffer.remove());
  }
  
  private boolean shouldNotifyProducers() {
    return Shared.buffer.size() == Shared.MAX_BUFFER_SIZE - 1;
  }
}

class Producer implements Runnable {
  private static int i = 0;
  
  public void run() {
    while (true) {
      if (Shared.buffer.size() == Shared.MAX_BUFFER_SIZE) {
        Shared.waitUntilNotified();
      }
      
      Shared.buffer.add(produce());
      
      if (shouldNotifyConsumers()) {
        Shared.notifyWaitingThread();
      }
    }
  }
  
  private String produce() {
    return String.valueOf(i++);
  }
  
  private boolean shouldNotifyConsumers() {
    return Shared.buffer.size() == 1;
  }
}
// }

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new Producer(), "produce").start();
    new Thread(new Consumer(), "consume").start();
  }
}

The Producer-Consumer Problem - Part VI

The following program fixes the deadlock issue and works correctly:

import java.util.ArrayDeque;
import java.util.Queue;

class Shared {
  private static final int MAX_BUFFER_SIZE = 3;
  private static Queue<String> buffer = new ArrayDeque<>();
  private static final Object lock = new Object();
  
  static String remove() throws InterruptedException {
    final String message;
    
    synchronized (lock) {
      while (buffer.size() == 0) {
        lock.wait();
      }
      message = buffer.remove();
      lock.notify();
    }
    return message;
  }
  
  static void add(String message) throws InterruptedException {
    synchronized (lock) {
      while (buffer.size() == MAX_BUFFER_SIZE) {
        lock.wait();
      }
      buffer.add(message);
      lock.notify();
    }
  }
}

class Consumer implements Runnable {
  public void run() {
    while (true) {
      try {
        System.out.println("Consumed: " + Shared.remove());
      } catch (Exception ex) {
        System.out.println(ex);
      }
    }
  }
}

class Producer implements Runnable {
  private static int i = 0;
  
  public void run() {
    while (true) {
      try {
        Shared.add(String.valueOf(i++));
      } catch (Exception ex) {
        System.out.println(ex);
      }
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new Thread(new Producer(), "Producer").start();
    new Thread(new Consumer(), "Consumer").start();
  }
}

Reader-Writer Locks - Part I

Here’s an example where we produce a dummy value to write to each element of the cached array so that its sum is either 0 (initial value) or its length (after any complete write operation):

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

class Simulator {
  static void simulateWork() {
    try {
      Thread.sleep(3); // simulates work being done
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

class Shared {
  static final Map<String, int[]> cache = new HashMap<>();
}

class Producer {
  protected int produce() {
    Simulator.simulateWork();
    return 1;
  }
}

class Writer extends Producer implements Runnable {
  public void run() {
    int[] referenceToValue = Shared.cache.get("key");
    
    for (int i = 0; i < referenceToValue.length; i++) {
      // Update value in-place; sum will add up to length
      referenceToValue[i] = produce();
    }
  }
}

class Consumer {
  protected void consume(int[] value) {
    int sum = Arrays.stream(value).sum();
    
    if (sum != 0 && sum != value.length) {
      throw new IllegalStateException("Partial sum: " + sum);
    }
    
    Simulator.simulateWork();
  }
}

class Reader extends Consumer implements Runnable {
  public void run() {
    consume(Shared.cache.get("key"));
  }
}

public class Main {
  private static long runTrial() throws Exception {
    long startTime = System.nanoTime();
    final Thread[] threads = new Thread[1000];
    
    Shared.cache.put("key", new int[20]); // sum = 0 at this time
    threads[0] = new Thread(new Writer(), "Writer");
    threads[0].start();
    
    for (int i = 1; i < threads.length; i++) {
      threads[i] = new Thread(new Reader(), "Reader #" + i);
      threads[i].start();
    }
    
    for (int i = 1; i < threads.length; i++) {
      threads[i].join();
    }
    
    return System.nanoTime() - startTime;
  }
  
  public static void main(String args[]) throws Exception {
    long[] trials = new long[11];
    
    for (int i = 0; i < trials.length; i++) {
      trials[i] = runTrial();
    }
    
    Arrays.sort(trials);
    
    long median = trials[trials.length / 2];
    double average = Arrays.stream(trials).
      average().
      getAsDouble();
    
    System.out.printf(
      "Median time in nanoseconds: %1$,d\n",
      median);
    System.out.printf(
      "Average time in nanoseconds: %1$,.2f\n",
      average);
  }
}

Reader-Writer Locks - Part II

In order to return the correct value, we may want to make the cache a mutually exclusive resource by making the following changes to the program:

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

class Writer extends Producer implements Runnable {
  public void run() {
    synchronized (Shared.cache) {
      int[] referenceToValue = Shared.cache.get("key");
      
      for (int i = 0; i < referenceToValue.length; i++) {
        // Update value in-place; sum will add up to length
        referenceToValue[i] = produce();
      }
    }
  }
}

class Reader extends Consumer implements Runnable {
  public void run() {
    synchronized (Shared.cache) {
      consume(Shared.cache.get("key"));
    }
  }
}

// { autofold
class Simulator {
  static void simulateWork() {
    try {
      Thread.sleep(3); // simulates work being done
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

class Shared {
  static final Map<String, int[]> cache = new HashMap<>();
}

class Producer {
  protected int produce() {
    Simulator.simulateWork();
    return 1;
  }
}

class Consumer {
  protected void consume(int[] value) {
    int sum = Arrays.stream(value).sum();
    
    if (sum != 0 && sum != value.length) {
      throw new IllegalStateException("Partial sum: " + sum);
    }
    
    Simulator.simulateWork();
  }
}

public class Main {
  private static long runTrial() throws Exception {
    long startTime = System.nanoTime();
    final Thread[] threads = new Thread[1000];
    
    Shared.cache.put("key", new int[20]); // sum = 0 at this time
    threads[0] = new Thread(new Writer(), "Writer");
    threads[0].start();
    
    for (int i = 1; i < threads.length; i++) {
      threads[i] = new Thread(new Reader(), "Reader #" + i);
      threads[i].start();
    }
    
    for (int i = 1; i < threads.length; i++) {
      threads[i].join();
    }
    
    return System.nanoTime() - startTime;
  }
  
  public static void main(String args[]) throws Exception {
    long[] trials = new long[11];
    
    for (int i = 0; i < trials.length; i++) {
      trials[i] = runTrial();
    }
    
    Arrays.sort(trials);
    
    long median = trials[trials.length / 2];
    double average = Arrays.stream(trials).
      average().
      getAsDouble();
    
    System.out.printf(
      "Median time in nanoseconds: %1$,d\n",
      median);
    System.out.printf(
      "Average time in nanoseconds: %1$,.2f\n",
      average);
  }
}
// }

Reader-Writer Locks - Part III

Of course there are many improvements we can make here to reduce contention; for example, following the best practice of reducing the time a thread spends inside a critical section:

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

class Reader extends Consumer implements Runnable {
  public void run() {
    int[] deepCopy;
    
    synchronized (Shared.cache) {
      int[] value = Shared.cache.get("key");
      
      deepCopy = Arrays.copyOf(value, value.length);
    }
    consume(deepCopy);
  }
}

// { autofold
class Writer extends Producer implements Runnable {
  public void run() {
    synchronized (Shared.cache) {
      int[] referenceToValue = Shared.cache.get("key");
      
      for (int i = 0; i < referenceToValue.length; i++) {
        // Update value in-place; sum will add up to length
        referenceToValue[i] = produce();
      }
    }
  }
}

class Simulator {
  static void simulateWork() {
    try {
      Thread.sleep(3); // simulates work being done
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

class Shared {
  static final Map<String, int[]> cache = new HashMap<>();
}

class Producer {
  protected int produce() {
    Simulator.simulateWork();
    return 1;
  }
}

class Consumer {
  protected void consume(int[] value) {
    int sum = Arrays.stream(value).sum();
    
    if (sum != 0 && sum != value.length) {
      throw new IllegalStateException("Partial sum: " + sum);
    }
    
    Simulator.simulateWork();
  }
}

public class Main {
  private static long runTrial() throws Exception {
    long startTime = System.nanoTime();
    final Thread[] threads = new Thread[1000];
    
    Shared.cache.put("key", new int[20]); // sum = 0 at this time
    threads[0] = new Thread(new Writer(), "Writer");
    threads[0].start();
    
    for (int i = 1; i < threads.length; i++) {
      threads[i] = new Thread(new Reader(), "Reader #" + i);
      threads[i].start();
    }
    
    for (int i = 1; i < threads.length; i++) {
      threads[i].join();
    }
    
    return System.nanoTime() - startTime;
  }
  
  public static void main(String args[]) throws Exception {
    long[] trials = new long[11];
    
    for (int i = 0; i < trials.length; i++) {
      trials[i] = runTrial();
    }
    
    Arrays.sort(trials);
    
    long median = trials[trials.length / 2];
    double average = Arrays.stream(trials).
      average().
      getAsDouble();
    
    System.out.printf(
      "Median time in nanoseconds: %1$,d\n",
      median);
    System.out.printf(
      "Average time in nanoseconds: %1$,.2f\n",
      average);
  }
}
// }

Reader-Writer Locks - Part IV

Can we make it run faster? The answer — in this case — is a joyful yes; we modify the program to use a reader-writer lock:

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Shared {
  static final Map<String, int[]> cache = new HashMap<>();
  private static final ReadWriteLock lock = new ReentrantReadWriteLock();
  static final Lock readLock = lock.readLock();
  static final Lock writeLock = lock.writeLock();
}

class Writer extends Producer implements Runnable {
  public void run() {
    Shared.writeLock.lock();
    try {
      int[] referenceToValue = Shared.cache.get("key");
      
      for (int i = 0; i < referenceToValue.length; i++) {
        // Update value in-place; sum will add up to length
        referenceToValue[i] = produce();
      }
    } finally {
      Shared.writeLock.unlock();
    }
  }
}

class Reader extends Consumer implements Runnable {
  public void run() {
    Shared.readLock.lock();
    try {
      consume(Shared.cache.get("key"));
    } finally {
      Shared.readLock.unlock();
    }
  }
}

// { autofold
class Simulator {
  static void simulateWork() {
    try {
      Thread.sleep(3); // simulates work being done
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

class Producer {
  protected int produce() {
    Simulator.simulateWork();
    return 1;
  }
}

class Consumer {
  protected void consume(int[] value) {
    int sum = Arrays.stream(value).sum();
    
    if (sum != 0 && sum != value.length) {
      throw new IllegalStateException("Partial sum: " + sum);
    }
    
    Simulator.simulateWork();
  }
}

public class Main {
  private static long runTrial() throws Exception {
    long startTime = System.nanoTime();
    final Thread[] threads = new Thread[1000];
    
    Shared.cache.put("key", new int[20]); // sum = 0 at this time
    threads[0] = new Thread(new Writer(), "Writer");
    threads[0].start();
    
    for (int i = 1; i < threads.length; i++) {
      threads[i] = new Thread(new Reader(), "Reader #" + i);
      threads[i].start();
    }
    
    for (int i = 1; i < threads.length; i++) {
      threads[i].join();
    }
    
    return System.nanoTime() - startTime;
  }
  
  public static void main(String args[]) throws Exception {
    long[] trials = new long[11];
    
    for (int i = 0; i < trials.length; i++) {
      trials[i] = runTrial();
    }
    
    Arrays.sort(trials);
    
    long median = trials[trials.length / 2];
    double average = Arrays.stream(trials).
      average().
      getAsDouble();
    
    System.out.printf(
      "Median time in nanoseconds: %1$,d\n",
      median);
    System.out.printf(
      "Average time in nanoseconds: %1$,.2f\n",
      average);
  }
}
// }

Reentrant Locks - Part I

This kind of locking is useful when one method acquires the lock and calls another that also acquires said lock; for example:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ReentrantLockExample {
  private final Lock lock = new ReentrantLock();
  
  public void foo() {
    lock.lock();
    try {
      // ...
      bar();
    } finally {
      lock.unlock();
    }
  }
  
  public void bar() {
    lock.lock();
    try {
      // ...
    } finally {
      lock.unlock();
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new ReentrantLockExample().foo();
  }
}

Reentrant Locks - Part II

We may use a binary semaphore to implement a lock for mutual exclusion as in the following code snippet:

import java.util.concurrent.Semaphore;

class ReentrantLockExample {
  private final Semaphore lock = new Semaphore(1);
  
  public void foo() throws InterruptedException {
    lock.acquire();
    try {
      // ...
      bar();
    } finally {
      lock.release();
    }
  }
  
  public void bar() throws InterruptedException {
    lock.acquire();
    try {
      // ...
    } finally {
      lock.release();
    }
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new ReentrantLockExample().foo();
  }
}

Reentrant Locks - Part III

That doesn’t mean that reentrant locks are safer than non-reentrant ones:

import java.util.concurrent.locks.ReentrantLock;

class NonReentrantLockExample {
  private final ReentrantLock lock = new ReentrantLock();
  
  public void foo() throws InterruptedException {
    assert !lock.isHeldByCurrentThread() :
      "lock is held by current thread";
    lock.lock();
    try {
      // ...
      threadUnsafeBar();
    } finally {
      lock.unlock();
    }
  }
  
  public void bar() throws InterruptedException {
    assert !lock.isHeldByCurrentThread() :
      "lock is held by current thread";
    lock.lock();
    try {
      threadUnsafeBar();
    } finally {
      lock.unlock();
    }
  }
  
  private void threadUnsafeBar() {
    assert lock.isHeldByCurrentThread() :
      "caller must hold the lock first";
    // ...
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new NonReentrantLockExample().foo();
  }
}

Reentrant Locks - Part IV

The locks created using the synchronized keyword in Java are reentrant; they are also known as intrinsic or monitor locks. Here’s how reentrant synchronization works:

class ReentrantIntrinsicLockExample {
  private final Object intrinsicLock = new Object();
  
  public void foo() {
    synchronized (intrinsicLock) {
      // ...
      bar();
    }
  }
  
  public void bar() {
    synchronized (intrinsicLock) {
      // ...
    }
  }
}

Thread Pools - Part I

A fixed thread pool maintain the same number of threads even if no task is running. Here’s an example of a fixed thread pool in Java:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
  public static void main(String args[]) throws Exception {
    final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    final long startTime = System.nanoTime();
    
    for (int i = 0; i < 10; i++) {
      threadPool.submit(() ->doWork(startTime));
    }
    
    threadPool.shutdown();
    
    if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
      System.out.println("Thread pool terminated gracefully.");
    } else {
      System.err.println("Thread pool timed out!");
    }
  }
  
  private static void doWork(final long startTime) {
    System.out.println("Timestamp: " + (System.nanoTime() - startTime));
    try {
      Thread.sleep(100);
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

Thread Pools - Part II

Thread pools can be also be elastic: they scale with increased demand to run parallel tasks and shrink when said demand diminishes. An example of such thread pool is called a cached thread pool; to demonstrate the difference, we replace the statement that creates the thread pool in the previous example with this one:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
  public static void main(String args[]) throws Exception {
    final ExecutorService threadPool = Executors.newCachedThreadPool();
    final long startTime = System.nanoTime();
    
    for (int i = 0; i < 10; i++) {
      threadPool.submit(() ->doWork(startTime));
    }
    
    threadPool.shutdown();
    
    if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
      System.out.println("Thread pool terminated gracefully.");
    } else {
      System.err.println("Thread pool timed out!");
    }
  }
  
  private static void doWork(final long startTime) {
    System.out.println("Timestamp: " + (System.nanoTime() - startTime));
    try {
      Thread.sleep(100);
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

Thread Pools - Part III

Another way to create an elastic thread pool is to use a work-stealing one, which allows worker threads with nothing left to do to steal work from those that are still busy with work. To demonstrate the difference:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
  public static void main(String args[]) throws Exception {
    final ExecutorService threadPool = Executors.newWorkStealingPool(3);
    final long startTime = System.nanoTime();
    
    for (int i = 0; i < 10; i++) {
      threadPool.submit(() ->doWork(startTime));
    }
    
    threadPool.shutdown();
    
    if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
      System.out.println("Thread pool terminated gracefully.");
    } else {
      System.err.println("Thread pool timed out!");
    }
  }
  
  private static void doWork(final long startTime) {
    System.out.println("Timestamp: " + (System.nanoTime() - startTime));
    try {
      Thread.sleep(100);
    } catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}

Scheduled Executor Services

Here’s an example where we create three schedulers and run them using a thread pool that always keeps 3 threads in the pool (even if they’re idle):

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Main {
  public static void main(String args[]) throws Exception {
    final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
    final long startTime = System.nanoTime();
    
    for (int i = 0; i < 3; i++) {
      final int id = i;
      
      threadPool.scheduleAtFixedRate(() ->
        doWork(id, startTime), 0, 100, TimeUnit.MILLISECONDS);
    }
    threadPool.schedule(threadPool::shutdown, 300, TimeUnit.MILLISECONDS);
    
    if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
      System.out.println("Thread pool terminated gracefully.");
    } else {
      System.err.println("Thread pool timed out!");
    }
  }
  
  private static void doWork(final int id, final long start) {
    System.out.println("ID: " + id + "\tTimestamp: " + (System.nanoTime() - start));
  }
}

Futures

Here’s an example that shows how to return the results of tasks run in parallel:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

public class Main {
  public static void main(String args[]) throws Exception {
    final ExecutorService threadPool = Executors.newCachedThreadPool();
    final List<Future<Integer>> futures = new ArrayList<>();
    final int[] data = IntStream.range(0, 10000).toArray();
    int sum = 0;
    
    for (int i = 0, step = 1000; i < data.length; i += step) {
      final IntStream chunk = Arrays.stream(data, i, i + step);
      
      futures.add(threadPool.submit(chunk::sum));
    }
    
    for (final Future<Integer> f : futures) {
      sum += f.get();
    }
    
    System.out.println("Parallel Sum: " + sum);
    threadPool.shutdown();
  }
}

Streams

The previous example can be simplified using parallel streams to be much more succinct:

import java.util.stream.IntStream;

public class Main {
  public static void main(String args[]) throws Exception {
    System.out.println("Parallel Sum: " +
      IntStream.range(0, 10000).parallel().sum());
  }
}

ParSeq - Part I

Before delving into the details of those features, let’s take a look at a familiar example, rewritten using ParSeq this time:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.trace.TraceUtil;

import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;


public class Main {
  public static void main(String args[]) throws Exception {
    final int availableProcessors =
      Runtime.getRuntime().availableProcessors();
    final ScheduledExecutorService scheduler =
      Executors.newScheduledThreadPool(availableProcessors + 1);
    final Engine engine = new EngineBuilder().
      setTaskExecutor(scheduler).
      setTimerScheduler(scheduler).
      build();
    final int[] data = IntStream.range(0, 10000).toArray();
    final int chunkSize = data.length / 4;
    final Task<Integer> sum = Task.par(
      Task.callable(
        "a",
        Arrays.stream(data, 0 * chunkSize, 1 * chunkSize)::sum),
      Task.callable(
        "b",
        Arrays.stream(data, 1 * chunkSize, 2 * chunkSize)::sum),
      Task.callable(
        "c",
        Arrays.stream(data, 2 * chunkSize, 3 * chunkSize)::sum),
      Task.callable(
        "d",
        Arrays.stream(data, 3 * chunkSize, 4 * chunkSize)::sum)
    ).map("sum", (a, b, c, d) -> a + b + c + d);
    
    engine.run(sum);
    sum.await();
    System.out.println("Parallel Sum: " + sum.get());
    engine.shutdown();
    scheduler.shutdown();
  }
}

ParSeq - Part II

To print a task’s trace as JSON:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.trace.TraceUtil;

import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;


public class Main {
  public static void main(String args[]) throws Exception {
    final int availableProcessors =
      Runtime.getRuntime().availableProcessors();
    final ScheduledExecutorService scheduler =
      Executors.newScheduledThreadPool(availableProcessors + 1);
    final Engine engine = new EngineBuilder().
      setTaskExecutor(scheduler).
      setTimerScheduler(scheduler).
      build();
    final int[] data = IntStream.range(0, 10000).toArray();
    final int chunkSize = data.length / 4;
    final Task<Integer> sum = Task.par(
      Task.callable(
        "a",
        Arrays.stream(data, 0 * chunkSize, 1 * chunkSize)::sum),
      Task.callable(
        "b",
        Arrays.stream(data, 1 * chunkSize, 2 * chunkSize)::sum),
      Task.callable(
        "c",
        Arrays.stream(data, 2 * chunkSize, 3 * chunkSize)::sum),
      Task.callable(
        "d",
        Arrays.stream(data, 3 * chunkSize, 4 * chunkSize)::sum)
    ).map("sum", (a, b, c, d) -> a + b + c + d);
    
    engine.run(sum);
    sum.await();
    System.out.println("Parallel Sum: " + sum.get());
    engine.shutdown();
    scheduler.shutdown();
    System.out.println(TraceUtil.getJsonTrace(sum));
  }
}

ParSeq - Part III

In the example below, we choose to reuse the same thread pool from which ParSeq gets its dedicated thread:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.trace.TraceUtil;

import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;


public class Main {
  public static void main(String args[]) throws Exception {
    final int availableProcessors =
      Runtime.getRuntime().availableProcessors();
    final ScheduledExecutorService scheduler =
      Executors.newScheduledThreadPool(availableProcessors + 1);
    final Engine engine = new EngineBuilder().
      setTaskExecutor(scheduler).
      setTimerScheduler(scheduler).
      build();
    final int[] data = IntStream.range(0, 10000).toArray();
    final int chunkSize = data.length / 4;
    final Task<Integer> sum = Task.par(
      Task.blocking(
        "a",
        Arrays.stream(data, 0 * chunkSize, 1 * chunkSize)::sum,
        scheduler),
      Task.blocking(
        "b",
        Arrays.stream(data, 1 * chunkSize, 2 * chunkSize)::sum,
        scheduler),
      Task.blocking(
        "c",
        Arrays.stream(data, 2 * chunkSize, 3 * chunkSize)::sum,
        scheduler),
      Task.blocking(
        "d",
        Arrays.stream(data, 3 * chunkSize, 4 * chunkSize)::sum,
        scheduler)
    ).map("sum", (a, b, c, d) -> a + b + c + d);
    
    engine.run(sum);
    sum.await();
    System.out.println("Parallel Sum: " + sum.get());
    engine.shutdown();
    scheduler.shutdown();
    System.out.println(TraceUtil.getJsonTrace(sum));
  }
}

ParSeq - Part IV

ParSeq really shines when used to perform asynchronous operations (e.g., I/O) in parallel, especially in the context of a large-scale system that requires special consideration for fault tolerance and other design requirements. Let’s take the following code snippet as an example:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.httpclient.HttpClient;
import com.linkedin.parseq.trace.TraceUtil;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class Main {
  public static void main(String args[]) throws Exception {
    final int availableProcessors =
      Runtime.getRuntime().availableProcessors();
    final ScheduledExecutorService scheduler =
      Executors.newScheduledThreadPool(availableProcessors + 1);
    final Engine engine = new EngineBuilder().
      setTaskExecutor(scheduler).
      setTimerScheduler(scheduler).
      build();
    final Task<Task<Void>> get = Task.par(
      getContentType("a://a"), // causes an exception
      getContentType("http://www.bing.com"),
      getContentType("http://www.facebook.com"),
      getContentType("http://www.google.com"),
      getContentType("http://www.yahoo.com"))
      .map((a, b, f, g, y) -> print(a, b, f, g, y));
    
    engine.run(Task.flatten(get));
    get.await();
    engine.shutdown();
    scheduler.shutdown();
    System.out.println(TraceUtil.getJsonTrace(get));
  }
  
  private static Task<String> getContentType(String url) {
    return HttpClient.get(url).task()
      .withTimeout(1, TimeUnit.SECONDS)
      .toTry()
      .map(t -> t.isFailed() ?
        "Failed to get " + url : t.get().getContentType());
  }
  
  private static Task<Void> print(String... strings) {
    return Task.action(() -> {
      for (final String s : strings) System.out.println(s);
    });
  }
}

File Locks

In Java, we will use the java.nio.channels.FileLock class to demonstrate how to use it as a mutex that protects a critical section; this time around, only a single process can be inside said critical section:

file-locks.java

import java.lang.management.ManagementFactory;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.FileSystems;
import java.nio.file.Path;

import static java.nio.file.StandardOpenOption.*;


public class Main {
  public static void main(String args[]) throws Exception {
    final Path lockPath =
      FileSystems.getDefault().getPath(".lock");
    final String processID =
      ManagementFactory.getRuntimeMXBean().getName();
    
    try (final FileLock lock =
      FileChannel.open(lockPath, WRITE, CREATE).lock()) {
      System.out.println(processID + " acquired the lock");
      Thread.sleep(1000); // simulates other work being done
      System.out.println(processID + " is releasing the lock");
    }
  }
}

run-file-locks-example.sh

java Main& java Main& java Main&

Distributed Locks

Using ZooKeeper v3.4.12 and Curator v4.0.1, we demonstrate below how to use ZooKeeper to rewrite the previous example:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.lang.management.ManagementFactory;

public class Main {
  public static void main(String args[]) throws Exception {
    final RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
    final InterProcessMutex lock = new InterProcessMutex(client, "/path/to/lock");
    final String processID = ManagementFactory.getRuntimeMXBean().getName();
    
    try {
      client.start();
      lock.acquire();
      System.out.println(processID + " acquired the lock");
      Thread.sleep(1000); // simulates other work being done
      System.out.println(processID + " is releasing the lock");
    } finally {
      if (lock.isAcquiredInThisProcess()) {
        lock.release();
      }
      client.close();
    }
  }
}

Linearizable Counters

We can create a znode for our counter and use Curator to commit a dummy transaction:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Main {
  private static final String HOST = "127.0.0.1:2181";
  
  public static void main(String args[]) throws Exception {
    final RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    try (final CuratorFramework client = CuratorFrameworkFactory.newClient(HOST, retryPolicy)) {
      client.start();
      client.createContainers("/path/to/counter");

      final int version = client.inTransaction().
        setData().forPath("/path/to/counter").and().
        commit().iterator().next().
        getResultStat().getVersion();
      
      System.out.println("Version: " + version);
    }
  }
}