Java - Execute tasks concurrently and wait for completion

In Java, it’s a common task to execute multiple tasks concurrently and wait for all of them to complete. We’ll discuss how to implement this feature using CountDownLatch, CompletableFuture and Project Reactor.

CountDownLatch

When using CountDownLatch, we create a CountDownLatch with the number of tasks to execute. After a task is completed, we use countDown to signal that. The main thread uses await to wait for all tasks to finish.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SimpleExecutor {
public static void main(String[] args) {
int numOfTasks = 10;
CountDownLatch latch = new CountDownLatch(numOfTasks);
IntStream.range(0, numOfTasks)
.mapToObj(count -> new Thread(new Task(latch, count)))
.forEach(Thread::start);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Done!");
}
private static class Task implements Runnable {
private final CountDownLatch latch;
private final int count;
private Task(CountDownLatch latch, int count) {
this.latch = latch;
this.count = count;
}
@Override
public void run() {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
System.out.println(String.format("[%s] Run %d", Thread.currentThread().getName(), count));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
}

Here we manually create the threads, you can also use ExecutorService.

CompletableFuture

When using CompletableFuture, we use supplyAsync to create new CompletableFutures, then use allOf to create a new CompletableFuture which is completed when all the provided CompletableFutures are completed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.allOf(
IntStream.range(0, 10)
.mapToObj(count -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
System.out.println(String.format("[%s] Run %d", Thread.currentThread().getName(), count));
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}))
.collect(Collectors.toList())
.toArray(new CompletableFuture[0]))
.get();
System.out.println("Done!");
}

Project Reactor

We can use Flux and Mono from Project Reactor to run tasks. We create Monos from Callable objects. publishOn(Schedulers.elastic()) makes sure that emitting items from Monos are executed in different threads. Flux.blockLast waits for the Flux to emit all items.

1
2
3
4
5
6
7
8
9
10
11
12
public class ReactorExecutor {
public static void main(String[] args) {
int numOfTasks = 10;
Flux.range(0, numOfTasks).flatMap(index -> Mono.fromCallable(() -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
System.out.println(String.format("[%s] Run %d", Thread.currentThread().getName(), index));
return null;
}).publishOn(Schedulers.elastic())).blockLast();
}
}

See GitHub for the complete source code.

Comments