Skip to main content

Raft

1.Raft类

0.主要成员函数和方法

成员变量

std::mutex m_mtx;
std::vector<std::shared_ptr< RaftRpc >> m_peers; //需要与其他raft节点通信,这里保存与其他结点通信的rpc入口
std::shared_ptr<Persister> m_persister; //持久化层,负责raft数据的持久化
int m_me; //raft是以集群启动,这个用来标识自己的的编号
int m_currentTerm; //记录当前的term
int m_votedFor; //记录当前term给谁投票过
std::vector<mprrpc:: LogEntry> m_logs; //// 日志条目数组,包含了状态机要执行的指令集,以及收到领导时的任期号
// 这两个状态所有结点都在维护,易失
int m_commitIndex;
int m_lastApplied; // 已经汇报给状态机(上层应用)的log 的index

// 这两个状态是由leader来维护,易失 ,这两个部分在内容补充的部分也会再讲解
std::vector<int> m_nextIndex; // 这两个状态的下标1开始,因为通常commitIndex和lastApplied从0开始,应该是一个无效的index,因此下标从1开始
std::vector<int> m_matchIndex;
enum Status
{
Follower,
Candidate,
Leader
};
// 保存当前身份
Status m_status;

std::shared_ptr<LockQueue<ApplyMsg>> applyChan; // client从这里取日志,client与raft通信的接口
// ApplyMsgQueue chan ApplyMsg // raft内部使用的chan,applyChan是用于和服务层交互,最后好像没用上

// 选举超时
std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;
// 心跳超时,用于leader
std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;

// 用于传入快照点
// 储存了快照中的最后一个日志的Index和Term
int m_lastSnapshotIncludeIndex;
int m_lastSnapshotIncludeTerm;

成员方法

void AppendEntries1(const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply); //日志同步 + 心跳 rpc ,重点关注
void applierTicker(); //定期向状态机写入日志,非重点函数

bool CondInstallSnapshot(int lastIncludedTerm, int lastIncludedIndex, std::string snapshot); //快照相关,非重点
void doElection(); //发起选举
void doHeartBeat(); //leader定时发起心跳
// 每隔一段时间检查睡眠时间内有没有重置定时器,没有则说明超时了
// 如果有则设置合适睡眠时间:睡眠到重置时间+超时时间
void electionTimeOutTicker(); //监控是否该发起选举了
std::vector<ApplyMsg> getApplyLogs();
int getNewCommandIndex();
void getPrevLogInfo(int server, int *preIndex, int *preTerm);
void GetState(int *term, bool *isLeader); //看当前节点是否是leader
void InstallSnapshot( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply);
void leaderHearBeatTicker(); //检查是否需要发起心跳(leader)
void leaderSendSnapShot(int server);
void leaderUpdateCommitIndex(); //leader更新commitIndex
bool matchLog(int logIndex, int logTerm); //对应Index的日志是否匹配,只需要Index和Term就可以知道是否匹配
void persist(); //持久化
void RequestVote(const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply); //变成candidate之后需要让其他结点给自己投票
bool UpToDate(int index, int term); //判断当前节点是否含有最新的日志
int getLastLogIndex();
void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);
int getLogTermFromLogIndex(int logIndex);
int GetRaftStateSize();
int getSlicesIndexFromLogIndex(int logIndex); //设计快照之后logIndex不能与在日志中的数组下标相等了,根据logIndex找到其在日志数组中的位置


bool sendRequestVote(int server , std::shared_ptr<mprrpc::RequestVoteArgs> args , std::shared_ptr<mprrpc::RequestVoteReply> reply, std::shared_ptr<int> votedNum) ; // 请求其他结点的投票
bool sendAppendEntries(int server ,std::shared_ptr<mprrpc::AppendEntriesArgs> args , std::shared_ptr<mprrpc::AppendEntriesReply> reply , std::shared_ptr<int> appendNums ) ; //Leader发送心跳后,对心跳的回复进行对应的处理


//rf.applyChan <- msg //不拿锁执行 可以单独创建一个线程执行,但是为了同意使用std:thread ,避免使用pthread_create,因此专门写一个函数来执行
void pushMsgToKvServer(ApplyMsg msg); //给上层的kvserver层发送消息
void readPersist(std::string data);
std::string persistData();
void Start(Op command,int* newLogIndex,int* newLogTerm,bool* isLeader ) ; // 发布发来一个新日志
// 即kv-server主动发起,请求raft(持久层)保存snapshot里面的数据,index是用来表示snapshot快照执行到了哪条命令
void Snapshot(int index , std::string snapshot );

public:
void init(std::vector<std::shared_ptr< RaftRpc >> peers,int me,std::shared_ptr<Persister> persister,std::shared_ptr<LockQueue<ApplyMsg>> applyCh); //初始化

1.初始化init函数

1.初始化参数

2.如果崩溃了则读取文件

3.启动两个协程leaderHearBeatTicker用来判断超时选举,leaderHearBeatTicker用来维持心跳机制。启动一个线程applierTicker定期向状态机写入日志。

void Raft::init(std::vector<std::shared_ptr<RaftRpc>> peers, int me, std::shared_ptr<Persister> persister, std::shared_ptr<LockQueue<ApplyMsg>> applyCh) {
m_peers = peers; //与其他结点沟通的rpc类
m_persister = persister; //持久化类
m_me = me; //标记自己,毕竟不能给自己发送rpc吧

m_mtx.lock();

//applier
this->applyChan = applyCh; //与kv-server沟通
// rf.ApplyMsgQueue = make(chan ApplyMsg)
m_currentTerm = 0; //初始化term为0
m_status = Follower; //初始化身份为follower
m_commitIndex = 0;
m_lastApplied = 0;
m_logs.clear();
for (int i =0;i<m_peers.size();i++){
m_matchIndex.push_back(0);
m_nextIndex.push_back(0);
}
m_votedFor = -1; //当前term没有给其他人投过票就用-1表示

m_lastSnapshotIncludeIndex = 0;
m_lastSnapshotIncludeTerm = 0;
m_lastResetElectionTime = now();
m_lastResetHearBeatTime = now();

// 从持久化类读取操作对应的成员变量。
readPersist(m_persister->ReadRaftState());
if(m_lastSnapshotIncludeIndex > 0){
m_lastApplied = m_lastSnapshotIncludeIndex;
//rf.commitIndex = rf.lastSnapshotIncludeIndex 崩溃恢复不能读取commitIndex
}

m_mtx.unlock();
m_ioManager = std::make_unique<monsoon::IOManager>(FIBER_THREAD_NUM, FIBER_USE_CALLER_THREAD); //启动协程

// start ticker fiber to start elections
// 启动三个循环定时器
// todo:原来是启动了三个线程,现在是直接使用了协程,三个函数中leaderHearBeatTicker
// 、electionTimeOutTicker执行时间是恒定的,applierTicker时间受到数据库响应延迟和两次apply之间请求数量的影响,这个随着数据量增多可能不太合理,最好其还是启用一个线程。
m_ioManager->scheduler([this]() -> void { this->leaderHearBeatTicker(); });
m_ioManager->scheduler([this]() -> void { this->electionTimeOutTicker(); });

std::thread t3(&Raft::applierTicker, this); //定期向状态机写入日志
t3.detach();
}

2.领导选举

img

electionTimeOutTicker:负责查看是否该发起选举,如果该发起选举就执行doElection发起选举。

doElection:实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。

sendRequestVote:负责发送选举中的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

RequestVote:接收别人发来的选举请求,主要检验是否要给对方投票。

electionTimeOutTicker 查看是否该发起选举

while :

​ 1.随机初始化 选举时间 150 - 300 ms

​ 2.睡眠 选举时间

​ 3.判断m_lastResetElectionTime - wakeTime 有没有大于0,有的话说明这段时间有重置定时器,则继续睡眠

​ 4.超时的话发起选举 doElection();

void Raft::electionTimeOutTicker() {
// Check if a Leader election should be started.
while (true) {
/**
* 如果不睡眠,那么对于leader,这个函数会一直空转,浪费cpu。且加入协程之后,空转会导致其他协程无法运行,对于时间敏感的AE,会导致心跳无法正常发送导致异常
*/
//是Leader就睡眠 维持心跳的时间
while (m_status == Leader) {
usleep(
HeartBeatTimeout); //定时时间没有严谨设置,因为HeartBeatTimeout比选举超时一般小一个数量级,因此就设置为HeartBeatTimeout了
}
std::chrono::duration<signed long int, std::ratio<1, 1000000000>> suitableSleepTime{};
std::chrono::system_clock::time_point wakeTime{};
{
m_mtx.lock();
wakeTime = now();
suitableSleepTime = getRandomizedElectionTimeout() + m_lastResetElectionTime - wakeTime; //随机初始化suitableSleepTime
m_mtx.unlock();
}

if (std::chrono::duration<double, std::milli>(suitableSleepTime).count() > 1) {
// 获取当前时间点
auto start = std::chrono::steady_clock::now();

usleep(std::chrono::duration_cast<std::chrono::microseconds>(suitableSleepTime).count());
// std::this_thread::sleep_for(suitableSleepTime);

// 获取函数运行结束后的时间点
auto end = std::chrono::steady_clock::now();

// 计算时间差并输出结果(单位为毫秒)
std::chrono::duration<double, std::milli> duration = end - start;

// 使用ANSI控制序列将输出颜色修改为紫色
std::cout << "\033[1;35m electionTimeOutTicker();函数设置睡眠时间为: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(suitableSleepTime).count() << " 毫秒\033[0m"
<< std::endl;
std::cout << "\033[1;35m electionTimeOutTicker();函数实际睡眠时间为: " << duration.count() << " 毫秒\033[0m"
<< std::endl;
}

if (std::chrono::duration<double, std::milli>(m_lastResetElectionTime - wakeTime).count() > 0) {
//说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doElection();
}
}

doElection() 发起超时选举

发起选举,

对所有的m_peers发起RPC请求

void Raft::doElection() {
lock_guard<mutex> g(m_mtx); //c11新特性,使用raii避免死锁


if (m_status != Leader) {
DPrintf("[ ticker-func-rf(%d) ] 选举定时器到期且不是leader,开始选举 \n", m_me);
//当选举的时候定时器超时就必须重新选举,不然没有选票就会一直卡住
//重竞选超时,term也会增加的
m_status = Candidate;
///开始新一轮的选举
m_currentTerm += 1; //无论是刚开始竞选,还是超时重新竞选,term都要增加
m_votedFor = m_me; //即是自己给自己投票,也避免candidate给同辈的candidate投
persist();
std::shared_ptr<int> votedNum = std::make_shared<int>(1); // 使用 make_shared 函数初始化 !! 亮点
// 重新设置定时器
m_lastResetElectionTime = now();
// 发布RequestVote RPC
for (int i = 0; i < m_peers.size(); i++) {
if (i == m_me) {
continue;
}
int lastLogIndex = -1, lastLogTerm = -1;
getLastLogIndexAndTerm(&lastLogIndex, &lastLogTerm);//获取最后一个log的term和下标,以添加到RPC的发送 - 自己的

//初始化发送参数
std::shared_ptr<mprrpc::RequestVoteArgs> requestVoteArgs = std::make_shared<mprrpc::RequestVoteArgs>();
requestVoteArgs->set_term(m_currentTerm);
requestVoteArgs->set_candidateid(m_me);
requestVoteArgs->set_lastlogindex(lastLogIndex);
requestVoteArgs->set_lastlogterm(lastLogTerm);
std::shared_ptr<mprrpc::RequestVoteReply> requestVoteReply = std::make_shared<mprrpc::RequestVoteReply>();

//使用匿名函数执行避免其拿到锁

std::thread t(&Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply,
votedNum); // 创建新线程并执行函数,并传递参数
t.detach();
}
}
}

sendRequestVote 发起投票 后 处理

1.发送RPC请求,得到args参数,reply是要回复的消息

2.如果对端任期大于目前当前编号,三变:身份-Follower,term,和投票

3.如果投票了,票数 + 1,变成leader,初始化状态和nextIndex、matchIndex

4.m_nextIndex[i] = lastLogIndex + 1; //有效下标从1开始,因此要+1 ,有效下标改为 m_matchIndex[i] = 0; //每换一个领导都是从0开始,匹配的Index

5.成功了则发送心跳 doHeartBeat

bool Raft::sendRequestVote(int server, std::shared_ptr<raftRpcProctoc::RequestVoteArgs> args,
std::shared_ptr<raftRpcProctoc::RequestVoteReply> reply, std::shared_ptr<int> votedNum) {
//这个ok是网络是否正常通信的ok,而不是requestVote rpc是否投票的rpc
// ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
// todo
auto start = now();
DPrintf("[func-sendRequestVote rf{%d}] 向server{%d} 發送 RequestVote 開始", m_me, m_currentTerm, getLastLogIndex());
bool ok = m_peers[server]->RequestVote(args.get(), reply.get());
DPrintf("[func-sendRequestVote rf{%d}] 向server{%d} 發送 RequestVote 完畢,耗時:{%d} ms", m_me, m_currentTerm,
getLastLogIndex(), now() - start);

if (!ok) {
return ok; //不知道为什么不加这个的话如果服务器宕机会出现问题的,通不过2B todo
}
// for !ok {
//
// //ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
// //if ok {
// // break
// //}
// } //这里是发送出去了,但是不能保证他一定到达
//对回应进行处理,要记得无论什么时候收到回复就要检查term
std::lock_guard<std::mutex> lg(m_mtx);
if (reply->term() > m_currentTerm) {
m_status = Follower; //三变:身份,term,和投票
m_currentTerm = reply->term();
m_votedFor = -1;
persist();
return true;
} else if (reply->term() < m_currentTerm) {
return true;
}
myAssert(reply->term() == m_currentTerm, format("assert {reply.Term==rf.currentTerm} fail"));

// todo:这里没有按博客写
if (!reply->votegranted()) {
return true;
}

*votedNum = *votedNum + 1;
if (*votedNum >= m_peers.size() / 2 + 1) {
//变成leader
*votedNum = 0;
if (m_status == Leader) {
//如果已经是leader了,那么是就是了,不会进行下一步处理了k
myAssert(false,
format("[func-sendRequestVote-rf{%d}] term:{%d} 同一个term当两次领导,error", m_me, m_currentTerm));
}
// 第一次变成leader,初始化状态和nextIndex、matchIndex
m_status = Leader;

DPrintf("[func-sendRequestVote rf{%d}] elect success ,current term:{%d} ,lastLogIndex:{%d}\n", m_me, m_currentTerm,
getLastLogIndex());

int lastLogIndex = getLastLogIndex();
for (int i = 0; i < m_nextIndex.size(); i++) {
m_nextIndex[i] = lastLogIndex + 1; //有效下标从1开始,因此要+1
m_matchIndex[i] = 0; //每换一个领导都是从0开始,见fig2
}
std::thread t(&Raft::doHeartBeat, this); //马上向其他节点宣告自己就是leader
t.detach();

persist();
}
return true;
}

RequestVote 对投票结果进行处理

1.args->term() < m_currentTerm 出现网络分区,该竞选者已经OutOfDate(过时)

2.args->term() > m_currentTerm ,更新term,并变成follower

3.检查日志且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票

4.如果正常则同意投票

void Raft::RequestVote( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) {
lock_guard<mutex> lg(m_mtx);

Defer ec1([this]() -> void { //应该先持久化,再撤销lock,因此这个写在lock后面
this->persist();
});
//对args的term的三种情况分别进行处理,大于小于等于自己的term都是不同的处理
//reason: 出现网络分区,该竞选者已经OutOfDate(过时)
if (args->term() < m_currentTerm) {
reply->set_term(m_currentTerm);
reply->set_votestate(Expire);
reply->set_votegranted(false);
return;
}
//论文fig2:右下角,如果任何时候rpc请求或者响应的term大于自己的term,更新term,并变成follower
if (args->term() > m_currentTerm) {

m_status = Follower;
m_currentTerm = args->term();
m_votedFor = -1;

// 重置定时器:收到leader的ae,开始选举,透出票
//这时候更新了term之后,votedFor也要置为-1
}

// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了)
// 要检查log的term和index是不是匹配的了
int lastLogTerm = getLastLogIndex();
//只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票
if (!UpToDate(args->lastlogindex(), args->lastlogterm())) {

//日志太旧了
reply->set_term(m_currentTerm);
reply->set_votestate(Voted);
reply->set_votegranted(false);
return;
}

// 当因为网络质量不好导致的请求丢失重发就有可能!!!!
// 因此需要避免重复投票
if (m_votedFor != -1 && m_votedFor != args->candidateid()) {
reply->set_term(m_currentTerm);
reply->set_votestate(Voted);
reply->set_votegranted(false);
return;
} else {
//同意投票
m_votedFor = args->candidateid();
m_lastResetElectionTime = now();//认为必须要在投出票的时候才重置定时器,
reply->set_term(m_currentTerm);
reply->set_votestate(Normal);
reply->set_votegranted(true);
return;
}
}

2.日志复制|心跳

img

leaderHearBeatTicker:负责查看是否该发送心跳了,如果该发起就执行doHeartBeat。

doHeartBeat:实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。

sendAppendEntries:负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

leaderSendSnapShot:负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

AppendEntries:接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。

InstallSnapshot:接收leader发来的快照请求,同步快照到本机。

leaderHearBeatTicker 是否发送心跳

void Raft::leaderHearBeatTicker() {
while (true) {

auto nowTime = now();
m_mtx.lock();
auto suitableSleepTime = std::chrono::milliseconds(HeartBeatTimeout) + m_lastResetHearBeatTime - nowTime;
m_mtx.unlock();
if (suitableSleepTime.count() < 1) {
suitableSleepTime = std::chrono::milliseconds(1);
}
std::this_thread::sleep_for(suitableSleepTime);
if ((m_lastResetHearBeatTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doHeartBeat();
}
}

doHeartBeat 进行心跳发送机制

1.for循环各个对端

2.判断m_lastSnapshotIncludeIndex是否大于m_nextIndex,是的话发送快照

void Raft::doHeartBeat() {
std::lock_guard<std::mutex> g(m_mtx);

if (m_status == Leader) {
DPrintf("[func-Raft::doHeartBeat()-Leader: {%d}] Leader的心跳定时器触发了且拿到mutex,开始发送AE\n", m_me);
auto appendNums = std::make_shared<int>(1); //正确返回的节点的数量

//对Follower(除了自己外的所有节点发送AE)
// todo 这里肯定是要修改的,最好使用一个单独的goruntime来负责管理发送log,因为后面的log发送涉及优化之类的
//最少要单独写一个函数来管理,而不是在这一坨
for (int i = 0; i < m_peers.size(); i++) {
if (i == m_me) {
continue;
}
DPrintf("[func-Raft::doHeartBeat()-Leader: {%d}] Leader的心跳定时器触发了 index:{%d}\n", m_me, i);
myAssert(m_nextIndex[i] >= 1, format("rf.nextIndex[%d] = {%d}", i, m_nextIndex[i]));
//日志压缩加入后要判断是发送快照还是发送AE
if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) {
// DPrintf("[func-ticker()-rf{%v}]rf.nextIndex[%v] {%v} <=
// rf.lastSnapshotIncludeIndex{%v},so leaderSendSnapShot", rf.me, i, rf.nextIndex[i],
// rf.lastSnapshotIncludeIndex)
std::thread t(&Raft::leaderSendSnapShot, this, i); // 创建新线程并执行b函数,并传递参数
t.detach();
continue;
}
//构造发送值
int preLogIndex = -1;
int PrevLogTerm = -1;
getPrevLogInfo(i, &preLogIndex, &PrevLogTerm);//获取本次发送的一系列日志的上一条日志的信息,以判断是否匹配
std::shared_ptr<raftRpcProctoc::AppendEntriesArgs> appendEntriesArgs =
std::make_shared<raftRpcProctoc::AppendEntriesArgs>();
appendEntriesArgs->set_term(m_currentTerm);
appendEntriesArgs->set_leaderid(m_me);
appendEntriesArgs->set_prevlogindex(preLogIndex);
appendEntriesArgs->set_prevlogterm(PrevLogTerm);
appendEntriesArgs->clear_entries();
appendEntriesArgs->set_leadercommit(m_commitIndex);

if (preLogIndex != m_lastSnapshotIncludeIndex) {
for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j) {
raftRpcProctoc::LogEntry* sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = m_logs[j]; //=是可以点进去的,可以点进去看下protobuf如何重写这个的
}
} else {
for (const auto& item : m_logs) {
raftRpcProctoc::LogEntry* sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = item; //=是可以点进去的,可以点进去看下protobuf如何重写这个的
}
}
int lastLogIndex = getLastLogIndex();
// leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后
myAssert(appendEntriesArgs->prevlogindex() + appendEntriesArgs->entries_size() == lastLogIndex,
format("appendEntriesArgs.PrevLogIndex{%d}+len(appendEntriesArgs.Entries){%d} != lastLogIndex{%d}",
appendEntriesArgs->prevlogindex(), appendEntriesArgs->entries_size(), lastLogIndex));
//构造返回值
const std::shared_ptr<raftRpcProctoc::AppendEntriesReply> appendEntriesReply =
std::make_shared<raftRpcProctoc::AppendEntriesReply>();
appendEntriesReply->set_appstate(Disconnected);

std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,
appendNums); // 创建新线程并执行b函数,并传递参数
t.detach();
}
m_lastResetHearBeatTime = now(); // leader发送心跳,就不是随机时间了
}
}

sendAppendEntries 返回心跳后的处理

bool
Raft::sendAppendEntries(int server, std::shared_ptr<mprrpc::AppendEntriesArgs> args, std::shared_ptr<mprrpc::AppendEntriesReply> reply,
std::shared_ptr<int> appendNums) {

// todo: paper中5.3节第一段末尾提到,如果append失败应该不断的retries ,直到这个log成功的被store

bool ok = m_peers[server]->AppendEntries(args.get(), reply.get());

if (!ok) {
return ok;
}

lock_guard<mutex> lg1(m_mtx);

//对reply进行处理
// 对于rpc通信,无论什么时候都要检查term
if(reply->term() > m_currentTerm){
m_status = Follower;
m_currentTerm = reply->term();
m_votedFor = -1;
return ok;
} else if (reply->term() < m_currentTerm) {//正常不会发生
return ok;
}

if (m_status != Leader) { //如果不是leader,那么就不要对返回的情况进行处理了
return ok;
}
//term相等

if (!reply->success()){
//日志不匹配,正常来说就是index要往前-1,既然能到这里,第一个日志(idnex = 1)发送后肯定是匹配的,因此不用考虑变成负数
//因为真正的环境不会知道是服务器宕机还是发生网络分区了
if (reply->updatenextindex() != -100) { //-100只是一个特殊标记而已,没有太具体的含义
// 优化日志匹配,让follower决定到底应该下一次从哪一个开始尝试发送
m_nextIndex[server] = reply->updatenextindex();
}
// 如果感觉rf.nextIndex数组是冗余的,看下论文fig2,其实不是冗余的
} else {
*appendNums = *appendNums +1; //到这里代表同意接收了本次心跳或者日志

m_matchIndex[server] = std::max(m_matchIndex[server],args->prevlogindex()+args->entries_size() ); //同意了日志,就更新对应的m_matchIndex和m_nextIndex
m_nextIndex[server] = m_matchIndex[server]+1;
int lastLogIndex = getLastLogIndex();

if (*appendNums >= 1 + m_peers.size()/2) { //可以commit了
//两种方法保证幂等性,1.赋值为0 2.上面≥改为==

*appendNums = 0; //置0

//日志的安全性保证!!!!! leader只有在当前term有日志提交的时候才更新commitIndex,因为raft无法保证之前term的Index是否提交
//只有当前term有日志提交,之前term的log才可以被提交,只有这样才能保证“领导人完备性{当选领导人的节点拥有之前被提交的所有log,当然也可能有一些没有被提交的}”
//说白了就是只有当前term有日志提交才会提交
if(args->entries_size() >0 && args->entries(args->entries_size()-1).logterm() == m_currentTerm){

m_commitIndex = std::max(m_commitIndex,args->prevlogindex() + args->entries_size());
}

}
}
return ok;
}

AppendEntries 收到日志之后的操作

void Raft::AppendEntries1(const mprrpc:: AppendEntriesArgs *args,  mprrpc::AppendEntriesReply *reply) {
std::lock_guard<std::mutex> locker(m_mtx);

// 不同的人收到AppendEntries的反应是不同的,要注意无论什么时候收到rpc请求和响应都要检查term


if (args->term() < m_currentTerm) {
reply->set_success(false);
reply->set_term(m_currentTerm);
reply->set_updatenextindex(-100); // 论文中:让领导人可以及时更新自己
DPrintf("[func-AppendEntries-rf{%d}] 拒绝了 因为Leader{%d}的term{%v}< rf{%d}.term{%d}\n", m_me, args->leaderid(),args->term() , m_me, m_currentTerm) ;
return; // 注意从过期的领导人收到消息不要重设定时器
}
Defer ec1([this]() -> void { this->persist(); });//由于这个局部变量创建在锁之后,因此执行persist的时候应该也是拿到锁的. //本质上就是使用raii的思想让persist()函数执行完之后再执行
if (args->term() > m_currentTerm) {
// 三变 ,防止遗漏,无论什么时候都是三变

m_status = Follower;
m_currentTerm = args->term();
m_votedFor = -1; // 这里设置成-1有意义,如果突然宕机然后上线理论上是可以投票的
// 这里可不返回,应该改成让改节点尝试接收日志
// 如果是领导人和candidate突然转到Follower好像也不用其他操作
// 如果本来就是Follower,那么其term变化,相当于“不言自明”的换了追随的对象,因为原来的leader的term更小,是不会再接收其消息了
}

// 如果发生网络分区,那么candidate可能会收到同一个term的leader的消息,要转变为Follower,为了和上面,因此直接写
m_status = Follower; // 这里是有必要的,因为如果candidate收到同一个term的leader的AE,需要变成follower
// term相等
m_lastResetElectionTime = now(); //重置选举超时定时器

// 不能无脑的从prevlogIndex开始阶段日志,因为rpc可能会延迟,导致发过来的log是很久之前的

// 那么就比较日志,日志有3种情况
if (args->prevlogindex() > getLastLogIndex()) {
reply->set_success(false);
reply->set_term(m_currentTerm);
reply->set_updatenextindex(getLastLogIndex() + 1);
return;
} else if (args->prevlogindex() < m_lastSnapshotIncludeIndex) { // 如果prevlogIndex还没有更上快照
reply->set_success(false);
reply->set_term(m_currentTerm);
reply->set_updatenextindex(m_lastSnapshotIncludeIndex + 1);
}
// 本机日志有那么长,冲突(same index,different term),截断日志
// 注意:这里目前当args.PrevLogIndex == rf.lastSnapshotIncludeIndex与不等的时候要分开考虑,可以看看能不能优化这块
if (matchLog(args->prevlogindex(), args->prevlogterm())) {
//日志匹配,那么就复制日志
for (int i = 0; i < args->entries_size(); i++) {
auto log = args->entries(i);
if (log.logindex() > getLastLogIndex()) { //超过就直接添加日志
m_logs.push_back(log);
} else { //没超过就比较是否匹配,不匹配再更新,而不是直接截断

if (m_logs[getSlicesIndexFromLogIndex(log.logindex())].logterm() != log.logterm()) { //不匹配就更新
m_logs[getSlicesIndexFromLogIndex(log.logindex())] = log;
}
}
}

if (args->leadercommit() > m_commitIndex) {
m_commitIndex = std::min(args->leadercommit(), getLastLogIndex());// 这个地方不能无脑跟上getLastLogIndex(),因为可能存在args->leadercommit()落后于 getLastLogIndex()的情况
}


// 领导会一次发送完所有的日志
reply->set_success(true);
reply->set_term(m_currentTerm);


return;
} else {
// 不匹配,不匹配不是一个一个往前,而是有优化加速
// PrevLogIndex 长度合适,但是不匹配,因此往前寻找 矛盾的term的第一个元素
// 为什么该term的日志都是矛盾的呢?也不一定都是矛盾的,只是这么优化减少rpc而已
// ?什么时候term会矛盾呢?很多情况,比如leader接收了日志之后马上就崩溃等等
reply->set_updatenextindex(args->prevlogindex());

for (int index = args->prevlogindex(); index >= m_lastSnapshotIncludeIndex; --index) {
if (getLogTermFromLogIndex(index) != getLogTermFromLogIndex(args->prevlogindex())) {
reply->set_updatenextindex(index + 1);
break;
}
}
reply->set_success(false);
reply->set_term(m_currentTerm);

return;
}

}

2.Client

class Clerk {
private:
std::vector<std::shared_ptr<raftServerRpcUtil>>
m_servers; //保存所有raft节点的fd //todo:全部初始化为-1,表示没有连接上
std::string m_clientId;
int m_requestId; //标识客户端发送的id序列
int m_recentLeaderId; //只是有可能是领导

std::string Uuid() {
return std::to_string(rand()) + std::to_string(rand()) + std::to_string(rand()) + std::to_string(rand());
} //用于返回随机的clientId

// MakeClerk todo
void PutAppend(std::string key, std::string value, std::string op);

public:
//对外暴露的三个功能和初始化
void Init(std::string configFileName);
std::string Get(std::string key);

void Put(std::string key, std::string value);
void Append(std::string key, std::string value);

public:
Clerk();
};

实现

//
// Created by swx on 23-6-4.
//
#include "clerk.h"

#include "raftServerRpcUtil.h"

#include "util.h"

#include <string>
#include <vector>
std::string Clerk::Get(std::string key) {
m_requestId++;
auto requestId = m_requestId;
int server = m_recentLeaderId;
raftKVRpcProctoc::GetArgs args;
args.set_key(key);
args.set_clientid(m_clientId);
args.set_requestid(requestId);

while (true) {
raftKVRpcProctoc::GetReply reply;
bool ok = m_servers[server]->Get(&args, &reply);
if (!ok ||
reply.err() ==
ErrWrongLeader) { //会一直重试,因为requestId没有改变,因此可能会因为RPC的丢失或者其他情况导致重试,kvserver层来保证不重复执行(线性一致性)
server = (server + 1) % m_servers.size();
continue;
}
if (reply.err() == ErrNoKey) {
return "";
}
if (reply.err() == OK) {
m_recentLeaderId = server;
return reply.value();
}
}
return "";
}

void Clerk::PutAppend(std::string key, std::string value, std::string op) {
// You will have to modify this function.
m_requestId++;
auto requestId = m_requestId;
auto server = m_recentLeaderId;
while (true) {
raftKVRpcProctoc::PutAppendArgs args;
args.set_key(key);
args.set_value(value);
args.set_op(op);
args.set_clientid(m_clientId);
args.set_requestid(requestId);
raftKVRpcProctoc::PutAppendReply reply;
bool ok = m_servers[server]->PutAppend(&args, &reply);
if (!ok || reply.err() == ErrWrongLeader) {
DPrintf("【Clerk::PutAppend】原以为的leader:{%d}请求失败,向新leader{%d}重试 ,操作:{%s}", server, server + 1,
op.c_str());
if (!ok) {
DPrintf("重试原因 ,rpc失敗 ,");
}
if (reply.err() == ErrWrongLeader) {
DPrintf("重試原因:非leader");
}
server = (server + 1) % m_servers.size(); // try the next server
continue;
}
if (reply.err() == OK) { //什么时候reply errno为ok呢???
m_recentLeaderId = server;
return;
}
}
}

void Clerk::Put(std::string key, std::string value) { PutAppend(key, value, "Put"); }

void Clerk::Append(std::string key, std::string value) { PutAppend(key, value, "Append"); }
//初始化客户端
void Clerk::Init(std::string configFileName) {
//获取所有raft节点ip、port ,并进行连接
MprpcConfig config;
config.LoadConfigFile(configFileName.c_str());
std::vector<std::pair<std::string, short>> ipPortVt;
for (int i = 0; i < INT_MAX - 1; ++i) {
std::string node = "node" + std::to_string(i);

std::string nodeIp = config.Load(node + "ip");
std::string nodePortStr = config.Load(node + "port");
if (nodeIp.empty()) {
break;
}
ipPortVt.emplace_back(nodeIp, atoi(nodePortStr.c_str())); //沒有atos方法,可以考慮自己实现
}
//进行连接
for (const auto& item : ipPortVt) {
std::string ip = item.first;
short port = item.second;
// 2024-01-04 todo:bug fix
auto* rpc = new raftServerRpcUtil(ip, port);
m_servers.push_back(std::shared_ptr<raftServerRpcUtil>(rpc));
}
}

Clerk::Clerk() : m_clientId(Uuid()), m_requestId(0), m_recentLeaderId(0) {}

3.KV-Server

Persister持久化类

//
// Created by swx on 23-5-30.
//

#ifndef SKIP_LIST_ON_RAFT_PERSISTER_H
#define SKIP_LIST_ON_RAFT_PERSISTER_H
#include <fstream>
#include <mutex>
class Persister {
private:
std::mutex m_mtx;
std::string m_raftState;
std::string m_snapshot;
/**
* m_raftStateFileName: raftState文件名
*/
const std::string m_raftStateFileName;
/**
* m_snapshotFileName: snapshot文件名
*/
const std::string m_snapshotFileName;
/**
* 保存raftState的输出流
*/
std::ofstream m_raftStateOutStream;
/**
* 保存snapshot的输出流
*/
std::ofstream m_snapshotOutStream;
/**
* 保存raftStateSize的大小
* 避免每次都读取文件来获取具体的大小
*/
long long m_raftStateSize;

public:
void Save(std::string raftstate, std::string snapshot);
std::string ReadSnapshot();
void SaveRaftState(const std::string& data);
long long RaftStateSize();
std::string ReadRaftState();
explicit Persister(int me);
~Persister();

private:
void clearRaftState();
void clearSnapshot();
void clearRaftStateAndSnapshot();
};

#endif // SKIP_LIST_ON_RAFT_PERSISTER_H

//
// Created by swx on 23-5-30.
//
#include "Persister.h"
#include "util.h"

// todo:会涉及反复打开文件的操作,没有考虑如果文件出现问题会怎么办??
void Persister::Save(const std::string raftstate, const std::string snapshot) {
std::lock_guard<std::mutex> lg(m_mtx);
clearRaftStateAndSnapshot();
// 将raftstate和snapshot写入本地文件
m_raftStateOutStream << raftstate;
m_snapshotOutStream << snapshot;
}

std::string Persister::ReadSnapshot() {
std::lock_guard<std::mutex> lg(m_mtx);
if (m_snapshotOutStream.is_open()) {
m_snapshotOutStream.close();
}

DEFER {
m_snapshotOutStream.open(m_snapshotFileName); //默认是追加
};
std::fstream ifs(m_snapshotFileName, std::ios_base::in);
if (!ifs.good()) {
return "";
}
std::string snapshot;
ifs >> snapshot;
ifs.close();
return snapshot;
}

void Persister::SaveRaftState(const std::string &data) {
std::lock_guard<std::mutex> lg(m_mtx);
// 将raftstate和snapshot写入本地文件
clearRaftState();
m_raftStateOutStream << data;
m_raftStateSize += data.size();
}

long long Persister::RaftStateSize() {
std::lock_guard<std::mutex> lg(m_mtx);

return m_raftStateSize;
}

std::string Persister::ReadRaftState() {
std::lock_guard<std::mutex> lg(m_mtx);

std::fstream ifs(m_raftStateFileName, std::ios_base::in);
if (!ifs.good()) {
return "";
}
std::string snapshot;
ifs >> snapshot;
ifs.close();
return snapshot;
}

Persister::Persister(const int me)
: m_raftStateFileName("raftstatePersist" + std::to_string(me) + ".txt"),
m_snapshotFileName("snapshotPersist" + std::to_string(me) + ".txt"),
m_raftStateSize(0) {
/**
* 检查文件状态并清空文件
*/
bool fileOpenFlag = true;
std::fstream file(m_raftStateFileName, std::ios::out | std::ios::trunc);
if (file.is_open()) {
file.close();
} else {
fileOpenFlag = false;
}
file = std::fstream(m_snapshotFileName, std::ios::out | std::ios::trunc);
if (file.is_open()) {
file.close();
} else {
fileOpenFlag = false;
}
if (!fileOpenFlag) {
DPrintf("[func-Persister::Persister] file open error");
}
/**
* 绑定流
*/
m_raftStateOutStream.open(m_raftStateFileName);
m_snapshotOutStream.open(m_snapshotFileName);
}

Persister::~Persister() {
if (m_raftStateOutStream.is_open()) {
m_raftStateOutStream.close();
}
if (m_snapshotOutStream.is_open()) {
m_snapshotOutStream.close();
}
}

void Persister::clearRaftState() {
m_raftStateSize = 0;
// 关闭文件流
if (m_raftStateOutStream.is_open()) {
m_raftStateOutStream.close();
}
// 重新打开文件流并清空文件内容
m_raftStateOutStream.open(m_raftStateFileName, std::ios::out | std::ios::trunc);
}

void Persister::clearSnapshot() {
if (m_snapshotOutStream.is_open()) {
m_snapshotOutStream.close();
}
m_snapshotOutStream.open(m_snapshotFileName, std::ios::out | std::ios::trunc);
}

void Persister::clearRaftStateAndSnapshot() {
clearRaftState();
clearSnapshot();
}

kvserver

1.KvServer初始化:

  • 启动RPC监听,等待客户端连接

  • 连接其余的Servers - > 通过Servers ,与其他节点通信

  • init 自己的raft类

  • 通过一个线程启动ReadRaftApplyCommandLoop() ,循环接受applyChan的消息

    • message.CommandValid 如果是命令

      • 运行GetCommandFromRaft
      • 把日志运行加到KV Server中
      • 如果raft的log太大(大于指定的比例)就把制作快照
      • waitApplyCh[raftIndex] 发送 op,通知发送的raft收到数据
  • message.SnapshotValid 如果是日志

    • 运行 GetSnapShotFromRaft
    • 执行ReadSnapShotToInstall 运行安装日志

  1. PutAppend 客户端添加数据调用

    • m_raftNode调用Start函数,并添加到日志中

    • 等待raft执行完所有返回

  1. Get 客户端获取数据调用

class KvServer : raftKVRpcProctoc::kvServerRpc {
private:
std::mutex m_mtx;
int m_me;
std::shared_ptr<Raft> m_raftNode;
std::shared_ptr<LockQueue<ApplyMsg> > applyChan; // kvServer和raft节点的通信管道
int m_maxRaftState; // snapshot if log grows this big

// Your definitions here.
std::string m_serializedKVData; // todo : 序列化后的kv数据,理论上可以不用,但是目前没有找到特别好的替代方法
SkipList<std::string, std::string> m_skipList;
std::unordered_map<std::string, std::string> m_kvDB;

std::unordered_map<int, LockQueue<Op> *> waitApplyCh;
// index(raft) -> chan //???字段含义 waitApplyCh是一个map,键是int,值是Op类型的管道

std::unordered_map<std::string, int> m_lastRequestId; // clientid -> requestID //一个kV服务器可能连接多个client

// last SnapShot point , raftIndex
int m_lastSnapShotRaftLogIndex;

public:
KvServer() = delete;

KvServer(int me, int maxraftstate, std::string nodeInforFileName, short port);

void StartKVServer();

void DprintfKVDB();

void ExecuteAppendOpOnKVDB(Op op);

void ExecuteGetOpOnKVDB(Op op, std::string *value, bool *exist);

void ExecutePutOpOnKVDB(Op op);

void Get(const raftKVRpcProctoc::GetArgs *args,
raftKVRpcProctoc::GetReply
*reply); //将 GetArgs 改为rpc调用的,因为是远程客户端,即服务器宕机对客户端来说是无感的
/**
* 從raft節點中獲取消息 (不要誤以爲是執行【GET】命令)
* @param message
*/
void GetCommandFromRaft(ApplyMsg message);

bool ifRequestDuplicate(std::string ClientId, int RequestId);

// clerk 使用RPC远程调用
void PutAppend(const raftKVRpcProctoc::PutAppendArgs *args, raftKVRpcProctoc::PutAppendReply *reply);

////一直等待raft传来的applyCh
void ReadRaftApplyCommandLoop();

void ReadSnapShotToInstall(std::string snapshot);

bool SendMessageToWaitChan(const Op &op, int raftIndex);

// 检查是否需要制作快照,需要的话就向raft之下制作快照
void IfNeedToSendSnapShotCommand(int raftIndex, int proportion);

// Handler the SnapShot from kv.rf.applyCh
void GetSnapShotFromRaft(ApplyMsg message);

std::string MakeSnapShot();

public: // for rpc
void PutAppend(google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::PutAppendArgs *request,
::raftKVRpcProctoc::PutAppendReply *response, ::google::protobuf::Closure *done) override;

void Get(google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::GetArgs *request,
::raftKVRpcProctoc::GetReply *response, ::google::protobuf::Closure *done) override;

/////////////////serialiazation start ///////////////////////////////
// notice : func serialize
private:
friend class boost::serialization::access;

// When the class Archive corresponds to an output archive, the
// & operator is defined similar to <<. Likewise, when the class Archive
// is a type of input archive the & operator is defined similar to >>.
template <class Archive>
void serialize(Archive &ar, const unsigned int version) //这里面写需要序列话和反序列化的字段
{
ar &m_serializedKVData;

// ar & m_kvDB;
ar &m_lastRequestId;
}

std::string getSnapshotData() {
m_serializedKVData = m_skipList.dump_file();
std::stringstream ss;
boost::archive::text_oarchive oa(ss);
oa << *this;
m_serializedKVData.clear();
return ss.str();
}

void parseFromString(const std::string &str) {
std::stringstream ss(str);
boost::archive::text_iarchive ia(ss);
ia >> *this;
m_skipList.load_file(m_serializedKVData);
m_serializedKVData.clear();
}

/////////////////serialiazation end ///////////////////////////////
};
void KvServer::DprintfKVDB() {
if (!Debug) {
return;
}
std::lock_guard<std::mutex> lg(m_mtx);
DEFER {
// for (const auto &item: m_kvDB) {
// DPrintf("[DBInfo ----]Key : %s, Value : %s", &item.first, &item.second);
// }
m_skipList.display_list();
};
}

void KvServer::ExecuteAppendOpOnKVDB(Op op) {
// if op.IfDuplicate { //get请求是可重复执行的,因此可以不用判复
// return
// }
m_mtx.lock();

m_skipList.insert_set_element(op.Key, op.Value);

// if (m_kvDB.find(op.Key) != m_kvDB.end()) {
// m_kvDB[op.Key] = m_kvDB[op.Key] + op.Value;
// } else {
// m_kvDB.insert(std::make_pair(op.Key, op.Value));
// }
m_lastRequestId[op.ClientId] = op.RequestId;
m_mtx.unlock();

// DPrintf("[KVServerExeAPPEND-----]ClientId :%d ,RequestID :%d ,Key : %v, value : %v", op.ClientId, op.RequestId,
// op.Key, op.Value)
DprintfKVDB();
}

void KvServer::ExecuteGetOpOnKVDB(Op op, std::string *value, bool *exist) {
m_mtx.lock();
*value = "";
*exist = false;
if (m_skipList.search_element(op.Key, *value)) {
*exist = true;
// *value = m_skipList.se //value已经完成赋值了
}
// if (m_kvDB.find(op.Key) != m_kvDB.end()) {
// *exist = true;
// *value = m_kvDB[op.Key];
// }
m_lastRequestId[op.ClientId] = op.RequestId;
m_mtx.unlock();

if (*exist) {
// DPrintf("[KVServerExeGET----]ClientId :%d ,RequestID :%d ,Key : %v, value :%v", op.ClientId,
// op.RequestId, op.Key, value)
} else {
// DPrintf("[KVServerExeGET----]ClientId :%d ,RequestID :%d ,Key : %v, But No KEY!!!!", op.ClientId,
// op.RequestId, op.Key)
}
DprintfKVDB();
}

void KvServer::ExecutePutOpOnKVDB(Op op) {
m_mtx.lock();
m_skipList.insert_set_element(op.Key, op.Value);
// m_kvDB[op.Key] = op.Value;
m_lastRequestId[op.ClientId] = op.RequestId;
m_mtx.unlock();

// DPrintf("[KVServerExePUT----]ClientId :%d ,RequestID :%d ,Key : %v, value : %v", op.ClientId, op.RequestId,
// op.Key, op.Value)
DprintfKVDB();
}

// 处理来自clerk的Get RPC
void KvServer::Get(const raftKVRpcProctoc::GetArgs *args, raftKVRpcProctoc::GetReply *reply) {
Op op;
op.Operation = "Get";
op.Key = args->key();
op.Value = "";
op.ClientId = args->clientid();
op.RequestId = args->requestid();

int raftIndex = -1;
int _ = -1;
bool isLeader = false;
m_raftNode->Start(op, &raftIndex, &_,
&isLeader); // raftIndex:raft预计的logIndex
// ,虽然是预计,但是正确情况下是准确的,op的具体内容对raft来说 是隔离的

if (!isLeader) {
reply->set_err(ErrWrongLeader);
return;
}

// create waitForCh
m_mtx.lock();

if (waitApplyCh.find(raftIndex) == waitApplyCh.end()) {
waitApplyCh.insert(std::make_pair(raftIndex, new LockQueue<Op>()));
}
auto chForRaftIndex = waitApplyCh[raftIndex];

m_mtx.unlock(); //直接解锁,等待任务执行完成,不能一直拿锁等待

// timeout
Op raftCommitOp;

if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
// DPrintf("[GET TIMEOUT!!!]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId,
// args.RequestId, kv.me, op.Key, raftIndex)
// todo 2023年06月01日
int _ = -1;
bool isLeader = false;
m_raftNode->GetState(&_, &isLeader);

if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) {
//如果超时,代表raft集群不保证已经commitIndex该日志,但是如果是已经提交过的get请求,是可以再执行的。
// 不会违反线性一致性
std::string value;
bool exist = false;
ExecuteGetOpOnKVDB(op, &value, &exist);
if (exist) {
reply->set_err(OK);
reply->set_value(value);
} else {
reply->set_err(ErrNoKey);
reply->set_value("");
}
} else {
reply->set_err(ErrWrongLeader); //返回这个,其实就是让clerk换一个节点重试
}
} else {
// raft已经提交了该command(op),可以正式开始执行了
// DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId
// %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key,
// op.Value)
// todo 这里还要再次检验的原因:感觉不用检验,因为leader只要正确的提交了,那么这些肯定是符合的
if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
std::string value;
bool exist = false;
ExecuteGetOpOnKVDB(op, &value, &exist);
if (exist) {
reply->set_err(OK);
reply->set_value(value);
} else {
reply->set_err(ErrNoKey);
reply->set_value("");
}
} else {
reply->set_err(ErrWrongLeader);
// DPrintf("[GET ] 不满足:raftCommitOp.ClientId{%v} == op.ClientId{%v} && raftCommitOp.RequestId{%v}
// == op.RequestId{%v}", raftCommitOp.ClientId, op.ClientId, raftCommitOp.RequestId, op.RequestId)
}
}
m_mtx.lock(); // todo 這個可以先弄一個defer,因爲刪除優先級並不高,先把rpc發回去更加重要
auto tmp = waitApplyCh[raftIndex];
waitApplyCh.erase(raftIndex);
delete tmp;
m_mtx.unlock();
}

void KvServer::GetCommandFromRaft(ApplyMsg message) {
Op op;
op.parseFromString(message.Command);

DPrintf(
"[KvServer::GetCommandFromRaft-kvserver{%d}] , Got Command --> Index:{%d} , ClientId {%s}, RequestId {%d}, "
"Opreation {%s}, Key :{%s}, Value :{%s}",
m_me, message.CommandIndex, &op.ClientId, op.RequestId, &op.Operation, &op.Key, &op.Value);
if (message.CommandIndex <= m_lastSnapShotRaftLogIndex) {
return;
}

// State Machine (KVServer solute the duplicate problem)
// duplicate command will not be exed
if (!ifRequestDuplicate(op.ClientId, op.RequestId)) {
// execute command
if (op.Operation == "Put") {
ExecutePutOpOnKVDB(op);
}
if (op.Operation == "Append") {
ExecuteAppendOpOnKVDB(op);
}
// kv.lastRequestId[op.ClientId] = op.RequestId 在Executexxx函数里面更新的
}
//到这里kvDB已经制作了快照
if (m_maxRaftState != -1) {
IfNeedToSendSnapShotCommand(message.CommandIndex, 9);
//如果raft的log太大(大于指定的比例)就把制作快照
}

// Send message to the chan of op.ClientId
SendMessageToWaitChan(op, message.CommandIndex);
}

bool KvServer::ifRequestDuplicate(std::string ClientId, int RequestId) {
std::lock_guard<std::mutex> lg(m_mtx);
if (m_lastRequestId.find(ClientId) == m_lastRequestId.end()) {
return false;
// todo :不存在这个client就创建
}
return RequestId <= m_lastRequestId[ClientId];
}

// get和put//append執行的具體細節是不一樣的
// PutAppend在收到raft消息之後執行,具體函數裏面只判斷冪等性(是否重複)
// get函數收到raft消息之後在,因爲get無論是否重複都可以再執行
void KvServer::PutAppend(const raftKVRpcProctoc::PutAppendArgs *args, raftKVRpcProctoc::PutAppendReply *reply) {
Op op;
op.Operation = args->op();
op.Key = args->key();
op.Value = args->value();
op.ClientId = args->clientid();
op.RequestId = args->requestid();
int raftIndex = -1;
int _ = -1;
bool isleader = false;

m_raftNode->Start(op, &raftIndex, &_, &isleader);

if (!isleader) {
DPrintf(
"[func -KvServer::PutAppend -kvserver{%d}]From Client %s (Request %d) To Server %d, key %s, raftIndex %d , but "
"not leader",
m_me, &args->clientid(), args->requestid(), m_me, &op.Key, raftIndex);

reply->set_err(ErrWrongLeader);
return;
}
DPrintf(
"[func -KvServer::PutAppend -kvserver{%d}]From Client %s (Request %d) To Server %d, key %s, raftIndex %d , is "
"leader ",
m_me, &args->clientid(), args->requestid(), m_me, &op.Key, raftIndex);
m_mtx.lock();
if (waitApplyCh.find(raftIndex) == waitApplyCh.end()) {
waitApplyCh.insert(std::make_pair(raftIndex, new LockQueue<Op>()));
}
auto chForRaftIndex = waitApplyCh[raftIndex];

m_mtx.unlock(); //直接解锁,等待任务执行完成,不能一直拿锁等待

// timeout
Op raftCommitOp;

if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
DPrintf(
"[func -KvServer::PutAppend -kvserver{%d}]TIMEOUT PUTAPPEND !!!! Server %d , get Command <-- Index:%d , "
"ClientId %s, RequestId %s, Opreation %s Key :%s, Value :%s",
m_me, m_me, raftIndex, &op.ClientId, op.RequestId, &op.Operation, &op.Key, &op.Value);

if (ifRequestDuplicate(op.ClientId, op.RequestId)) {
reply->set_err(OK); // 超时了,但因为是重复的请求,返回ok,实际上就算没有超时,在真正执行的时候也要判断是否重复
} else {
reply->set_err(ErrWrongLeader); ///这里返回这个的目的让clerk重新尝试
}
} else {
DPrintf(
"[func -KvServer::PutAppend -kvserver{%d}]WaitChanGetRaftApplyMessage<--Server %d , get Command <-- Index:%d , "
"ClientId %s, RequestId %d, Opreation %s, Key :%s, Value :%s",
m_me, m_me, raftIndex, &op.ClientId, op.RequestId, &op.Operation, &op.Key, &op.Value);
if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
//可能发生leader的变更导致日志被覆盖,因此必须检查
reply->set_err(OK);
} else {
reply->set_err(ErrWrongLeader);
}
}

m_mtx.lock();

auto tmp = waitApplyCh[raftIndex];
waitApplyCh.erase(raftIndex);
delete tmp;
m_mtx.unlock();
}

void KvServer::ReadRaftApplyCommandLoop() {
while (true) {
//如果只操作applyChan不用拿锁,因为applyChan自己带锁
auto message = applyChan->Pop(); //阻塞弹出
DPrintf(
"---------------tmp-------------[func-KvServer::ReadRaftApplyCommandLoop()-kvserver{%d}] 收到了下raft的消息",
m_me);
// listen to every command applied by its raft ,delivery to relative RPC Handler

if (message.CommandValid) {
GetCommandFromRaft(message);
}
if (message.SnapshotValid) {
GetSnapShotFromRaft(message);
}
}
}

// raft会与persist层交互,kvserver层也会,因为kvserver层开始的时候需要恢复kvdb的状态
// 关于快照raft层与persist的交互:保存kvserver传来的snapshot;生成leaderInstallSnapshot RPC的时候也需要读取snapshot;
// 因此snapshot的具体格式是由kvserver层来定的,raft只负责传递这个东西
// snapShot里面包含kvserver需要维护的persist_lastRequestId 以及kvDB真正保存的数据persist_kvdb
void KvServer::ReadSnapShotToInstall(std::string snapshot) {
if (snapshot.empty()) {
// bootstrap without any state?
return;
}
parseFromString(snapshot);

// r := bytes.NewBuffer(snapshot)
// d := labgob.NewDecoder(r)
//
// var persist_kvdb map[string]string //理应快照
// var persist_lastRequestId map[int64]int //快照这个为了维护线性一致性
//
// if d.Decode(&persist_kvdb) != nil || d.Decode(&persist_lastRequestId) != nil {
// DPrintf("KVSERVER %d read persister got a problem!!!!!!!!!!",kv.me)
// } else {
// kv.kvDB = persist_kvdb
// kv.lastRequestId = persist_lastRequestId
// }
}

bool KvServer::SendMessageToWaitChan(const Op &op, int raftIndex) {
std::lock_guard<std::mutex> lg(m_mtx);
DPrintf(
"[RaftApplyMessageSendToWaitChan--> raftserver{%d}] , Send Command --> Index:{%d} , ClientId {%d}, RequestId "
"{%d}, Opreation {%v}, Key :{%v}, Value :{%v}",
m_me, raftIndex, &op.ClientId, op.RequestId, &op.Operation, &op.Key, &op.Value);

if (waitApplyCh.find(raftIndex) == waitApplyCh.end()) {
return false;
}
waitApplyCh[raftIndex]->Push(op);
DPrintf(
"[RaftApplyMessageSendToWaitChan--> raftserver{%d}] , Send Command --> Index:{%d} , ClientId {%d}, RequestId "
"{%d}, Opreation {%v}, Key :{%v}, Value :{%v}",
m_me, raftIndex, &op.ClientId, op.RequestId, &op.Operation, &op.Key, &op.Value);
return true;
}

void KvServer::IfNeedToSendSnapShotCommand(int raftIndex, int proportion) {
if (m_raftNode->GetRaftStateSize() > m_maxRaftState / 10.0) {
// Send SnapShot Command
auto snapshot = MakeSnapShot();
m_raftNode->Snapshot(raftIndex, snapshot);
}
}

void KvServer::GetSnapShotFromRaft(ApplyMsg message) {
std::lock_guard<std::mutex> lg(m_mtx);

if (m_raftNode->CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot)) {
ReadSnapShotToInstall(message.Snapshot);
m_lastSnapShotRaftLogIndex = message.SnapshotIndex;
}
}

std::string KvServer::MakeSnapShot() {
std::lock_guard<std::mutex> lg(m_mtx);
std::string snapshotData = getSnapshotData();
return snapshotData;
}

void KvServer::PutAppend(google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::PutAppendArgs *request,
::raftKVRpcProctoc::PutAppendReply *response, ::google::protobuf::Closure *done) {
KvServer::PutAppend(request, response);
done->Run();
}

void KvServer::Get(google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::GetArgs *request,
::raftKVRpcProctoc::GetReply *response, ::google::protobuf::Closure *done) {
KvServer::Get(request, response);
done->Run();
}

KvServer::KvServer(int me, int maxraftstate, std::string nodeInforFileName, short port) : m_skipList(6) {
std::shared_ptr<Persister> persister = std::make_shared<Persister>(me);

m_me = me;
m_maxRaftState = maxraftstate;

applyChan = std::make_shared<LockQueue<ApplyMsg> >();

m_raftNode = std::make_shared<Raft>();
////////////////clerk层面 kvserver开启rpc接受功能
// 同时raft与raft节点之间也要开启rpc功能,因此有两个注册
std::thread t([this, port]() -> void {
// provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
RpcProvider provider;
provider.NotifyService(this);
provider.NotifyService(
this->m_raftNode.get()); // todo:这里获取了原始指针,后面检查一下有没有泄露的问题 或者 shareptr释放的问题
// 启动一个rpc服务发布节点 Run以后,进程进入阻塞状态,等待远程的rpc调用请求
provider.Run(m_me, port);
});
t.detach();

////开启rpc远程调用能力,需要注意必须要保证所有节点都开启rpc接受功能之后才能开启rpc远程调用能力
////这里使用睡眠来保证
std::cout << "raftServer node:" << m_me << " start to sleep to wait all ohter raftnode start!!!!" << std::endl;
sleep(6);
std::cout << "raftServer node:" << m_me << " wake up!!!! start to connect other raftnode" << std::endl;
//获取所有raft节点ip、port ,并进行连接 ,要排除自己
MprpcConfig config;
config.LoadConfigFile(nodeInforFileName.c_str());
std::vector<std::pair<std::string, short> > ipPortVt;
for (int i = 0; i < INT_MAX - 1; ++i) {
std::string node = "node" + std::to_string(i);

std::string nodeIp = config.Load(node + "ip");
std::string nodePortStr = config.Load(node + "port");
if (nodeIp.empty()) {
break;
}
ipPortVt.emplace_back(nodeIp, atoi(nodePortStr.c_str())); //沒有atos方法,可以考慮自己实现
}
std::vector<std::shared_ptr<RaftRpcUtil> > servers;
//进行连接
for (int i = 0; i < ipPortVt.size(); ++i) {
if (i == m_me) {
servers.push_back(nullptr);
continue;
}
std::string otherNodeIp = ipPortVt[i].first;
short otherNodePort = ipPortVt[i].second;
auto *rpc = new RaftRpcUtil(otherNodeIp, otherNodePort);
servers.push_back(std::shared_ptr<RaftRpcUtil>(rpc));

std::cout << "node" << m_me << " 连接node" << i << "success!" << std::endl;
}
sleep(ipPortVt.size() - me); //等待所有节点相互连接成功,再启动raft
m_raftNode->init(servers, m_me, persister, applyChan);
// kv的server直接与raft通信,但kv不直接与raft通信,所以需要把ApplyMsg的chan传递下去用于通信,两者的persist也是共用的

//////////////////////////////////

// You may need initialization code here.
// m_kvDB; //kvdb初始化
m_skipList;
waitApplyCh;
m_lastRequestId;
m_lastSnapShotRaftLogIndex = 0; // todo:感覺這個函數沒什麼用,不如直接調用raft節點中的snapshot值???
auto snapshot = persister->ReadSnapshot();
if (!snapshot.empty()) {
ReadSnapShotToInstall(snapshot);
}
std::thread t2(&KvServer::ReadRaftApplyCommandLoop, this); //马上向其他节点宣告自己就是leader
t2.join(); //由於ReadRaftApplyCommandLoop一直不會結束,达到一直卡在这的目的
}