基于AQS实现的ReentrantLock
这里的源码用的Java8版本
lock方法
当ReentrantLock类的实例对象尝试获取锁的时候,调用lock方法,
会进入sync的lock方法,其中Sync是ReentrantLock的一个内部类,ReentrantLock构造方法会默认使用非公平锁NonfairSync,这个类是继承于Sync的- final void lock() {
- if (!initialTryLock())
- acquire(1);
- }
- // 其中Sync的initialTryLock是抽象方法,需要看非公平锁实现方法
复制代码[!TIP]
在这里是第一次尝试获取锁
由于ReentrantLock是个可重入锁,判断里有重入的判断- final boolean initialTryLock() {
- Thread current = Thread.currentThread();
- // 获取当前线程的对象
- if (compareAndSetState(0, 1)) { // first attempt is unguarded
- // 用CAS比较state状态是否为0(无人持有锁),如果是,就转为1(获取到锁)
- setExclusiveOwnerThread(current);
- // 将当前进程设置为拥有锁的线程
- return true;
- } else if (getExclusiveOwnerThread() == current) {
- // 当前线程为拥有锁的线程(重复获取),重入
- int c = getState() + 1;
- if (c < 0) // overflow
- // 负数,state是个int类型数据,超出可能导致溢出变为负数
- throw new Error("Maximum lock count exceeded");
- setState(c);
- // 设置新的state
- return true;
- } else
- // 已有线程占锁,返回为false
- return false;
- }
复制代码 然后开始调用acquire方法,传入1- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复制代码 调用tryAcquire()方法,其中tryAcquire()方法是一个只有抛出异常的方法,需要重写,我们看非公平锁的写法
[!TIP]
这是第二次获取锁
- protected final boolean tryAcquire(int acquires) {
- if (getState() == 0 && !hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
复制代码 这里,如果state是0,即没有线程占用锁的情况下getState() == 0这个为真!hasQueuedPredecessors()执行这个方法,这个方法会检查是否已经出现了等待队列- public final boolean hasQueuedPredecessors() {
- Thread first = null; Node h, s;
- if ((h = head) != null && ((s = h.next) == null ||
- (first = s.waiter) == null ||
- s.prev == null))
- first = getFirstQueuedThread(); // retry via getFirstQueuedThread
- return first != null && first != Thread.currentThread();
- }
复制代码 当未出现 同步队列/阻塞队列 ,或者当前线程是队列的第一个时,执行compareAndSetState(0, acquires),第二次尝试获取锁,如果成功,返回真
否则返回假,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- // 尝试加入队尾
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
复制代码 Node是双向队列:阻塞队列一个节点,是为了保证原子化所以包装起来的
如果tail尾指针指向的节点不为空,则设置新生成的为尾指针指向的
否则(阻塞队列为空),调用enq函数- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- // 使用CAS,防止多线程同时创建头节点,所以本质上还是需要抢入队顺序
- tail = head;
- // 初始化头节点,并将尾指针指向头节点
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- // 判断t是否为尾节点,如果有线程更快的改掉尾节点,那么修改失败,
- // 重新进入for循环
- t.next = node;
- return t;
- // 修改成功
- }
- }
- }
- }
复制代码[!TIP]
这是第三次尝试获取锁
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 获取node的前一个节点,如果前一个节点是头节点(当前节点是第一个)
- // 执行tryAcquire(arg),执行第三次尝试获取锁
- if (p == head && tryAcquire(arg)) {
- // 获取锁成功,出队
- setHead(node);// 将node设为头节点
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
复制代码 如果第三次尝试获取锁失败了,会调用shouldParkAfterFailedAcquire()方法,将node的前一个节点传入(node一直都是加入的节点)- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- // 确认前面的节点处于SIGNAL状态,即确认前面的节点会叫醒自己
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws > 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- // Node里面仅有一个大于零的状态,即1取消状态,也就是说当前任务被取消了
- // 持续循环值找到不再取消的节点
- pred.next = node;
- } else {
- // 将前一个节点用CAS转为Node.SIGNAL状态-1,返回为false
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
复制代码这里插一嘴,Node节点有一些状态,来体现其的任务状态,如前面传入的就是独占队列,addWaiter(Node.EXCLUSIVE)- static final class Node {
- /** Marker to indicate a node is waiting in shared mode */
- static final Node SHARED = new Node();
- // 共享队列
- /** Marker to indicate a node is waiting in exclusive mode */
- static final Node EXCLUSIVE = null;
- // 独占队列
- /** waitStatus value to indicate thread has cancelled */// 取消
- static final int CANCELLED = 1;
- // 已被取消
- /** waitStatus value to indicate successor's thread needs unparking */
- static final int SIGNAL = -1;
- // 表示next节点已经park,需要被唤醒
- /** waitStatus value to indicate thread is waiting on condition */
- static final int CONDITION = -2;
- /**
- * waitStatus value to indicate the next acquireShared should
- * unconditionally propagate
- */
- // 共享状态
- static final int PROPAGATE = -3;
复制代码 - if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
复制代码 如果前一个节点的waitState是0,会被CAS转为-1,然后返回false,进而不会执行parkAndCheckInterrupt(),继续for的无限循环,这里有可能出现第四次尝试
如果前一个节点的waitState是-1,该函数返回一个true,也就可以继续执行parkAndCheckInterrupt()- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
复制代码 当前线程进入park状态
至此我们完成了这个的lock过程
unlock方法
unlock()也是公平锁以及非公平锁都有的方法,同样继承了Sync- public void unlock() {
- sync.release(1);
- }
复制代码 Sync的release方法- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
复制代码 首先尝试tryRelease方法- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
复制代码 如果成功醒过来,该线程依然处于一种park的位置上,即parkAndCheckInterrupt这个方法上,这个方法返回是否被中断ReentrantLock这个锁仅获取中断信息,而不会做出任何操作- final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
复制代码 苏醒过来之后,继续for循环,尝试获取锁,失败之后会接着park,成功就会获取锁,并返回中断状态,在acquire中决定自我中断- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
复制代码 并将setExclusiveOwnerThread传入当前线程,返回为真,因此在TryRelease方法里的Thread.currentThread() != getExclusiveOwnerThread()一定为假,不会抛出异常,并设置free为false,当c也就是资源的state如果是0- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
复制代码 c如果是0,即没有线程占用资源,setExclusiveOwnerThread将锁的线程设置为空,如果不为0,也就是重入锁仅仅解锁一次,c依然存在多个,设置c为新的state值,然会free值(资源锁的使用情况)- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
复制代码 [code] private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;、 // 如果下一个节点的状态为取消或者为空,从后向前找最后一个满足条件的,赋值为s if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus |