找回密码
 立即注册
首页 业界区 业界 玩转 ZooKeeper之分布式锁

玩转 ZooKeeper之分布式锁

红弘丽 2026-1-23 10:55:00
上一篇已经给出了选举leader执行任务的案例,接下来将领导者选举例子改成分布式锁(Distributed Lock)的实现方式。 模拟一个高并发扣减库存的场景:多个节点同时抢购同一商品(库存=100),使用 ZooKeeper 分布式锁确保同一时刻只有一个节点能扣库存,避免超卖。
核心区别回顾:

  • 领导者选举:全局只有一个 Leader 长期持有,一直执行任务。
  • 分布式锁:谁需要操作资源就去抢锁用完立即释放,其他节点可以继续抢。
  1. distributed-lock-service/
  2. ├── pom.xml
  3. ├── src/
  4. │   └── main/
  5. │       ├── java/
  6. │       │   └── com/example/
  7. │       │       ├── DistributedLock.java  // 分布式锁核心
  8. │       │       ├── InventoryService.java // 库存扣减服务
  9. │       │       └── App.java             // 主入口
  10. │       └── resources/
  11. │           └── application.properties
复制代码
pom.xml(依赖同前,添加 Curator 简化实现,生产中推荐 Curator 而非原生 ZooKeeper API)
  1. 1 <dependencies>
  2. 2     
  3. 3     <dependency>
  4. 4         <groupId>org.apache.zookeeper</groupId>
  5. 5         zookeeper</artifactId>
  6. 6         <version>3.8.0</version>
  7. 7     </dependency>
  8. 8     
  9. 9     <dependency>
  10. 10         <groupId>org.apache.curator</groupId>
  11. 11         curator-recipes</artifactId>
  12. 12         <version>5.7.0</version>
  13. 13     </dependency>
  14. 14     <dependency>
  15. 15         <groupId>org.apache.curator</groupId>
  16. 16         curator-framework</artifactId>
  17. 17         <version>5.7.0</version>
  18. 18     </dependency>
  19. 19     
  20. 20     <dependency>
  21. 21         <groupId>org.slf4j</groupId>
  22. 22         slf4j-simple</artifactId>
  23. 23         <version>1.7.36</version>
  24. 24     </dependency>
  25. 25 </dependencies>
复制代码
1. 配置(application.properties)
  1. 1 zk.connectString=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
  2. 2 zk.sessionTimeout=5000
  3. 3 zk.connectionTimeout=3000
  4. 4
  5. 5 # 分布式锁根路径
  6. 6 lock.rootPath=/locks
  7. 7 # 具体锁路径(这里模拟商品库存锁)
  8. 8 lock.path=/locks/inventory/product-12345
复制代码
2. 核心代码
DistributedLock.java(基于 Curator 的可重入公平分布式锁)
  1. 1 package com.example;
  2. 2
  3. 3 import org.apache.curator.framework.CuratorFramework;
  4. 4 import org.apache.curator.framework.CuratorFrameworkFactory;
  5. 5 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  6. 6 import org.apache.curator.retry.ExponentialBackoffRetry;
  7. 7 import org.slf4j.Logger;
  8. 8 import org.slf4j.LoggerFactory;
  9. 9
  10. 10 import java.util.concurrent.TimeUnit;
  11. 11
  12. 12 public class DistributedLock {
  13. 13     private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
  14. 14
  15. 15     private final CuratorFramework client;
  16. 16     private final InterProcessMutex lock;
  17. 17     private final String lockPath;
  18. 18
  19. 19     public DistributedLock(String zkConnectString, int sessionTimeout, String lockPath) {
  20. 20         this.lockPath = lockPath;
  21. 21
  22. 22         // Curator 客户端(带重试机制)
  23. 23         ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  24. 24         this.client = CuratorFrameworkFactory.newClient(zkConnectString, sessionTimeout, 3000, retryPolicy);
  25. 25         client.start();
  26. 26
  27. 27         // 可重入公平分布式锁
  28. 28         this.lock = new InterProcessMutex(client, lockPath);
  29. 29     }
  30. 30
  31. 31     /**
  32. 32      * 尝试获取锁,超时则返回 false
  33. 33      */
  34. 34     public boolean acquire(long timeout, TimeUnit unit) {
  35. 35         try {
  36. 36             boolean acquired = lock.acquire(timeout, unit);
  37. 37             if (acquired) {
  38. 38                 logger.info("Lock acquired: {}", lockPath);
  39. 39             } else {
  40. 40                 logger.warn("Failed to acquire lock within timeout: {}", lockPath);
  41. 41             }
  42. 42             return acquired;
  43. 43         } catch (Exception e) {
  44. 44             logger.error("Error acquiring lock", e);
  45. 45             return false;
  46. 46         }
  47. 47     }
  48. 48
  49. 49     /**
  50. 50      * 释放锁
  51. 51      */
  52. 52     public void release() {
  53. 53         try {
  54. 54             if (lock.isAcquiredInThisProcess()) {
  55. 55                 lock.release();
  56. 56                 logger.info("Lock released: {}", lockPath);
  57. 57             }
  58. 58         } catch (Exception e) {
  59. 59             logger.error("Error releasing lock", e);
  60. 60         }
  61. 61     }
  62. 62
  63. 63     public void close() {
  64. 64         client.close();
  65. 65     }
  66. 66 }
复制代码
InventoryService.java(模拟库存扣减)
  1. 1 package com.example;
  2. 2
  3. 3 import org.slf4j.Logger;
  4. 4 import org.slf4j.LoggerFactory;
  5. 5
  6. 6 import java.util.concurrent.atomic.AtomicInteger;
  7. 7
  8. 8 public class InventoryService {
  9. 9     private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);
  10. 10
  11. 11     // 模拟库存(实际应从数据库读取)
  12. 12     private final AtomicInteger stock = new AtomicInteger(100);
  13. 13
  14. 14     /**
  15. 15      * 扣减库存(模拟业务逻辑)
  16. 16      */
  17. 17     public void deductStock(int quantity) {
  18. 18         if (stock.get() < quantity) {
  19. 19             logger.warn("库存不足!当前库存: {}", stock.get());
  20. 20             return;
  21. 21         }
  22. 22
  23. 23         // 模拟数据库操作耗时
  24. 24         try {
  25. 25             Thread.sleep(50); // 模拟网络延迟
  26. 26         } catch (InterruptedException e) {
  27. 27             Thread.currentThread().interrupt();
  28. 28         }
  29. 29
  30. 30         int newStock = stock.addAndGet(-quantity);
  31. 31         logger.info("扣减成功!扣减数量: {},剩余库存: {}", quantity, newStock);
  32. 32     }
  33. 33
  34. 34     public int getStock() {
  35. 35         return stock.get();
  36. 36     }
  37. 37 }
复制代码
App.java(主入口:模拟 10 个并发请求抢锁)
  1. 1 package com.example;
  2. 2
  3. 3 import java.io.IOException;
  4. 4 import java.util.Properties;
  5. 5 import java.util.concurrent.ExecutorService;
  6. 6 import java.util.concurrent.Executors;
  7. 7 import java.util.concurrent.TimeUnit;
  8. 8
  9. 9 public class App {
  10. 10     public static void main(String[] args) throws IOException, InterruptedException {
  11. 11         Properties props = new Properties();
  12. 12         props.load(App.class.getClassLoader().getResourceAsStream("application.properties"));
  13. 13
  14. 14         String zkConnect = props.getProperty("zk.connectString");
  15. 15         int sessionTimeout = Integer.parseInt(props.getProperty("zk.sessionTimeout"));
  16. 16         String lockPath = props.getProperty("lock.path");
  17. 17
  18. 18         InventoryService inventory = new InventoryService();
  19. 19         DistributedLock lock = new DistributedLock(zkConnect, sessionTimeout, lockPath);
  20. 20
  21. 21         // 模拟 10 个并发请求(实际生产中来自不同服务实例或线程)
  22. 22         ExecutorService executor = Executors.newFixedThreadPool(10);
  23. 23         for (int i = 0; i < 10; i++) {
  24. 24             final int orderId = i + 1;
  25. 25             executor.submit(() -> {
  26. 26                 System.out.println("订单 " + orderId + " 开始尝试扣库存...");
  27. 27
  28. 28                 // 尝试获取锁(超时 3 秒)
  29. 29                 if (lock.acquire(3, TimeUnit.SECONDS)) {
  30. 30                     try {
  31. 31                         // 临界区:扣库存
  32. 32                         inventory.deductStock(1);
  33. 33                     } finally {
  34. 34                         // 必须释放锁!
  35. 35                         lock.release();
  36. 36                     }
  37. 37                 } else {
  38. 38                     System.out.println("订单 " + orderId + " 获取锁超时,放弃本次扣减");
  39. 39                 }
  40. 40             });
  41. 41         }
  42. 42
  43. 43         executor.shutdown();
  44. 44         executor.awaitTermination(30, TimeUnit.SECONDS);
  45. 45
  46. 46         System.out.println("最终剩余库存: " + inventory.getStock());
  47. 47         lock.close();
  48. 48     }
  49. 49 }
复制代码
部署方式(与领导者选举完全相同)

  • 3 台服务器(node1、node2、node3)
  • 安装 ZooKeeper 集群(同前)
  • 打包 JAR:mvn clean package
  • 每台服务器上传 JAR + application.properties
  • 启动脚本(start.sh)同前,但 node.id 不需要了(分布式锁不需要唯一 ID)
    1. nohup java -jar distributed-lock-service-1.0-SNAPSHOT.jar > service.log 2>&1 &
    复制代码

    • 运行方式:在任意一台或多台服务器上启动多个实例(或在同一台机器启动多个进程),模拟并发。
    • 验证:启动后观察日志,只有一个线程/进程能成功扣库存,其他线程要么等待要么超时。

日志示例(运行后可能的输出)
  1. 订单 1 开始尝试扣库存...
  2. 订单 2 开始尝试扣库存...
  3. 订单 3 开始尝试扣库存...
  4. ...
  5. [INFO] Lock acquired: /locks/inventory/product-12345   // 订单1 抢到锁
  6. [INFO] 扣减成功!扣减数量: 1,剩余库存: 99
  7. [INFO] Lock released: /locks/inventory/product-12345
  8. 订单 4 开始尝试扣库存...
  9. [INFO] Lock acquired: /locks/inventory/product-12345   // 订单4 抢到锁
  10. [INFO] 扣减成功!扣减数量: 1,剩余库存: 98
  11. [INFO] Lock released: /locks/inventory/product-12345
  12. ...
  13. 最终剩余库存: 90   // 扣了 10 次,库存从 100 -> 90,无超卖
复制代码
与领导者选举的对比总结
方面领导者选举(前例)分布式锁(本例)日志出现频率选举只在启动或 Leader 宕机时触发一次每次业务请求都可能触发抢锁/释放日志(高频)持有锁时间长期(直到宕机)极短(扣库存 50ms + 网络延迟)日志关键词"I am the Leader"、"I am Follower, watching""Lock acquired"、"Lock released"、"Failed to acquire"并发场景所有节点只选一个干活多个节点/线程并发抢同一把锁释放时机通常不释放(宕机自动释放)必须在 finally 中释放,否则死锁典型日志量少(启动 + 宕机时)多(每个订单都有一条 acquire + release) 
生产环境建议

  • 用 Curator:原生 ZooKeeper 实现分布式锁容易出错(比如忘记释放、顺序节点管理复杂),Curator 的 InterProcessMutex 可解决。
  • 可重入性:Curator 锁默认支持可重入(同一线程可多次 acquire)。
  • 公平锁:默认公平(FIFO),避免饥饿。
  • 锁超时:业务设置合理超时,避免长时间阻塞。
  • 监控:监控 ZooKeeper 节点数、Watch 数量、锁竞争频率。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2026-1-26 10:30:34

举报

您需要登录后才可以回帖 登录 | 立即注册