找回密码
 立即注册
首页 业界区 业界 分布式锁—2.Redisson的可重入锁

分布式锁—2.Redisson的可重入锁

歇凛尾 2025-6-4 22:23:19
大纲
1.Redisson可重入锁RedissonLock概述
2.可重入锁源码之创建RedissonClient实例
3.可重入锁源码之lua脚本加锁逻辑
4.可重入锁源码之WatchDog维持加锁逻辑
5.可重入锁源码之可重入加锁逻辑
6.可重入锁源码之锁的互斥阻塞逻辑
7.可重入锁源码之释放锁逻辑
8.可重入锁源码之获取锁超时与锁超时自动释放逻辑
9.可重入锁源码总结
 
1.Redisson可重入锁RedissonLock概述
(1)在pom.xml里引入依赖
(2)构建RedissonClient并使用Redisson
(3)Redisson可重入锁RedissonLock简单使用
 
(1)在pom.xml里引入依赖
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.redisson</groupId>
  4.         redisson</artifactId>
  5.         <version>3.16.8</version>
  6.      </dependency>
  7. </dependencies>
复制代码
(2)构建RedissonClient并使用Redisson
参考官网中文文档,连接上3主3从的Redis Cluster。
  1. //https://github.com/redisson/redisson/wiki/目录
  2. public class Application {
  3.     public static void main(String[] args) throws Exception {
  4.         //连接3主3从的Redis CLuster
  5.         Config config = new Config();
  6.         config.useClusterServers()
  7.             .addNodeAddress("redis://192.168.1.110:7001")
  8.             .addNodeAddress("redis://192.168.1.110:7002")
  9.             .addNodeAddress("redis://192.168.1.110:7003")
  10.             .addNodeAddress("redis://192.168.1.111:7001")
  11.             .addNodeAddress("redis://192.168.1.111:7002")
  12.             .addNodeAddress("redis://192.168.1.111:7003");
  13.         //创建RedissonClient实例
  14.         RedissonClient redisson = Redisson.create(config);
  15.         //获取可重入锁
  16.         RLock lock = redisson.getLock("myLock");
  17.         lock.lock();
  18.         lock.unlock();
  19.       
  20.         RMap<String, Object> map = redisson.getMap("myMap");
  21.         map.put("foo", "bar");  
  22.       
  23.         map = redisson.getMap("myMap");
  24.         System.out.println(map.get("foo"));   
  25.     }
  26. }
复制代码
(3)Redisson可重入锁RedissonLock简单使用
Redisson可重入锁RLock实现了java.util.concurrent.locks.Lock接口,同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。
  1. RLock lock = redisson.getLock("myLock");
  2. //最常见的使用方法
  3. lock.lock();
复制代码
如果设置锁的超时时间不合理,导致超时时间已到时锁还没能主动释放,但实际上锁却被Redis节点通过过期时间释放了,这会有问题。
 
为了避免这种情况,Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前,不断地延长锁的有效期。
 
WatchDog检查锁的默认超时时间是30秒,可通过Config.lockWatchdogTimeout来指定。
 
RLock的tryLock方法提供了leaseTime参数来指定加锁的超时时间,超过这个时间后锁便自动被释放。
  1. //如果没有主动释放锁的话,10秒后将会自动释放锁
  2. lock.lock(10, TimeUnit.SECONDS);
  3. //加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放
  4. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
  5. if (res) {
  6.     try {
  7.         ...
  8.     } finally {
  9.         lock.unlock();
  10.     }
  11. }
复制代码
RLock完全符合Java的Lock规范,即只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。如果需要其他进程也能解锁,那么可以使用分布式信号量Semaphore。
 
2.可重入锁源码之创建RedissonClient实例
(1)初始化与Redis的连接管理器ConnectionManager
(2)初始化Redis的命令执行器CommandExecutor
 
使用Redisson.create()方法可以根据配置创建一个RedissonClient实例,因为Redisson类会实现RedissonClient接口,而创建RedissonClient实例的主要工作其实就是:
一.初始化与Redis的连接管理器ConnectionManager
二.初始化Redis的命令执行器CommandExecutor
 
(1)初始化与Redis的连接管理器ConnectionManager
Redis的配置类Config会被封装在连接管理器ConnectionManager中,后续可以通过连接管理器ConnectionManager获取Redis的配置类Config。
  1. public class Application {
  2.     public static void main(String[] args) throws Exception {
  3.         Config config = new Config();
  4.         config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
  5.         //创建RedissonClient实例
  6.         RedissonClient redisson = Redisson.create(config);
  7.         ...
  8.     }
  9. }
  10. //创建RedissonClient实例的源码
  11. public class Redisson implements RedissonClient {
  12.     protected final Config config;//Redis配置类
  13.     protected final ConnectionManager connectionManager;//Redis的连接管理器
  14.     protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
  15.   
  16.     ...
  17.     public static RedissonClient create(Config config) {
  18.         return new Redisson(config);
  19.     }
  20.    
  21.     protected Redisson(Config config) {
  22.         this.config = config;
  23.         Config configCopy = new Config(config);
  24.         //根据Redis配置类Config实例创建和Redis的连接管理器
  25.         connectionManager = ConfigSupport.createConnectionManager(configCopy);
  26.         RedissonObjectBuilder objectBuilder = null;
  27.         if (config.isReferenceEnabled()) {
  28.             objectBuilder = new RedissonObjectBuilder(this);
  29.         }
  30.         //创建Redis的命令执行器
  31.         commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
  32.         evictionScheduler = new EvictionScheduler(commandExecutor);
  33.         writeBehindService = new WriteBehindService(commandExecutor);
  34.     }
  35.     ...
  36. }
  37. public class ConfigSupport {
  38.     ...
  39.     //创建Redis的连接管理器
  40.     public static ConnectionManager createConnectionManager(Config configCopy) {
  41.         //生成UUID
  42.         UUID id = UUID.randomUUID();
  43.         ...
  44.         if (configCopy.getClusterServersConfig() != null) {
  45.             validate(configCopy.getClusterServersConfig());
  46.             //返回ClusterConnectionManager实例
  47.             return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
  48.         }
  49.         ...
  50.     }
  51.     ...
  52. }
  53. public class ClusterConnectionManager extends MasterSlaveConnectionManager {
  54.     public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
  55.         super(config, id);
  56.         ...
  57.         this.natMapper = cfg.getNatMapper();
  58.         //将Redis的配置类Config封装在ConnectionManager中
  59.         this.config = create(cfg);
  60.         initTimer(this.config);
  61.    
  62.         Throwable lastException = null;
  63.         List<String> failedMasters = new ArrayList<String>();
  64.         for (String address : cfg.getNodeAddresses()) {
  65.             RedisURI addr = new RedisURI(address);
  66.             //异步连接Redis节点
  67.             CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
  68.             ...
  69.             //通过connectionFuture阻塞获取建立好的连接
  70.             RedisConnection connection = connectionFuture.toCompletableFuture().join();
  71.             ...
  72.             List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);
  73.             ...
  74.             CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
  75.             Collection<ClusterPartition> partitions = partitionsFuture.join();
  76.             List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
  77.             for (ClusterPartition partition : partitions) {
  78.                 if (partition.isMasterFail()) {
  79.                     failedMasters.add(partition.getMasterAddress().toString());
  80.                     continue;
  81.                 }
  82.                 if (partition.getMasterAddress() == null) {
  83.                     throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");
  84.                 }
  85.                 CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
  86.                 masterFutures.add(masterFuture);
  87.             }
  88.             CompletableFuture<Void> masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));
  89.             masterFuture.join();
  90.             ...
  91.         }
  92.         ...
  93.     }
  94.     ...
  95. }
  96. public class MasterSlaveConnectionManager implements ConnectionManager {
  97.     protected final String id;//初始化时为UUID
  98.     private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
  99.     ...
  100.     protected MasterSlaveConnectionManager(Config cfg, UUID id) {
  101.         this.id = id.toString();//传入的是UUID
  102.         this.cfg = cfg;
  103.         ...
  104.     }
  105.    
  106.     protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
  107.         RedisConnection conn = nodeConnections.get(addr);
  108.         if (conn != null) {
  109.             if (!conn.isActive()) {
  110.                 closeNodeConnection(conn);
  111.             } else {
  112.                 return CompletableFuture.completedFuture(conn);
  113.             }
  114.         }
  115.         //创建Redis客户端连接实例
  116.         RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
  117.         //向Redis服务端发起异步连接请求,这个future会层层往外返回
  118.         CompletionStage<RedisConnection> future = client.connectAsync();
  119.         return future.thenCompose(connection -> {
  120.             if (connection.isActive()) {
  121.                 if (!addr.isIP()) {
  122.                     RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort());
  123.                     nodeConnections.put(address, connection);
  124.                 }
  125.                 nodeConnections.put(addr, connection);
  126.                 return CompletableFuture.completedFuture(connection);
  127.             } else {
  128.                 connection.closeAsync();
  129.                 CompletableFuture<RedisConnection> f = new CompletableFuture<>();
  130.                 f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
  131.                 return f;
  132.             }
  133.         });
  134.     }
  135.    
  136.     //创建Redis客户端连接实例
  137.     @Override
  138.     public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
  139.         RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
  140.         return RedisClient.create(redisConfig);
  141.     }
  142.     ...
  143. }
  144. //Redisson主要使用Netty去和Redis服务端建立连接
  145. public final class RedisClient {
  146.     private final Bootstrap bootstrap;
  147.     private final Bootstrap pubSubBootstrap;
  148.     ...
  149.     public static RedisClient create(RedisClientConfig config) {
  150.         return new RedisClient(config);
  151.     }
  152.    
  153.     private RedisClient(RedisClientConfig config) {
  154.         ...
  155.         bootstrap = createBootstrap(copy, Type.PLAIN);
  156.         pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
  157.         this.commandTimeout = copy.getCommandTimeout();
  158.     }
  159.    
  160.     private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
  161.         Bootstrap bootstrap = new Bootstrap()
  162.             .resolver(config.getResolverGroup())
  163.             .channel(config.getSocketChannelClass())
  164.             .group(config.getGroup());
  165.         bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
  166.         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
  167.         bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
  168.         bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
  169.         config.getNettyHook().afterBoostrapInitialization(bootstrap);
  170.         return bootstrap;
  171.     }
  172.    
  173.     //向Redis服务端发起异步连接请求
  174.     public RFuture<RedisConnection> connectAsync() {
  175.         CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
  176.         CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
  177.             CompletableFuture<RedisConnection> r = new CompletableFuture<>();
  178.             //Netty的Bootstrap发起连接
  179.             ChannelFuture channelFuture = bootstrap.connect(res);
  180.             channelFuture.addListener(new ChannelFutureListener() {
  181.                 @Override
  182.                 public void operationComplete(final ChannelFuture future) throws Exception {
  183.                     if (bootstrap.config().group().isShuttingDown()) {
  184.                         IllegalStateException cause = new IllegalStateException("RedisClient is shutdown");
  185.                         r.completeExceptionally(cause);
  186.                         return;
  187.                     }
  188.                     if (future.isSuccess()) {
  189.                         RedisConnection c = RedisConnection.getFrom(future.channel());
  190.                         c.getConnectionPromise().whenComplete((res, e) -> {
  191.                             bootstrap.config().group().execute(new Runnable() {
  192.                                 @Override
  193.                                 public void run() {
  194.                                     if (e == null) {
  195.                                         if (!r.complete(c)) {
  196.                                             c.closeAsync();
  197.                                         }
  198.                                     } else {
  199.                                         r.completeExceptionally(e);
  200.                                         c.closeAsync();
  201.                                     }
  202.                                 }
  203.                             });
  204.                         });
  205.                     } else {
  206.                         bootstrap.config().group().execute(new Runnable() {
  207.                             public void run() {
  208.                                 r.completeExceptionally(future.cause());
  209.                             }
  210.                         });
  211.                     }
  212.                 }
  213.             });
  214.             return r;
  215.         });
  216.         return new CompletableFutureWrapper<>(f);
  217.     }
  218.     ...
  219. }
复制代码
(2)初始化Redis的命令执行器CommandExecutor
首先,CommandSyncService继承自CommandAsyncService类。
 
而CommandAsyncService类实现了CommandExecutor接口。
 
然后,ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。
 
所以,通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager。
  1. //Redis命令的同步执行器CommandSyncService
  2. public class CommandSyncService extends CommandAsyncService implements CommandExecutor {
  3.     //初始化CommandExecutor
  4.     public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
  5.         super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);
  6.     }
  7.    
  8.     public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
  9.         return read(key, connectionManager.getCodec(), command, params);
  10.     }
  11.    
  12.     public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
  13.         RFuture<R> res = readAsync(key, codec, command, params);
  14.         return get(res);
  15.     }
  16.    
  17.     public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
  18.         return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
  19.     }
  20.    
  21.     public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
  22.         RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
  23.         return get(res);
  24.     }
  25.    
  26.     public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
  27.         return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
  28.     }
  29.    
  30.     public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
  31.         RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
  32.         return get(res);
  33.     }
  34. }
  35. //Redis命令的异步执行器CommandAsyncService
  36. public class CommandAsyncService implements CommandAsyncExecutor {
  37.     //Redis连接管理器
  38.     final ConnectionManager connectionManager;
  39.     final RedissonObjectBuilder objectBuilder;
  40.     final RedissonObjectBuilder.ReferenceType referenceType;
  41.     public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
  42.         this.connectionManager = connectionManager;
  43.         this.objectBuilder = objectBuilder;
  44.         this.referenceType = referenceType;
  45.     }
  46.    
  47.     @Override
  48.     public <V> V getNow(CompletableFuture<V> future) {
  49.         try {
  50.             return future.getNow(null);
  51.         } catch (Exception e) {
  52.             return null;
  53.         }
  54.     }
  55.    
  56.     @Override
  57.     public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
  58.         RFuture<R> res = readAsync(key, codec, command, params);
  59.         return get(res);
  60.     }
  61.    
  62.     @Override
  63.     public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
  64.         NodeSource source = getNodeSource(key);
  65.         return async(true, source, codec, command, params, false, false);
  66.     }
  67.    
  68.     private NodeSource getNodeSource(String key) {
  69.         int slot = connectionManager.calcSlot(key);
  70.         return new NodeSource(slot);
  71.     }
  72.    
  73.     public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
  74.         CompletableFuture<R> mainPromise = createPromise();
  75.         RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
  76.         executor.execute();
  77.         return new CompletableFutureWrapper<>(mainPromise);
  78.     }
  79.    
  80.     @Override
  81.     public <V> V get(RFuture<V> future) {
  82.         if (Thread.currentThread().getName().startsWith("redisson-netty")) {
  83.             throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
  84.         }
  85.         try {
  86.             return future.toCompletableFuture().get();
  87.         } catch (InterruptedException e) {
  88.             future.cancel(true);
  89.             Thread.currentThread().interrupt();
  90.             throw new RedisException(e);
  91.         } catch (ExecutionException e) {
  92.             throw convertException(e);
  93.         }
  94.     }
  95.     ...
  96. }
复制代码
 
3.可重入锁源码之lua脚本加锁逻辑
(1)通过Redisson.getLock()方法获取一个RedissonLock实例
(2)加锁时的执行流程
(3)加锁时执行的lua脚本
(4)执行加锁lua脚本的命令执行器逻辑
(5)如何根据slot值获取对应的节点
 
(1)通过Redisson.getLock()方法获取一个RedissonLock实例
在Redisson.getLock()方法中,会传入命令执行器CommandExecutor来创建一个RedissonLock实例,而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的,所以命令执行器CommandExecutor会被封装在RedissonLock实例中。
 
因此,通过RedissonLock实例可以获取一个命令执行器CommandExecutor,通过命令执行器CommandExecutor可获取连接管理器ConnectionManager,通过连接管理器ConnectionManager可获取Redis的配置信息类Config,通过Redis的配置信息类Config可以获取各种配置信息。
 
RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面,有个internalLockLeaseTime变量,这个internalLockLeaseTime变量与WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒,即30秒;
  1. public class Application {
  2.     public static void main(String[] args) throws Exception {
  3.         Config config = new Config();
  4.         config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
  5.         //创建RedissonClient实例
  6.         RedissonClient redisson = Redisson.create(config);
  7.         //获取可重入锁
  8.         RLock lock = redisson.getLock("myLock");
  9.         lock.lock();
  10.         ...
  11.     }
  12. }
  13. //创建Redisson实例
  14. public class Redisson implements RedissonClient {
  15.     protected final Config config;//Redis配置类
  16.     protected final ConnectionManager connectionManager;//Redis的连接管理器
  17.     protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
  18.     ...
  19.    
  20.     public static RedissonClient create(Config config) {
  21.         return new Redisson(config);
  22.     }
  23.    
  24.     protected Redisson(Config config) {
  25.         ...
  26.         //根据Redis配置类Config实例创建和Redis的连接管理器
  27.         connectionManager = ConfigSupport.createConnectionManager(configCopy);
  28.         //创建Redis的命令执行器
  29.         commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
  30.         ...
  31.     }
  32.     ...
  33.     @Override
  34.     public RLock getLock(String name) {
  35.         return new RedissonLock(commandExecutor, name);
  36.     }
  37.     ...
  38. }
  39. //创建RedissonLock实例
  40. //通过RedissonLock实例可以获取一个命令执行器CommandExecutor;
  41. public class RedissonLock extends RedissonBaseLock {
  42.     protected long internalLockLeaseTime;
  43.     protected final LockPubSub pubSub;
  44.     final CommandAsyncExecutor commandExecutor;
  45.    
  46.     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
  47.         super(commandExecutor, name);
  48.         this.commandExecutor = commandExecutor;
  49.         //与WatchDog有关的internalLockLeaseTime
  50.         //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
  51.         //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
  52.         //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
  53.         this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  54.         this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
  55.     }
  56.     ...
  57. }
  58. //创建Redis的命令执行器
  59. //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
  60. public class CommandAsyncService implements CommandAsyncExecutor {
  61.     final ConnectionManager connectionManager;
  62.     ...
  63.     public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
  64.         this.connectionManager = connectionManager;
  65.         this.objectBuilder = objectBuilder;
  66.         this.referenceType = referenceType;
  67.     }
  68.    
  69.     @Override
  70.     public ConnectionManager getConnectionManager() {
  71.         return connectionManager;
  72.     }
  73.     ...
  74. }
  75. //创建Redis的连接管理器
  76. //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
  77. public class ClusterConnectionManager extends MasterSlaveConnectionManager {
  78.     ...
  79.     public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
  80.         super(config, id);
  81.         ...
  82.     }
  83.     ...
  84. }
  85. //创建Redis的连接管理器
  86. //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
  87. public class MasterSlaveConnectionManager implements ConnectionManager {
  88.     private final Config cfg;
  89.     protected final String id;//初始化时为UUID
  90.     ...
  91.     protected MasterSlaveConnectionManager(Config cfg, UUID id) {
  92.         this.id = id.toString();//传入的是UUID
  93.         this.cfg = cfg;
  94.         ...
  95.     }
  96.    
  97.     @Override
  98.     public Config getCfg() {
  99.         return cfg;
  100.     }
  101.     ...
  102. }
  103. //配置信息类Config中的lockWatchdogTimeout变量初始化为30秒,该变量与WatchDog有关
  104. public class Config {
  105.     private long lockWatchdogTimeout = 30 * 1000;
  106.     ...
  107.     //This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
  108.     //Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval.
  109.     //This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way.
  110.     //Default is 30000 milliseconds
  111.     public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
  112.         this.lockWatchdogTimeout = lockWatchdogTimeout;
  113.         return this;
  114.     }
  115.    
  116.     public long getLockWatchdogTimeout() {
  117.         return lockWatchdogTimeout;
  118.     }
  119. }
复制代码
默认情况下,调用RedissonLock.lock()方法加锁时,传入的leaseTime为-1。此时锁的超时时间会设为lockWatchdogTimeout默认的30秒,从而避免出现死锁的情况。
  1. public class RedissonLock extends RedissonBaseLock {
  2.     ...
  3.     //加锁
  4.     @Override
  5.     public void lock() {
  6.         try {
  7.             lock(-1, null, false);
  8.         } catch (InterruptedException e) {
  9.             throw new IllegalStateException();
  10.         }
  11.     }
  12.    
  13.     private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  14.         long threadId = Thread.currentThread().getId();
  15.         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  16.         ...
  17.     }
  18.    
  19.     //解锁
  20.     @Override
  21.     public void unlock() {
  22.         try {
  23.             get(unlockAsync(Thread.currentThread().getId()));
  24.         } catch (RedisException e) {
  25.             if (e.getCause() instanceof IllegalMonitorStateException) {
  26.                 throw (IllegalMonitorStateException) e.getCause();
  27.             } else {
  28.                 throw e;
  29.             }
  30.         }
  31.     }
  32.     ...
  33. }
复制代码
(2)加锁时的执行流程
首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关,然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理,接着调用RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。
  1. public class RedissonLock extends RedissonBaseLock {
  2.     protected long internalLockLeaseTime;
  3.     protected final LockPubSub pubSub;
  4.     final CommandAsyncExecutor commandExecutor;
  5.    
  6.     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
  7.         super(commandExecutor, name);
  8.         this.commandExecutor = commandExecutor;
  9.         //与WatchDog有关的internalLockLeaseTime
  10.         //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
  11.         //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
  12.         //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
  13.         this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  14.         this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
  15.     }
  16.     ...
  17.     //加锁
  18.     @Override
  19.     public void lock() {
  20.         try {
  21.             lock(-1, null, false);
  22.         } catch (InterruptedException e) {
  23.             throw new IllegalStateException();
  24.         }
  25.     }
  26.    
  27.     private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  28.         //线程ID,用来生成设置Hash的值
  29.         long threadId = Thread.currentThread().getId();
  30.         //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1
  31.         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  32.         //ttl为null说明加锁成功
  33.         if (ttl == null) {
  34.             return;
  35.         }
  36.         //加锁失败时的处理
  37.         CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
  38.         if (interruptibly) {
  39.             commandExecutor.syncSubscriptionInterrupted(future);
  40.         } else {
  41.             commandExecutor.syncSubscription(future);
  42.         }
  43.         try {
  44.             while (true) {
  45.                 ttl = tryAcquire(-1, leaseTime, unit, threadId);
  46.                 // lock acquired
  47.                 if (ttl == null) {
  48.                     break;
  49.                 }
  50.                 // waiting for message
  51.                 if (ttl >= 0) {
  52.                     try {
  53.                         commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  54.                     } catch (InterruptedException e) {
  55.                         if (interruptibly) {
  56.                             throw e;
  57.                         }
  58.                         commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  59.                     }
  60.                 } else {
  61.                     if (interruptibly) {
  62.                         commandExecutor.getNow(future).getLatch().acquire();
  63.                     } else {
  64.                         commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
  65.                     }
  66.                 }
  67.             }
  68.         } finally {
  69.             unsubscribe(commandExecutor.getNow(future), threadId);
  70.         }
  71.     }
  72.     ...
  73.     private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  74.         //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法
  75.         //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步
  76.         return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  77.     }
  78.    
  79.     private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  80.         RFuture<Long> ttlRemainingFuture;
  81.         if (leaseTime != -1) {
  82.             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  83.         } else {
  84.             //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime
  85.             //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒
  86.             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  87.         }
  88.         CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
  89.             //加锁返回的ttlRemaining为null表示加锁成功
  90.             if (ttlRemaining == null) {
  91.                 if (leaseTime != -1) {
  92.                     internalLockLeaseTime = unit.toMillis(leaseTime);
  93.                 } else {
  94.                     scheduleExpirationRenewal(threadId);
  95.                 }
  96.             }
  97.             return ttlRemaining;
  98.         });
  99.         return new CompletableFutureWrapper<>(f);
  100.     }
  101.    
  102.     //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
  103.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  104.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  105.             "if (redis.call('exists', KEYS[1]) == 0) then " +
  106.                 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  107.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  108.                 "return nil; " +
  109.             "end; " +
  110.             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  111.                 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  112.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  113.                 "return nil; " +
  114.             "end; " +
  115.             "return redis.call('pttl', KEYS[1]);",
  116.             Collections.singletonList(getRawName()),//锁的名字:KEYS[1]
  117.             unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
  118.             getLockName(threadId)//ARGV[2],值为UUID + 线程ID
  119.         );
  120.     }
  121.     ...
  122. }
  123. public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
  124.     final String id;
  125.     final String entryName;
  126.     final CommandAsyncExecutor commandExecutor;
  127.    
  128.     public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
  129.         super(commandExecutor, name);
  130.         this.commandExecutor = commandExecutor;
  131.         this.id = commandExecutor.getConnectionManager().getId();
  132.         this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  133.         this.entryName = id + ":" + name;
  134.     }
  135.    
  136.     protected String getLockName(long threadId) {
  137.         return id + ":" + threadId;
  138.     }
  139.     ...
  140. }
  141. abstract class RedissonExpirable extends RedissonObject implements RExpirable {
  142.     ...
  143.     RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
  144.         super(connectionManager, name);
  145.     }
  146.     ...
  147. }
  148. public abstract class RedissonObject implements RObject {
  149.     protected final CommandAsyncExecutor commandExecutor;
  150.     protected String name;
  151.     protected final Codec codec;
  152.    
  153.     public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
  154.         this.codec = codec;
  155.         this.commandExecutor = commandExecutor;
  156.         if (name == null) {
  157.             throw new NullPointerException("name can't be null");
  158.         }
  159.         setName(name);
  160.     }
  161.     ...
  162.     protected final <V> V get(RFuture<V> future) {
  163.         //下面会调用CommandAsyncService.get()方法
  164.         return commandExecutor.get(future);
  165.     }
  166.     ...
  167. }
  168. public class CommandAsyncService implements CommandAsyncExecutor {
  169.     ...
  170.     @Override
  171.     public <V> V get(RFuture<V> future) {
  172.         if (Thread.currentThread().getName().startsWith("redisson-netty")) {
  173.             throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
  174.         }
  175.         try {
  176.             return future.toCompletableFuture().get();
  177.         } catch (InterruptedException e) {
  178.             future.cancel(true);
  179.             Thread.currentThread().interrupt();
  180.             throw new RedisException(e);
  181.         } catch (ExecutionException e) {
  182.             throw convertException(e);
  183.         }
  184.     }
  185.     ...
  186. }
复制代码
(3)加锁时执行的lua脚本
  1. public class RedissonLock extends RedissonBaseLock {
  2.     //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
  3.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  4.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  5.             "if (redis.call('exists', KEYS[1]) == 0) then " +
  6.                 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  7.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  8.                 "return nil; " +
  9.             "end; " +
  10.             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  11.                 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  12.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  13.                 "return nil; " +
  14.             "end; " +
  15.             "return redis.call('pttl', KEYS[1]);",
  16.             Collections.singletonList(getRawName()),//锁的名字:KEYS[1],比如"myLock"
  17.             unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
  18.             getLockName(threadId)//ARGV[2],值为UUID + 线程ID
  19.         );
  20.     }
  21.     ...
  22. }
复制代码
首先执行Redis的exists命令,判断key为锁名的Hash值是否不存在,也就是判断key为锁名myLock的Hash值是否存在。
 
一.如果key为锁名的Hash值不存在,那么就进行如下加锁处理
首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名,value是一个映射。也就是在value值中会有一个field为UUID + 线程ID,value为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。
 
然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。
 
二.如果key为锁名的Hash值存在,那么就执行如下判断处理
首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。
 
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行:pexpire myLock 30000,再次将myLock的有效期设置为30秒。
 
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的,ARGV[2] = UUID + 线程ID。
 
(4)执行加锁lua脚本的命令执行器逻辑
在RedissonLock的tryLockInnerAsync()方法中,会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本,即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。
 
在CommandAsyncService的evalWriteAsync()方法中,首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法来执行lua脚本。
 
在CommandAsyncService的getNodeSource()方法中,会根据key进行CRC16运算,然后再对16384取模,计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。
 
在CommandAsyncService的evalAsync()方法中,会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor,实现将脚本请求发送给对应的Redis节点处理。
[code]public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {    //从外部传入的:在创建实现了RedissonClient的Redisson实例时,初始化的命令执行器CommandExecutor    final CommandAsyncExecutor commandExecutor;    public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {        super(commandExecutor, name);        this.commandExecutor = commandExecutor;        this.id = commandExecutor.getConnectionManager().getId();        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();        this.entryName = id + ":" + name;    }    ...    protected  RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) {        //获取可用的节点,并继续封装一个命令执行器CommandBatchService        MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());        int availableSlaves = entry.getAvailableSlaves();        CommandBatchService executorService = createCommandBatchService(availableSlaves);        //通过CommandAsyncService.evalWriteAsync方法执行lua脚本        RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);        if (commandExecutor instanceof CommandBatchService) {            return result;        }        //异步执行然后获取结果        RFuture

相关推荐

您需要登录后才可以回帖 登录 | 立即注册