大纲
1.Redisson可重入锁RedissonLock概述
2.可重入锁源码之创建RedissonClient实例
3.可重入锁源码之lua脚本加锁逻辑
4.可重入锁源码之WatchDog维持加锁逻辑
5.可重入锁源码之可重入加锁逻辑
6.可重入锁源码之锁的互斥阻塞逻辑
7.可重入锁源码之释放锁逻辑
8.可重入锁源码之获取锁超时与锁超时自动释放逻辑
9.可重入锁源码总结
1.Redisson可重入锁RedissonLock概述
(1)在pom.xml里引入依赖
(2)构建RedissonClient并使用Redisson
(3)Redisson可重入锁RedissonLock简单使用
(1)在pom.xml里引入依赖- <dependencies>
- <dependency>
- <groupId>org.redisson</groupId>
- redisson</artifactId>
- <version>3.16.8</version>
- </dependency>
- </dependencies>
复制代码 (2)构建RedissonClient并使用Redisson
参考官网中文文档,连接上3主3从的Redis Cluster。- //https://github.com/redisson/redisson/wiki/目录
- public class Application {
- public static void main(String[] args) throws Exception {
- //连接3主3从的Redis CLuster
- Config config = new Config();
- config.useClusterServers()
- .addNodeAddress("redis://192.168.1.110:7001")
- .addNodeAddress("redis://192.168.1.110:7002")
- .addNodeAddress("redis://192.168.1.110:7003")
- .addNodeAddress("redis://192.168.1.111:7001")
- .addNodeAddress("redis://192.168.1.111:7002")
- .addNodeAddress("redis://192.168.1.111:7003");
- //创建RedissonClient实例
- RedissonClient redisson = Redisson.create(config);
- //获取可重入锁
- RLock lock = redisson.getLock("myLock");
- lock.lock();
- lock.unlock();
-
- RMap<String, Object> map = redisson.getMap("myMap");
- map.put("foo", "bar");
-
- map = redisson.getMap("myMap");
- System.out.println(map.get("foo"));
- }
- }
复制代码 (3)Redisson可重入锁RedissonLock简单使用
Redisson可重入锁RLock实现了java.util.concurrent.locks.Lock接口,同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。- RLock lock = redisson.getLock("myLock");
- //最常见的使用方法
- lock.lock();
复制代码 如果设置锁的超时时间不合理,导致超时时间已到时锁还没能主动释放,但实际上锁却被Redis节点通过过期时间释放了,这会有问题。
为了避免这种情况,Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前,不断地延长锁的有效期。
WatchDog检查锁的默认超时时间是30秒,可通过Config.lockWatchdogTimeout来指定。
RLock的tryLock方法提供了leaseTime参数来指定加锁的超时时间,超过这个时间后锁便自动被释放。- //如果没有主动释放锁的话,10秒后将会自动释放锁
- lock.lock(10, TimeUnit.SECONDS);
- //加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放
- boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
- if (res) {
- try {
- ...
- } finally {
- lock.unlock();
- }
- }
复制代码 RLock完全符合Java的Lock规范,即只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。如果需要其他进程也能解锁,那么可以使用分布式信号量Semaphore。
2.可重入锁源码之创建RedissonClient实例
(1)初始化与Redis的连接管理器ConnectionManager
(2)初始化Redis的命令执行器CommandExecutor
使用Redisson.create()方法可以根据配置创建一个RedissonClient实例,因为Redisson类会实现RedissonClient接口,而创建RedissonClient实例的主要工作其实就是:
一.初始化与Redis的连接管理器ConnectionManager
二.初始化Redis的命令执行器CommandExecutor
(1)初始化与Redis的连接管理器ConnectionManager
Redis的配置类Config会被封装在连接管理器ConnectionManager中,后续可以通过连接管理器ConnectionManager获取Redis的配置类Config。- public class Application {
- public static void main(String[] args) throws Exception {
- Config config = new Config();
- config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
- //创建RedissonClient实例
- RedissonClient redisson = Redisson.create(config);
- ...
- }
- }
- //创建RedissonClient实例的源码
- public class Redisson implements RedissonClient {
- protected final Config config;//Redis配置类
- protected final ConnectionManager connectionManager;//Redis的连接管理器
- protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
-
- ...
- public static RedissonClient create(Config config) {
- return new Redisson(config);
- }
-
- protected Redisson(Config config) {
- this.config = config;
- Config configCopy = new Config(config);
- //根据Redis配置类Config实例创建和Redis的连接管理器
- connectionManager = ConfigSupport.createConnectionManager(configCopy);
- RedissonObjectBuilder objectBuilder = null;
- if (config.isReferenceEnabled()) {
- objectBuilder = new RedissonObjectBuilder(this);
- }
- //创建Redis的命令执行器
- commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
- evictionScheduler = new EvictionScheduler(commandExecutor);
- writeBehindService = new WriteBehindService(commandExecutor);
- }
- ...
- }
- public class ConfigSupport {
- ...
- //创建Redis的连接管理器
- public static ConnectionManager createConnectionManager(Config configCopy) {
- //生成UUID
- UUID id = UUID.randomUUID();
- ...
- if (configCopy.getClusterServersConfig() != null) {
- validate(configCopy.getClusterServersConfig());
- //返回ClusterConnectionManager实例
- return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
- }
- ...
- }
- ...
- }
- public class ClusterConnectionManager extends MasterSlaveConnectionManager {
- public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
- super(config, id);
- ...
- this.natMapper = cfg.getNatMapper();
- //将Redis的配置类Config封装在ConnectionManager中
- this.config = create(cfg);
- initTimer(this.config);
-
- Throwable lastException = null;
- List<String> failedMasters = new ArrayList<String>();
- for (String address : cfg.getNodeAddresses()) {
- RedisURI addr = new RedisURI(address);
- //异步连接Redis节点
- CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
- ...
- //通过connectionFuture阻塞获取建立好的连接
- RedisConnection connection = connectionFuture.toCompletableFuture().join();
- ...
- List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);
- ...
- CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
- Collection<ClusterPartition> partitions = partitionsFuture.join();
- List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
- for (ClusterPartition partition : partitions) {
- if (partition.isMasterFail()) {
- failedMasters.add(partition.getMasterAddress().toString());
- continue;
- }
- if (partition.getMasterAddress() == null) {
- throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");
- }
- CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
- masterFutures.add(masterFuture);
- }
- CompletableFuture<Void> masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));
- masterFuture.join();
- ...
- }
- ...
- }
- ...
- }
- public class MasterSlaveConnectionManager implements ConnectionManager {
- protected final String id;//初始化时为UUID
- private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
- ...
- protected MasterSlaveConnectionManager(Config cfg, UUID id) {
- this.id = id.toString();//传入的是UUID
- this.cfg = cfg;
- ...
- }
-
- protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
- RedisConnection conn = nodeConnections.get(addr);
- if (conn != null) {
- if (!conn.isActive()) {
- closeNodeConnection(conn);
- } else {
- return CompletableFuture.completedFuture(conn);
- }
- }
- //创建Redis客户端连接实例
- RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
- //向Redis服务端发起异步连接请求,这个future会层层往外返回
- CompletionStage<RedisConnection> future = client.connectAsync();
- return future.thenCompose(connection -> {
- if (connection.isActive()) {
- if (!addr.isIP()) {
- RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort());
- nodeConnections.put(address, connection);
- }
- nodeConnections.put(addr, connection);
- return CompletableFuture.completedFuture(connection);
- } else {
- connection.closeAsync();
- CompletableFuture<RedisConnection> f = new CompletableFuture<>();
- f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
- return f;
- }
- });
- }
-
- //创建Redis客户端连接实例
- @Override
- public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
- RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
- return RedisClient.create(redisConfig);
- }
- ...
- }
- //Redisson主要使用Netty去和Redis服务端建立连接
- public final class RedisClient {
- private final Bootstrap bootstrap;
- private final Bootstrap pubSubBootstrap;
- ...
- public static RedisClient create(RedisClientConfig config) {
- return new RedisClient(config);
- }
-
- private RedisClient(RedisClientConfig config) {
- ...
- bootstrap = createBootstrap(copy, Type.PLAIN);
- pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
- this.commandTimeout = copy.getCommandTimeout();
- }
-
- private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
- Bootstrap bootstrap = new Bootstrap()
- .resolver(config.getResolverGroup())
- .channel(config.getSocketChannelClass())
- .group(config.getGroup());
- bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
- bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
- bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
- config.getNettyHook().afterBoostrapInitialization(bootstrap);
- return bootstrap;
- }
-
- //向Redis服务端发起异步连接请求
- public RFuture<RedisConnection> connectAsync() {
- CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
- CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
- CompletableFuture<RedisConnection> r = new CompletableFuture<>();
- //Netty的Bootstrap发起连接
- ChannelFuture channelFuture = bootstrap.connect(res);
- channelFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture future) throws Exception {
- if (bootstrap.config().group().isShuttingDown()) {
- IllegalStateException cause = new IllegalStateException("RedisClient is shutdown");
- r.completeExceptionally(cause);
- return;
- }
- if (future.isSuccess()) {
- RedisConnection c = RedisConnection.getFrom(future.channel());
- c.getConnectionPromise().whenComplete((res, e) -> {
- bootstrap.config().group().execute(new Runnable() {
- @Override
- public void run() {
- if (e == null) {
- if (!r.complete(c)) {
- c.closeAsync();
- }
- } else {
- r.completeExceptionally(e);
- c.closeAsync();
- }
- }
- });
- });
- } else {
- bootstrap.config().group().execute(new Runnable() {
- public void run() {
- r.completeExceptionally(future.cause());
- }
- });
- }
- }
- });
- return r;
- });
- return new CompletableFutureWrapper<>(f);
- }
- ...
- }
复制代码 (2)初始化Redis的命令执行器CommandExecutor
首先,CommandSyncService继承自CommandAsyncService类。
而CommandAsyncService类实现了CommandExecutor接口。
然后,ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。
所以,通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager。- //Redis命令的同步执行器CommandSyncService
- public class CommandSyncService extends CommandAsyncService implements CommandExecutor {
- //初始化CommandExecutor
- public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
- super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);
- }
-
- public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
- return read(key, connectionManager.getCodec(), command, params);
- }
-
- public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
- RFuture<R> res = readAsync(key, codec, command, params);
- return get(res);
- }
-
- public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
- return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
- }
-
- public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
- RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
- return get(res);
- }
-
- public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
- return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
- }
-
- public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
- RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
- return get(res);
- }
- }
- //Redis命令的异步执行器CommandAsyncService
- public class CommandAsyncService implements CommandAsyncExecutor {
- //Redis连接管理器
- final ConnectionManager connectionManager;
- final RedissonObjectBuilder objectBuilder;
- final RedissonObjectBuilder.ReferenceType referenceType;
- public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
- this.connectionManager = connectionManager;
- this.objectBuilder = objectBuilder;
- this.referenceType = referenceType;
- }
-
- @Override
- public <V> V getNow(CompletableFuture<V> future) {
- try {
- return future.getNow(null);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Override
- public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
- RFuture<R> res = readAsync(key, codec, command, params);
- return get(res);
- }
-
- @Override
- public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
- NodeSource source = getNodeSource(key);
- return async(true, source, codec, command, params, false, false);
- }
-
- private NodeSource getNodeSource(String key) {
- int slot = connectionManager.calcSlot(key);
- return new NodeSource(slot);
- }
-
- public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
- CompletableFuture<R> mainPromise = createPromise();
- RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
- executor.execute();
- return new CompletableFutureWrapper<>(mainPromise);
- }
-
- @Override
- public <V> V get(RFuture<V> future) {
- if (Thread.currentThread().getName().startsWith("redisson-netty")) {
- throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
- }
- try {
- return future.toCompletableFuture().get();
- } catch (InterruptedException e) {
- future.cancel(true);
- Thread.currentThread().interrupt();
- throw new RedisException(e);
- } catch (ExecutionException e) {
- throw convertException(e);
- }
- }
- ...
- }
复制代码
3.可重入锁源码之lua脚本加锁逻辑
(1)通过Redisson.getLock()方法获取一个RedissonLock实例
(2)加锁时的执行流程
(3)加锁时执行的lua脚本
(4)执行加锁lua脚本的命令执行器逻辑
(5)如何根据slot值获取对应的节点
(1)通过Redisson.getLock()方法获取一个RedissonLock实例
在Redisson.getLock()方法中,会传入命令执行器CommandExecutor来创建一个RedissonLock实例,而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的,所以命令执行器CommandExecutor会被封装在RedissonLock实例中。
因此,通过RedissonLock实例可以获取一个命令执行器CommandExecutor,通过命令执行器CommandExecutor可获取连接管理器ConnectionManager,通过连接管理器ConnectionManager可获取Redis的配置信息类Config,通过Redis的配置信息类Config可以获取各种配置信息。
RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面,有个internalLockLeaseTime变量,这个internalLockLeaseTime变量与WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒,即30秒;- public class Application {
- public static void main(String[] args) throws Exception {
- Config config = new Config();
- config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
- //创建RedissonClient实例
- RedissonClient redisson = Redisson.create(config);
- //获取可重入锁
- RLock lock = redisson.getLock("myLock");
- lock.lock();
- ...
- }
- }
- //创建Redisson实例
- public class Redisson implements RedissonClient {
- protected final Config config;//Redis配置类
- protected final ConnectionManager connectionManager;//Redis的连接管理器
- protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
- ...
-
- public static RedissonClient create(Config config) {
- return new Redisson(config);
- }
-
- protected Redisson(Config config) {
- ...
- //根据Redis配置类Config实例创建和Redis的连接管理器
- connectionManager = ConfigSupport.createConnectionManager(configCopy);
- //创建Redis的命令执行器
- commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
- ...
- }
- ...
- @Override
- public RLock getLock(String name) {
- return new RedissonLock(commandExecutor, name);
- }
- ...
- }
- //创建RedissonLock实例
- //通过RedissonLock实例可以获取一个命令执行器CommandExecutor;
- public class RedissonLock extends RedissonBaseLock {
- protected long internalLockLeaseTime;
- protected final LockPubSub pubSub;
- final CommandAsyncExecutor commandExecutor;
-
- public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- //与WatchDog有关的internalLockLeaseTime
- //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
- //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
- //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
- }
- ...
- }
- //创建Redis的命令执行器
- //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
- public class CommandAsyncService implements CommandAsyncExecutor {
- final ConnectionManager connectionManager;
- ...
- public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
- this.connectionManager = connectionManager;
- this.objectBuilder = objectBuilder;
- this.referenceType = referenceType;
- }
-
- @Override
- public ConnectionManager getConnectionManager() {
- return connectionManager;
- }
- ...
- }
- //创建Redis的连接管理器
- //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
- public class ClusterConnectionManager extends MasterSlaveConnectionManager {
- ...
- public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
- super(config, id);
- ...
- }
- ...
- }
- //创建Redis的连接管理器
- //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
- public class MasterSlaveConnectionManager implements ConnectionManager {
- private final Config cfg;
- protected final String id;//初始化时为UUID
- ...
- protected MasterSlaveConnectionManager(Config cfg, UUID id) {
- this.id = id.toString();//传入的是UUID
- this.cfg = cfg;
- ...
- }
-
- @Override
- public Config getCfg() {
- return cfg;
- }
- ...
- }
- //配置信息类Config中的lockWatchdogTimeout变量初始化为30秒,该变量与WatchDog有关
- public class Config {
- private long lockWatchdogTimeout = 30 * 1000;
- ...
- //This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
- //Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval.
- //This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way.
- //Default is 30000 milliseconds
- public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
- this.lockWatchdogTimeout = lockWatchdogTimeout;
- return this;
- }
-
- public long getLockWatchdogTimeout() {
- return lockWatchdogTimeout;
- }
- }
复制代码 默认情况下,调用RedissonLock.lock()方法加锁时,传入的leaseTime为-1。此时锁的超时时间会设为lockWatchdogTimeout默认的30秒,从而避免出现死锁的情况。- public class RedissonLock extends RedissonBaseLock {
- ...
- //加锁
- @Override
- public void lock() {
- try {
- lock(-1, null, false);
- } catch (InterruptedException e) {
- throw new IllegalStateException();
- }
- }
-
- private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
- long threadId = Thread.currentThread().getId();
- Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
- ...
- }
-
- //解锁
- @Override
- public void unlock() {
- try {
- get(unlockAsync(Thread.currentThread().getId()));
- } catch (RedisException e) {
- if (e.getCause() instanceof IllegalMonitorStateException) {
- throw (IllegalMonitorStateException) e.getCause();
- } else {
- throw e;
- }
- }
- }
- ...
- }
复制代码 (2)加锁时的执行流程
首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关,然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理,接着调用RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。- public class RedissonLock extends RedissonBaseLock {
- protected long internalLockLeaseTime;
- protected final LockPubSub pubSub;
- final CommandAsyncExecutor commandExecutor;
-
- public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- //与WatchDog有关的internalLockLeaseTime
- //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
- //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
- //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
- }
- ...
- //加锁
- @Override
- public void lock() {
- try {
- lock(-1, null, false);
- } catch (InterruptedException e) {
- throw new IllegalStateException();
- }
- }
-
- private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
- //线程ID,用来生成设置Hash的值
- long threadId = Thread.currentThread().getId();
- //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1
- Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
- //ttl为null说明加锁成功
- if (ttl == null) {
- return;
- }
- //加锁失败时的处理
- CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
- if (interruptibly) {
- commandExecutor.syncSubscriptionInterrupted(future);
- } else {
- commandExecutor.syncSubscription(future);
- }
- try {
- while (true) {
- ttl = tryAcquire(-1, leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- break;
- }
- // waiting for message
- if (ttl >= 0) {
- try {
- commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- if (interruptibly) {
- throw e;
- }
- commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- }
- } else {
- if (interruptibly) {
- commandExecutor.getNow(future).getLatch().acquire();
- } else {
- commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
- }
- }
- }
- } finally {
- unsubscribe(commandExecutor.getNow(future), threadId);
- }
- }
- ...
- private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
- //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法
- //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步
- return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
- }
-
- private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
- RFuture<Long> ttlRemainingFuture;
- if (leaseTime != -1) {
- ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- } else {
- //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime
- //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒
- ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
- }
- CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
- //加锁返回的ttlRemaining为null表示加锁成功
- if (ttlRemaining == null) {
- if (leaseTime != -1) {
- internalLockLeaseTime = unit.toMillis(leaseTime);
- } else {
- scheduleExpirationRenewal(threadId);
- }
- }
- return ttlRemaining;
- });
- return new CompletableFutureWrapper<>(f);
- }
-
- //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
- <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getRawName()),//锁的名字:KEYS[1]
- unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
- getLockName(threadId)//ARGV[2],值为UUID + 线程ID
- );
- }
- ...
- }
- public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
- final String id;
- final String entryName;
- final CommandAsyncExecutor commandExecutor;
-
- public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- this.id = commandExecutor.getConnectionManager().getId();
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.entryName = id + ":" + name;
- }
-
- protected String getLockName(long threadId) {
- return id + ":" + threadId;
- }
- ...
- }
- abstract class RedissonExpirable extends RedissonObject implements RExpirable {
- ...
- RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
- super(connectionManager, name);
- }
- ...
- }
- public abstract class RedissonObject implements RObject {
- protected final CommandAsyncExecutor commandExecutor;
- protected String name;
- protected final Codec codec;
-
- public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
- this.codec = codec;
- this.commandExecutor = commandExecutor;
- if (name == null) {
- throw new NullPointerException("name can't be null");
- }
- setName(name);
- }
- ...
- protected final <V> V get(RFuture<V> future) {
- //下面会调用CommandAsyncService.get()方法
- return commandExecutor.get(future);
- }
- ...
- }
- public class CommandAsyncService implements CommandAsyncExecutor {
- ...
- @Override
- public <V> V get(RFuture<V> future) {
- if (Thread.currentThread().getName().startsWith("redisson-netty")) {
- throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
- }
- try {
- return future.toCompletableFuture().get();
- } catch (InterruptedException e) {
- future.cancel(true);
- Thread.currentThread().interrupt();
- throw new RedisException(e);
- } catch (ExecutionException e) {
- throw convertException(e);
- }
- }
- ...
- }
复制代码 (3)加锁时执行的lua脚本- public class RedissonLock extends RedissonBaseLock {
- //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
- <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getRawName()),//锁的名字:KEYS[1],比如"myLock"
- unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
- getLockName(threadId)//ARGV[2],值为UUID + 线程ID
- );
- }
- ...
- }
复制代码 首先执行Redis的exists命令,判断key为锁名的Hash值是否不存在,也就是判断key为锁名myLock的Hash值是否存在。
一.如果key为锁名的Hash值不存在,那么就进行如下加锁处理
首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名,value是一个映射。也就是在value值中会有一个field为UUID + 线程ID,value为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。
然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。
二.如果key为锁名的Hash值存在,那么就执行如下判断处理
首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行:pexpire myLock 30000,再次将myLock的有效期设置为30秒。
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的,ARGV[2] = UUID + 线程ID。
(4)执行加锁lua脚本的命令执行器逻辑
在RedissonLock的tryLockInnerAsync()方法中,会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本,即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。
在CommandAsyncService的evalWriteAsync()方法中,首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法来执行lua脚本。
在CommandAsyncService的getNodeSource()方法中,会根据key进行CRC16运算,然后再对16384取模,计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。
在CommandAsyncService的evalAsync()方法中,会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor,实现将脚本请求发送给对应的Redis节点处理。
[code]public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { //从外部传入的:在创建实现了RedissonClient的Redisson实例时,初始化的命令执行器CommandExecutor final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } ... protected RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { //获取可用的节点,并继续封装一个命令执行器CommandBatchService MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName()); int availableSlaves = entry.getAvailableSlaves(); CommandBatchService executorService = createCommandBatchService(availableSlaves); //通过CommandAsyncService.evalWriteAsync方法执行lua脚本 RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params); if (commandExecutor instanceof CommandBatchService) { return result; } //异步执行然后获取结果 RFuture |