找回密码
 立即注册
首页 业界区 业界 收入写RAFT算法(一)Leader选举

收入写RAFT算法(一)Leader选举

扈季雅 2026-2-12 18:00:11
Raft Leader 选举实现文档

目录


  • 1. 概述
  • 2. 核心概念
  • 3. 涉及的类及其职责
  • 4. 实现细节

    • 4.1 节点状态与转换
    • 4.2 选举超时机制
    • 4.3 投票请求处理
    • 4.4 选举发起流程
    • 4.5 投票响应处理
    • 4.6 心跳机制
    • 4.7 安全性保证

  • 5. 测试指南
  • 6. 使用示例
  • 7. 常见问题
1. 概述

1.1 目的

本文档详细说明了 LingRaft-Lite 模块中 Raft Leader 选举功能的实现,包括涉及的类、实现细节、测试方法等,便于开发者理解和复现。
1.2 功能范围


  • 节点状态管理(Follower、Candidate、Leader)
  • 选举超时检测
  • 投票请求与响应处理
  • 多数派选举机制
  • 心跳维护 Leader 地位
  • 网络分区处理
1.3 Raft 算法参考

本实现基于 Raft 论文(Diego Ongaro 和 John Ousterhout, 2014)的 Leader 选举部分,具体参考:

  • Section 5.1: Leader Election
  • Section 5.2: Leader Election - RequestVote RPC
  • Section 5.4.1: Election Safety Property
2. 核心概念

2.1 节点状态

Raft 节点有三种状态:
状态说明职责FOLLOWER从节点响应 Leader 的 RPC 请求(AppendEntries、RequestVote)CANDIDATE候选节点发起选举,向其他节点请求投票LEADER主节点处理客户端请求,向 Follower 复制日志,发送心跳2.2 任期 (Term)

定义

  • 时间被分成多个任期,每个任期以选举开始
  • 任期号是单调递增的整数
  • 每次选举都进入新任期
用途

  • 识别过时的信息(旧任期的投票、心跳等)
  • 防止脑裂(分裂投票)
实现
  1. private volatile long currentTerm = 0;  // 当前任期号
复制代码
2.3 选举超时 (Election Timeout)

定义

  • Follower 在收到有效心跳或投票请求之前等待的时间
  • 超时后转为 Candidate 并发起选举
随机化

  • 为了避免多个节点同时超时导致平票选举,超时时间随机化
  • 通常在 150ms ~ 300ms 之间
实现
  1. // 配置随机范围
  2. config.setElectionTimeoutRandomRange(Range.of(150, 300));
  3. // 计算随机超时时间
  4. int randomTimeout = raftConfig.getElectionTimeoutMs();
复制代码
2.4 多数派 (Majority)

定义

  • 超过半数的节点数:N/2 + 1
  • 3 节点集群需要 2 票
  • 5 节点集群需要 3 票
重要性

  • 保证选举结果的唯一性
  • 两个多数派必然有交集,确保只有一个 Leader
实现
  1. public VoteCounter(long term, int totalNodes) {
  2.     this.majorityCount = totalNodes / 2 + 1;
  3. }
复制代码
2.5 投票规则

节点投票给候选人的条件

  • 候选人的任期 >= 当前任期
  • 如果任期相同,candidate 的日志至少和当前节点一样新
日志比较规则

  • 如果 candidateLastLogTerm > lastLogTerm,投票
  • 如果 candidateLastLogTerm == lastLogTerm 且 candidateLastLogIndex >= lastLogIndex,投票
  • 否则,拒绝投票
3. 涉及的类及其职责

3.1 核心类

类名路径职责RaftNodeImplcom.ling.raft.core.RaftNodeImpl节点状态管理、选举发起、投票处理、心跳发送ConsensusModuleImplcom.ling.raft.core.ConsensusModuleImpl投票请求和响应的具体实现逻辑VoteCountercom.ling.raft.core.VoteCounter投票计数器,统计和判断多数派ElectionTaskcom.ling.raft.core.task.ElectionTask选举超时检测任务HeartbeatTaskcom.ling.raft.core.task.HeartbeatTaskLeader 心跳发送任务ServerStatusEnumcom.ling.raft.enums.ServerStatusEnum节点状态枚举VoteRequestcom.ling.raft.model.dto.VoteRequest投票请求 RPCVoteResponsecom.ling.raft.model.dto.VoteResponse投票响应 RPCThreeNodeElectionTestcom.ling.raft.example.leader.ThreeNodeElectionTest完整测试程序3.2 类关系图
  1. ┌─────────────────────┐
  2. │   RaftNodeImpl      │
  3. │   (节点主类)         │
  4. └──────────┬──────────┘
  5.            │ 持有引用
  6.            ├─────────────────┐
  7.            ▼                 ▼
  8. ┌─────────────────────┐  ┌─────────────────────┐
  9. │ ConsensusModuleImpl │  │   VoteCounter       │
  10. │ (投票逻辑)          │  │   (投票计数)         │
  11. └─────────────────────┘  └─────────────────────┘
  12.            │                     │
  13.            ├─────────────────────┤
  14.            ▼                     ▼
  15. ┌─────────────────────┐  ┌─────────────────────┐
  16. │  ElectionTask       │  │  HeartbeatTask      │
  17. │  (选举超时检测)      │  │  (心跳任务)         │
  18. └─────────────────────┘  └─────────────────────┘
复制代码
3.3 关键字段说明

RaftNodeImpl
  1. // 节点状态
  2. private volatile ServerStatusEnum nodeStatus = ServerStatusEnum.FOLLOWER;
  3. // 持久化状态
  4. private volatile long currentTerm = 0;           // 当前任期
  5. private volatile String votedFor = null;         // 本轮任期投票给的候选人
  6. // 选举相关
  7. private ScheduledExecutorService electionExecutor;
  8. private ScheduledFuture<?> electionFuture;
  9. private VoteCounter currentVoteCounter;
  10. private final Random random = new Random();
  11. // 心跳相关
  12. private ScheduledExecutorService heartbeatExecutor;
  13. private ScheduledFuture<?> heartbeatFuture;
  14. // 时间记录
  15. private volatile long prevElectionTime = 0;      // 上次选举时间
  16. private volatile long preHeartBeatTime = 0;      // 上次收到心跳时间
复制代码
ConsensusModuleImpl
  1. public final RaftNodeImpl node;  // 持有 RaftNodeImpl 的引用
  2. public final ReentrantLock voteLock = new ReentrantLock();  // 投票锁
  3. public final ReentrantLock appendEntriesLock = new ReentrantLock();  // 追加条目锁
复制代码
VoteCounter
  1. private final long term;                      // 当前选举任期
  2. private final Set<String> votesReceived;      // 已投票的节点ID集合
  3. private final int majorityCount;              // 需要获得的多数派票数
  4. private volatile boolean votedForSelf;        // 是否已投票给自己
复制代码
4. 实现细节

4.1 节点状态与转换

4.1.1 状态枚举

类名:ServerStatusEnum
定义
  1. public enum ServerStatusEnum {
  2.     LEADER("LEADER", "主节点"),
  3.     CANDIDATE("CANDIDATE", "候选节点"),
  4.     FOLLOWER("FOLLOWER", "从节点");
  5. }
复制代码
4.1.2 状态转换图
  1.          +-------------------------+
  2.          |         初始化          |
  3.          +-------------------------+
  4.                     |
  5.                     ▼
  6.          +-------------------------+
  7.          |      FOLLOWER          | <------------+
  8.          |  (等待心跳或投票)        |              |
  9.          +-------------------------+              |
  10.                     |                             |
  11.                     | 选举超时                     | 收到更高任期的
  12.                     |                             | AppendEntries 或
  13.                     ▼                             | RequestVote
  14.          +-------------------------+              |
  15.          |     CANDIDATE          |              |
  16.          |  (发起选举)             |              |
  17.          +-------------------------+              |
  18.                     |                             |
  19.                     | 获得多数派                  |
  20.                     |                             |
  21.                     ▼                             |
  22.          +-------------------------+              |
  23.          |      LEADER            | --------------+
  24.          |  (处理客户端请求)        |  发现更高任期
  25.          +-------------------------+
复制代码
投票规则详解

  • 任期检查

    • candidate 的任期 < 当前任期 → 拒绝

  • 任期更新

    • candidate 的任期 > 当前任期 → 更新任期,转为 Follower

  • 唯一投票

    • 本轮任期已投票给其他人 → 拒绝
    • 已投票给该 candidate → 接受(幂等性)

  • 日志完整性

    • candidate 的日志 >= 自己的日志 → 接受
    • 否则 → 拒绝

4.3.3 日志比较逻辑

方法:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
实现位置:ConsensusModuleImpl.java:337-350
  1. public void becomeFollower(long newTerm) {
  2.     // 检查任期
  3.     if (newTerm < currentTerm) {
  4.         log.warn("Cannot become Follower with smaller term: {} < {}",
  5.                  newTerm, currentTerm);
  6.         return;
  7.     }
  8.     ServerStatusEnum oldStatus = nodeStatus;
  9.     // 更新状态
  10.     nodeStatus = ServerStatusEnum.FOLLOWER;
  11.     currentTerm = newTerm;
  12.     votedFor = null;  // 重置投票记录
  13.     currentVoteCounter = null;  // 清空投票计数器
  14.     // 停止心跳(如果之前是 Leader)
  15.     cancelHeartbeatTimer();
  16.     // 重置选举定时器
  17.     resetElectionTimer();
  18.     log.info("State changed: {} -> FOLLOWER, term: {}", oldStatus, currentTerm);
  19. }
复制代码
示例
  1. public void becomeCandidate() {
  2.     ServerStatusEnum oldStatus = nodeStatus;
  3.     // 增加任期号(重要!)
  4.     currentTerm++;
  5.     nodeStatus = ServerStatusEnum.CANDIDATE;
  6.     votedFor = currentNodeConfig.getServerId();  // 投票给自己
  7.     log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);
  8.     // 重置选举定时器
  9.     resetElectionTimer();
  10.     // 发起投票请求
  11.     startElection();
  12. }
复制代码
4.3.4 投票锁

目的:防止并发投票请求导致状态不一致
实现
  1. public void becomeLeader() {
  2.     // 只有 Candidate 才能成为 Leader
  3.     if (nodeStatus != ServerStatusEnum.CANDIDATE) {
  4.         log.warn("Only CANDIDATE can become LEADER, current: {}", nodeStatus);
  5.         return;
  6.     }
  7.     ServerStatusEnum oldStatus = nodeStatus;
  8.     nodeStatus = ServerStatusEnum.LEADER;
  9.     // 初始化 Leader 状态(nextIndex、matchIndex)
  10.     initializeLeaderState();
  11.     // 取消选举定时器(Leader 不需要选举)
  12.     cancelElectionTimer();
  13.     log.info("========================================");
  14.     log.info("State changed: {} -> LEADER, term: {}", oldStatus, currentTerm);
  15.     log.info("========================================");
  16.     // 立即发送心跳并开始心跳定时器
  17.     sendHeartbeats();
  18.     startHeartbeatTimer();
  19. }
复制代码
保护的资源

  • currentTerm
  • votedFor
  • nodeStatus
4.4 选举发起流程

4.4.1 开始选举

方法:startElection()
实现位置:RaftNodeImpl.java:266-289
  1. @Override
  2. public void run() {
  3.     try {
  4.         // Leader 不需要选举
  5.         if (node.getNodeStatus() == ServerStatusEnum.LEADER) {
  6.             log.debug("Current node is LEADER, skip election");
  7.             return;
  8.         }
  9.         // 检查是否超时
  10.         long currentTime = System.currentTimeMillis();
  11.         int electionTimeoutMs = node.getRaftConfig().getElectionTimeoutMs();
  12.         long timeElapsed = currentTime - node.getPrevElectionTime();
  13.         if (timeElapsed < electionTimeoutMs) {
  14.             // 未超时,重新设置定时器
  15.             node.resetElectionTimer();
  16.             return;
  17.         }
  18.         // 选举超时,开始新一轮选举
  19.         log.info("========================================");
  20.         log.info("ELECTION TIMEOUT DETECTED!");
  21.         log.info("Time elapsed: {}ms, Timeout: {}ms", timeElapsed, electionTimeoutMs);
  22.         log.info("Current term: {}, Status: {}", node.getCurrentTerm(), node.getNodeStatus());
  23.         log.info("Converting to CANDIDATE and starting new election...");
  24.         log.info("========================================");
  25.         node.becomeCandidate();
  26.     } catch (Exception e) {
  27.         log.error("Error in election task", e);
  28.         if (node.getIsRunning().get()) {
  29.             node.resetElectionTimer();
  30.         }
  31.     }
  32. }
复制代码
流程

  • 创建投票计数器
  • 投票给自己
  • 单机模式直接成为 Leader
  • 多机模式并发发送投票请求
  • 检查选举结果
4.4.2 发送投票请求

方法:sendVoteRequest(targetNode)
实现位置:RaftNodeImpl.java:294-316
  1. public void resetElectionTimer() {
  2.     if (!isRunning.get()) {
  3.         return;
  4.     }
  5.     // 取消旧的定时任务
  6.     cancelElectionTimer();
  7.     // 计算随机超时时间
  8.     int randomTimeout = raftConfig.getElectionTimeoutMs();
  9.     // 更新超时时间戳
  10.     prevElectionTime = System.currentTimeMillis();
  11.     // 设置新的定时任务
  12.     electionFuture = electionExecutor.schedule(
  13.         new ElectionTask(this),
  14.         randomTimeout,
  15.         TimeUnit.MILLISECONDS
  16.     );
  17.     log.debug("Election timer reset, timeout: {}ms", randomTimeout);
  18. }
复制代码
特点

  • 并发发送到所有其他节点
  • 使用线程池异步发送
  • 超时设置为 3000ms
  • 失败不重试(等待下一次选举)
4.4.3 投票计数器

类名:VoteCounter
实现位置:com.ling.raft.core.VoteCounter.java
核心方法
  1. RaftConfig config = new RaftConfig(currentNode, allNodes);
  2. config.setElectionTimeout(2);  // 基础倍数
  3. config.setElectionTimeoutRandomRange(Range.of(150, 300));  // 随机范围
复制代码
数据结构

  • 使用 ConcurrentHashMap.newKeySet() 存储投票节点 ID
  • 保证线程安全
  • 自动去重(不会重复计票)
4.5 投票响应处理

4.5.1 处理投票响应

方法:handleVoteResponse(response, voterId)
实现位置:RaftNodeImpl.java:322-361
  1. // RaftConfig 内部实现
  2. public int getElectionTimeoutMs() {
  3.     if (electionTimeoutRandomRange == null) {
  4.         return electionTimeout * 1000;
  5.     }
  6.     // 在随机范围内选择一个值
  7.     int min = electionTimeoutRandomRange.getMin();
  8.     int max = electionTimeoutRandomRange.getMax();
  9.     Random random = new Random();
  10.     return min + random.nextInt(max - min + 1);
  11. }
复制代码
处理逻辑

  • 状态检查

    • 不再是 Candidate → 忽略

  • 任期检查

    • 响应任期 > 当前任期 → 发现更高任期,转为 Follower
    • 响应任期 < 当前任期 → 忽略旧响应

  • 投票统计

    • 投票成功 → 记录投票,检查是否获得多数派
    • 投票失败 → 记录日志

4.5.2 检查选举结果

方法:checkElectionResult()
实现位置:RaftNodeImpl.java:367-373
  1. public class VoteRequest {
  2.     private long term;          // candidate 的任期号
  3.     private String candidateId; // candidate 的节点 ID
  4.     private long lastLogIndex;  // candidate 最后一条日志的索引
  5.     private long lastLogTerm;   // candidate 最后一条日志的任期号
  6. }
复制代码
调用时机

  • 投票给自己后(单机模式)
  • 收到每个投票响应后
  • 所有投票请求发送后(初始检查)
4.6 心跳机制

4.6.1 心跳任务

类名:HeartbeatTask
实现位置:com.ling.raft.core.task.HeartbeatTask.java
  1. public class VoteResponse {
  2.     private long term;          // 当前任期(用于更新 candidate 的任期)
  3.     private boolean voteGranted; // 是否投票
  4. }
复制代码
4.6.2 发送心跳

方法:sendHeartbeats()
实现位置:RaftNodeImpl.java:407-413
  1. @Override
  2. public VoteResponse requestVote(VoteRequest voteRequest) {
  3.     voteLock.lock();
  4.     try {
  5.         long currentTerm = node.getCurrentTerm();
  6.         String votedFor = node.getVotedFor();
  7.         String candidateId = voteRequest.getCandidateId();
  8.         log.info("Received vote request from candidate: {}, Term: {}, CurrentTerm: {}, VotedFor: {}",
  9.                 candidateId, voteRequest.getTerm(), currentTerm, votedFor);
  10.         // 1. 任期检查
  11.         if (voteRequest.getTerm() < currentTerm) {
  12.             log.info("Rejected: candidate term {} < current term {}",
  13.                     voteRequest.getTerm(), currentTerm);
  14.             return new VoteResponse(currentTerm, false);
  15.         }
  16.         // 2. 任期更大,更新并转为 Follower
  17.         if (voteRequest.getTerm() > currentTerm) {
  18.             log.info("Higher term received: {} -> {}, becoming FOLLOWER",
  19.                     currentTerm, voteRequest.getTerm());
  20.             node.becomeFollower(voteRequest.getTerm());
  21.             currentTerm = node.getCurrentTerm();
  22.             votedFor = node.getVotedFor();
  23.         }
  24.         // 3. 检查是否已投票给其他人
  25.         if (votedFor != null && !votedFor.equals(candidateId)) {
  26.             log.info("Already voted for {}, rejecting {}", votedFor, candidateId);
  27.             return new VoteResponse(currentTerm, false);
  28.         }
  29.         // 4. 检查日志是否至少一样新
  30.         if (isLogUpToDate(voteRequest.getLastLogIndex(), voteRequest.getLastLogTerm())) {
  31.             log.info("Voting for candidate: {}", candidateId);
  32.             node.setVotedFor(candidateId);
  33.             node.setPrevElectionTime(System.currentTimeMillis());  // 重置超时
  34.             return new VoteResponse(currentTerm, true);
  35.         } else {
  36.             log.info("Candidate log not up to date");
  37.             return new VoteResponse(currentTerm, false);
  38.         }
  39.     } finally {
  40.         voteLock.unlock();
  41.     }
  42. }
复制代码
4.6.3 单次心跳发送

方法:sendHeartbeat(targetNode)
实现位置:RaftNodeImpl.java:418-436
  1. private boolean isLogUpToDate(long candidateLastLogIndex, long candidateLastLogTerm) {
  2.     long lastLogTerm = getLastLogTerm();
  3.     long lastLogIndex = getLastLogIndex();
  4.     // 优先比较任期:candidate 的任期更大 → 更新
  5.     if (candidateLastLogTerm > lastLogTerm) {
  6.         return true;
  7.     }
  8.     // 任期相同,比较索引:candidate 的索引 >= 自己的索引 → 更新
  9.     if (candidateLastLogTerm == lastLogTerm && candidateLastLogIndex >= lastLogIndex) {
  10.         return true;
  11.     }
  12.     // 其他情况 → 不更新
  13.     return false;
  14. }
复制代码
心跳特点

  • entries 为空列表
  • 只包含 term、leaderId 等元数据
  • 用于维护 Leader 地位,防止 Follower 发起新选举
4.6.4 心跳定时器

方法:startHeartbeatTimer()
实现位置:RaftNodeImpl.java:380-391
  1. 情况 1: candidate 任期更大
  2. candidate: term=3, index=5
  3. current:  term=2, index=5
  4. → 投票 (任期更大)
  5. 情况 2: 任期相同,索引更大或相等
  6. candidate: term=2, index=5
  7. current:  term=2, index=4
  8. → 投票 (索引更大)
  9. 情况 3: 任期相同,索引更小
  10. candidate: term=2, index=4
  11. current:  term=2, index=5
  12. → 不投票 (日志落后)
  13. 情况 4: 任期更小
  14. candidate: term=1, index=10
  15. current:  term=2, index=5
  16. → 不投票 (任期更小)
复制代码
配置示例
  1. public final ReentrantLock voteLock = new ReentrantLock();
  2. @Override
  3. public VoteResponse requestVote(VoteRequest voteRequest) {
  4.     voteLock.lock();
  5.     try {
  6.         // 投票逻辑
  7.         ...
  8.     } finally {
  9.         voteLock.unlock();
  10.     }
  11. }
复制代码
4.6.5 心跳响应处理

方法:handleHeartbeatResponse(response, nodeId)
实现位置:RaftNodeImpl.java:441-448
  1. private void startElection() {
  2.     int totalNodes = raftConfig.getRaftNodeConfigList().size();
  3.     currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
  4.     // 投票给自己
  5.     currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());
  6.     log.info("Starting election for term: {}, voted for self, votes: {}/{}",
  7.             currentTerm, currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
  8.     // 单机模式直接成为 Leader
  9.     if (totalNodes == 1) {
  10.         log.info("Single node mode, becoming leader immediately");
  11.         becomeLeader();
  12.         return;
  13.     }
  14.     // 发送投票请求给所有其他节点
  15.     List<RaftNodeConfig> otherNodes = getOtherNodes();
  16.     for (RaftNodeConfig nodeConfig : otherNodes) {
  17.         electionExecutor.execute(() -> sendVoteRequest(nodeConfig));
  18.     }
  19.     // 检查是否已获得多数派(可能只有自己一票的情况)
  20.     checkElectionResult();
  21. }
复制代码
处理逻辑

  • 检查响应中的任期
  • 发现更高任期 → 立即转为 Follower
  • 避免网络分区导致的脑裂
4.7 安全性保证

4.7.1 选举安全性

目标:任期内最多一个 Leader
实现

  • 任期单调递增
    1. private void sendVoteRequest(RaftNodeConfig targetNode) {
    2.     try {
    3.         // 构建 VoteRequest
    4.         VoteRequest request = VoteRequest.builder()
    5.                 .term(currentTerm)
    6.                 .candidateId(currentNodeConfig.getServerId())
    7.                 .lastLogIndex(getLastLogIndex())
    8.                 .lastLogTerm(getLastLogTerm())
    9.                 .build();
    10.         request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
    11.         request.setCmd(Request.REQUEST_VOTE);
    12.         log.debug("Sending VoteRequest to {} for term {}", targetNode.getServerId(), currentTerm);
    13.         // 发送 RPC 请求
    14.         VoteResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
    15.         // 处理响应
    16.         if (response != null) {
    17.             handleVoteResponse(response, targetNode.getServerId());
    18.         }
    19.     } catch (Exception e) {
    20.         log.debug("Failed to send vote request to {}: {}", targetNode.getServerId(), e.getMessage());
    21.     }
    22. }
    复制代码
  • 只投一次票
    1. // 记录投票
    2. public synchronized boolean recordVote(String nodeId) {
    3.     return votesReceived.add(nodeId);
    4. }
    5. // 投票给自己
    6. public synchronized void voteForSelf(String selfId) {
    7.     if (!votedForSelf) {
    8.         votesReceived.add(selfId);
    9.         votedForSelf = true;
    10.     }
    11. }
    12. // 检查是否获得多数派
    13. public boolean hasMajority() {
    14.     return votesReceived.size() >= majorityCount;
    15. }
    16. // 获取当前票数
    17. public int getVoteCount() {
    18.     return votesReceived.size();
    19. }
    复制代码
  • 多数派约束
    1. private void handleVoteResponse(VoteResponse response, String voterId) {
    2.     // 使用同步块确保原子性
    3.     synchronized (this) {
    4.         // 如果不是 Candidate,忽略
    5.         if (nodeStatus != ServerStatusEnum.CANDIDATE) {
    6.             log.debug("Not a candidate anymore (status: {}), ignoring vote from {}",
    7.                     nodeStatus, voterId);
    8.             return;
    9.         }
    10.         // 如果收到更高任期,转为 Follower
    11.         if (response.getTerm() > currentTerm) {
    12.             log.info("Received higher term {} from {}, stepping down",
    13.                     response.getTerm(), voterId);
    14.             becomeFollower(response.getTerm());
    15.             return;
    16.         }
    17.         // 忽略旧任期的响应
    18.         if (response.getTerm() < currentTerm) {
    19.             log.debug("Received stale vote response from {} for old term {}",
    20.                     voterId, response.getTerm());
    21.             return;
    22.         }
    23.         // 统计投票
    24.         if (response.isVoteGranted()) {
    25.             boolean isNewVote = currentVoteCounter.recordVote(voterId);
    26.             if (isNewVote) {
    27.                 log.info("Received vote from {} for term {}, total votes: {}/{}",
    28.                         voterId, currentTerm, currentVoteCounter.getVoteCount(),
    29.                         currentVoteCounter.getMajorityCount());
    30.                 // 检查选举结果
    31.                 checkElectionResult();
    32.             }
    33.         } else {
    34.             log.debug("Vote denied by {} for term {}", voterId, currentTerm);
    35.         }
    36.     }
    37. }
    复制代码
4.7.2 任期更新规则

规则:发现更高任期 → 更新任期,转为 Follower
实现位置

  • ConsensusModuleImpl.requestVote() 第 63-68 行
  • ConsensusModuleImpl.appendEntries() 第 128-134 行
  • RaftNodeImpl.handleVoteResponse() 第 333-337 行
  • RaftNodeImpl.handleHeartbeatResponse() 第 443-447 行
示例
  1. private void checkElectionResult() {
  2.     if (currentVoteCounter != null && currentVoteCounter.hasMajority()) {
  3.         log.info("Majority votes received ({}/{}), becoming LEADER",
  4.                 currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
  5.         becomeLeader();
  6.     }
  7. }
复制代码
4.7.3 日志完整性检查

目的:只投票给日志至少和自己一样新的候选人
实现:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
规则

  • candidate 任期 > 自己任期 → 投票
  • 任期相同,candidate 索引 >= 自己索引 → 投票
  • 否则 → 拒绝
重要性

  • 保证新 Leader 包含所有已提交的日志
  • 防止日志丢失或覆盖
4.7.4 脑裂预防

场景:网络分区,两个 Leader 同时存在
预防机制

  • 多数派约束

    • Leader 需要多数派支持
    • 分区后的少数派无法获得足够票数

  • 心跳超时

    • 少数派 Follower 收不到心跳
    • 选举超时后发起选举
    • 多数派选出新 Leader

  • 任期递增

    • 新 Leader 使用更高任期
    • 旧 Leader 的心跳被拒绝

示例
  1. @Override
  2. public void run() {
  3.     try {
  4.         // 只有 Leader 才发送心跳
  5.         if (node.getNodeStatus() != ServerStatusEnum.LEADER) {
  6.             log.debug("Current node is not LEADER, skip heartbeat");
  7.             return;
  8.         }
  9.         log.debug("Sending heartbeats to all nodes, term: {}", node.getCurrentTerm());
  10.         // 发送心跳给所有节点
  11.         node.sendHeartbeats();
  12.     } catch (Exception e) {
  13.         log.error("Error in heartbeat task", e);
  14.     }
  15. }
复制代码
5. 测试指南

5.1 测试程序

文件位置
  1. public void sendHeartbeats() {
  2.     List<RaftNodeConfig> otherNodes = getOtherNodes();
  3.     for (RaftNodeConfig nodeConfig : otherNodes) {
  4.         heartbeatExecutor.execute(() -> sendHeartbeat(nodeConfig));
  5.     }
  6. }
复制代码
运行方式
  1. private void sendHeartbeat(RaftNodeConfig targetNode) {
  2.     try {
  3.         // 构建心跳请求(entries 为空)
  4.         AppendEntriesRequest request = AppendEntriesRequest.builder()
  5.                 .term(currentTerm)
  6.                 .leaderId(currentNodeConfig.getServerId())
  7.                 .entries(new ArrayList<>())  // 空列表表示心跳
  8.                 .build();
  9.         request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
  10.         request.setCmd(Request.APPEND_ENTRIES);
  11.         // 发送请求
  12.         AppendEntriesResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
  13.         // 处理响应
  14.         if (response != null) {
  15.             handleHeartbeatResponse(response, targetNode.getServerId());
  16.         }
  17.     } catch (Exception e) {
  18.         log.debug("Failed to send heartbeat to {}: {}", targetNode.getServerId(), e.getMessage());
  19.     }
  20. }
复制代码
脚本运行
  1. private void startHeartbeatTimer() {
  2.     int heartbeatInterval = raftConfig.getHeartbeatIntervalMs();
  3.     heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(
  4.             new HeartbeatTask(this),
  5.             0,  // 立即开始
  6.             heartbeatInterval,  // 间隔
  7.             TimeUnit.MILLISECONDS
  8.     );
  9.     log.debug("Heartbeat timer started, interval: {}ms", heartbeatInterval);
  10. }
复制代码
5.2 测试功能

5.2.1 基本测试场景

场景 1:正常选举
  1. config.setHeartbeatInterval(1);  // 每 1 秒发送一次心跳
复制代码
场景 2:Leader 故障
  1. private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
  2.     // 如果响应的任期更大,转为 Follower
  3.     if (response.getTerm() > currentTerm) {
  4.         log.info("Received higher term {} from {} in heartbeat response, stepping down",
  5.                 response.getTerm(), nodeId);
  6.         becomeFollower(response.getTerm());
  7.     }
  8. }
复制代码
场景 3:节点恢复
  1. public void becomeCandidate() {
  2.     currentTerm++;  // 每次选举增加任期
  3. }
复制代码
5.2.2 交互式命令

命令说明示例status查看所有节点状态statusleader显示当前 Leader 信息leaderkill 模拟节点故障kill node1revive 恢复节点revive node1log 控制日志级别log debugstop停止所有节点并退出stop5.2.3 日志级别控制

控制方式
  1. // ConsensusModuleImpl.requestVote()
  2. if (votedFor != null && !votedFor.equals(candidateId)) {
  3.     return new VoteResponse(currentTerm, false);
  4. }
复制代码
日志级别说明

  • silent/error - 仅错误信息
  • warn - 警告及以上
  • info - 信息及以上(默认)
  • debug - 调试信息(全部日志)
  • election - 仅选举相关日志
  • heartbeat - 仅心跳相关日志
5.3 预期输出

正常选举
  1. // VoteCounter
  2. public boolean hasMajority() {
  3.     return votesReceived.size() >= majorityCount;  // N/2 + 1
  4. }
复制代码
Leader 故障恢复
  1. // 在 requestVote 中
  2. if (voteRequest.getTerm() > currentTerm) {
  3.     node.becomeFollower(voteRequest.getTerm());
  4.     currentTerm = node.getCurrentTerm();
  5. }
复制代码
5.4 完整测试流程

步骤 1:启动并验证选举
  1. 初始状态:5 节点(node1-5),Leader=node1
  2. 网络分区:
  3. - 分区 A: node1, node2 (2 节点)
  4. - 分区 B: node3, node4, node5 (3 节点)
  5. 分区 A:
  6. - node1 仍是 Leader
  7. - node2 收不到心跳,超时后转为 Candidate
  8. - 只有 1 票(自己),无法获得多数派(需要 3 票)
  9. - 无法选出新 Leader
  10. 分区 B:
  11. - node3 超时后发起选举
  12. - 获得自己 + node4 + node5 的票(3 票)
  13. - 成为新 Leader(term=2)
  14. 网络恢复后:
  15. - node1 发送心跳(term=1)
  16. - 其他节点拒绝(term=2 > term=1)
  17. - node1 收到更高任期,转为 Follower
复制代码
步骤 2:验证心跳
  1. LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader/ThreeNodeElectionTest.java
复制代码
步骤 3:模拟 Leader 故障
  1. # 直接运行 main 方法
  2. java -cp <classpath> com.ling.raft.example.leader.ThreeNodeElectionTest
复制代码
步骤 4:恢复旧 Leader
  1. cd LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader
  2. start-cluster.bat
复制代码
步骤 5:多次故障测试
  1. [STEP 1] Starting 3 nodes...
  2.   ✓ node1 started on port 8081
  3.   ✓ node2 started on port 8082
  4.   ✓ node3 started on port 8083
  5.   ✓ All nodes started!
  6. [STEP 2] Waiting for leader election...
  7. [Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
  8. [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
  9. ----------------------------------------
  10.   ✓ Leader elected!
复制代码
6. 使用示例

6.1 基本使用
  1. raft> kill node1
  2. ✓ node1 stopped
  3. ! Leader killed, waiting for new election...
  4. [Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
  5. [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码
6.2 监控选举状态
  1. raft> revive node1
  2. ✓ node1 revived
  3. ✓ Status: FOLLOWER
  4. ✓ Election timer: active
  5. [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码
6.3 手动触发选举
  1. raft> log silent
  2. ✓ Log level set to ERROR (silent mode)
  3. raft> log info
  4. ✓ Log level set to INFO
  5. raft> log debug
  6. ✓ Log level set to DEBUG (verbose mode)
  7. raft> log election
  8. ✓ Showing election logs only
  9. raft> log heartbeat
  10. ✓ Showing heartbeat logs only
复制代码
6.4 查询投票信息
  1. ╔════════════════════════════════════════════════════════════╗
  2. ║          Raft Leader Election Test - 3 Nodes               ║
  3. ╚════════════════════════════════════════════════════════════╝
  4. [STEP 1] Starting 3 nodes...
  5.   ✓ node1 started on port 8081
  6.   ✓ node2 started on port 8082
  7.   ✓ node3 started on port 8083
  8.   ✓ All nodes started!
  9. [STEP 2] Waiting for leader election...
  10. [Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
  11. [Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
  12. [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
  13. ----------------------------------------
  14.   ✓ Leader elected!
  15. ┌────────────────────────────────────────────────────────────┐
  16. │                     Cluster Status                         │
  17. ├────────────┬──────────────┬─────────┬─────────┬────────────┤
  18. │ Node       │ Status       │ Term    │ Log     │ Voted For  │
  19. ├────────────┼──────────────┼─────────┼─────────┼────────────┤
  20. │ node1      │ LEADER       │ 1       │ 0       │ -          │
  21. │ node2      │ FOLLOWER     │ 1       │ 0       │ node1      │
  22. │ node3      │ FOLLOWER     │ 1       │ 0       │ node1      │
  23. └────────────┴──────────────┴─────────┴─────────┴────────────┘
复制代码
7. 常见问题

7.1 为什么选举超时需要随机化?

原因

  • 如果所有节点使用固定的超时时间,可能同时超时
  • 同时超时的节点会同时发起选举
  • 导致平票(split vote),需要重新选举
  • 随机化可以避免多个节点同时超时
示例
  1. raft> kill node1
  2. Killing node1...
  3. ✓ node1 stopped
  4. ! Leader killed, waiting for new election...
  5. [Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
  6. [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
  7. raft> leader
  8. ┌────────────────────────────────────────────────────────────┐
  9. │                      Leader Info                           │
  10. ├────────────────────────────────────────────────────────────┤
  11. │  Node ID:   node2                                         │
  12. │  Address:   127.0.0.1:8082                                 │
  13. │  Term:      2                                              │
  14. └────────────────────────────────────────────────────────────┘
  15. raft> revive node1
  16. Reviving node1...
  17. ✓ node1 revived
  18. ✓ Status: FOLLOWER
  19. ✓ Election timer: active
  20. [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码
代码实现
  1. # 运行测试程序
  2. java com.ling.raft.example.leader.ThreeNodeElectionTest
  3. # 观察选举过程
  4. [Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
  5. [Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
  6. [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
  7. # 查看当前状态
  8. raft> status
复制代码
7.2 为什么收到投票请求后要重置超时?

原因

  • 收到投票请求表示至少有一个其他节点是活跃的
  • 重置超时可以减少不必要的选举
  • 避免频繁切换状态
代码实现
  1. # 等待几秒,观察心跳
  2. [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
  3. [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
  4. [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
  5. # 启用心跳日志观察
  6. raft> log heartbeat
  7. ✓ Showing heartbeat logs only
复制代码
7.3 为什么 Candidate 要增加任期?

原因

  • 避免使用旧任期发起新的选举
  • 区分不同轮的选举
  • 保证任期单调递增
代码实现
  1. # 杀死 Leader
  2. raft> kill node1
  3. ✓ node1 stopped
  4. ! Leader killed, waiting for new election...
  5. # 观察新选举
  6. [Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
  7. [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码
7.4 如何处理网络分区?

Raft 的保证

  • 旧 Leader 无法获得多数派,无法提交新日志
  • 新 Leader 会在多数派分区选举产生
  • 网络恢复后,旧 Leader 会转为 Follower
代码体现
  1. # 恢复节点
  2. raft> revive node1
  3. ✓ node1 revived
  4. # 观察恢复过程
  5. [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码
7.5 为什么心跳间隔通常远小于选举超时?

原因

  • 心跳间隔短(如 100ms),选举超时长(如 200-300ms)
  • 确保 Follower 在超时前收到心跳
  • 避免不必要的选举
配置示例
  1. # 持续故障恢复
  2. raft> kill node2
  3. raft> kill node3
  4. raft> revive node2
  5. raft> revive node3
复制代码
建议配置
  1. // 1. 创建节点配置
  2. RaftNodeConfig node1 = new RaftNodeConfig("node1", "127.0.0.1", 8081);
  3. RaftNodeConfig node2 = new RaftNodeConfig("node2", "127.0.0.1", 8082);
  4. RaftNodeConfig node3 = new RaftNodeConfig("node3", "127.0.0.1", 8083);
  5. List<RaftNodeConfig> allNodes = Arrays.asList(node1, node2, node3);
  6. // 2. 创建 Raft 配置
  7. RaftConfig config1 = new RaftConfig(node1, allNodes);
  8. config1.setElectionTimeout(2);  // 基础超时倍数
  9. config1.setElectionTimeoutRandomRange(Range.of(150, 300));  // 随机范围
  10. config1.setHeartbeatInterval(1);  // 心跳间隔 1 秒
  11. // 3. 创建 RPC 组件
  12. DefaultRpcServer rpcServer1 = new DefaultRpcServer(node1.getPort(), null);
  13. DefaultRpcClient rpcClient1 = new DefaultRpcClient();
  14. // 4. 创建并初始化 Raft 节点
  15. RaftNodeImpl raftNode1 = new RaftNodeImpl(config1, rpcServer1, rpcClient1);
  16. rpcServer1.setRaftNode(raftNode1);
  17. raftNode1.init();
  18. // 5. 等待选举
  19. Thread.sleep(2000);
  20. // 6. 检查节点状态
  21. if (raftNode1.getNodeStatus() == ServerStatusEnum.LEADER) {
  22.     System.out.println("Node1 is Leader, term: " + raftNode1.getCurrentTerm());
  23. }
复制代码
7.6 如何避免平票(split vote)?

平票场景
  1. // 创建监控线程
  2. Thread monitor = new Thread(() -> {
  3.     while (true) {
  4.         System.out.printf("Node1: %s(t%d) ",
  5.             raftNode1.getNodeStatus(),
  6.             raftNode1.getCurrentTerm());
  7.         System.out.printf("Node2: %s(t%d) ",
  8.             raftNode2.getNodeStatus(),
  9.             raftNode2.getCurrentTerm());
  10.         System.out.printf("Node3: %s(t%d)\n",
  11.             raftNode3.getNodeStatus(),
  12.             raftNode3.getCurrentTerm());
  13.         Thread.sleep(3000);
  14.     }
  15. });
  16. monitor.setDaemon(true);
  17. monitor.start();
复制代码
避免方法

  • 随机化超时(已实现)

    • 减少多个节点同时超时的概率

  • 预投票(Pre-vote)(未实现)

    • 先询问其他节点是否愿意投票
    • 如果多数派同意,再真正发起选举

  • 快速重试(未实现)

    • 平票后快速重新选举
    • 立即开始,不等超时

当前实现

  • 仅依赖超时随机化
  • 平票后等待超时重试
7.7 为什么单机模式直接成为 Leader?

原因

  • 单机集群不需要选举
  • 只有一个节点,自己就是多数派
  • 提高启动速度
代码实现
  1. // 停止 Leader 的心跳定时器
  2. raftNode1.cancelHeartbeatTimer();
  3. // 模拟 Follower 超时
  4. raftNode2.resetElectionTimer();  // 重置超时
  5. // 等待超时后,node2 会自动发起选举
复制代码
7.8 如何调优选举参数?

参数建议
参数推荐值说明心跳间隔50ms - 100ms越短越快,但网络开销大选举超时最小150ms - 200ms应该 > 心跳间隔选举超时最大300ms - 400ms应该是心跳间隔的 3-5 倍RPC 超时2000ms - 3000ms应该 > 选举超时调优示例
  1. // 获取当前投票信息
  2. String votedFor = raftNode1.getVotedFor();
  3. long currentTerm = raftNode1.getCurrentTerm();
  4. ServerStatusEnum status = raftNode1.getNodeStatus();
  5. System.out.println("Node1 - Status: " + status + ", Term: " + currentTerm + ", VotedFor: " + votedFor);
  6. // 如果是 Candidate,查看投票计数器
  7. if (status == ServerStatusEnum.CANDIDATE) {
  8.     VoteCounter counter = raftNode1.getCurrentVoteCounter();
  9.     System.out.println("Votes: " + counter.getVoteCount() + "/" + counter.getMajorityCount());
  10. }
复制代码
附录

A. 术语表

术语说明Term任期号,单调递增,用于识别 LeaderElection Timeout选举超时时间,随机化避免平票Heartbeat心跳,Leader 定期发送维持地位Majority多数派,超过半数的节点(N/2 + 1)Split Vote平票选举,没有节点获得多数派Candidate候选节点,发起选举的节点Leader主节点,处理客户端请求Follower从节点,响应 Leader 的请求B. 参考资料


  • Raft 论文:Diego Ongaro, John Ousterhout. "In Search of an Understandable Consensus Algorithm." 2014
  • Raft GitHub:https://github.com/ongardie/raft.github.io
  • 可视化 Raft:http://thesecretlivesofdata.com/raft/
  • Raft Scope:https://raft.github.io/raftscope/index.html
C. 相关文件

文件路径RaftNodeImplcom.ling.raft.core.RaftNodeImplConsensusModuleImplcom.ling.raft.core.ConsensusModuleImplVoteCountercom.ling.raft.core.VoteCounterElectionTaskcom.ling.raft.core.task.ElectionTaskHeartbeatTaskcom.ling.raft.core.task.HeartbeatTaskServerStatusEnumcom.ling.raft.enums.ServerStatusEnumVoteRequestcom.ling.raft.model.dto.VoteRequestVoteResponsecom.ling.raft.model.dto.VoteResponseThreeNodeElectionTestcom.ling.raft.example.leader.ThreeNodeElectionTest
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册