1. Introduction
When we execute a task using a thread pool or a dedicated thread, it runs without being aware of other threads. However, there are cases where we want a group of threads to coordinate their actions and wait for each other. In this tutorial, we're going to look at how we can start multiples tasks at the same time using the synchronizers in the java.util.concurrent package.
2. Sample Application
We'll start with the sample application:
public class Worker implements Runnable {
public void run() {
System.out.println("Ready to start.");
doWork();
}
void doWork() {
System.out.println("Doing work.");
}
}
The Worker task is a simple Runnable implementation. It doesn't use any synchronization tool to coordinate its actions with other threads. For example, if we submit ten tasks to a thread pool they start immediately and try to complete. In a moment we'll make these Worker tasks and the underlying threads wait for each other and start simultaneously.
3. Use CountDownLatch
Firstly, we'll use a CountDownLatch to make all participating Worker tasks to wait for each other:
public class Worker implements Runnable {
private final CountDownLatch readySignal;
private final CountDownLatch startSignal;
Worker(CountDownLatch readySignal, CountDownLatch startSignal) {
this.readySignal = readySignal;
this.startSignal = startSignal;
}
public void run() {
try {
System.out.println("Ready to start.");
readySignal.countDown();
startSignal.await();
doWork();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
System.out.println("Interrupted.");
}
}
void doWork() {
System.out.println("Doing work.");
}
}
Here, we're providing two latches to coordinate the start. Firstly, the tasks use readySignal to announce that they're ready. Then they wait on startSignal to perform the actual work - doWork.
We'll now implement the task submission code:
public static void main(String[] args) throws InterruptedException {
final WaitForStart waitForStart = new WaitForStart();
waitForStart.coordinateStart();
}
public void coordinateStart() throws InterruptedException {
final int taskCount = 3;
final CountDownLatch readySignal = new CountDownLatch(taskCount);
final CountDownLatch startSignal = new CountDownLatch(1);
final ExecutorService threadPool = Executors.newFixedThreadPool(taskCount);
for (int i = 0; i < taskCount; ++i) {
threadPool.execute(new Worker(readySignal, startSignal));
}
readySignal.await(); // Wait for all workers to get ready
startSignal.countDown(); // Let all workers proceed
threadPool.shutdown();
}
Here, we're creating two latches. Notice the latch counts passed to the constructors. We initialize readySignal with the task count since all tasks must count down to signal their ready status. On the other hand, we initialize startSignal with one since all tasks must wait on it after getting ready. After submitting the tasks to the thread pool, we start waiting - by invoking readySignal.await - for all tasks to get ready. When all invoke readySignal.countDown, the latch releases the waiting threads - the main thread in our case. Note that at this moment, tasks are blocked waiting on startSignal. Finally, when the main thread invokes startSignal.countDown, the worker threads resume and the tasks start doing their work.
A sample run shows the execution order:
Ready to start.
Ready to start.
Ready to start.
Doing work.
Doing work.
Doing work.
Keep in mind that CountDownLatch requires two different parties: one invoking await and another invoking countDown. In this example, the worker threads invoke startSignal.await, while the main thread invokes startSignal.countDown.
4. Use CyclicBarrier
Next, we'll use CyclicBarrier to make threads start working at the same time. Similar to the previous example, the Worker tasks first declare that they're ready. Then they wait for a signal to do their work:
public class Worker implements Runnable {
private final CyclicBarrier barrier;
Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
public void run() {
try {
System.out.println("Ready to start.");
barrier.await();
doWork();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
System.out.println("Interrupted.");
} catch (BrokenBarrierException ex) {
System.out.println("Broken barrier.");
}
}
public void doWork() {
System.out.println("Doing work.");
}
}
In this Worker implementation, we have a CyclicBarrier instance. We're invoking its await method before doing any actual work in the run method. This invocation serves two purposes. Firstly, it announces that the current task is ready to commence. Secondly, it blocks the current thread until all participating threads reach the same state and get ready.
Let's see the task submission code:
public void coordinateStart() throws InterruptedException {
final int taskCount = 3;
final ExecutorService threadPool = Executors.newFixedThreadPool(taskCount);
final CyclicBarrier barrier = new CyclicBarrier(taskCount,
() -> System.out.println("All ready to continue!"));
for (int i = 0; i < taskCount; ++i) {
threadPool.execute(new Worker(barrier));
}
threadPool.shutdown();
}
In this method, we're creating a CyclicBarrier initialized with the task count and an optional barrier action. Unlike the CountDownLatch example, the main thread doesn't participate in the thread communication. The worker threads resume their execution when all reach the barrier. However, if we require the main thread to control this process, we must increase the barrier count:
public void coordinateStartUsingMain() throws Exception {
final int taskCount = 3;
final ExecutorService threadPool = Executors.newFixedThreadPool(taskCount);
final CyclicBarrier barrier = new CyclicBarrier(taskCount + 1,
() -> System.out.println("All ready to continue!"));
for (int i = 0; i < taskCount; ++i) {
threadPool.execute(new Worker(barrier));
}
barrier.await();
threadPool.shutdown();
}
In this modified version, we're increasing the CyclicBarrier count by one. We're then invoking await after submitting the tasks. This way a worker thread also waits for the main thread in addition to the other workers.
A sample run prints:
Ready to start.
Ready to start.
Ready to start.
All ready to continue!
Doing work.
Doing work.
Doing work.
5. Use Phaser
Finally, we'll look at the Phaser class to enable threads to coordinate their execution.
public class Worker implements Runnable {
private final Phaser phaser;
Worker(Phaser phaser) {
this.phaser = phaser;
}
public void run() {
System.out.println("Ready to start.");
phaser.arriveAndAwaitAdvance();
doWork();
}
public void doWork() {
System.out.println("Doing work.");
}
}
In this implementation, we're declaring a Phaser instance variable. Note that we're invoking Phaser.arriveAndAwaitAdvance to make the current thread wait for the others.
Now we'll look at the task submission:
public void coordinateStart() {
final int taskCount = 3;
final ExecutorService threadPool = Executors.newFixedThreadPool(taskCount);
final Phaser phaser = new Phaser(taskCount);
for (int i = 0; i < taskCount; ++i) {
threadPool.execute(new Worker(phaser));
}
threadPool.shutdown();
}
We're initializing the Phaser instance with the task count. As a result, when all tasks invoke arriveAndAwaitAdvance, they continue with the next phase of computation. This also means that the main thread doesn't have a say in the release of worker threads. We'll next make the main thread to participate in this process:
public void coordinateStartUsingMain() {
final int taskCount = 3;
final ExecutorService threadPool = Executors.newFixedThreadPool(taskCount);
final Phaser phaser = new Phaser(taskCount + 1);
for (int i = 0; i < taskCount; ++i) {
threadPool.execute(new Worker(phaser));
}
phaser.arriveAndAwaitAdvance();
threadPool.shutdown();
}
Here, we're incrementing the party count by one - new Phaser(taskCount + 1). The additional party is the task submission code and the main thread. Consequently, the worker threads can't proceed until the main thread invokes arriveAndAwaitAdvance.
6. Summary
In this tutorial, we've investigated how we can start multiple tasks simultaneously using the Java synchronizers. Throughout the examples, we examined different solutions using the CountDownLatch, CyclicBarrier, and Phaser classes.
Finally, check out the source code for all examples in this tutorial over on Github.