/ Java  

Java Multithreading 19: Condition

This blog will introduce the Condition that is often used with locks in JUC. The content includes:

  • Introduction to Condition
  • Condition function list
  • Condition example

Introduction to Condition

Condition‘s role is to control the lock more precisely. The await() method in Condition is equivalent to the wait() method of Object, the signal() method in Condition is equivalent to the notify() method of Object, and the signalAll() in Condition is equivalent to the notifyAll() method of Object. The difference is that the wait(), notify(), notifyAll() methods in Object are bundled with synchronized lock. Condition needs to be bundled with mutually exclusive lock/shared lock in use.

Condition function list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Causes the current thread to wait until it is signalled or interrupted
void await()

// Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
boolean await(long time, TimeUnit unit)

// Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
long awaitNanos(long nanosTimeout)

// Causes the current thread to wait until it is signalled.
void awaitUninterruptibly()

// Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.
boolean awaitUntil(Date deadline)

// Wakes up one waiting thread.
void signal()

// Wakes up all waiting threads.
void signalAll()

Condition example

Example 1: demonstrate sleep/wake of a thread through Object’s wait(), notify()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ObjectWaitTest {
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");

// acquire ta object's lock
synchronized(ta) {
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();

System.out.println(Thread.currentThread().getName()+" block");
ta.wait();

System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}

public void run() {
synchronized (this) {
System.out.println(Thread.currentThread().getName()+" wake up others");
// wake up thread
notify();
}
}
}
}

Result:

1
2
3
4
main start ta
main block
ta wake up others
main continue

Example 2: demonstrate sleep/wake of a thread through Condition await(), signal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ConditionWait {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();

public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");

// acquire lock
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();

System.out.println(Thread.currentThread().getName()+" block");
condition.await();

System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}

public void run() {
// acquire lock
lock.lock();

try {
System.out.println(Thread.currentThread().getName()+" wakup others");
// wake up other thread wait on condition
condition.signal();
} finally {
// release lock
lock.unlock();
}
}
}
}

Result:

1
2
3
4
main start ta
main block
ta wake up others
main continue

From Example 1 and Example 2, we know that the methods of Condition and Object have the following correspondence:

1
2
3
4
                      Object      Condition  
Sleep wait await
Wake up one thread notify signal
Wake up all thread notifyAll signalAll

In addition to supporting the above functions, Condition is more powerful: it can control the sleep and wake up of multi-thread more finely. For the same lock, we can create multiple Condition and use different Condition in different situations.

For example, if multiple threads read/write the same buffer: when writing data to the buffer, wake up the read thread. When reading data from the buffer, wake up the writing thread. And when the buffer is full the write thread needs to wait. When the buffer is empty, the read thread needs to wait. If you use wait(), notify(), notifyAll() in the Object to implement the buffer, when you need to wake up the read thread after writing data to the buffer, it is not possible to explicitly use notify() or notifyAll() to wake up the read thread, and only wake up all threads through notifyAll() (but notifyAll() can not distinguish whether the wake up thread is a read thread or a write thread). However, through Condition, you can clearly wake up the reader thread.

Example 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[5];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
// acquire lock
lock.lock();

try {
// if buffer is full, wait
// until buffer has space, add x to it
while (count == items.length) {
notFull.await();
}

// put x to buffer
items[putptr] = x;

// increase putptr
// if buffer is full, set to 0
if (++putptr == items.length) {
putptr = 0;
}

count++;

// wake up take thread, as there are items in the buffer
notEmpty.signal();

System.out.println(Thread.currentThread().getName() + " put " + x);
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
try {
// if buffer is empty, wait
// until buffer has items, take it
while (count == 0) {
notEmpty.await();
}

// take x from buffer
Object x = items[takeptr];

if (++takeptr == items.length) {
takeptr = 0;
}

--count;

// wake up put thread, as there are space in the buffer
notFull.signal();

System.out.println(Thread.currentThread().getName() + " take " + x);

return x;
} finally {
lock.unlock();
}
}
}

public class ConditionWait2 {
private static BoundedBuffer bb = new BoundedBuffer();

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new PutThread("p" + i, i).start();
new TakeThread("t" + i).start();
}
}

static class PutThread extends Thread {
private int num;

public PutThread(String name, int num) {
super(name);
this.num = num;
}

public void run() {
try {
Thread.sleep(100);
bb.put(num);
} catch (InterruptedException e) {
}
}
}

static class TakeThread extends Thread {
public TakeThread(String name) {
super(name);
}

public void run() {
try {
Thread.sleep(100);
Integer num = (Integer) bb.take();
} catch (InterruptedException e) {
}
}
}
}

Results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
p1 put  1
p4 put 4
p5 put 5
p0 put 0
p2 put 2
t0 take 1
p3 put 3
t1 take 4
p6 put 6
t2 take 5
p7 put 7
t3 take 0
p8 put 8
t4 take 2
p9 put 9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9
  • BoundedBuffer is a buffer with a capacity of 5, the buffer stores Object objects, and supports multi-threaded read/write buffer. When multiple threads operate on a BoundedBuffer object, they perform mutually exclusive access to the buffer items through the mutex lock. All threads under the same BoundedBuffer object share the two conditions notFull and notEmpty.

notFull is used to control the write buffer, and notEmpty is used to control the read buffer. When the buffer is full, the thread that called put() will execute notFull.await() to wait. When the buffer is not full, it will add the object to the buffer and count+1 the capacity of the buffer. Finally, call notEmpty.signal() buffers the waiting thread on notEmpty() (the thread that calls notEmpty.await()). In short, notFull controls the buffer writing, and when the data is written to the buffer, the waiting thread on notEmpty will be woken up.

In the same way, notEmpty controls the buffer reading, and will wake up the waiting thread on notFull after reading the buffer data.

  • In the main function of ConditionTest2, start 10 write threads to continuously write data to the BoundedBuffer (write 0-9). At the same time, also start 10 read threads to continuously read from the BoundedBuffer data.

  • Brief analysis of the operation results:

1
2
3
4
5
6
7
8
9
10
11
12
13
1, p1 thread writes 1 to the buffer.     The buffer data  | 1 |   |   |   |   |
2, p4 thread writes 4 to the buffer. The buffer data | 1 | 4 | | | |
3, p5 thread writes 5 to the buffer. The buffer data | 1 | 4 | 5 | | |
4, p0 thread writes 0 to the buffer. The buffer data | 1 | 4 | 5 | 0 | |
5, p2 thread writes 2 to the buffer. The buffer data | 1 | 4 | 5 | 0 | 2 |

At this point, the buffer capacity is 5: the buffer is full! If at this time, there is a write thread who wants to write data to the buffer, it will call notFull.await() in put to wait, until buffer is not full

6, t0 thread takes 1 from the buffer. The buffer data | | 4 | 5 | 0 | 2 |
7, p3 thread writes 3 to the buffer. The buffer data | 3 | 4 | 5 | 0 | 2 |
8, t1 thread takes 4 from the buffer. The buffer data | 3 | | 5 | 0 | 2 |
9, p6 thread writes 6 to the buffer. The buffer data | 3 | 6 | 5 | 0 | 2 |
...