This blog introduces the acquisition mechanism of fair lock, and the content includes:
- Basic concept
- ReentrantLock data structure
- Acquire a fair lock (based on JDK 11.0.5)
- tryAcquire()
- addWaiter()
- acquireQueued()
- selfInterrupt()
The release mechanism of fair lock will be introduced in the following blog.
Basic concept
Before explain the acquisition mechanism of fair lock, we need to understand a few basic concepts.
AQS
Refers to the AbstractQueuedSynchronizer
class.
AQS
is an abstract class for managing locks in Java. Many public methods of locks are implemented in this class. AQS
is a common parent class for exclusive locks (eg, ReentrantLock
) and shared locks (eg, Semaphore
).
Types of AQS locks
Divided into exclusive locks and shared locks.
Exclusive lock - The lock can only be occupied by one thread at a time. A typical example of an exclusive lock is
ReentrantLock
. In addition,ReentrantReadWriteLock.WriteLock
is also an exclusive lock.According to the lock acquisition mechanism, exclusive lock is divided into fair lock and unfair lock.- Fair lock is to acquire the lock fairly according to the rule of first-come-first-served through the
CLH
waiting thread - Non-fair lock, when the thread wants to acquire the lock, it will directly acquire the lock regardless of the
CLH
waiting queue.
- Fair lock is to acquire the lock fairly according to the rule of first-come-first-served through the
Shared lock - A lock that can be owned by multiple threads at the same time and can be shared.
ReentrantReadWriteLock.ReadLock
,CyclicBarrier
,CountDownLatch
andSemaphore
in the JUC package are shared locks. The purpose and principle of these locks will be introduced in detail in later blog.
CLH queue
Craig, Landin, and Hagersten lock queue
The CLH
queue is the waiting for lock thread queue in AQS
. In multithreading, in order to protect competing resources from being operated by multiple threads at the same time, we often need to protect these resources through locks. In an exclusive lock, competing resources can only be accessed by one thread at a time, other threads need to wait. CLH
is to manage the queue of these “waiting for lock” threads.
CLH
is a non-blocking FIFO
queue. That is to say, when inserting or removing a node in it, it will not block under concurrent conditions, but through spin lock and CAS to ensure the atomicity of node insertion and removal.
CAS function
Compare And Swap
The CAS
function is a comparison and exchange function, which is an atomic operation function. That is, the data operated by CAS
are all performed in an atomic manner. For example, compareAndSetHead()
, compareAndSetTail()
, compareAndSetNext()
and other functions. Their common feature is that the operations performed by these functions are performed atomically.
ReentrantLock data structure
ReentrantLock
UML graph
ReentrantLock
implements the Lock
interface.
ReentrantLock
and Sync
are combined. ReentrantLock
contains the Sync
object. Moreover, Sync
is a subclass of AQS
. More importantly, Sync
has two subclasses FairSync
and NonFairSync
. ReentrantLock
is an exclusive lock. Whether it is a fair lock or an unfair lock depends on whether the sync object is an instance of FairSync
or an instance of NonFairSync
.
Acquire a fair lock (based on JDK 11.0.5)
From Java Multithreading 15: ReentrantLock example 1, we know that the lock is obtained through lock()
. Let’s look at how lock()
is implemented to acquire a fair lock.
lock()
lock()
is implemented in ReentrantLock
1 | public void lock() { |
The current thread acquires the lock through sync.acquire(1)
. sync
is a Sync
object. It could either be FairSync
or NonfairSync
Explain the meaning of 1
here, it is the parameter to set the lock state. For the exclusive lock, when the lock is in the obtainable state, its state value is 0
. When the lock is acquired by the thread for the first time, its state value becomes 1
.
Since ReentrantLock
(fair lock/unfair lock) is a reentrant lock, an exclusive lock can be acquired by a single thread multiple times, and the state of the lock is +1
every time it is acquired. That is to say, when acquiring the lock for the first time, set the state value of the lock to 1
through acquire(1)
. When acquiring the lock again, set the state value of the lock to 2
; and so on…
Reentrant means that the lock can be acquired multiple times by a single thread.
acquire()
acquire()
is implemented in AQS
1 | if (!tryAcquire(arg) && |
- The current thread first tries to acquire the lock through
tryAcquire()
. If the acquisition is successful, return directly. If the attempt fails, enter the waiting queue to wait (there may be other thread waiting for the lock in front). - When the current thread attempt fails, first add the current thread to the end of the
CLH
queue (non-blocking FIFO queue) throughaddWaiter(Node.EXCLUSIVE)
. TheCLH
queue is the thread waiting queue. - After executing
addWaiter(Node.EXCLUSIVE)
,acquireQueued()
will be called to acquire the lock. SinceReentrantLock
is a fair lock at this time, it will acquire the lock according to the principle of fairness. - When executing
acquireQueued()
, the current thread will enter theCLH
queue to sleep and wait until it acquires the lock before returning! If the current thread was interrupted during sleep waiting,acquireQueued()
will returntrue
. At this time, the current thread will callselfInterrupt()
to generate an interrupt for itself. As for why you should generate an interrupt for yourself, I will introduce it later.
The above is a general description of acquire()
. Below, we divide this function into 4 parts to analyze step by step.
tryAcquire()
1. tryAcquire()
The tryAcquire()
of the fair lock is implemented in the FairSync
class of ReentrantLock
1 | protected final boolean tryAcquire(int acquires) { |
The role of tryAcquire()
is to try to acquire the lock. Note, just try here!
If the attempt succeeds, it returns true
. If the attempt fails, it returns false
, and then the lock is acquired through other methods. Later we will explain how to acquire the lock step by step in the case of failed attempts.
2. hasQueuedPredecessors()
hasQueuedPredecessors()
is implemented in AQS
1 | public final boolean hasQueuedPredecessors() { |
hasQueuedPredecessors()
is to determine whether the current thread is at the head of the CLH
queue to return whether there is a thread waiting longer than the current thread in AQS
.
3. Node
Node
is the node of the CLH
queue. Node
is implemented in AQS
1 | private transient volatile Node head; |
Node
is a node of the CLH
queue, which stands for thread queue waiting for lock.
- Each
Node
will correspond to a thread. - Each
Node
will point to the previous node and the next node throughprev
and nex`t respectively, which represents the previous waiting thread and the next waiting thread, respectively. Node
saves the wait state of the thread throughwaitStatus
.Node
usesnextWaiter
to distinguish whether a thread is an exclusive lock thread or a shared lock thread. If it is an exclusive lock thread, the value ofnextWaiter
isEXCLUSIVE
; if it is ashared lock
thread, the value ofnextWaiter
isSHARED
.
4. compareAndSetState()
compareAndSetState()
is implemented in AQS
1 | protected final boolean compareAndSetState(int expect, int update) { |
We need to understand that compareAndSetState(expect, update)
is to operate the current thread atomically. If the current thread’s state is expect
, then set its state to update
.
5. setExclusiveOwnerThread()
setExclusiveOwnerThread()
is implemented in AbstractOwnableSynchronizer
1 | private transient Thread exclusiveOwnerThread; |
The role of setExclusiveOwnerThread()
is to set thread t
to the thread that currently owns the exclusive lock.
6. getState(), setState()
getState() and setState() is implemented in AQS
1 | private volatile int state; |
state
indicates the state of the lock. For exclusive locks, state = 0
indicates that the lock is available (that is, the lock is not held by any thread). Since the exclusive lock in java is reentrant, the value of state can be > 1
.
addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE)
is to create a current thread Node
, and the lock corresponding to the current thread recorded in the Node
is an exclusive lock type, and add the node to the end of the CLH
queue.
1. addWaiter()
addWaiter()
is implemented in AQS
1 | private Node addWaiter(Node mode) { |
For fair lock, addWaiter(Node.EXCLUSIVE)
will first create a Node
, the node type is exclusive lock (Node.EXCLUSIVE)
type. Then, add the node to the end of the CLH
queue.
2. compareAndSetTail()
compareAndSetTail()
is implemented in AQS
1 | private final boolean compareAndSetTail(Node expect, Node update) { |
compareAndSetTail()
also belongs to the CAS
function. compareAndSetTail(expect, update)
will operate atomically. Its role is to determine whether the tail of the CLH
queue is expect
. If so, set the tail to update
.
acquireQueued()
Earlier, we have added the current thread to the CLH
queue. The role of acquireQueued()
is to gradually execute the thread of the CLH
queue. If the current thread acquires the lock, it returns. Otherwise, the current thread sleeps and does not return until it wakes up and reacquires the lock. Below, we look at the specific flow of acquireQueued()
.
1. acquireQueued()
acquireQueued()
is implemented in AQS
1 | final boolean acquireQueued(final Node node, int arg) { |
The purpose of acquireQueued()
is to acquire a lock from the queue.
2. shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()
is implemented in AQS
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
- Please refer to the table below for
waitStatus
1 | CANCELLED[1] -- Current thread is cancelled |
shouldParkAfterFailedAcquire()
uses the following rules to determine whether the current thread needs to be blocked.
1 | Rule 1: If the status of the predecessor node is SIGNAL, it means that the current node needs to be unparked (wake up), then return true. |
If rule 1 occurs, that is, the “predecessor node is SIGNAL” state, it means that the current thread needs to be blocked. Next, parkAndCheckInterrupt()
will be called to block the current thread, and it will not return from parkAndCheckInterrupt()
until it is first awakened.
3. parkAndCheckInterrupt()
parkAndCheckInterrupt()
is implemented in AQS
1 | private final boolean parkAndCheckInterrupt() { |
parkAndCheckInterrupt()
is to block the current thread and return to the interrupted state after the thread is woken up.
It will first block the current thread via LockSupport.park()
, and then return to the thread’s interrupted status via Thread.interrupted()
.
Here is how to wake up after the thread is blocked. There are generally 2 situations:
- The first case:
unpark()
wake up. After using the lock, the thread corresponding to the previous node wakes up the current thread throughunpark()
. - The second case: interrupt wake up. Other threads interrupt the current thread through
interrupt()
.
Supplement: park()
and unpark()
in LockSupport()
are similar to those of wait()
and notify()
in Object
, which are blocking/waking up.
Their usage is different, park()
and unpark()
are lightweight, and wait()
and notify()
must first obtain synchronization lock through Synchronized
.
4. tryAcquire() again
After understanding the shouldParkAfterFailedAcquire()
and parkAndCheckInterrupt()
functions. We then analyze the for loop part of acquireQueued()
.
1 | final Node p = node.predecessor(); |
- Get the predecessor node through
node.predecessor()
. - p == head && tryAcquire(arg)
- First, determine whether the predecessor node is the
CHL
header. If so, try to acquire the lock throughtryAcquire()
. - In fact, the purpose of this is to let the current thread acquire the lock, but why do we need to judge
p==head
first? Understanding this is important to understanding the fair lock mechanism, because the reason for this is to ensure fairness!- Earlier, we use
shouldParkAfterFailedAcquire()
to determine whether the current thread needs to be blocked; - Then, if the current thread is blocked,
parkAndCheckInterrupt()
will be called to block the thread. When the thread is unblocked, we return to the interrupted state of the thread. The blocking of the thread is resolved, either because the thread was interrupted or because the other thread called theunpark()
function of the thread. - Return to
p==head
here. If the current thread was awakened because another thread called theunpark()
function, then the thread that awakened it should be the thread corresponding to its predecessor node (on this point, we will see later in the process of releasing the lock). OK, the previous node calledunpark()
to wake up the current thread!
- Earlier, we use
- At this point, it is very simple to understand
p==head
: the current successor node is the head node of theCLH
queue, and after it releases the lock, it is the turn of the current node to acquire the lock. Then, the current node acquires the lock throughtryAcquire()
. If the acquisition is successful, set the current node as the head node throughsetHead(node)
and return.
- First, determine whether the predecessor node is the
In short, if the predecessor node calls unpark()
to wake up the current thread and the predecessor node is the CLH
queue header, then p==head
is satisfied, which is in line with the principle of fairness. Otherwise, if the current thread is woken up because the thread was interrupted, then it is obviously not fair. This is why p==head
is to ensure fairness!
selfInterrupt()
selfInterrupt()
is implemented in AQS
1 | static void selfInterrupt() { |
The code for selfInterrupt()
is very simple, that is, the current thread generates an interrupt by itself. But why do you need to do this?
This must be analyzed in conjunction with acquireQueued()
. If the current thread is interrupted in acquireQueued()
, execute selfInterrupt()
. Otherwise, it will not execute.
In acquireQueued()
, even if the thread is interrupted and woken up in the blocked state to obtain cpu execution rights. However, if there are other threads waiting for the lock in front of the thread, according to the principle of fairness, the thread still cannot acquire the lock. It will block again! The thread blocks again until the thread is woken up by the thread lock in front of it.
In other words, before the thread acquires the lock successfully and actually executes, its interrupt will be ignored and the interrupt flag will be cleared! Because in parkAndCheckInterrupt()
, we call Thread.interrupted()
when the interrupted state of our thread. This function is different from Thread’s isInterrupted()
function. isInterrupted()
only returns the interrupted state, and interrupted()
will also clear the interrupted state after returning to the current interrupted state. Because the previous interrupt status was cleared, so here you need to call selfInterrupt()
to regenerate an interrupt!
Summary
Looking back at the acquire()
function, its ultimate purpose is to acquire a lock!
1 | public final void acquire(int arg) { |
First try to acquire the lock through
tryAcquire()
. If the acquisition is successful, return directly. If the attempt fails, then acquire the lock throughacquireQueued()
.If the attempt fails, the current thread will be added to the end of the
CLH
queue throughaddWaiter()
. Then theacquireQueued()
is called to wait for the lock to be sorted in theCLH
queue. During this process, the thread is dormant . Do not return until the lock is acquired. If it was interrupted during the sleep waiting process, callselfInterrupt()
to generate an interrupt by itself.