At the heart of any big data system is a plethora of processes and algorithms that run in parallel to crunch data and produce results that would have taken ages if they were run in a sequential manner. Parallel computing is what enables companies like Google to index the Internet and provide big data systems like email, video streaming, etc. Once workloads can be distributed effectively over multiple processes, scaling the processing horizontally becomes an easier task. In this chapter, we will explore how to parallelize work among concurrent processing units; such concepts apply for the most part whether said processing units are concurrent threads in the same process, or multiple processes running on the same machine or on multiple machines. If you haven’t already read about process management and scheduling in Sect. 3.4 of Chap. 3, now would be a good time to visit that.
Note: The examples below are abridged; the book contains more details.
In this chapter, we will be mainly using Java to explore parallel computing concepts that, given they’re supported, can be easily translated into other programming languages.
Note: Code examples in this chapter may sacrifice best OOP practices, like encapsulation, for demonstration purposes and brevity.
# Example 1 - single-threaded logic
def do_work():
print('Hello, world!')
do_work()
# Example 2 - multi-threaded logic
from threading import Thread
for t in (Thread(target=do_work) for _ in range(10)):
t.start()
class Task implements Runnable {
static boolean isDone = false;
public void run() {
try {
System.out.println("Doing work...");
Thread.sleep(10); // do some work
isDone = true;
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
class Poller implements Runnable {
public void run() {
int iterations = 0;
System.out.println("Waiting for work to be done...");
while (!Task.isDone) { // bad way to wait for a signal
++iterations;
}
System.out.println("Polled " + iterations + " times");
}
}
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new Poller()).start();
new Thread(new Task()).start();
Thread.sleep(1000); // ad-hoc wait till threads finish
}
}
Here’s the new code snippet with a single keyword addition, volatile
,
to fix the infinite-loop issue:
class Task implements Runnable {
volatile static boolean isDone = false;
public void run() {
try {
System.out.println("Doing work...");
Thread.sleep(10); // do some work
isDone = true;
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
class Poller implements Runnable {
public void run() {
int iterations = 0;
System.out.println("Waiting for work to be done...");
while (!Task.isDone) { // bad way to wait for a signal
++iterations;
}
System.out.println("Polled " + iterations + " times");
}
}
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new Poller()).start();
new Thread(new Task()).start();
Thread.sleep(1000); // ad-hoc wait till threads finish
}
}
More importantly, declaring a shared resource as volatile
can be
misleading as it may give a false sense of thread safety; it’s crucial
to know that the mere act of sprinkling the keyword volatile
on fields
doesn’t guarantee thread safety and definitely doesn’t protect against
concurrency issues. Let’s take the following simple task as an example:
class Task implements Runnable {
volatile static int sum = 0;
public void run() { ++sum; }
}
public class Main {
public static void main(String args[]) throws Exception {
Task task = new Task();
for (int i = 0; i < 10000; i++) {
new Thread(task).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("sum: " + Task.sum);
}
}
Here’s the new code snippet with a single keyword addition, synchronized
,
to fix the race condition:
class Task implements Runnable {
static int sum = 0;
synchronized public void run() { ++sum; }
}
public class Main {
public static void main(String args[]) throws Exception {
Task task = new Task();
for (int i = 0; i < 10000; i++) {
new Thread(task).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("sum: " + Task.sum);
}
}
To illustrate how subtle concurrency issues are and how hard they are to miss, we will make a very small change that may look extremely inconspicuous:
class Task implements Runnable {
static int sum = 0;
synchronized public void run() { ++sum; }
}
public class Main {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 10000; i++) {
new Thread(new Task()).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("sum: " + Task.sum);
}
}
class Task implements Runnable {
static int sum = 0;
public void run() {
synchronized(this) {
++sum;
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
Task task = new Task();
for (int i = 0; i < 10000; i++) {
new Thread(task).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("sum: " + Task.sum);
}
}
Another example of ineffectual synchronization, that’s not applicable in Java but can happen in C#, is demonstrated below:
using System;
using System.Threading;
class Program
{
static int sum = 0;
static void DoWork()
{
lock ((object) 0)
{
for (int i = 0; i < 10000; i++)
{
++sum;
}
}
}
static void Main()
{
for (int i = 0; i < 4; i++)
{
new Thread(DoWork).Start();
}
Thread.Sleep(1000); // ad-hoc wait till threads finish
Console.WriteLine("C# sum: " + sum);
}
}
class DeepThoughtTask implements Runnable {
static int finalResult = 0;
private int computeResult() {
return 1; // assume this is a long-running task
}
public void run() {
long partialResult = computeResult(); // lock-free
synchronized (DeepThoughtTask.class) {
finalResult += partialResult;
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
DeepThoughtTask task = new DeepThoughtTask();
for (int i = 0; i < 42; i++) {
new Thread(task).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("answer: " + DeepThoughtTask.finalResult);
}
}
class DeepThoughtTask implements Runnable {
static int finalResult = 0;
private int computeResult() {
return 1; // assume this is a long-running task
}
public void run() {
long partialResult = computeResult(); // lock-free
synchronized (DeepThoughtTask.class) {
finalResult += partialResult;
}
}
}
class ProcessorHog implements Runnable {
public void run() {
synchronized (DeepThoughtTask.class) {
try {
Thread.sleep(3000); // keep the CPU busy
} catch(InterruptedException ex) {
System.out.println(ex);
}
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new ProcessorHog()).start();
Thread.sleep(1000); // simulates other work being done
DeepThoughtTask task = new DeepThoughtTask();
for (int i = 0; i < 42; i++) {
new Thread(task).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("answer: " + DeepThoughtTask.finalResult);
}
}
Here’s another example for issues that can occur due to lock objects being visible outside the enclosing class where they belong:
class A implements Runnable {
public void run() {
synchronized (A.class) {
System.out.println("A has A.class and now wants B.class");
try {
Thread.sleep(500); // simulates some work being done
} catch (InterruptedException ex) {
System.out.println(ex);
}
synchronized (B.class) {
System.out.println("A got both locks and made progress");
}
}
}
}
class B implements Runnable {
public void run() {
synchronized (B.class) {
System.out.println("B has B.class and now wants A.class");
synchronized (A.class) {
System.out.println("B got both locks and made progress");
}
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new A()).start();
new Thread(new B()).start();
Thread.sleep(1000); // ad-hoc wait till threads finish
}
}
Let’s revisit one of the previous examples and make use of the AtomicInteger
class:
import java.util.concurrent.atomic.AtomicInteger;
class DeepThoughtTask implements Runnable {
static AtomicInteger finalResult = new AtomicInteger(0);
private int computeResult() {
return 1; // assume this is a long-running task
}
public void run() {
// This is lock-free on modern CPUs
finalResult.addAndGet(computeResult());
}
}
public class Main {
public static void main(String args[]) throws Exception {
DeepThoughtTask task = new DeepThoughtTask();
for (int i = 0; i < 42; i++) {
new Thread(task).start();
}
Thread.sleep(1000); // ad-hoc wait till threads finish
System.out.println("answer: " + DeepThoughtTask.finalResult);
}
}
import java.util.concurrent.atomic.AtomicInteger;
class DeepThoughtTask implements Runnable {
static AtomicInteger finalResult = new AtomicInteger(0);
private int computeResult() {
return 1; // assume this is a long-running task
}
public void run() {
// This is lock-free on modern CPUs
finalResult.addAndGet(computeResult());
}
}
public class Main {
public static void main(String args[]) throws Exception {
DeepThoughtTask task = new DeepThoughtTask();
Thread[] threads = new Thread[42];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(task);
threads[i].start(); // fork; non-blocking
}
for (int i = 0; i < threads.length; i++) {
threads[i].join(); // wait; blocking
}
System.out.println("answer: " + DeepThoughtTask.finalResult);
}
}
Let’s talk a look at an example of a bad implementation; first, let’s examine our shared resource, which is a bounded queue in this case that’s protected with busy waiting:
class Shared {
static final int MAX_BUFFER_SIZE = 3;
static Queue<String> buffer = new ArrayDeque<>();
private static volatile boolean shouldWait = true;
static void waitUntilNotified() {
while (shouldWait);
shouldWait = true;
}
static void notifyWaitingThread() {
shouldWait = false;
}
}
The consumer class in our simple example consumes a message from the queue and prints it if there’s one; otherwise it waits:
class Consumer implements Runnable {
public void run() {
while (true) {
if (Shared.buffer.size() == 0) {
Shared.waitUntilNotified();
}
consume();
if (shouldNotifyProducers()) {
Shared.notifyWaitingThread();
}
}
}
private void consume() {
System.out.println("Consumed: " + Shared.buffer.remove());
}
private boolean shouldNotifyProducers() {
return Shared.buffer.size() == Shared.MAX_BUFFER_SIZE - 1;
}
}
class Producer implements Runnable {
private static int i = 0;
public void run() {
while (true) {
if (Shared.buffer.size() == Shared.MAX_BUFFER_SIZE) {
Shared.waitUntilNotified();
}
Shared.buffer.add(produce());
if (shouldNotifyConsumers()) {
Shared.notifyWaitingThread();
}
}
}
private String produce() {
return String.valueOf(i++);
}
private boolean shouldNotifyConsumers() {
return Shared.buffer.size() == 1;
}
}
Running the following program will result into a race condition:
// { autofold
class Shared {
static final int MAX_BUFFER_SIZE = 3;
static Queue<String> buffer = new ArrayDeque<>();
private static volatile boolean shouldWait = true;
static void waitUntilNotified() {
while (shouldWait);
shouldWait = true;
}
static void notifyWaitingThread() {
shouldWait = false;
}
}
class Consumer implements Runnable {
public void run() {
while (true) {
if (Shared.buffer.size() == 0) {
Shared.waitUntilNotified();
}
consume();
if (shouldNotifyProducers()) {
Shared.notifyWaitingThread();
}
}
}
private void consume() {
System.out.println("Consumed: " + Shared.buffer.remove());
}
private boolean shouldNotifyProducers() {
return Shared.buffer.size() == Shared.MAX_BUFFER_SIZE - 1;
}
}
class Producer implements Runnable {
private static int i = 0;
public void run() {
while (true) {
if (Shared.buffer.size() == Shared.MAX_BUFFER_SIZE) {
Shared.waitUntilNotified();
}
Shared.buffer.add(produce());
if (shouldNotifyConsumers()) {
Shared.notifyWaitingThread();
}
}
}
private String produce() {
return String.valueOf(i++);
}
private boolean shouldNotifyConsumers() {
return Shared.buffer.size() == 1;
}
}
// }
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new Producer(), "produce").start();
new Thread(new Consumer(), "consume").start();
}
}
In Java, it’s idiomatic to use wait
and notify
, which require acquiring
a lock on the object for which those methods are called:
class Shared {
static final int MAX_BUFFER_SIZE = 3;
static Queue<String> buffer = new ArrayDeque<>();
private static final Object lock = new Object();
static void waitUntilNotified() {
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
static void notifyWaitingThread() {
synchronized (lock) {
lock.notify();
}
}
}
// { autofold
class Consumer implements Runnable {
public void run() {
while (true) {
if (Shared.buffer.size() == 0) {
Shared.waitUntilNotified();
}
consume();
if (shouldNotifyProducers()) {
Shared.notifyWaitingThread();
}
}
}
private void consume() {
System.out.println("Consumed: " + Shared.buffer.remove());
}
private boolean shouldNotifyProducers() {
return Shared.buffer.size() == Shared.MAX_BUFFER_SIZE - 1;
}
}
class Producer implements Runnable {
private static int i = 0;
public void run() {
while (true) {
if (Shared.buffer.size() == Shared.MAX_BUFFER_SIZE) {
Shared.waitUntilNotified();
}
Shared.buffer.add(produce());
if (shouldNotifyConsumers()) {
Shared.notifyWaitingThread();
}
}
}
private String produce() {
return String.valueOf(i++);
}
private boolean shouldNotifyConsumers() {
return Shared.buffer.size() == 1;
}
}
// }
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new Producer(), "produce").start();
new Thread(new Consumer(), "consume").start();
}
}
The following program fixes the deadlock issue and works correctly:
import java.util.ArrayDeque;
import java.util.Queue;
class Shared {
private static final int MAX_BUFFER_SIZE = 3;
private static Queue<String> buffer = new ArrayDeque<>();
private static final Object lock = new Object();
static String remove() throws InterruptedException {
final String message;
synchronized (lock) {
while (buffer.size() == 0) {
lock.wait();
}
message = buffer.remove();
lock.notify();
}
return message;
}
static void add(String message) throws InterruptedException {
synchronized (lock) {
while (buffer.size() == MAX_BUFFER_SIZE) {
lock.wait();
}
buffer.add(message);
lock.notify();
}
}
}
class Consumer implements Runnable {
public void run() {
while (true) {
try {
System.out.println("Consumed: " + Shared.remove());
} catch (Exception ex) {
System.out.println(ex);
}
}
}
}
class Producer implements Runnable {
private static int i = 0;
public void run() {
while (true) {
try {
Shared.add(String.valueOf(i++));
} catch (Exception ex) {
System.out.println(ex);
}
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
new Thread(new Producer(), "Producer").start();
new Thread(new Consumer(), "Consumer").start();
}
}
Here’s an example where we produce a dummy value to write to each element of the cached array so that its sum is either 0 (initial value) or its length (after any complete write operation):
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
class Simulator {
static void simulateWork() {
try {
Thread.sleep(3); // simulates work being done
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
class Shared {
static final Map<String, int[]> cache = new HashMap<>();
}
class Producer {
protected int produce() {
Simulator.simulateWork();
return 1;
}
}
class Writer extends Producer implements Runnable {
public void run() {
int[] referenceToValue = Shared.cache.get("key");
for (int i = 0; i < referenceToValue.length; i++) {
// Update value in-place; sum will add up to length
referenceToValue[i] = produce();
}
}
}
class Consumer {
protected void consume(int[] value) {
int sum = Arrays.stream(value).sum();
if (sum != 0 && sum != value.length) {
throw new IllegalStateException("Partial sum: " + sum);
}
Simulator.simulateWork();
}
}
class Reader extends Consumer implements Runnable {
public void run() {
consume(Shared.cache.get("key"));
}
}
public class Main {
private static long runTrial() throws Exception {
long startTime = System.nanoTime();
final Thread[] threads = new Thread[1000];
Shared.cache.put("key", new int[20]); // sum = 0 at this time
threads[0] = new Thread(new Writer(), "Writer");
threads[0].start();
for (int i = 1; i < threads.length; i++) {
threads[i] = new Thread(new Reader(), "Reader #" + i);
threads[i].start();
}
for (int i = 1; i < threads.length; i++) {
threads[i].join();
}
return System.nanoTime() - startTime;
}
public static void main(String args[]) throws Exception {
long[] trials = new long[11];
for (int i = 0; i < trials.length; i++) {
trials[i] = runTrial();
}
Arrays.sort(trials);
long median = trials[trials.length / 2];
double average = Arrays.stream(trials).
average().
getAsDouble();
System.out.printf(
"Median time in nanoseconds: %1$,d\n",
median);
System.out.printf(
"Average time in nanoseconds: %1$,.2f\n",
average);
}
}
In order to return the correct value, we may want to make the cache a mutually exclusive resource by making the following changes to the program:
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
class Writer extends Producer implements Runnable {
public void run() {
synchronized (Shared.cache) {
int[] referenceToValue = Shared.cache.get("key");
for (int i = 0; i < referenceToValue.length; i++) {
// Update value in-place; sum will add up to length
referenceToValue[i] = produce();
}
}
}
}
class Reader extends Consumer implements Runnable {
public void run() {
synchronized (Shared.cache) {
consume(Shared.cache.get("key"));
}
}
}
// { autofold
class Simulator {
static void simulateWork() {
try {
Thread.sleep(3); // simulates work being done
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
class Shared {
static final Map<String, int[]> cache = new HashMap<>();
}
class Producer {
protected int produce() {
Simulator.simulateWork();
return 1;
}
}
class Consumer {
protected void consume(int[] value) {
int sum = Arrays.stream(value).sum();
if (sum != 0 && sum != value.length) {
throw new IllegalStateException("Partial sum: " + sum);
}
Simulator.simulateWork();
}
}
public class Main {
private static long runTrial() throws Exception {
long startTime = System.nanoTime();
final Thread[] threads = new Thread[1000];
Shared.cache.put("key", new int[20]); // sum = 0 at this time
threads[0] = new Thread(new Writer(), "Writer");
threads[0].start();
for (int i = 1; i < threads.length; i++) {
threads[i] = new Thread(new Reader(), "Reader #" + i);
threads[i].start();
}
for (int i = 1; i < threads.length; i++) {
threads[i].join();
}
return System.nanoTime() - startTime;
}
public static void main(String args[]) throws Exception {
long[] trials = new long[11];
for (int i = 0; i < trials.length; i++) {
trials[i] = runTrial();
}
Arrays.sort(trials);
long median = trials[trials.length / 2];
double average = Arrays.stream(trials).
average().
getAsDouble();
System.out.printf(
"Median time in nanoseconds: %1$,d\n",
median);
System.out.printf(
"Average time in nanoseconds: %1$,.2f\n",
average);
}
}
// }
Of course there are many improvements we can make here to reduce contention; for example, following the best practice of reducing the time a thread spends inside a critical section:
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
class Reader extends Consumer implements Runnable {
public void run() {
int[] deepCopy;
synchronized (Shared.cache) {
int[] value = Shared.cache.get("key");
deepCopy = Arrays.copyOf(value, value.length);
}
consume(deepCopy);
}
}
// { autofold
class Writer extends Producer implements Runnable {
public void run() {
synchronized (Shared.cache) {
int[] referenceToValue = Shared.cache.get("key");
for (int i = 0; i < referenceToValue.length; i++) {
// Update value in-place; sum will add up to length
referenceToValue[i] = produce();
}
}
}
}
class Simulator {
static void simulateWork() {
try {
Thread.sleep(3); // simulates work being done
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
class Shared {
static final Map<String, int[]> cache = new HashMap<>();
}
class Producer {
protected int produce() {
Simulator.simulateWork();
return 1;
}
}
class Consumer {
protected void consume(int[] value) {
int sum = Arrays.stream(value).sum();
if (sum != 0 && sum != value.length) {
throw new IllegalStateException("Partial sum: " + sum);
}
Simulator.simulateWork();
}
}
public class Main {
private static long runTrial() throws Exception {
long startTime = System.nanoTime();
final Thread[] threads = new Thread[1000];
Shared.cache.put("key", new int[20]); // sum = 0 at this time
threads[0] = new Thread(new Writer(), "Writer");
threads[0].start();
for (int i = 1; i < threads.length; i++) {
threads[i] = new Thread(new Reader(), "Reader #" + i);
threads[i].start();
}
for (int i = 1; i < threads.length; i++) {
threads[i].join();
}
return System.nanoTime() - startTime;
}
public static void main(String args[]) throws Exception {
long[] trials = new long[11];
for (int i = 0; i < trials.length; i++) {
trials[i] = runTrial();
}
Arrays.sort(trials);
long median = trials[trials.length / 2];
double average = Arrays.stream(trials).
average().
getAsDouble();
System.out.printf(
"Median time in nanoseconds: %1$,d\n",
median);
System.out.printf(
"Average time in nanoseconds: %1$,.2f\n",
average);
}
}
// }
Can we make it run faster? The answer — in this case — is a joyful yes; we modify the program to use a reader-writer lock:
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Shared {
static final Map<String, int[]> cache = new HashMap<>();
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
static final Lock readLock = lock.readLock();
static final Lock writeLock = lock.writeLock();
}
class Writer extends Producer implements Runnable {
public void run() {
Shared.writeLock.lock();
try {
int[] referenceToValue = Shared.cache.get("key");
for (int i = 0; i < referenceToValue.length; i++) {
// Update value in-place; sum will add up to length
referenceToValue[i] = produce();
}
} finally {
Shared.writeLock.unlock();
}
}
}
class Reader extends Consumer implements Runnable {
public void run() {
Shared.readLock.lock();
try {
consume(Shared.cache.get("key"));
} finally {
Shared.readLock.unlock();
}
}
}
// { autofold
class Simulator {
static void simulateWork() {
try {
Thread.sleep(3); // simulates work being done
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
class Producer {
protected int produce() {
Simulator.simulateWork();
return 1;
}
}
class Consumer {
protected void consume(int[] value) {
int sum = Arrays.stream(value).sum();
if (sum != 0 && sum != value.length) {
throw new IllegalStateException("Partial sum: " + sum);
}
Simulator.simulateWork();
}
}
public class Main {
private static long runTrial() throws Exception {
long startTime = System.nanoTime();
final Thread[] threads = new Thread[1000];
Shared.cache.put("key", new int[20]); // sum = 0 at this time
threads[0] = new Thread(new Writer(), "Writer");
threads[0].start();
for (int i = 1; i < threads.length; i++) {
threads[i] = new Thread(new Reader(), "Reader #" + i);
threads[i].start();
}
for (int i = 1; i < threads.length; i++) {
threads[i].join();
}
return System.nanoTime() - startTime;
}
public static void main(String args[]) throws Exception {
long[] trials = new long[11];
for (int i = 0; i < trials.length; i++) {
trials[i] = runTrial();
}
Arrays.sort(trials);
long median = trials[trials.length / 2];
double average = Arrays.stream(trials).
average().
getAsDouble();
System.out.printf(
"Median time in nanoseconds: %1$,d\n",
median);
System.out.printf(
"Average time in nanoseconds: %1$,.2f\n",
average);
}
}
// }
This kind of locking is useful when one method acquires the lock and calls another that also acquires said lock; for example:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ReentrantLockExample {
private final Lock lock = new ReentrantLock();
public void foo() {
lock.lock();
try {
// ...
bar();
} finally {
lock.unlock();
}
}
public void bar() {
lock.lock();
try {
// ...
} finally {
lock.unlock();
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
new ReentrantLockExample().foo();
}
}
We may use a binary semaphore to implement a lock for mutual exclusion as in the following code snippet:
import java.util.concurrent.Semaphore;
class ReentrantLockExample {
private final Semaphore lock = new Semaphore(1);
public void foo() throws InterruptedException {
lock.acquire();
try {
// ...
bar();
} finally {
lock.release();
}
}
public void bar() throws InterruptedException {
lock.acquire();
try {
// ...
} finally {
lock.release();
}
}
}
public class Main {
public static void main(String args[]) throws Exception {
new ReentrantLockExample().foo();
}
}
That doesn’t mean that reentrant locks are safer than non-reentrant ones:
import java.util.concurrent.locks.ReentrantLock;
class NonReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
public void foo() throws InterruptedException {
assert !lock.isHeldByCurrentThread() :
"lock is held by current thread";
lock.lock();
try {
// ...
threadUnsafeBar();
} finally {
lock.unlock();
}
}
public void bar() throws InterruptedException {
assert !lock.isHeldByCurrentThread() :
"lock is held by current thread";
lock.lock();
try {
threadUnsafeBar();
} finally {
lock.unlock();
}
}
private void threadUnsafeBar() {
assert lock.isHeldByCurrentThread() :
"caller must hold the lock first";
// ...
}
}
public class Main {
public static void main(String args[]) throws Exception {
new NonReentrantLockExample().foo();
}
}
The locks created using the synchronized
keyword in Java are reentrant;
they are also known as intrinsic or monitor locks. Here’s how reentrant
synchronization works:
class ReentrantIntrinsicLockExample {
private final Object intrinsicLock = new Object();
public void foo() {
synchronized (intrinsicLock) {
// ...
bar();
}
}
public void bar() {
synchronized (intrinsicLock) {
// ...
}
}
}
A fixed thread pool maintain the same number of threads even if no task is running. Here’s an example of a fixed thread pool in Java:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String args[]) throws Exception {
final ExecutorService threadPool = Executors.newFixedThreadPool(3);
final long startTime = System.nanoTime();
for (int i = 0; i < 10; i++) {
threadPool.submit(() ->doWork(startTime));
}
threadPool.shutdown();
if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("Thread pool terminated gracefully.");
} else {
System.err.println("Thread pool timed out!");
}
}
private static void doWork(final long startTime) {
System.out.println("Timestamp: " + (System.nanoTime() - startTime));
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
Thread pools can be also be elastic: they scale with increased demand to run parallel tasks and shrink when said demand diminishes. An example of such thread pool is called a cached thread pool; to demonstrate the difference, we replace the statement that creates the thread pool in the previous example with this one:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String args[]) throws Exception {
final ExecutorService threadPool = Executors.newCachedThreadPool();
final long startTime = System.nanoTime();
for (int i = 0; i < 10; i++) {
threadPool.submit(() ->doWork(startTime));
}
threadPool.shutdown();
if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("Thread pool terminated gracefully.");
} else {
System.err.println("Thread pool timed out!");
}
}
private static void doWork(final long startTime) {
System.out.println("Timestamp: " + (System.nanoTime() - startTime));
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
Another way to create an elastic thread pool is to use a work-stealing one, which allows worker threads with nothing left to do to steal work from those that are still busy with work. To demonstrate the difference:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String args[]) throws Exception {
final ExecutorService threadPool = Executors.newWorkStealingPool(3);
final long startTime = System.nanoTime();
for (int i = 0; i < 10; i++) {
threadPool.submit(() ->doWork(startTime));
}
threadPool.shutdown();
if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("Thread pool terminated gracefully.");
} else {
System.err.println("Thread pool timed out!");
}
}
private static void doWork(final long startTime) {
System.out.println("Timestamp: " + (System.nanoTime() - startTime));
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
Here’s an example where we create three schedulers and run them using a thread pool that always keeps 3 threads in the pool (even if they’re idle):
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String args[]) throws Exception {
final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
final long startTime = System.nanoTime();
for (int i = 0; i < 3; i++) {
final int id = i;
threadPool.scheduleAtFixedRate(() ->
doWork(id, startTime), 0, 100, TimeUnit.MILLISECONDS);
}
threadPool.schedule(threadPool::shutdown, 300, TimeUnit.MILLISECONDS);
if (threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("Thread pool terminated gracefully.");
} else {
System.err.println("Thread pool timed out!");
}
}
private static void doWork(final int id, final long start) {
System.out.println("ID: " + id + "\tTimestamp: " + (System.nanoTime() - start));
}
}
Here’s an example that shows how to return the results of tasks run in parallel:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
public class Main {
public static void main(String args[]) throws Exception {
final ExecutorService threadPool = Executors.newCachedThreadPool();
final List<Future<Integer>> futures = new ArrayList<>();
final int[] data = IntStream.range(0, 10000).toArray();
int sum = 0;
for (int i = 0, step = 1000; i < data.length; i += step) {
final IntStream chunk = Arrays.stream(data, i, i + step);
futures.add(threadPool.submit(chunk::sum));
}
for (final Future<Integer> f : futures) {
sum += f.get();
}
System.out.println("Parallel Sum: " + sum);
threadPool.shutdown();
}
}
The previous example can be simplified using parallel streams to be much more succinct:
import java.util.stream.IntStream;
public class Main {
public static void main(String args[]) throws Exception {
System.out.println("Parallel Sum: " +
IntStream.range(0, 10000).parallel().sum());
}
}
Before delving into the details of those features, let’s take a look at a familiar example, rewritten using ParSeq this time:
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.trace.TraceUtil;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
public class Main {
public static void main(String args[]) throws Exception {
final int availableProcessors =
Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(availableProcessors + 1);
final Engine engine = new EngineBuilder().
setTaskExecutor(scheduler).
setTimerScheduler(scheduler).
build();
final int[] data = IntStream.range(0, 10000).toArray();
final int chunkSize = data.length / 4;
final Task<Integer> sum = Task.par(
Task.callable(
"a",
Arrays.stream(data, 0 * chunkSize, 1 * chunkSize)::sum),
Task.callable(
"b",
Arrays.stream(data, 1 * chunkSize, 2 * chunkSize)::sum),
Task.callable(
"c",
Arrays.stream(data, 2 * chunkSize, 3 * chunkSize)::sum),
Task.callable(
"d",
Arrays.stream(data, 3 * chunkSize, 4 * chunkSize)::sum)
).map("sum", (a, b, c, d) -> a + b + c + d);
engine.run(sum);
sum.await();
System.out.println("Parallel Sum: " + sum.get());
engine.shutdown();
scheduler.shutdown();
}
}
To print a task’s trace as JSON:
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.trace.TraceUtil;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
public class Main {
public static void main(String args[]) throws Exception {
final int availableProcessors =
Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(availableProcessors + 1);
final Engine engine = new EngineBuilder().
setTaskExecutor(scheduler).
setTimerScheduler(scheduler).
build();
final int[] data = IntStream.range(0, 10000).toArray();
final int chunkSize = data.length / 4;
final Task<Integer> sum = Task.par(
Task.callable(
"a",
Arrays.stream(data, 0 * chunkSize, 1 * chunkSize)::sum),
Task.callable(
"b",
Arrays.stream(data, 1 * chunkSize, 2 * chunkSize)::sum),
Task.callable(
"c",
Arrays.stream(data, 2 * chunkSize, 3 * chunkSize)::sum),
Task.callable(
"d",
Arrays.stream(data, 3 * chunkSize, 4 * chunkSize)::sum)
).map("sum", (a, b, c, d) -> a + b + c + d);
engine.run(sum);
sum.await();
System.out.println("Parallel Sum: " + sum.get());
engine.shutdown();
scheduler.shutdown();
System.out.println(TraceUtil.getJsonTrace(sum));
}
}
In the example below, we choose to reuse the same thread pool from which ParSeq gets its dedicated thread:
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.trace.TraceUtil;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
public class Main {
public static void main(String args[]) throws Exception {
final int availableProcessors =
Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(availableProcessors + 1);
final Engine engine = new EngineBuilder().
setTaskExecutor(scheduler).
setTimerScheduler(scheduler).
build();
final int[] data = IntStream.range(0, 10000).toArray();
final int chunkSize = data.length / 4;
final Task<Integer> sum = Task.par(
Task.blocking(
"a",
Arrays.stream(data, 0 * chunkSize, 1 * chunkSize)::sum,
scheduler),
Task.blocking(
"b",
Arrays.stream(data, 1 * chunkSize, 2 * chunkSize)::sum,
scheduler),
Task.blocking(
"c",
Arrays.stream(data, 2 * chunkSize, 3 * chunkSize)::sum,
scheduler),
Task.blocking(
"d",
Arrays.stream(data, 3 * chunkSize, 4 * chunkSize)::sum,
scheduler)
).map("sum", (a, b, c, d) -> a + b + c + d);
engine.run(sum);
sum.await();
System.out.println("Parallel Sum: " + sum.get());
engine.shutdown();
scheduler.shutdown();
System.out.println(TraceUtil.getJsonTrace(sum));
}
}
ParSeq really shines when used to perform asynchronous operations (e.g., I/O) in parallel, especially in the context of a large-scale system that requires special consideration for fault tolerance and other design requirements. Let’s take the following code snippet as an example:
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.httpclient.HttpClient;
import com.linkedin.parseq.trace.TraceUtil;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String args[]) throws Exception {
final int availableProcessors =
Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(availableProcessors + 1);
final Engine engine = new EngineBuilder().
setTaskExecutor(scheduler).
setTimerScheduler(scheduler).
build();
final Task<Task<Void>> get = Task.par(
getContentType("a://a"), // causes an exception
getContentType("http://www.bing.com"),
getContentType("http://www.facebook.com"),
getContentType("http://www.google.com"),
getContentType("http://www.yahoo.com"))
.map((a, b, f, g, y) -> print(a, b, f, g, y));
engine.run(Task.flatten(get));
get.await();
engine.shutdown();
scheduler.shutdown();
System.out.println(TraceUtil.getJsonTrace(get));
}
private static Task<String> getContentType(String url) {
return HttpClient.get(url).task()
.withTimeout(1, TimeUnit.SECONDS)
.toTry()
.map(t -> t.isFailed() ?
"Failed to get " + url : t.get().getContentType());
}
private static Task<Void> print(String... strings) {
return Task.action(() -> {
for (final String s : strings) System.out.println(s);
});
}
}
In Java, we will use the java.nio.channels.FileLock
class to demonstrate
how to use it as a mutex that protects a critical section; this time around,
only a single process can be inside said critical section:
import java.lang.management.ManagementFactory;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import static java.nio.file.StandardOpenOption.*;
public class Main {
public static void main(String args[]) throws Exception {
final Path lockPath =
FileSystems.getDefault().getPath(".lock");
final String processID =
ManagementFactory.getRuntimeMXBean().getName();
try (final FileLock lock =
FileChannel.open(lockPath, WRITE, CREATE).lock()) {
System.out.println(processID + " acquired the lock");
Thread.sleep(1000); // simulates other work being done
System.out.println(processID + " is releasing the lock");
}
}
}
java Main& java Main& java Main&
Using ZooKeeper v3.4.12 and Curator v4.0.1, we demonstrate below how to use ZooKeeper to rewrite the previous example:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.lang.management.ManagementFactory;
public class Main {
public static void main(String args[]) throws Exception {
final RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
final InterProcessMutex lock = new InterProcessMutex(client, "/path/to/lock");
final String processID = ManagementFactory.getRuntimeMXBean().getName();
try {
client.start();
lock.acquire();
System.out.println(processID + " acquired the lock");
Thread.sleep(1000); // simulates other work being done
System.out.println(processID + " is releasing the lock");
} finally {
if (lock.isAcquiredInThisProcess()) {
lock.release();
}
client.close();
}
}
}
We can create a znode for our counter and use Curator to commit a dummy transaction:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Main {
private static final String HOST = "127.0.0.1:2181";
public static void main(String args[]) throws Exception {
final RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
try (final CuratorFramework client = CuratorFrameworkFactory.newClient(HOST, retryPolicy)) {
client.start();
client.createContainers("/path/to/counter");
final int version = client.inTransaction().
setData().forPath("/path/to/counter").and().
commit().iterator().next().
getResultStat().getVersion();
System.out.println("Version: " + version);
}
}
}