分布式锁—7.Curator的分布式锁
大纲1.Curator的可重入锁的源码
2.Curator的非可重入锁的源码
3.Curator的可重入读写锁的源码
4.Curator的MultiLock源码
5.Curator的Semaphore源码
1.Curator的可重入锁的源码
(1)InterProcessMutex获取分布式锁
(2)InterProcessMutex的初始化
(3)InterProcessMutex.acquire()尝试获取锁
(4)LockInternals.attemptLock()尝试获取锁
(5)不同客户端线程获取锁时的互斥实现
(6)同一客户端线程可重入加锁的实现
(7)客户端线程释放锁的实现
(8)客户端线程释放锁后其他线程获取锁的实现
(9)InterProcessMutex就是一个公平锁
(1)InterProcessMutex获取分布式锁
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",
5000,
3000,
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端");
//获取分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");
lock.acquire();
Thread.sleep(1000);
lock.release();
}
}(2)InterProcessMutex的初始化
设置锁的节点路径basePath + 初始化一个LockInternals对象实例。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
...
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME, 1, driver);
}
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//1.设置锁的节点路径
basePath = PathUtils.validatePath(path);
//2.初始化一个LockInternals对象实例
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
}
public class LockInternals {
private final LockInternalsDriver driver;
private final String lockName;
private volatile int maxLeases;
private final WatcherRemoveCuratorFramework client;
private final String basePath;
private final String path;
...
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
...
}(3)InterProcessMutex.acquire()尝试获取锁
LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象,用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount,用于锁的重入次数计数。
在执行InterProcessMutex的acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的LockData数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间time = -1。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
//一个线程对应一个LockData数据对象
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//设置锁的路径
basePath = PathUtils.validatePath(path);
//初始化LockInternals
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
@Override
public void acquire() throws Exception {
//获取分布式锁,会一直阻塞等待直到获取成功
//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData数据
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
//可重入计算
lockData.lockCount.incrementAndGet();
return true;
}
//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
LockData newLockData = new LockData(currentThread, lockPath);
//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
//LockData是InterProcessMutex的一个静态内部类
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
protected byte[] getLockNodeBytes() {
return null;
}
...
}(4)LockInternals.attemptLock()尝试获取锁
先创建临时节点,再判断是否满足获取锁的条件。
步骤一:首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。
步骤二:然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的PredicateResults对象。
具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁,默认是1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。
获取锁成功,则会中断LockInternals的internalLockLoop()方法的while循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象,然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。
获取锁失败,则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息,并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后,便会调用wait()方法将当前线程挂起。
所以前一个节点发生变化时,便会通知添加的Watcher监听。然后便会唤醒阻塞的线程,继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。
public class LockInternals {
private final LockInternalsDriver driver;
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
...
}
...
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//获取当前时间
final long startMillis = System.currentTimeMillis();
//默认情况下millisToWait=null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//默认情况下localLockNodeBytes也是null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;//是否已经获取到锁
boolean isDone = false;//是否正在获取锁
while (!isDone) {
isDone = true;
//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//2.检查是否获取到了锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
List<String> getSortedChildren() throws Exception {
//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
return getSortedChildren(client, basePath, lockName, driver);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = client.getChildren().forPath(basePath);
//对节点名称进行排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort(
sortedList,
new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//级联创建一个临时顺序节点
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
//默认情况下传入的lockNodeBytes=null
if (lockNodeBytes != null) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
//创建临时顺序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}(8)客户端线程释放锁后其他线程获取锁的实现
由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法,也就是唤醒该线程,让其继续执行while循环获取锁。
public class LockInternals { ... private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { //唤醒LockInternals中被wait()阻塞的线程 client.postSafeNotify(LockInternals.this); } }; //检查是否获取到了锁 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; ... while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) { //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表 List children = getSortedChildren(); //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if (predicateResults.getsTheLock()) {//获取锁成功 //返回true haveTheLock = true; } else {//获取锁失败 //获取前一个节点路径名称 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak //通过getData()获取前一个节点路径在zk的信息,并添加watch监听 client.getData().usingWatcher(watcher).forPath(previousSequencePath); //默认情况下,millisToWait = null if (millisToWait != null) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait0, "qty cannot be 0"); ImmutableList.Builder builder = ImmutableList.builder(); boolean success = false; try { while (qty-- > 0) { int retryCount = 0; long startMillis = System.currentTimeMillis(); boolean isDone = false; while (!isDone) { switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) { case CONTINUE: { isDone = true; break; } case RETURN_NULL: { return null; } case RETRY_DUE_TO_MISSING_NODE: { if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { throw new KeeperException.NoNodeException("Sequential path not found - possible session loss"); } //try again break; } } } } success = true; } finally { if (!success) { returnAll(builder.build()); } } return builder.build(); } private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder builder, long startMs, boolean hasWait, long waitMs) throws Exception { if (client.getState() != CuratorFrameworkState.STARTED) { return InternalAcquireResult.RETURN_NULL; } if (hasWait) { long thisWaitMs = getThisWaitMs(startMs, waitMs); if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) { return InternalAcquireResult.RETURN_NULL; } } else { //1.首先获取一个分布式锁 lock.acquire(); } Lease lease = null; boolean success = false; try { //2.尝试获取Semaphore的Lease:创建一个临时顺序节点 PathAndBytesable createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL); String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME)); String nodeName = ZKPaths.getNodeFromPath(path); lease = makeLease(path); ... try { synchronized(this) { for(;;) { List children; //3.获取./lease目录下的所有临时顺序节点,并添加watcher监听 children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); ... //4.判断临时顺序节点的数量是否大于maxLeases //maxLeases表示最多允许多少个客户端线程获取Semaphore的Lease if (children.size()
页:
[1]