/ Java  

Java Multithreading 16: Fair lock - Acquire

This blog introduces the acquisition mechanism of fair lock, and the content includes:

  1. Basic concept
  2. ReentrantLock data structure
  3. Acquire a fair lock (based on JDK 11.0.5)
  • tryAcquire()
  • addWaiter()
  • acquireQueued()
  • selfInterrupt()

The release mechanism of fair lock will be introduced in the following blog.

Basic concept

Before explain the acquisition mechanism of fair lock, we need to understand a few basic concepts.

AQS

Refers to the AbstractQueuedSynchronizer class.

AQS is an abstract class for managing locks in Java. Many public methods of locks are implemented in this class. AQS is a common parent class for exclusive locks (eg, ReentrantLock) and shared locks (eg, Semaphore).

Types of AQS locks

Divided into exclusive locks and shared locks.

  • Exclusive lock - The lock can only be occupied by one thread at a time. A typical example of an exclusive lock is ReentrantLock. In addition, ReentrantReadWriteLock.WriteLock is also an exclusive lock.According to the lock acquisition mechanism, exclusive lock is divided into fair lock and unfair lock.

    • Fair lock is to acquire the lock fairly according to the rule of first-come-first-served through the CLH waiting thread
    • Non-fair lock, when the thread wants to acquire the lock, it will directly acquire the lock regardless of the CLH waiting queue.
  • Shared lock - A lock that can be owned by multiple threads at the same time and can be shared. ReentrantReadWriteLock.ReadLock, CyclicBarrier, CountDownLatch and Semaphore in the JUC package are shared locks. The purpose and principle of these locks will be introduced in detail in later blog.

CLH queue

Craig, Landin, and Hagersten lock queue

The CLH queue is the waiting for lock thread queue in AQS. In multithreading, in order to protect competing resources from being operated by multiple threads at the same time, we often need to protect these resources through locks. In an exclusive lock, competing resources can only be accessed by one thread at a time, other threads need to wait. CLH is to manage the queue of these “waiting for lock” threads.

CLH is a non-blocking FIFO queue. That is to say, when inserting or removing a node in it, it will not block under concurrent conditions, but through spin lock and CAS to ensure the atomicity of node insertion and removal.

CAS function

Compare And Swap

The CAS function is a comparison and exchange function, which is an atomic operation function. That is, the data operated by CAS are all performed in an atomic manner. For example, compareAndSetHead(), compareAndSetTail(), compareAndSetNext() and other functions. Their common feature is that the operations performed by these functions are performed atomically.

ReentrantLock data structure

ReentrantLock UML graph



ReentrantLock implements the Lock interface.

ReentrantLock and Sync are combined. ReentrantLock contains the Sync object. Moreover, Sync is a subclass of AQS. More importantly, Sync has two subclasses FairSync and NonFairSync. ReentrantLock is an exclusive lock. Whether it is a fair lock or an unfair lock depends on whether the sync object is an instance of FairSync or an instance of NonFairSync.

Acquire a fair lock (based on JDK 11.0.5)

From Java Multithreading 15: ReentrantLock example 1, we know that the lock is obtained through lock(). Let’s look at how lock() is implemented to acquire a fair lock.

lock()

lock() is implemented in ReentrantLock

1
2
3
public void lock() {
sync.acquire(1);
}

The current thread acquires the lock through sync.acquire(1). sync is a Sync object. It could either be FairSync or NonfairSync

Explain the meaning of 1 here, it is the parameter to set the lock state. For the exclusive lock, when the lock is in the obtainable state, its state value is 0. When the lock is acquired by the thread for the first time, its state value becomes 1.

Since ReentrantLock (fair lock/unfair lock) is a reentrant lock, an exclusive lock can be acquired by a single thread multiple times, and the state of the lock is +1 every time it is acquired. That is to say, when acquiring the lock for the first time, set the state value of the lock to 1 through acquire(1). When acquiring the lock again, set the state value of the lock to 2; and so on…

Reentrant means that the lock can be acquired multiple times by a single thread.

acquire()

acquire() is implemented in AQS

1
2
3
if (!tryAcquire(arg) && 
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
  1. The current thread first tries to acquire the lock through tryAcquire(). If the acquisition is successful, return directly. If the attempt fails, enter the waiting queue to wait (there may be other thread waiting for the lock in front).
  2. When the current thread attempt fails, first add the current thread to the end of the CLH queue (non-blocking FIFO queue) through addWaiter(Node.EXCLUSIVE). The CLH queue is the thread waiting queue.
  3. After executing addWaiter(Node.EXCLUSIVE), acquireQueued() will be called to acquire the lock. Since ReentrantLock is a fair lock at this time, it will acquire the lock according to the principle of fairness.
  4. When executing acquireQueued(), the current thread will enter the CLH queue to sleep and wait until it acquires the lock before returning! If the current thread was interrupted during sleep waiting, acquireQueued() will return true. At this time, the current thread will call selfInterrupt() to generate an interrupt for itself. As for why you should generate an interrupt for yourself, I will introduce it later.

The above is a general description of acquire(). Below, we divide this function into 4 parts to analyze step by step.

tryAcquire()

1. tryAcquire()
The tryAcquire() of the fair lock is implemented in the FairSync class of ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final boolean tryAcquire(int acquires) {
// get current thread
final Thread current = Thread.currentThread();
// get the exclusive lock state
int c = getState();
// c=0 means the lock is not acquired by any thread
if (c == 0) {
// If the lock has not been acquired by any thread
// Then determine whether the current thread is the first in the CLH queue
// If it is, then acquire the lock, set lock state, and set the lock owner as current thread
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// If the exclusive lock has already acquired by the current thread
// Then update state
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

The role of tryAcquire() is to try to acquire the lock. Note, just try here!

If the attempt succeeds, it returns true. If the attempt fails, it returns false, and then the lock is acquired through other methods. Later we will explain how to acquire the lock step by step in the case of failed attempts.

2. hasQueuedPredecessors()
hasQueuedPredecessors() is implemented in AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}

if (s != null && s.thread != Thread.currentThread())
return true;
}

return false;
}
}

hasQueuedPredecessors() is to determine whether the current thread is at the head of the CLH queue to return whether there is a thread waiting longer than the current thread in AQS.

3. Node
Node is the node of the CLH queue. Node is implemented in AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
private transient volatile Node head;    
private transient volatile Node tail;

static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
// Normally: when successor's thread is blocked, and current thread was released or cancelled
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* Link to predecessor node
*/
volatile Node prev;

/**
* Link to the successor node
*/
volatile Node next;

/**
* The thread that enqueued this node.
*/
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node
*/
final Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

/** Establishes initial head or SHARED marker. */
Node() {}

/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}

/** Constructor used by addConditionWaiter. */
Node(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}

/** CASes waitStatus field. */
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}

/** CASes next field. */
final boolean compareAndSetNext(Node expect, Node update) {
return NEXT.compareAndSet(this, expect, update);
}

final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}

// VarHandle mechanics
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
PREV = l.findVarHandle(Node.class, "prev", Node.class);
THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}

Node is a node of the CLH queue, which stands for thread queue waiting for lock.

  • Each Node will correspond to a thread.
  • Each Node will point to the previous node and the next node through prev and nex`t respectively, which represents the previous waiting thread and the next waiting thread, respectively.
  • Node saves the wait state of the thread through waitStatus.
  • Node uses nextWaiter to distinguish whether a thread is an exclusive lock thread or a shared lock thread. If it is an exclusive lock thread, the value of nextWaiter is EXCLUSIVE; if it is a shared lock thread, the value of nextWaiter is SHARED.

4. compareAndSetState()
compareAndSetState() is implemented in AQS

1
2
3
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}

We need to understand that compareAndSetState(expect, update) is to operate the current thread atomically. If the current thread’s state is expect, then set its state to update.

5. setExclusiveOwnerThread()
setExclusiveOwnerThread() is implemented in AbstractOwnableSynchronizer

1
2
3
4
5
private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

The role of setExclusiveOwnerThread() is to set thread t to the thread that currently owns the exclusive lock.

6. getState(), setState()
getState() and setState() is implemented in AQS

1
2
3
4
5
6
7
8
9
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

state indicates the state of the lock. For exclusive locks, state = 0 indicates that the lock is available (that is, the lock is not held by any thread). Since the exclusive lock in java is reentrant, the value of state can be > 1.

addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE) is to create a current thread Node, and the lock corresponding to the current thread recorded in the Node is an exclusive lock type, and add the node to the end of the CLH queue.

1. addWaiter()
addWaiter() is implemented in AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addWaiter(Node mode) {
// create a Node, correspon to current thread, lock mode is `mode`
Node node = new Node(mode);

for (;;) {
Node oldTail = tail;
// if CLH queue is not empty, then add current thread to the tail of the queue
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}

For fair lock, addWaiter(Node.EXCLUSIVE) will first create a Node, the node type is exclusive lock (Node.EXCLUSIVE) type. Then, add the node to the end of the CLH queue.

2. compareAndSetTail()
compareAndSetTail() is implemented in AQS

1
2
3
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}

compareAndSetTail() also belongs to the CAS function. compareAndSetTail(expect, update) will operate atomically. Its role is to determine whether the tail of the CLH queue is expect. If so, set the tail to update.

acquireQueued()

Earlier, we have added the current thread to the CLH queue. The role of acquireQueued() is to gradually execute the thread of the CLH queue. If the current thread acquires the lock, it returns. Otherwise, the current thread sleeps and does not return until it wakes up and reacquires the lock. Below, we look at the specific flow of acquireQueued().

1. acquireQueued()
acquireQueued() is implemented in AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
// interrupted means in CLH queue, current thread is never interrupted while sleep
boolean interrupted = false;
try {
for (;;) {
// Get the predecessor node
// node correspond to the current thread, here means "get last thread that is wait for the lock"
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}

The purpose of acquireQueued() is to acquire a lock from the queue.

2. shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire() is implemented in AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// status of pred node
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;

if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
  • Please refer to the table below for waitStatus
1
2
3
4
5
CANCELLED[1]  -- Current thread is cancelled
SIGNAL[-1] -- Current thread need to unpard.
CONDITION[-2] -- Current(under COndition sleep) is waiting Condition to be awake
PROPAGATE[-3] -- (Share lock)Other thread acquire "share lock"
[0] -- Current thread doesn't belong to above status
  • shouldParkAfterFailedAcquire() uses the following rules to determine whether the current thread needs to be blocked.
1
2
3
Rule 1: If the status of the predecessor node is SIGNAL, it means that the current node needs to be unparked (wake up), then return true.
Rule 2: If the status of the predecessor node is CANCELLED (ws>0), indicating that the predecessor node has been cancelled, then find a valid (non-CANCELLED state) node through the previous traceback and return false.
Rule 3: If the state of the predecessor node is non-SIGNAL and non-CANCELLED, set the state of the predecessor to SIGNAL and return false.

If rule 1 occurs, that is, the “predecessor node is SIGNAL” state, it means that the current thread needs to be blocked. Next, parkAndCheckInterrupt() will be called to block the current thread, and it will not return from parkAndCheckInterrupt() until it is first awakened.

3. parkAndCheckInterrupt()
parkAndCheckInterrupt() is implemented in AQS

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// Through LockSupport's park() block current thread
LockSupport.park(this);
// return interrupted state
return Thread.interrupted();
}

parkAndCheckInterrupt() is to block the current thread and return to the interrupted state after the thread is woken up.

It will first block the current thread via LockSupport.park(), and then return to the thread’s interrupted status via Thread.interrupted().

Here is how to wake up after the thread is blocked. There are generally 2 situations:

  1. The first case: unpark() wake up. After using the lock, the thread corresponding to the previous node wakes up the current thread through unpark().
  2. The second case: interrupt wake up. Other threads interrupt the current thread through interrupt().

Supplement: park() and unpark() in LockSupport() are similar to those of wait() and notify() in Object, which are blocking/waking up.

Their usage is different, park() and unpark() are lightweight, and wait() and notify() must first obtain synchronization lock through Synchronized.

4. tryAcquire() again
After understanding the shouldParkAfterFailedAcquire() and parkAndCheckInterrupt() functions. We then analyze the for loop part of acquireQueued().

1
2
3
4
5
6
7
final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
  1. Get the predecessor node through node.predecessor().
  2. p == head && tryAcquire(arg)
    • First, determine whether the predecessor node is the CHL header. If so, try to acquire the lock through tryAcquire().
    • In fact, the purpose of this is to let the current thread acquire the lock, but why do we need to judge p==head first? Understanding this is important to understanding the fair lock mechanism, because the reason for this is to ensure fairness!
      • Earlier, we use shouldParkAfterFailedAcquire() to determine whether the current thread needs to be blocked;
      • Then, if the current thread is blocked, parkAndCheckInterrupt() will be called to block the thread. When the thread is unblocked, we return to the interrupted state of the thread. The blocking of the thread is resolved, either because the thread was interrupted or because the other thread called the unpark() function of the thread.
      • Return to p==head here. If the current thread was awakened because another thread called the unpark() function, then the thread that awakened it should be the thread corresponding to its predecessor node (on this point, we will see later in the process of releasing the lock). OK, the previous node called unpark() to wake up the current thread!
    • At this point, it is very simple to understand p==head: the current successor node is the head node of the CLH queue, and after it releases the lock, it is the turn of the current node to acquire the lock. Then, the current node acquires the lock through tryAcquire(). If the acquisition is successful, set the current node as the head node through setHead(node) and return.

In short, if the predecessor node calls unpark() to wake up the current thread and the predecessor node is the CLH queue header, then p==head is satisfied, which is in line with the principle of fairness. Otherwise, if the current thread is woken up because the thread was interrupted, then it is obviously not fair. This is why p==head is to ensure fairness!

selfInterrupt()

selfInterrupt() is implemented in AQS

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

The code for selfInterrupt() is very simple, that is, the current thread generates an interrupt by itself. But why do you need to do this?

This must be analyzed in conjunction with acquireQueued(). If the current thread is interrupted in acquireQueued(), execute selfInterrupt(). Otherwise, it will not execute.

In acquireQueued(), even if the thread is interrupted and woken up in the blocked state to obtain cpu execution rights. However, if there are other threads waiting for the lock in front of the thread, according to the principle of fairness, the thread still cannot acquire the lock. It will block again! The thread blocks again until the thread is woken up by the thread lock in front of it.

In other words, before the thread acquires the lock successfully and actually executes, its interrupt will be ignored and the interrupt flag will be cleared! Because in parkAndCheckInterrupt(), we call Thread.interrupted() when the interrupted state of our thread. This function is different from Thread’s isInterrupted() function. isInterrupted() only returns the interrupted state, and interrupted() will also clear the interrupted state after returning to the current interrupted state. Because the previous interrupt status was cleared, so here you need to call selfInterrupt() to regenerate an interrupt!

Summary

Looking back at the acquire() function, its ultimate purpose is to acquire a lock!

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • First try to acquire the lock through tryAcquire(). If the acquisition is successful, return directly. If the attempt fails, then acquire the lock through acquireQueued().

  • If the attempt fails, the current thread will be added to the end of the CLH queue through addWaiter(). Then the acquireQueued() is called to wait for the lock to be sorted in the CLH queue. During this process, the thread is dormant . Do not return until the lock is acquired. If it was interrupted during the sleep waiting process, call selfInterrupt() to generate an interrupt by itself.