本文共 19306 字,大约阅读时间需要 64 分钟。
AbstractQueuedSynchronizer简称AQS,是一个用于构建锁和相关同步器的框架,它依赖于FIFO的等待队列实现。见AbstractQueuedSynchronizer的描述:
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。
AQS中维护了一个FIFO队列,用于记录等待的线程,上锁和释放锁的过程可以认为是线程入队列和出队列的过程;获取不到锁,入队列等待;出队列时被唤醒。
AQS中有一个表示状态的字段state,ReentrantLock用它表示线程重入锁的次数,Semaphore用它表示剩余的许可数量,FutureTask用它表示任务的状态。对state变量值的更新都采用CAS操作保证更新操作的原子性。
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个类只有一个变量:exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的get,set方法。AQS使用此字段记录当前占有锁的线程。
AQS使用一个FIFO的等待队列表示排队等待锁的线程,队列结构如图:
队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联,其他的节点与一个等待线程关联。
每次一个线程释放锁以后,从Head开始向后寻找,找到一个waitStatus是SIGNAL的节点,然后通过LockSupport.unpark唤醒被挂起的线程,被挂起的线程继续执行尝试上锁逻辑。新的线程尝试获取锁时,如果获取不到锁,将会创建一个Node并加入到队尾,然后将自己挂起,等待挂起时间到或者等待被prev对应的节点唤醒。
说明:代码的中是从tail向head遍历查找waitStatus=SIGNAL的节点,但结果与这里说的是一样的,即最终也是找Head后的第一个waitStatus=SIGNAL的节点。见unparkSuccessor方法
类型 | 属性名 | 描述 |
int | waitStatus | 等待状态 |
Node | prev | 指向队列中前一个节点 |
Node | next | 指向队列中后一个节点 |
Thread | thread | Node的持有线程 |
Node | nextWaiter | 下一个等待condition的Node |
状态 | 状态值 | 描述 |
CANCELLED | 1 | 取消状态,例如因为等待锁超时而取消的线程;处于这种状态的Node会被踢出队列,被GC回收 |
SIGNAL | -1 | 表示这个Node的继任Node被阻塞了,到时需要通知它 |
CONDITION | -2 | 表示这个Node在条件队列中,因为等待某个条件而被阻塞 |
PROPAGATE | -3 | 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播 |
其他 | 0 | 初始状态 |
接下来我们以一个简单的示例描述ReentrantLock的等待锁、释放锁的原理。为了方便描述,以下示例中节点和线程一一对应,例如Node1与Thread1对应,依次类推。
假设初始状态,如下图所示:
Thread3申请锁时,因为无法获取锁,所以创建一个Node,然后加入到等待队列中;如下图所示:
假设3s中到了以后,Thread1被唤醒(挂起的时候指定了时间,所以等待时间到后会被唤醒),此时会将自己的状态改成CANCELLED,表示等待被从队列中移除;如下图所示:
如果此时有新线程申请锁,那么在入队列过程中会顺便将处于CANCLE状态的节点移除。如下图所示:
Thread0执行完毕后,释放线程,将唤醒Head后面第一个状态为等待SIGNAL的节点对应的线程。
注意:新Node入队列时会检查并删除被CANCELLED的节点;其实Node1在等待时间到后被唤醒后,在将自己状态改为CANCELLED时,如果发现自己是最后一个节点也会将Node1删除。
这一部分简要的介绍加锁、释放锁的流程,让大家对ReentrantLock有一个整体的概念,后面将通过源码详细分析实现细节。
非公平锁上锁主要流程如下所示:
这里仅给出流程图,后面会通过代码详细讲述这个流程。
非公平锁上锁主要流程如下所示:
如流程图中所示,主要流程基本和非公平锁一致,有两个差别:
公平锁和非公平锁的释放锁的过程是相同的,释放锁的流程图如下。
因为公平锁、非公平锁的大部分逻辑相同,所以这里主要以非公平锁的源码来讨论。
以下是ReentrantLock的两个构造器,如下所示:
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
我们可以看到,ReentrantLock默认构造器是非公平的。
如下代码示例中,可以看到公平锁和非公平锁的第一个差别:非公平锁lock的第一步就是尝试通过compareAndSetState(0, 1)获取锁,因为他不要求公平,所以他上来就争抢锁。
public void lock() { sync.lock(); }
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
exclusiveOwnerThread代表持有所得线程信息,所以一旦上锁成功,需要将exclusiveOwnerThread设置为当前线程。
final void lock() { acquire(1); }
compareAndSetState(0, 1)方法是基于CAS操作,尝试将state从0改为1。这是基于硬件指令集的原子操作,当且仅当state=0时将其修改为1。
在AQS部分我们提到ReentrantLock中state记录的锁次数(当然也包括重入次数)。state=0代表当前没有线程持有锁。state>0代表加锁次数,第一次lock成功state=1,重入一次state++。
Java中的CAS操作基本上都是通过unsafe实现的,代码如下:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
acquire属于AbstractQueuedSynchronizer中的方法;此方法主要逻辑如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
此方法是尝试获取锁。此时如果state=0,说明所已经被释放了,所以可以重新尝试上锁操作,即进行compareAndSetState(0, 1)操作;如果state!=0,并且当前线程持有锁,那么重入,更新state,即state++。
当且仅当以下两中情况,此方法返回true。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state = 0表示没有线程占用锁, if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; }} // state != 0表示有线程占用锁,如果是当前线程占用锁,那么相当于是重入操作,所以state++。 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果head的next节点对应的线程不是当前线程,那么当前线程不能尝试获取锁,这样才能保证按照获取所得顺序公平的获取锁。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
addWaiter是属于AbstractQueuedSynchronizer中的方法。此方法主要逻辑是:创建一个Node,与当前线程关联,然后尝试加入到FIFO队列的尾部。加入到FIFO队列时,如果发现队列没有初始化过,即Head为空,那么先创建一个空的不与任何线程相关联的Node最为Head;然后将Node加入到队列尾部。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // tail为空,可以认为队列尚未初始化过;不为空,那么尝试将Node加入到队列尾部 if (pred != null) { node.prev = pred; // 通过CAS操作将Node插入队尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
如果FIFO队列尚未初始化,那么先初始化;如果已经初始化了,那么尝试将node加入到队尾,失败则再试一次。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter是属于AbstractQueuedSynchronizer中的方法。此方法主要逻辑为:
注意,如果线程获取锁失败,那么会在parkAndCheckInterrupt()方法中被挂起。一个线程彻底释放锁资源以后,会唤醒head后的一个节点对应的线程,当某一个线程被唤醒以后,继续执行parkAndCheckInterrupt()方法中LockSupport#park以后的代码。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 如果prev节点是head,那么说明自己是第二个node,此时尝试获取锁;如果获取成功,那么将prev的thread、prev清除掉,然后作为head节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判断是否应该挂起当前线程,如果应该挂起,那么通过LockSupport挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
此方法是用来判断是否应该挂起当前节点对应的线程。允许挂起的条件是:prev节点的waitStatus=SIGNAL。这个状态的意思是:prev节点对应的线程在释放锁以后应该唤醒(unpack)node节点对应的线程。
如果prev节点waitStatus>0,即为CANCELLED状态时,需要将prev从队列中移除。重试此操作直到找到一个prve节点的状态不为CANCELLED。
如果prev节点waitStatus<=0(当然不包括SIGNAL状态),那么通过CAS操作设置waitStatus= SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
以下三个方法时获取超时放弃锁的源码,其中主要逻辑在doAcquireNanos中,和前面讨论的“获取锁”部分的(见acquireQueued方法)主要区别在于:在循环中会检测等待时间是否已经超过指定时间,如果超过了,那么将Node的waitStatus改为CANCELLED状态。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }
public void unlock() { sync.release(1); }
释放锁,主要逻辑是:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
此方法主要逻辑是:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
此方法主要逻辑是:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
Condition提供了这样一种能力:线程获取锁以后,如果条件不满足,挂起线程释放资源;如果条件满足,则唤醒线程继续处理。
ReentrantLock可以支持多条件等待,其实现原理如下:每次调用newCondition()方法,都会创建一个ConditionObject对象,每个ConditionObject对象都可以挂一个等待队列;如果希望同时等待多个条件,只需要简单的多次调用newCondition创建多个条件对象就好了。
说明:先区分两个概念,本节中同步等待队列指的是指的是AQS中的FIFO的队列。条件等待队列指的是ConditionObject对象上的等待队列。
创建ConditionObject对象,一个ConditionObject对象就是一个普通的对象,没有什么特别的地方,只是提供了await、signal等方法,这一部分将在后面详细分析。
public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { return new ConditionObject(); }
释放锁,挂起当前线程,等待其他线程发起唤醒信号(signal);被唤醒以后重新获取锁。此方法可以认为和Object的wait等价。
此方法主要逻辑如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
此方法的主要逻辑如下:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
此方法的主要逻辑如下:
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }
简单来说是:唤醒线程。
具体来说是:遍历等待队列,找出第一个等待被唤醒的节点Node,然后将它插入到同步等待队列尾部,然后就是等待被唤醒(具体的过程与一个线程释放锁以后其他线程获取所得过程相同)。
此方法的主要逻辑如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { // 如果能够获取锁,那么直接往后执行,否则等待直到获取锁 lock.lock(); try { //资源是否已经都准备好了? boolean resourceReady = false; if( !resourceReady ){ //await会让当前线程释放其持有的锁并挂起 condition.await(); } // TODO 执行业务逻辑 } finally { lock.unlock(); } } public void conditionSignal() throws InterruptedException { // 如果能够获取锁,那么直接往后执行,否则等待直到获取锁 lock.lock(); try { //资源是否已经都准备好了? boolean resourceReady = true; if( resourceReady ){ condition.signal(); } } finally { lock.unlock(); } }
Synchronized是通过同步互斥来实现线程安全的;即同一时间只能有一个线程访问synchronized修饰的代码块或方法。其特性与功能如下:
ReentrantLock是一种非常常见的临界区处理手段,通过在执行代码前上锁保证同一时间只有一个线程能执行指定的代码块。ReentrantLock的特性与功能如下:
ReentrantLock本质上是通过一个队列来完成同步的。因为每个Node与一个线程关联,只需要做好对队列节点的同步处理,既可以完成多线程的同步处理。
相比于synchronized关键字,ReentrantLock等多的类似于我们使用zookeeper实现的等待队列。Zookeeper的队列示例可以参考博客《Zookeeper使用案例》地址为:https://yq.aliyun.com/articles/272103
关于线程、synchronized的更多内容,可以参考《线程-基础知识》,地址为:https://yq.aliyun.com/articles/414908
关于synchronized的原理,可以参考《互斥同步-锁》,地址为:https://yq.aliyun.com/articles/414939
转载地址:http://ordzl.baihongyu.com/