找回密码
 立即注册
首页 业界区 业界 简易的分布式kv设计

简易的分布式kv设计

静轾 2026-2-11 22:35:01
简易的分布式kv设计--(一)

这篇文章目前只设计到集群启动,然后自动选主的功能。
地址:https://github.com/Jack-txf/easy-kv
TIPS:此文章对应的分支版本是version0211
raft基础知识:https://mp.weixin.qq.com/s/DHO2CK87kI-clf4O96t5Cw
1. 前言

在 Raft KV 系统中,每个节点(Node)都是对等的。一个典型的请求流向是: Client -> Leader Node -> Raft 日志同步 -> 大多数节点确认 -> 应用到状态机 (KV Store) -> 返回 Client。
2. 设计步骤

Raft 核心组件包括:一致性结点模块,RPC 通信,日志模块。
2.1 日志
  1. 写日志 → 复制日志 → commit → apply 【leader应用顺序】细分一下的话就如下:Client   ↓Leader   ↓ append log (本地)   ↓发送 AppendEntries   ↓Followers append log   ↓多数成功   ↓Leader commit   ↓Leader apply   ↓返回客户端成功   ↓Leader 下次心跳带 commitIndex   ↓Followers apply
复制代码
首先看一下,客户端发送一个请求,涉及到的大致东西有哪些:
从后往前看,我们需要设计的就是如何写入日志文件,以及日志文件的格式该如何设计呢?此处我们就弄简单一点儿就好了
totalLength (int)term(long)index(long)commandLength(int)command(byte[])整条log entry 长度Raft term日志 index命令长度真正命令,这个肯定是变长的totalLength = 8 (term) + 8 (index) + 4 (commandLength) + commandLength
  1. public class LogEntry {    private final long term;    private final long index;    private final String command;    ....}
复制代码
日志存储与管理
  1. /** * @Description: 日志存储与管理 * @Author: txf * @Date: 2026/2/9 */public class LogManager {    // 日志文件路径    private static final String LOG_FILE_PATH = "easy_kv_log.dat";    // 内存映射的分段大小(128MB,可根据内存调整),这里先调整为两兆    private static final int MAPPED_SIZE = 2 * 1024 * 1024;    // 文件打开模式:rw = 读写    private static final String FILE_MODE = "rw";    private final File logFile;    private RandomAccessFile raf;    private FileChannel fileChannel;    // 当前映射的内存缓冲区    private MappedByteBuffer currentMappedBuffer;    // 当前映射段的起始偏移(文件偏移)    private long currentMappedOffset = 0;    // 当前写入的位置(相对于文件的总偏移)    private long writePosition = 0;    public LogManager() {        this.logFile = new File(LOG_FILE_PATH);        initFileChannel();        initMappedBuffer();        // 初始化时定位到文件末尾(继续追加写)        try {            this.writePosition = fileChannel.size();        } catch (IOException e) {            throw new RuntimeException("获取文件大小失败", e);        }    }    /**     * 追加写入单条日志(核心高性能写入)     * @param term Raft任期     * @param index 日志索引     * @param command KV操作命令(如"PUT key value")     */    public void appendLogEntry(long term, long index, String command) {        // 1. 准备命令字节数组        byte[] commandBytes = command.getBytes(StandardCharsets.UTF_8);        int commandLength = commandBytes.length;        // 2. 计算总长度        int totalLength = 4 + 8 + 8 + 4 + commandLength;        // 3. 准备直接缓冲区(堆外内存,避免拷贝)        ByteBuffer directBuffer = ByteBuffer.allocateDirect(totalLength);        // 按格式写入缓冲区 -- 这里是写入二进制的,文件内容我们人类就读不懂了        directBuffer.putInt(totalLength);        directBuffer.putLong(term);        directBuffer.putLong(index);        directBuffer.putInt(commandLength);        directBuffer.put(commandBytes);        // 这里是写入字符串的        // directBuffer.put((term + index + command).getBytes(StandardCharsets.UTF_8));        // 翻转缓冲区(从写模式转为读模式)        directBuffer.flip();        // 4. 写入到内存映射缓冲区(核心:零拷贝)        writeToMappedBuffer(directBuffer);        // 5. 更新全局写入位置        writePosition += totalLength;    }    /**     * 将缓冲区数据写入内存映射区(自动扩容映射段)     */    private void writeToMappedBuffer(ByteBuffer buffer) {        while (buffer.hasRemaining()) {            // 检查当前映射缓冲区是否有足够剩余空间            if (currentMappedBuffer.remaining() < buffer.remaining()) {                // 先写入当前映射区的剩余空间                int remaining = currentMappedBuffer.remaining();                byte[] temp = new byte[remaining];                buffer.get(temp);                currentMappedBuffer.put(temp);                // 强制刷盘(将映射内存的数据同步到磁盘,可选:批量刷盘可提升性能)                currentMappedBuffer.force();                // 扩容映射段                initMappedBuffer();            } else {                // 直接写入映射缓冲区                currentMappedBuffer.put(buffer);            }        }    }    /**     * 读取指定索引的日志条目(高性能读取)     */    public LogEntry readLogEntry(long index) {        try {            // 使用FileChannel + 直接缓冲区读取            ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024 * 1024); // 1MB直接缓冲区            long fileOffset = 0;            long fileSize = fileChannel.size();            while (fileOffset < fileSize) {                // 重置缓冲区                directBuffer.clear();                // 从文件指定偏移读取数据到缓冲区                int readBytes = fileChannel.read(directBuffer, fileOffset);                if (readBytes == -1) break;                directBuffer.flip();                // 解析缓冲区中的日志条目                while (directBuffer.hasRemaining()) {                    // 检查剩余字节是否足够读取固定头部(4+8+8+4=24字节)                    if (directBuffer.remaining() < 24) break;                    // 读取固定字段                    int totalLength = directBuffer.getInt();                    long term = directBuffer.getLong();                    long currentIndex = directBuffer.getLong();                    int commandLength = directBuffer.getInt();                    // 检查剩余字节是否足够读取command                    if (directBuffer.remaining() < commandLength) {                        // 回退缓冲区position,下次继续解析                        directBuffer.position(directBuffer.position() - 24);                        break;                    }                    // 读取command                    byte[] commandBytes = new byte[commandLength];                    directBuffer.get(commandBytes);                    String command = new String(commandBytes, StandardCharsets.UTF_8);                    // 校验总长度                    int actualLength = 4 + 8 + 8 + 4 + commandLength;                    if (totalLength != actualLength) {                        throw new RuntimeException("日志文件损坏:总长度不匹配");                    }                    // 找到目标索引则返回                    if (currentIndex == index) {                        return new LogEntry(term, index, command);                    }                    // 更新文件偏移                    fileOffset += totalLength;                }            }        } catch (IOException e) {            throw new RuntimeException("读取日志条目失败 [index=" + index + "]", e);        }        return null;    }    /**     * 加载所有日志条目(节点启动时恢复)     */    public List loadAllLogEntries() {        List logEntries = new ArrayList();        try {            ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024 * 1024);            long fileOffset = 0;            long fileSize = fileChannel.size();            while (fileOffset < fileSize) {                directBuffer.clear();                int readBytes = fileChannel.read(directBuffer, fileOffset);                if (readBytes == -1) break;                directBuffer.flip();                while (directBuffer.hasRemaining()) {                    if (directBuffer.remaining() < 24) break;                    int totalLength = directBuffer.getInt();                    long term = directBuffer.getLong();                    long index = directBuffer.getLong();                    int commandLength = directBuffer.getInt();                    if (directBuffer.remaining() < commandLength) {                        directBuffer.position(directBuffer.position() - 24);                        break;                    }                    byte[] commandBytes = new byte[commandLength];                    directBuffer.get(commandBytes);                    String command = new String(commandBytes, StandardCharsets.UTF_8);                    int actualLength = 4 + 8 + 8 + 4 + commandLength;                    if (totalLength != actualLength) {                        throw new RuntimeException("日志文件损坏:总长度不匹配");                    }                    logEntries.add(new LogEntry(term, index, command));                    fileOffset += totalLength;                }            }        } catch (IOException e) {            throw new RuntimeException("加载所有日志条目失败", e);        }        return logEntries;    }    /**     * 强制刷盘(将映射内存的数据同步到磁盘)     */    public void forceFlush() {        if (currentMappedBuffer != null) {            currentMappedBuffer.force(); // 同步映射内存到磁盘        }        try {            fileChannel.force(true); // 强制刷盘(包含元数据)        } catch (IOException e) {            throw new RuntimeException("刷盘失败", e);        }    }    /**     * 关闭资源(必须调用,否则会导致文件句柄泄漏)     */    public void close() {        try {            forceFlush();            if (fileChannel != null) {                fileChannel.close();            }            if (raf != null) {                raf.close();            }        } catch (IOException e) {            throw new RuntimeException("关闭资源失败", e);        }    }    /**     * 初始化/扩容内存映射缓冲区     */    private void initMappedBuffer() {        try {            // 计算需要映射的起始位置和大小            long fileSize = fileChannel.size();            // 如果当前映射段已写满,或首次初始化,创建新的映射            if (currentMappedBuffer == null || writePosition >= currentMappedOffset + MAPPED_SIZE) {                currentMappedOffset = (writePosition / MAPPED_SIZE) * MAPPED_SIZE;                // 映射文件的指定区间到内存(FileChannel.MapMode.READ_WRITE:读写模式)                currentMappedBuffer = fileChannel.map(                        FileChannel.MapMode.READ_WRITE,                        currentMappedOffset,                        MAPPED_SIZE                );                // 将缓冲区的position定位到当前写入位置相对于映射段的偏移                currentMappedBuffer.position((int) (writePosition - currentMappedOffset));            }        } catch (IOException e) {            throw new RuntimeException("初始化内存映射缓冲区失败", e);        }    }    /**     * 初始化FileChannel(核心高性能通道)     */    private void initFileChannel() {        try {            // 不存在则创建文件            if (!logFile.exists()) {                boolean newFile = logFile.createNewFile();                if (!newFile) {                    throw new RuntimeException("创建文件失败");                }            }            this.raf = new RandomAccessFile(logFile, FILE_MODE);            this.fileChannel = raf.getChannel();        } catch (IOException e) {            throw new RuntimeException("初始化FileChannel失败", e);        }    }}
复制代码
2.2 服务端设计

2.2.1 消息设计

日志设计好了之后,接下来看服务端如何设计。我们使用的是netty框架,要求有netty基础。然后序列化协议采用的是protobuf,读者可以参考这篇文章:https://mp.weixin.qq.com/s/kg_-AMHRn_DzFbfBnkK4VQ 这篇文章大致讲解了一下该序列化协议,并且也是采用netty整合的。根据proto文件生成类的命令如下,也可以用idea的插件自动生成。
  1. protoc  --proto_path=xxxx目录  --java_out=xxx目录  具体的proto文件
复制代码
消息模板如下:
  1. syntax = "proto3";option java_outer_classname = "KvRaftProto"; // 生成的外层类名option java_multiple_files = false; // 生成多个独立的Java类(而非内部类)// ===================== 1. 通用消息封装体(核心) =====================// Netty传输时只传这个消息,通过type识别具体消息类型message RaftKvMessage {  // 消息类型枚举(覆盖所有交互场景)  enum MessageType {    UNKNOWN = 0; // 未知类型(兜底)    // 客户端 ↔ 节点:KV操作    KV_REQUEST = 1;    // 客户端发起KV请求(PUT/GET/DELETE)    KV_RESPONSE = 2;   // 节点响应客户端KV请求    // 节点 ↔ 节点:Raft共识    VOTE_REQUEST = 3;  // 选举请求(Candidate→Follower)    VOTE_RESPONSE = 4; // 选举响应(Follower→Candidate)    APPEND_ENTRIES_REQUEST = 5;  // 日志追加/心跳(Leader→Follower)    APPEND_ENTRIES_RESPONSE = 6; // 日志追加响应(Follower→Leader)  }  MessageType type = 1; // 消息类型(必传)  string node_id = 2;   // 发送方节点ID(用于识别节点)  // 具体消息体(根据type选择其中一个)  KvRequest kv_request = 3;  KvResponse kv_response = 4;  VoteRequest vote_request = 5;  VoteResponse vote_response = 6;  AppendEntriesRequest append_entries_request = 7;  AppendEntriesResponse append_entries_response = 8;}// ===================== 2. 客户端KV操作相关 =====================// 客户端发起的KV请求(PUT/GET/DELETE)message KvRequest {  enum OpType {    PUT = 0;    // 写入/更新    GET = 1;    // 读取    DELETE = 2; // 删除  }  OpType op_type = 1; // 操作类型(必传)  string key = 2;     // KV的key(必传)  string value = 3;   // KV的value(仅PUT时传)  // 可选:请求ID,用于幂等性(防止重复请求)  string request_id = 4;}// 节点响应客户端的KV结果message KvResponse {  bool success = 1;    // 操作是否成功  string message = 2;  // 错误信息/提示(失败时必传)  string value = 3;    // 返回的value(仅GET成功时传)  string request_id = 4; // 对应请求的ID(幂等性)}// ===================== 3. Raft选举相关 =====================// Candidate向Follower发起的投票请求message VoteRequest {  int64 term = 1;                // Candidate的当前任期(必传)  string candidate_id = 2;       // Candidate的节点ID(必传)  int64 last_log_index = 3;      // Candidate最后一条日志的索引(用于日志一致性检查)  int64 last_log_term = 4;       // Candidate最后一条日志的任期(用于日志一致性检查)}// Follower响应Candidate的投票结果message VoteResponse {  int64 term = 1;                // Follower的当前任期(必传,用于更新Candidate的任期)  bool vote_granted = 2;         // 是否投赞成票(必传)}// ===================== 4. Raft日志追加/心跳相关 =====================// 日志条目(与你设计的日志格式对齐,序列化后可直接写入日志文件)message LogEntry {  int64 term = 1;        // Raft任期(对应你日志格式的term)  int64 index = 2;       // 日志索引(对应你日志格式的index)  string command = 3;    // KV操作命令(如"PUT key1 value1",对应你日志格式的command)}// Leader向Follower发送的日志追加/心跳请求message AppendEntriesRequest {  int64 term = 1;                // Leader的当前任期(必传)  string leader_id = 2;          // Leader的节点ID(必传)  int64 prev_log_index = 3;      // 前一条日志的索引(用于日志一致性检查)  int64 prev_log_term = 4;       // 前一条日志的任期(用于日志一致性检查)  repeated LogEntry entries = 5; // 待追加的日志条目(心跳时为空)  int64 leader_commit = 6;       // Leader已提交的日志索引(Follower据此更新自己的提交索引)}// Follower响应Leader的日志追加结果message AppendEntriesResponse {  int64 term = 1;                // Follower的当前任期(必传,用于更新Leader的任期)  bool success = 2;              // 日志追加是否成功(必传)  int64 match_index = 3;         // Follower已匹配的日志索引(Leader据此更新nextIndex)}
复制代码
上面是消息的大致格式。
接下来看服务端的节点设计,我们从netty服务启动开始往下看:
在kv-core的app包里面
  1. public static void main(String[] args) {    int port = getPort(args);    RaftNettyServer raftNettyServer = new RaftNettyServer(port);    try {        raftNettyServer.start();    } catch (InterruptedException e) {        throw new RuntimeException(e);    }}private static int getPort(String[] args) {    for (String arg : args) {        if (arg.startsWith("node.port=")) {            return Integer.parseInt(arg.substring(10));        }    }    return 7777;}
复制代码
从java程序启动的命令行读取结点的端口参数,我们可以用一台电脑开启多个应用,在idea中这样配置就可以了:如下图所示
从上图中可以看到,配置了三个节点,然后本项目的jdk是采用的21这个版本。配置好了之后,在idea的services里面可以把这些配置一起加进去,然后就可以同时启动多个节点了。
2.2.2 连接设计
  1. RaftNettyServer raftNettyServer = new RaftNettyServer(port);...raftNettyServer.start();
复制代码
从上一节的启动类看出来主要就是new了一个server对象,然后调用start方法。我们顺着这两个看就可以了。
首先是构造方法:
  1. public class RaftNettyServer {          ....    private final int port;    private final RaftNode node;    public RaftNettyServer(int port) {        this.port = port;         // 这里创建了一个raft结点对象        this.node = new RaftNode(port);    }}// 这个是RaftNode类,public RaftNode(int port) {    this.port = port;    // 从配置文件中找到自己    this.nodesConfig = new NodesConfig();    this.nodeId = nodesConfig.findSelf(port);    // 需要把自身结点    this.rpcPeers = nodesConfig.getNodeList().stream()        // node的格式是'ip:port'        .map(node -> new RpcPeer(node, node.split(":")[0],                                 Integer.parseInt(node.split(":")[1]), this))        .toList();    this.logManager = new LogManager();    this.storage = new MemoryStorage();    // 把两个时间先初始化咯    this.electionTimeout = 8000 + ThreadLocalRandom.current().nextInt(4000);    this.lastHeartbeatTime = System.currentTimeMillis();    log.info("初始化选举超时:{}", electionTimeout);    // 定时器    scheduler = Executors.newScheduledThreadPool(2);    scheduler.scheduleAtFixedRate(this::tick, 2, 2000, TimeUnit.MILLISECONDS);}
复制代码
Raft类是最核心的一个类。上面的构造方法其实很简单。读者自行理解。需要说明一下的就是nodeId的格式:【ip:端口】
  1. 127.0.0.1:8888 // 就是这种字符串的格式
复制代码
还有要说明的就是rpcPeers这个List的构建,可以看出来先是从配置文件读取到了集群节点列表,然后遍历这个列表创建了对象,这个具体是什么意思呢?
首先看一下Netty的客户端发送请求到服务端,服务端处理后在返回给客户端,客户端根据响应结果进行逻辑处理。这样一个示意图:
再看一下下面的连接示意图:
NettyClient不仅仅是给客户用的,集群结点内部互相通信也要用到这个,rpcPeers就是集群节点内部通信使用的。如上图所示,每个节点都有一个NettyServer启动并监听着端口,同时Leader结点需要给所有的Follower结点发送心跳请求,此时这个Leader相当于其他两个Follower结点就相当于Client了,所以在结点一里面有client的部分,在项目归到了rpc的包下,也就是上面那个rpcPeers的由来了。
在集群启动的时候,都是Follower结点,只有等到超时了才会开始选主,在此之前,每个节点都有成为Candidate的可能,也就是说每个节点都有向其他结点发送投票请求的可能(VoteRequest),那么每个节点里面都要有一套Netty Client及处理流程。就如下图所示:
这样就有一个问题了,三个节点,我就要六个tcp连接了,四个节点就要12个连接,十个节点呢,就要90个连接,这也太多了吧,那也确实是的。有一个思路是搞一个中间层叫做routeCenter,所有结点连向它,然后消息都经过这个路由中心来转发,这样连接数就会少很多了。
还有一个思路,反正NettyClient + 一个NettyServer构建出一个Channel,理论上我只需要三个tcp连接就可以了啊。如下图所示:
但是这样的话,节点间通信需要转发了,代码逻辑就太复杂了,况且还有一个问题,那就是如何确定这个tcp的连接顺序呢?这个可以按照nodeId的字典顺序来嘛。反正就是麻烦就完事儿了。。。
综上所述,还是最开始的方案最简单直接了,反正这就是一个简易的案例,不要考虑太多了。如果有一些其他合适的思路,欢迎读者给出。
节点之间的连接设计就是上面的样子了。
接下来看RaftNode的设计。
2.2.3 RaftNode设计

那就从构造器开始看吧:
  1. public RaftNode(int port) {    this.port = port;    // 从配置文件中找到自己    this.nodesConfig = new NodesConfig();    this.nodeId = nodesConfig.findSelf(port);    // 需要把自身结点    this.rpcPeers = nodesConfig.getNodeList().stream()        // node的格式是'ip:port'        .map(node -> new RpcPeer(node, node.split(":")[0],                                 Integer.parseInt(node.split(":")[1]), this))        .toList();    this.logManager = new LogManager();    this.storage = new MemoryStorage();    // 把两个时间先初始化咯    this.electionTimeout = 8000 + ThreadLocalRandom.current().nextInt(4000);    this.lastHeartbeatTime = System.currentTimeMillis();    log.info("初始化选举超时:{}", electionTimeout);    // 定时器    scheduler = Executors.newScheduledThreadPool(2);    scheduler.scheduleAtFixedRate(this::tick, 2, 2000, TimeUnit.MILLISECONDS);}
复制代码
可以看到都是做一些初始化的工作,然后下面是开启了一个定时任务tick
  1. private void tick() {    log.info("检查是否超时:{} 状态: {}", nodeId, state);    try {        if (state != NodeState.LEADER && isTimeout()) {            becomeCandidate();        } else if (state == NodeState.LEADER) {            // sendHeartbeats();        }    } catch ( Exception e ) {        log.error("{} 节点tick定时任务异常", nodeId, e);    }}private void becomeCandidate() {    log.info("{} 选举超时,转为 Candidate,开始任期: {}", nodeId, currentTerm.get() + 1);    state = NodeState.CANDIDATE;    currentTerm.getAndIncrement(); // 任期+1    votedFor = nodeId; // 给自己投一票    resetElectionTimeout();    // 集群发送投票请求    requestVotes();} private boolean isTimeout() {     return System.currentTimeMillis() - lastHeartbeatTime > electionTimeout; }private void resetElectionTimeout() {    // 8000ms ~ 12000ms 随机超时,避免平票    this.electionTimeout = 8000 + ThreadLocalRandom.current().nextInt(4000);    log.info("重置 {} 节点选举时间,随机超时:{} ms", nodeId, electionTimeout);    this.lastHeartbeatTime = System.currentTimeMillis();}
复制代码
主要就是看becomeCandidate这个方法最后的向集群发送投票请求。
  1. private void requestVotes() {    // 1. 初始化票数:自己的一票    AtomicInteger grantedVotes = new AtomicInteger(1);    long count = rpcPeers.stream().filter(peer -> !peer.isSelf()).count(); // 不包含自己的结点数    int majority = (int) ((count + 1) / 2 + 1); // 总节点数(包含自己)的半数以上    // 2.构造投票消息    KvRaftProto.VoteRequest voteRequest = KvRaftProto.VoteRequest.newBuilder()        .setTerm(currentTerm.get())        .setCandidateId(nodeId)        .setLastLogIndex(logManager.getLastLogIndex())        .setLastLogTerm(logManager.getLastLogTerm())        .build();    // 3. 发送投票请求    // 构建一个对象,表示当前投票请求的状态    // String voteId = UUID.randomUUID().toString().replaceAll("-", "");    // 这里为什么可以用term?因为 Raft 规定,一个节点在一个 term 内只能投一张票。    // 所以,只要 term 匹配,这个响应就一定是针对你当前发起的这一轮选举的。    GlobalVoteManager.setVoteState(currentTerm.get(), new VoteState(nodeId, currentTerm.get(), majority));    int countSend = 0;    for (RpcPeer peer : rpcPeers) {        if (!peer.isSelf()) { // 不是自身结点,就发送投票请求            // send方法就很简单了,请读者自行查看            boolean send = peer.send(KvRaftProto.RaftKvMessage.newBuilder()                                     .setType(KvRaftProto.RaftKvMessage.MessageType.VOTE_REQUEST)                                     .setVoteRequest(voteRequest)                                     .build());            if ( send ) countSend++;        }    }    log.info("发送投票请求:{},已发送给了 {} 个结点..", voteRequest, countSend);}
复制代码
这样投票请求就发送出去了,此时结点是作为客户端发送给其他节点的,接下来的逻辑就是其他节点接收到voteRequest请求然后做逻辑处理,所以就要在server包下面去查看具体逻辑。
  1. // 在kv-core的server包下面的KvBusinessHandler.java// 1.如果是投票请求if ( raftKvMessage.getType() == KvRaftProto.RaftKvMessage.MessageType.VOTE_REQUEST) {    log.info("receive vote request.........");    KvRaftProto.VoteRequest voteRequest = raftKvMessage.getVoteRequest();    // 可以看到交给了node去处理    KvRaftProto.VoteResponse voteResponse = node.tackleVoteRequest(voteRequest);    ctx.writeAndFlush(KvRaftProto.RaftKvMessage.newBuilder()                      .setType(KvRaftProto.RaftKvMessage.MessageType.VOTE_RESPONSE)                      .setVoteResponse(voteResponse)                      .build());}// 又回到了RaftNode类了public KvRaftProto.VoteResponse tackleVoteRequest(KvRaftProto.VoteRequest voteRequest) {    // 比较任期    if (voteRequest.getTerm() < currentTerm.get()) {        log.info("{} 投票请求任期太小,拒绝投票", nodeId);        return buildVoteResponse(false, currentTerm.get());    }    if ( votedFor != null && !voteRequest.getCandidateId().equals(votedFor) ) {        log.info("{}已投给其他人,拒绝该投票请求", nodeId);        return buildVoteResponse(false, currentTerm.get());    }    // 再比较日志情况    if ( voteRequest.getLastLogIndex() >= logManager.getLastLogIndex() &&        voteRequest.getLastLogTerm() >= logManager.getLastLogTerm() ) {        log.info("{} 投票请求ok,赞成投票", nodeId);        currentTerm.set(voteRequest.getTerm()); // 更新自己的任期        votedFor = voteRequest.getCandidateId(); // 投票给该节点        return buildVoteResponse(true, voteRequest.getTerm());    }    log.info("{} 投票请求日志太旧,拒绝该投票请求", nodeId);    return buildVoteResponse(false, currentTerm.get());}
复制代码
其他节点收到了拉票请求,会返回response给candidate结点,candidate结点是作为Client发送的拉票请求,收到的响应肯定是在客户端的处理器handler,接下来的逻辑就要在rpc包下面的RpcClientHandler去查看了:
  1. @Overrideprotected void channelRead0(ChannelHandlerContext ctx, KvRaftProto.RaftKvMessage msg) {    // 2.VOTE_RESPONSE 投票请求回来的响应【投票请求是结点作为客户端发出的,应该在客户端的handler处理响应】    if (msg.getType() == KvRaftProto.RaftKvMessage.MessageType.VOTE_RESPONSE) {        log.info("receive vote response.........");        KvRaftProto.VoteResponse voteResponse = msg.getVoteResponse();        raftNode.tackleVoteResponse(voteResponse); // 又回到了RaftNode    }}// RaftNode.java// 投票结果处理,【投票请求是结点作为客户端发出的,要在客户端的handler处理响应】public synchronized void tackleVoteResponse(KvRaftProto.VoteResponse voteResponse) {    long term = voteResponse.getTerm();    // 2. 发现更高任期,立即降级并更新    // 1. 任期检查:对方比我大,我立即认输    if (term > currentTerm.get()) {        stepDown(term);        return;    }    // 2. 状态检查:如果我已经不是 Candidate 了(比如已经超时重选或收到心跳),忽略    if (state != NodeState.CANDIDATE) return;    // 3. 任期匹配检查:确保这是对“当前这一轮”选举的回复    // 如果收到的响应任期比当前小,说明是之前过期的选举回复,直接丢弃    if (term < currentTerm.get()) {        return;    }    // 4. 从全局管理器获取当前选举的投票状态    VoteState voteState = GlobalVoteManager.getVoteState(term);    if (voteState == null) {        log.error("未找到任期 {} 的投票记录状态", term);        return;    }    // 5. 如果对方投了赞成票    if (voteResponse.getVoteGranted()) {        // 增加票数(这里 AtomicInteger 在 VoteState 内部保证了线程安全,        // 但由于本方法加了 synchronized,其实双重保险)        int currentVotes = voteState.addVote();        int majority = voteState.getMajority();        log.info("赞成票,当前票数: {}/{}", currentVotes, nodesConfig.getNodeList().size());        // 6. 检查是否达到多数派        if (currentVotes >= majority) {            log.info("节点 {} 获得过半选票 ({}),准备晋升为 Leader", nodeId, currentVotes);            becomeLeader();        }    } else {        log.info("拒绝了我的投票请求");    }}private synchronized void becomeLeader() {    if (state != NodeState.CANDIDATE) return;    if (state == NodeState.LEADER) return;    this.state = NodeState.LEADER;    log.info("Node {} 赢得选举,即将成为 Leader, Term: {}", nodeId, currentTerm.get());    // 1. 清理上一任期的残留状态    this.votedFor = null;    // 2. 立即发送第一波心跳,宣示主权 (防止其他节点又超时)    sendHeartbeats();    // 3. 启动定时心跳任务 (比如每 2 秒一次)    if (heartbeatTask != null) heartbeatTask.cancel(true);    heartbeatTask = scheduler.scheduleAtFixedRate(this::sendHeartbeats,                                                  0, 1000, TimeUnit.MILLISECONDS);    log.info(">", nodeId, currentTerm.get());}
复制代码
3.启动测试选主

把配置好的三个节点启动一下看看结果



从上图可以看出,端口9999成为了Leader结点,然后向其他节点发送心跳数据了。
接下来就是完善一下日志分发那些逻辑了。
end. 参考


  • http://thinkinjava.cn/2019/01/12/2019/2019-01-12-lu-raft-kv/#什么是-Java-版-Raft-分布式-KV-存储
  • https://github.com/stateIs0/lu-raft-kv

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册