/ Java  

Java CountDownLatch

The JDK concurrency package provides several very useful tool classes. These tool classes provide us with a means of concurrent process control in the development. This article will introduce how to use CountDownLatch and the internal implementation mechanism based on actual application scenarios.

What is CountDownLatch

CountDownLatch, was introduced in JDK 1.5, allowing one or more threads to wait for other threads to complete operations before executing.

CountDownLatch internally maintains a counter whose initial value is the number of threads. The main thread executes the await method. If the counter is greater than 0, it blocks and waits. When a thread finishes its task, the counter value is decremented. When the counter is 0, it means that all threads have completed their tasks, and the waiting main thread is awakened to continue execution.



Example

The main thread of the application wants to execute after all other threads are completed.

The main class that uses a thread pool to perform tasks for each worker. Responsible for initializing the CountDownLatch, and then wait until all workers complete.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CountDownLatchApplication {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);

List<Worker> workers = new ArrayList<>();
workers.add(new Worker(countDownLatch, "worker1"));
workers.add(new Worker(countDownLatch, "worker2"));
workers.add(new Worker(countDownLatch, "worker3"));

Executor executor = Executors.newFixedThreadPool(workers.size());

for (Worker worker : workers) {
executor.execute(worker);
}

countDownLatch.await();

System.out.println("all workers finished");
}
}

Worker class. Using TimeUnit.SECONDS.sleep to simulate long running task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Worker implements Runnable {
private CountDownLatch countDownLatch;
private String name;

public Worker(CountDownLatch countDownLatch, String name) {
this.countDownLatch = countDownLatch;
this.name = name;
}

@Override
public void run() {
try {
System.out.println(name + "started!");

TimeUnit.SECONDS.sleep(2);

System.out.println(name + "finished!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
}

Output:

1
2
3
4
5
6
7
worker2started!
worker3started!
worker1started!
worker1finished!
worker3finished!
worker2finished!
all workers finished

Principle of the implementation

CountDownLatch implementation is mainly based on the java synchronizer AQS.

It maintains an AQS subclass internally and overrides related methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

Implementation of await

The main thread executes the await method. If the state is not equal to 0 in the tryAcquireShared method and returns -1, it is added to the waiting queue, and the main thread is suspended through LockSupport.park(this).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

Implementation of countDown

The countDown method delegates sync to reduce the state by one, which is to set the state value through the unsafe.compareAndSwapInt method.

1
2
3
public void countDown() {
sync.releaseShared(1);
}

If state is 0, then wake up the main thread suspended in the await method through LockSupport.unpark.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

Difference from CyclicBarrier

  • CyclicBarrier allows a series of threads to wait for each other to reach a point. As the barrier indicates, this point is like a fence. The first arriving thread is blocked in front of the fence. It must wait until all threads have reached before it can pass the fence.
  • CyclicBarrier has a variable parties, which indicates the number of threads that need to be reached. The thread that arrives first calls the barrier.await method to wait. Once the number of threads reaches the number specified by the parties variable, the fence is opened and all threads can pass;
  • CyclicBarrier constructor accepts another Runnable parameter barrierAction, which indicates the action to be taken when the barrier is opened. Null indicates that no action is taken. Note that the action will be executed after the barrier is opened and before all other threads are run.
  • CyclicBarrier is reusable. When the last thread arrives, the fence is opened. After all threads pass, the fence is closed again to enter the next generation;
  • CyclicBarrier.reset method can reset the fence manually, and the waiting thread will receive a BrokenBarrierException
    abnormal.

Conclusion

Through the introduction of this article, I hope everyone can understand the application scenario and working mechanism of CountDownLatch.