找回密码
 立即注册
首页 业界区 业界 基于AQS实现的ReentrantLock

基于AQS实现的ReentrantLock

凤清昶 2026-2-5 13:45:01
基于AQS实现的ReentrantLock

这里的源码用的Java8版本
lock方法

当ReentrantLock类的实例对象尝试获取锁的时候,调用lock方法,
1.png

会进入sync的lock方法,其中Sync是ReentrantLock的一个内部类,ReentrantLock构造方法会默认使用非公平锁NonfairSync,这个类是继承于Sync的
  1.         final void lock() {
  2.             if (!initialTryLock())
  3.                 acquire(1);
  4.         }
  5. // 其中Sync的initialTryLock是抽象方法,需要看非公平锁实现方法
复制代码
[!TIP]
在这里是第一次尝试获取锁
由于ReentrantLock是个可重入锁,判断里有重入的判断
  1. final boolean initialTryLock() {
  2.             Thread current = Thread.currentThread();
  3.                         // 获取当前线程的对象
  4.             if (compareAndSetState(0, 1)) { // first attempt is unguarded
  5.                         // 用CAS比较state状态是否为0(无人持有锁),如果是,就转为1(获取到锁)
  6.                 setExclusiveOwnerThread(current);
  7.                         // 将当前进程设置为拥有锁的线程
  8.                 return true;
  9.             } else if (getExclusiveOwnerThread() == current) {
  10.                         // 当前线程为拥有锁的线程(重复获取),重入
  11.                 int c = getState() + 1;
  12.                 if (c < 0) // overflow
  13.                         // 负数,state是个int类型数据,超出可能导致溢出变为负数
  14.                     throw new Error("Maximum lock count exceeded");
  15.                 setState(c);
  16.                         // 设置新的state
  17.                 return true;
  18.             } else
  19.                         // 已有线程占锁,返回为false
  20.                 return false;
  21.         }
复制代码
然后开始调用acquire方法,传入1
  1.     public final void acquire(int arg) {
  2.         if (!tryAcquire(arg) &&
  3.             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.             selfInterrupt();
  5.     }
复制代码
调用tryAcquire()方法,其中tryAcquire()方法是一个只有抛出异常的方法,需要重写,我们看非公平锁的写法

[!TIP]
这是第二次获取锁
  1.         protected final boolean tryAcquire(int acquires) {
  2.             if (getState() == 0 && !hasQueuedPredecessors() &&
  3.                 compareAndSetState(0, acquires)) {
  4.                 setExclusiveOwnerThread(Thread.currentThread());
  5.                 return true;
  6.             }
  7.             return false;
  8.         }
复制代码
这里,如果state是0,即没有线程占用锁的情况下getState() == 0​这个为真!hasQueuedPredecessors()执行这个方法,这个方法会检查是否已经出现了等待队列
  1.     public final boolean hasQueuedPredecessors() {
  2.         Thread first = null; Node h, s;
  3.         if ((h = head) != null && ((s = h.next) == null ||
  4.                                    (first = s.waiter) == null ||
  5.                                    s.prev == null))
  6.             first = getFirstQueuedThread(); // retry via getFirstQueuedThread
  7.         return first != null && first != Thread.currentThread();
  8.     }
复制代码
当未出现 同步队列/阻塞队列 ,或者当前线程是队列的第一个时,执行compareAndSetState(0, acquires),第二次尝试获取锁,如果成功,返回真
否则返回假,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  1.     private Node addWaiter(Node mode) {
  2.         Node node = new Node(Thread.currentThread(), mode);
  3.         // Try the fast path of enq; backup to full enq on failure
  4.         Node pred = tail;
  5.         if (pred != null) {
  6.             node.prev = pred;
  7.             if (compareAndSetTail(pred, node)) {
  8.                         // 尝试加入队尾
  9.                 pred.next = node;
  10.                 return node;
  11.             }
  12.         }
  13.         enq(node);
  14.         return node;
  15.     }
复制代码
Node是双向队列:阻塞队列一个节点,是为了保证原子化所以包装起来的
如果tail尾指针指向的节点不为空,则设置新生成的为尾指针指向的
否则(阻塞队列为空),调用enq函数
  1.     private Node enq(final Node node) {
  2.         for (;;) {
  3.             Node t = tail;
  4.             if (t == null) { // Must initialize
  5.                 if (compareAndSetHead(new Node()))
  6.                         // 使用CAS,防止多线程同时创建头节点,所以本质上还是需要抢入队顺序
  7.                     tail = head;
  8.                         // 初始化头节点,并将尾指针指向头节点
  9.             } else {
  10.                 node.prev = t;
  11.                 if (compareAndSetTail(t, node)) {
  12.                         // 判断t是否为尾节点,如果有线程更快的改掉尾节点,那么修改失败,
  13.                         // 重新进入for循环
  14.                     t.next = node;
  15.                     return t;
  16.                         // 修改成功
  17.                 }
  18.             }
  19.         }
  20.     }
复制代码
[!TIP]
这是第三次尝试获取锁
  1.     final boolean acquireQueued(final Node node, int arg) {
  2.         boolean failed = true;
  3.         try {
  4.             boolean interrupted = false;
  5.             for (;;) {
  6.                 final Node p = node.predecessor();
  7.                         // 获取node的前一个节点,如果前一个节点是头节点(当前节点是第一个)
  8.                         // 执行tryAcquire(arg),执行第三次尝试获取锁
  9.                 if (p == head && tryAcquire(arg)) {
  10.                         // 获取锁成功,出队
  11.                     setHead(node);// 将node设为头节点
  12.                     p.next = null; // help GC
  13.                     failed = false;
  14.                     return interrupted;
  15.                 }
  16.                 if (shouldParkAfterFailedAcquire(p, node) &&
  17.                     parkAndCheckInterrupt())
  18.                     interrupted = true;
  19.             }
  20.         } finally {
  21.             if (failed)
  22.                 cancelAcquire(node);
  23.         }
  24.     }
复制代码
如果第三次尝试获取锁失败了,会调用shouldParkAfterFailedAcquire()方法,将node的前一个节点传入(node一直都是加入的节点)
  1.     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2.         int ws = pred.waitStatus;
  3.         if (ws == Node.SIGNAL)
  4.                 // 确认前面的节点处于SIGNAL状态,即确认前面的节点会叫醒自己
  5.             /*
  6.              * This node has already set status asking a release
  7.              * to signal it, so it can safely park.
  8.              */
  9.             return true;
  10.         if (ws > 0) {
  11.             /*
  12.              * Predecessor was cancelled. Skip over predecessors and
  13.              * indicate retry.
  14.              */
  15.             do {
  16.                 node.prev = pred = pred.prev;
  17.             } while (pred.waitStatus > 0);
  18.                         // Node里面仅有一个大于零的状态,即1取消状态,也就是说当前任务被取消了
  19.                         // 持续循环值找到不再取消的节点
  20.             pred.next = node;
  21.         } else {
  22.                 // 将前一个节点用CAS转为Node.SIGNAL状态-1,返回为false
  23.             /*
  24.              * waitStatus must be 0 or PROPAGATE.  Indicate that we
  25.              * need a signal, but don't park yet.  Caller will need to
  26.              * retry to make sure it cannot acquire before parking.
  27.              */
  28.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  29.         }
  30.         return false;
  31.     }
复制代码
这里插一嘴,Node节点有一些状态,来体现其的任务状态,如前面传入的就是独占队列,addWaiter(Node.EXCLUSIVE)
  1.     static final class Node {
  2.         /** Marker to indicate a node is waiting in shared mode */
  3.         static final Node SHARED = new Node();
  4.                 // 共享队列
  5.         /** Marker to indicate a node is waiting in exclusive mode */
  6.         static final Node EXCLUSIVE = null;
  7.                 // 独占队列
  8.         /** waitStatus value to indicate thread has cancelled */// 取消
  9.         static final int CANCELLED =  1;
  10.                 // 已被取消
  11.         /** waitStatus value to indicate successor's thread needs unparking */
  12.         static final int SIGNAL    = -1;
  13.                 // 表示next节点已经park,需要被唤醒
  14.         /** waitStatus value to indicate thread is waiting on condition */
  15.         static final int CONDITION = -2;
  16.         /**
  17.          * waitStatus value to indicate the next acquireShared should
  18.          * unconditionally propagate
  19.          */
  20.                 // 共享状态
  21.         static final int PROPAGATE = -3;
复制代码
  1. if (shouldParkAfterFailedAcquire(p, node) &&
  2.                     parkAndCheckInterrupt())
  3.                     interrupted = true;
复制代码
如果前一个节点的waitState是0,会被CAS转为-1,然后返回false,进而不会执行parkAndCheckInterrupt(),继续for的无限循环,这里有可能出现第四次尝试
如果前一个节点的waitState是-1,该函数返回一个true,也就可以继续执行parkAndCheckInterrupt()
  1.     private final boolean parkAndCheckInterrupt() {
  2.         LockSupport.park(this);
  3.         return Thread.interrupted();
  4.     }
复制代码
当前线程进入park状态

至此我们完成了这个的lock过程

unlock方法

unlock()也是公平锁以及非公平锁都有的方法,同样继承了Sync
  1.     public void unlock() {
  2.         sync.release(1);
  3.     }
复制代码
Sync的release方法
  1.     public final boolean release(int arg) {
  2.         if (tryRelease(arg)) {
  3.             Node h = head;
  4.             if (h != null && h.waitStatus != 0)
  5.                 unparkSuccessor(h);
  6.             return true;
  7.         }
  8.         return false;
  9.     }
复制代码
首先尝试tryRelease方法
  1.         protected final boolean tryRelease(int releases) {
  2.             int c = getState() - releases;
  3.             if (Thread.currentThread() != getExclusiveOwnerThread())
  4.                 throw new IllegalMonitorStateException();
  5.             boolean free = false;
  6.             if (c == 0) {
  7.                 free = true;
  8.                 setExclusiveOwnerThread(null);
  9.             }
  10.             setState(c);
  11.             return free;
  12.         }
复制代码
如果成功醒过来,该线程依然处于一种park的位置上,即parkAndCheckInterrupt这个方法上,这个方法返回是否被中断ReentrantLock这个锁仅获取中断信息,而不会做出任何操作
  1. final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node) &&
  2.                     parkAndCheckInterrupt())
  3.                     interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }
复制代码
苏醒过来之后,继续for循环,尝试获取锁,失败之后会接着park,成功就会获取锁,并返回中断状态,在acquire中决定自我中断
  1.         final boolean nonfairTryAcquire(int acquires) {
  2.             final Thread current = Thread.currentThread();
  3.             int c = getState();
  4.             if (c == 0) {
  5.                 if (compareAndSetState(0, acquires)) {
  6.                     setExclusiveOwnerThread(current);
  7.                     return true;
  8.                 }
  9.             }
  10.             else if (current == getExclusiveOwnerThread()) {
  11.                 int nextc = c + acquires;
  12.                 if (nextc < 0) // overflow
  13.                     throw new Error("Maximum lock count exceeded");
  14.                 setState(nextc);
  15.                 return true;
  16.             }
  17.             return false;
  18.         }
复制代码
并将setExclusiveOwnerThread传入当前线程,返回为真,因此在TryRelease方法里的Thread.currentThread() != getExclusiveOwnerThread()一定为假,不会抛出异常,并设置free为false,当c也就是资源的state如果是0
  1.                         if (c == 0) {
  2.                 free = true;
  3.                 setExclusiveOwnerThread(null);
  4.             }
  5.             setState(c);
  6.             return free;
复制代码
c如果是0,即没有线程占用资源,setExclusiveOwnerThread将锁的线程设置为空,如果不为0,也就是重入锁仅仅解锁一次,c依然存在多个,设置c为新的state值,然会free值(资源锁的使用情况)
  1.     public final boolean release(int arg) {
  2.         if (tryRelease(arg)) {
  3.             Node h = head;
  4.             if (h != null && h.waitStatus != 0)
  5.                 unparkSuccessor(h);
  6.             return true;
  7.         }
  8.         return false;
  9.     }
复制代码
[code]    private void unparkSuccessor(Node node) {        /*         * If status is negative (i.e., possibly needing signal) try         * to clear in anticipation of signalling.  It is OK if this         * fails or if status is changed by waiting thread.         */        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        /*         * Thread to unpark is held in successor, which is normally         * just the next node.  But if cancelled or apparently null,         * traverse backwards from tail to find the actual         * non-cancelled successor.         */        Node s = node.next;、                // 如果下一个节点的状态为取消或者为空,从后向前找最后一个满足条件的,赋值为s        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus

相关推荐

2026-2-8 09:39:07

举报

2026-2-9 21:01:01

举报

懂技术并乐意极积无私分享的人越来越少。珍惜
2026-2-10 22:20:55

举报

7 天前

举报

您需要登录后才可以回帖 登录 | 立即注册