phxpaxos源码分析: Proposer与Accepter
Paxos协议是分布式系统设计中的一个非常重要的协议,本文转载自微信后台团队公众号团队所发表一系列Paxos的文章,中间针对自己的理解略有修改或注释。在此处做一个备份,一方面为了加深对Paxos协议的理解,另一方面也方便自己的后续查找,防止文章丢失。
1. proposal的发起与接受
本文我们会通过分析phxpaxos中自带的phxecho示例程序,以进一步理解Paxos确定一个值
的过程。在分析具体的程序之前,我们再简要重申一下Paxos算法所包含3个角色:
-
Proposer: 提案发起者。负责发起提案,提案本身由编号(number)、值(value)两部分组成
-
Acceptor: 提案接收者。负责接收由Proposer发起的提案,通过一定的规则首先确定提案编号,最终确定提案值。
-
Learner: 被选中提案的学习者。Learner不参与提案的确定过程,只负责学习已确定好的提案值。
简单来说,提案由每个节点的Proposer、Acceptor参与决策,如果某个节点由于网络故障等原因未参与决策,由Learner负责学习已经选中的提案值。
Paxos算法分为提案选定(Prepare)、确定提案值(accept)两个阶段。每个阶段都有可能需要执行多次,每次都花费一个RTT网络耗时。为了达到产品级应用的目的,PhxPaxos采用选主并在主上执行Paxos算法的方式来避免提案冲突,随后为所有的提案执行noop操作,有条件跳过Prepare阶段将网络花费降至理论最小值(一个RTT)。
注: 示例程序phxecho的运行并未执行选主,但仍是以Multi-Paxos方式来进行工作的。Multi-Paxos的工作并不依赖于Master,只是在有Master的情况下可以避免提案冲突,达到最高的工作效率。
本章我们先来看Proposer、Acceptor两个角色,Learner我们会在后续的章节进行讲解。
2. Proposer
我们先来看PhxEchoServer::Echo()
函数,其通过调用如下函数来发起一个提案:
int PNode :: Propose(const int iGroupIdx, const std::string & sValue, uint64_t & llInstanceID, SMCtx * poSMCtx)
{
if (!CheckGroupID(iGroupIdx))
{
return Paxos_GroupIdxWrong;
}
return m_vecGroupList[iGroupIdx]->GetCommitter()->NewValueGetID(sValue, llInstanceID, poSMCtx);
}
如上我们看到,是通过进一步调用对应Group的Committer来发起提案的。Committer::NewValueGetID()函数本身较为简单,我们不进行说明,我们来看其所调用的下一级函数:
int Committer :: NewValueGetIDNoRetry(const std::string & sValue, uint64_t & llInstanceID, SMCtx * poSMCtx)
{
LogStatus();
int iLockUseTimeMs = 0;
bool bHasLock = m_oWaitLock.Lock(m_iTimeoutMs, iLockUseTimeMs);
if (!bHasLock)
{
if (iLockUseTimeMs > 0)
{
BP->GetCommiterBP()->NewValueGetLockTimeout();
PLGErr("Try get lock, but timeout, lockusetime %dms", iLockUseTimeMs);
return PaxosTryCommitRet_Timeout;
}
else
{
BP->GetCommiterBP()->NewValueGetLockReject();
PLGErr("Try get lock, but too many thread waiting, reject");
return PaxosTryCommitRet_TooManyThreadWaiting_Reject;
}
}
int iLeftTimeoutMs = -1;
if (m_iTimeoutMs > 0)
{
iLeftTimeoutMs = m_iTimeoutMs > iLockUseTimeMs ? m_iTimeoutMs - iLockUseTimeMs : 0;
if (iLeftTimeoutMs < 200)
{
PLGErr("Get lock ok, but lockusetime %dms too long, lefttimeout %dms", iLockUseTimeMs, iLeftTimeoutMs);
BP->GetCommiterBP()->NewValueGetLockTimeout();
m_oWaitLock.UnLock();
return PaxosTryCommitRet_Timeout;
}
}
PLGImp("GetLock ok, use time %dms", iLockUseTimeMs);
BP->GetCommiterBP()->NewValueGetLockOK(iLockUseTimeMs);
//pack smid to value
int iSMID = poSMCtx != nullptr ? poSMCtx->m_iSMID : 0;
string sPackSMIDValue = sValue;
m_poSMFac->PackPaxosValue(sPackSMIDValue, iSMID);
m_poCommitCtx->NewCommit(&sPackSMIDValue, poSMCtx, iLeftTimeoutMs);
m_poIOLoop->AddNotify();
int ret = m_poCommitCtx->GetResult(llInstanceID);
m_oWaitLock.UnLock();
return ret;
}
对于同一个Committer,可能会有多个线程同时调用其来发起Proposal
,因此首先需要通过获取m_oWaitLock
来使其串行化(这里假设RTT最少为200ms,因此如果剩余时间iLeftTimeoutMs低于该值,不再进行后续的提交操作)。
接着执行如下步骤:
1) 将StateMachine ID打包进MSG
通过调用PackPaxosValue()函数来将StateMachine ID打包进所要提交的消息中(此处我们传递的状态机ID为1
):
void SMFac :: PackPaxosValue(std::string & sPaxosValue, const int iSMID)
{
char sSMID[sizeof(int)] = {0};
if (iSMID != 0)
{
memcpy(sSMID, &iSMID, sizeof(sSMID));
}
sPaxosValue = string(sSMID, sizeof(sSMID)) + sPaxosValue;
}
2) 初始化该Committer对应的上下文
调用CommitCtx::NewCommit()重新对该Committer所关联的CommitCtx进行初始化,准备进行提交:
void CommitCtx :: NewCommit(std::string * psValue, SMCtx * poSMCtx, const int iTimeoutMs)
{
m_oSerialLock.Lock();
m_llInstanceID = (uint64_t)-1;
m_iCommitRet = -1;
m_bIsCommitEnd = false;
m_iTimeoutMs = iTimeoutMs;
m_psValue = psValue;
m_poSMCtx = poSMCtx;
if (psValue != nullptr)
{
PLGHead("OK, valuesize %zu", psValue->size());
}
m_oSerialLock.UnLock();
}
3) 通知IOLoop有新的Commit
调用IOLoop::AddNotify()唤醒可能处于挂起状态的IOLoop,让其执行我们的提交操作:
void IOLoop :: AddNotify()
{
m_oMessageQueue.lock();
m_oMessageQueue.add(nullptr);
m_oMessageQueue.unlock();
}
注意,这里我们并没有将真正需要发送的消息投递到队列,这仅仅只是起一个通知作用。事实上,m_oMessageQueue
是作为接收消息队列使用,而不是发送消息队列使用。这里发送一个nullptr
仅仅起唤醒作用。
题外话,IOLoop唤醒之后,其实是IOLoop::OneLoop()循环中,通过调用m_poInstance()->CheckNewValue()来处理该新的提议的。
4) 等待提交结果
调用CommitCtx::GetResult()不断等待提交结果:
int CommitCtx :: GetResult(uint64_t & llSuccInstanceID)
{
m_oSerialLock.Lock();
while (!m_bIsCommitEnd)
{
m_oSerialLock.WaitTime(1000);
}
if (m_iCommitRet == 0)
{
llSuccInstanceID = m_llInstanceID;
PLGImp("commit success, instanceid %lu", llSuccInstanceID);
}
else
{
PLGErr("commit fail, ret %d", m_iCommitRet);
}
m_oSerialLock.UnLock();
return m_iCommitRet;
}
这里会不断的进行等待,直到提交超时或者受到其他Acceptor返回过来的结果。
2.1 对新的Commit进行检查
在IOLoop::OneLoop()中会调用本函数检查是否有新的提交:
void Instance :: CheckNewValue()
{
if (!m_oCommitCtx.IsNewCommit())
{
return;
}
if (!m_oLearner.IsIMLatest())
{
return;
}
if (m_poConfig->IsIMFollower())
{
PLGErr("I'm follower, skip this new value");
m_oCommitCtx.SetResultOnlyRet(PaxosTryCommitRet_Follower_Cannot_Commit);
return;
}
if (!m_poConfig->CheckConfig())
{
PLGErr("I'm not in membership, skip this new value");
m_oCommitCtx.SetResultOnlyRet(PaxosTryCommitRet_Im_Not_In_Membership);
return;
}
if ((int)m_oCommitCtx.GetCommitValue().size() > MAX_VALUE_SIZE)
{
PLGErr("value size %zu to large, skip this new value",
m_oCommitCtx.GetCommitValue().size());
m_oCommitCtx.SetResultOnlyRet(PaxosTryCommitRet_Value_Size_TooLarge);
return;
}
m_oCommitCtx.StartCommit(m_oProposer.GetInstanceID());
if (m_oCommitCtx.GetTimeoutMs() != -1)
{
m_oIOLoop.AddTimer(m_oCommitCtx.GetTimeoutMs(), Timer_Instance_Commit_Timeout, m_iCommitTimerID);
}
m_oTimeStat.Point();
if (m_poConfig->GetIsUseMembership()
&& (m_oProposer.GetInstanceID() == 0 || m_poConfig->GetGid() == 0))
{
//Init system variables.
PLGHead("Need to init system variables, Now.InstanceID %lu Now.Gid %lu",
m_oProposer.GetInstanceID(), m_poConfig->GetGid());
uint64_t llGid = OtherUtils::GenGid(m_poConfig->GetMyNodeID());
string sInitSVOpValue;
int ret = m_poConfig->GetSystemVSM()->CreateGid_OPValue(llGid, sInitSVOpValue);
assert(ret == 0);
m_oSMFac.PackPaxosValue(sInitSVOpValue, m_poConfig->GetSystemVSM()->SMID());
m_oProposer.NewValue(sInitSVOpValue);
}
else
{
if (m_oOptions.bOpenChangeValueBeforePropose) {
m_oSMFac.BeforePropose(m_poConfig->GetMyGroupIdx(), m_oCommitCtx.GetCommitValue());
}
m_oProposer.NewValue(m_oCommitCtx.GetCommitValue());
}
}
下面我们简要分析一下本函数的执行:
1) 通过CommitCtx检查是否是一个新的提交;
2) 如果当前Instance所关联的Learner检测到还有一些老的提议没有学习到,则这里会禁止提交。假如不禁止进行强行提交的话,也会因为Proposer ID过低而被其他Acceptor所拒绝,因此后续我们得不断的提高Proposer ID,这样不但浪费时间而且占用带宽。因此在我们明确知道Learner还有一些旧的提议没有学习完之前,我们拒绝提交。
3) 若本Instance只是作为一个Follower的话,不参与提交
4) 若当前节点并不是集群中的一员的话,禁止提交
注: 在我们的示例程序中,我们通过如下方式运行 ./phxecho 127.0.0.1:11111 127.0.0.1:11111,127.0.0.1:11112,127.0.0.1:11113 这样当前节点就已经是集群中一员了。在Config::Init()函数中会将后面3个节点信息添加到SystemVSM中
5) 启动提交
m_oCommitCtx.StartCommit(m_oProposer.GetInstanceID());
关于m_oProposer
是如何初始化并产生新的proposerID的,我们会在后面通过相关章节进行讲解。
6) 根据提交时设置的超时时间,来判断我们是否要启动定时器(目前我们将超时时间设置为-1,因此并不需要启动超时定时器)
if (m_oCommitCtx.GetTimeoutMs() != -1)
{
m_oIOLoop.AddTimer(m_oCommitCtx.GetTimeoutMs(), Timer_Instance_Commit_Timeout, m_iCommitTimerID);
}
7) proposer发起新的提交
默认情况下,我们并不使用membership,m_oOptions.bOpenChangeValueBeforePropose的值也为false,因此我们会直接进入proposer的提交。
m_oProposer.NewValue(m_oCommitCtx.GetCommitValue());
2.2 Proposer::NewValue()
接着上面,我们先来看Proposer::NewValue()函数:
int Proposer :: NewValue(const std::string & sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
BP->GetProposerBP()->NewProposalSkipPrepare();
PLGHead("skip prepare, directly start accept");
Accept();
}
else
{
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}
1) 设置我们要提交的值设置到m_oProposerState
中,通常情况下一轮新的Paxos提交,初始m_oProposerState.GetValue()的值为空。这里我们设置的值为:
|--------------------------------- | StateMachineID | message | | (4Bytes) | | ----------------------------------
2) 设置prepare的超时时间与accept的超时时间
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS; //2s m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS; //1s
3) 执行Prepare操作。
满足如下场景,允许跳过prepare阶段:
本节点之前执行国Prepare阶段,并且Prepare阶段的直接结果为Accept
当不满足上述场景时,需要执行完整的Paxos两阶段流程。这里我们虽然使用的是Multi-Paxos,但通常第一次提交我们还是要执行Prepare,因此下面我们来详细的分析一下Prepare()函数。
2.3 Prepare阶段
本函数正式进入我们在Paxos理论
中介绍的Prepare阶段,我们来看:
void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeone = false;
m_oProposerState.ResetHighestOtherPreAcceptBallot();
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
m_oMsgCounter.StartNewRound();
AddPrepareTimer();
PLGHead("END OK");
BroadcastMessage(oPaxosMsg);
}
发起Prepare主要做4件事情:
1) Proposer状态重置,表明当前开始进入Prepare阶段
ExitAccept(); m_bIsPreparing = true; m_bCanSkipPrepare = false; m_bWasRejectBySomeone = false;
2) 按需使用编号
m_oProposerState.ResetHighestOtherPreAcceptBallot();
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
这里按需
的意思是指,当有其他节点明确拒绝了该提案,按Paxos协议必须使用新的提案编号重写发起提案;而如果并无其他节点拒绝,即由于超时等原因导致的重新发起提案,可沿用原来的编号。
3) 构造PaxosMsg
通过如下方式构造PaxosMsg:
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
这里我们查看paxos_msg.proto
文件,其消息格式如下:
message PaxosMsg
{
required int32 MsgType = 1;
optional uint64 InstanceID = 2;
optional uint64 NodeID = 3;
optional uint64 ProposalID = 4;
optional uint64 ProposalNodeID = 5;
optional bytes Value = 6;
optional uint64 PreAcceptID = 7;
optional uint64 PreAcceptNodeID = 8;
optional uint64 RejectByPromiseID = 9;
optional uint64 NowInstanceID = 10;
optional uint64 MinChosenInstanceID = 11;
optional uint32 LastChecksum = 12;
optional uint32 Flag = 13;
optional bytes SystemVariables = 14;
optional bytes MasterVariables = 15;
};
4) 清除消息计数器,以开启新的一轮消息记录
m_oMsgCounter.StartNewRound();
5) 设置Prepare超时定时器。
void Proposer :: OnPrepareTimeout()
{
PLGHead("OK");
if (GetInstanceID() != m_llTimeoutInstanceID)
{
PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
m_llTimeoutInstanceID, GetInstanceID());
return;
}
BP->GetProposerBP()->PrepareTimeout();
Prepare(m_bWasRejectBySomeone);
}
Prepare超时原因有很多,比如网络丢包。当Prepare超时时,处理方式也很简单,重新执行Prepare()。注: 这里用m_llTimeoutInstanceID
来标识一个Prepare超时定时器,因为可能有很多定时器同时在运行。
6) 发送Prepare消息
int Base :: BroadcastMessage(const PaxosMsg & oPaxosMsg, const int iRunType, const int iSendType)
{
if (m_bIsTestMode)
{
return 0;
}
BP->GetInstanceBP()->BroadcastMessage();
if (iRunType == BroadcastMessage_Type_RunSelf_First)
{
if (m_poInstance->OnReceivePaxosMsg(oPaxosMsg) != 0)
{
return -1;
}
}
string sBuffer;
int ret = PackMsg(oPaxosMsg, sBuffer);
if (ret != 0)
{
return ret;
}
ret = m_poMsgTransport->BroadcastMessage(m_poConfig->GetMyGroupIdx(), sBuffer, iSendType);
if (iRunType == BroadcastMessage_Type_RunSelf_Final)
{
m_poInstance->OnReceivePaxosMsg(oPaxosMsg);
}
return ret;
}
这里在消息发送之前依然会再进行一次检查,看是否有必要进行发送。这里可能主要涉及到两个时间段,这两个时间段内整个集群的状态都可能会发生改变:
-
Prepare准备过程所耗费的时间段
-
Prepare定时器超时时间段(因为第二次Prepare()也同样会调用到BroadcastMessage()函数)
接着我们再来看对Paxos消息的打包:
int Base :: PackMsg(const PaxosMsg & oPaxosMsg, std::string & sBuffer)
{
std::string sBodyBuffer;
bool bSucc = oPaxosMsg.SerializeToString(&sBodyBuffer);
if (!bSucc)
{
PLGErr("PaxosMsg.SerializeToString fail, skip this msg");
return -1;
}
int iCmd = MsgCmd_PaxosMsg;
PackBaseMsg(sBodyBuffer, iCmd, sBuffer);
return 0;
}
上面先调用protobuf相关序列化函数来进行序列化,之后再调用PackBaseMsg进行进一步的打包。打包后的消息类似如下:
|------------------------------------------------------------------------------------------------------------------------ | | Header | | | | GroupID |-----------------------------------------| PaxosMsg | checksum | | | header_len | Header_content | | | |------------------------------------------------------------------------------------------------------------------------ | group_id | len | gid | rid | cmd | version| MsgType | InstanceID | NodeID | ProposalID | crc32_sum | | 4byte | (2Byte) |8byte |8byte|4byte| 4byte | 4byte | 8byte | 8byte | 8byte | 4byte | -------------------------------------------------------------------------------------------------------------------------
消息打完包之后,接着调用如下函数发往相应的节点:
int Communicate :: BroadcastMessage(const int iGroupIdx, const std::string & sMessage, const int iSendType)
{
const std::set<nodeid_t> & setNodeInfo = m_poConfig->GetSystemVSM()->GetMembershipMap();
for (auto & it : setNodeInfo)
{
if (it != m_iMyNodeID)
{
Send(iGroupIdx, it, NodeInfo(it), sMessage, iSendType);
}
}
return 0;
}
注意上面并不会发送给节点自身。后续收到其他节点的返回消息时,我们默认节点自身已经promise了。
2.4 MsgType_PaxosPrepareReply阶段
当收到发来的网络消息时,首先会回调Instance::OnReceive()函数,假如我们收到的是Paxos消息,则会回调到Instance::OnReceivePaxosMsg()函数,现在我们来看一下当收到MsgType_PaxosPrepareReply消息时的回调函数Instance::ReceiveMsgForProposer():
int Instance :: ReceiveMsgForProposer(const PaxosMsg & oPaxosMsg)
{
if (m_poConfig->IsIMFollower())
{
PLGErr("I'm follower, skip this message");
return 0;
}
///////////////////////////////////////////////////////////////
if (oPaxosMsg.instanceid() != m_oProposer.GetInstanceID())
{
if (oPaxosMsg.instanceid() + 1 == m_oProposer.GetInstanceID())
{
//Exipred reply msg on last instance.
//If the response of a node is always slower than the majority node,
//then the message of the node is always ignored even if it is a reject reply.
//In this case, if we do not deal with these reject reply, the node that
//gave reject reply will always give reject reply.
//This causes the node to remain in catch-up state.
//
//To avoid this problem, we need to deal with the expired reply.
if (oPaxosMsg.msgtype() == MsgType_PaxosPrepareReply)
{
m_oProposer.OnExpiredPrepareReply(oPaxosMsg);
}
else if (oPaxosMsg.msgtype() == MsgType_PaxosAcceptReply)
{
m_oProposer.OnExpiredAcceptReply(oPaxosMsg);
}
}
BP->GetInstanceBP()->OnReceivePaxosProposerMsgInotsame();
//PLGErr("InstanceID not same, skip msg");
return 0;
}
if (oPaxosMsg.msgtype() == MsgType_PaxosPrepareReply)
{
m_oProposer.OnPrepareReply(oPaxosMsg);
}
else if (oPaxosMsg.msgtype() == MsgType_PaxosAcceptReply)
{
m_oProposer.OnAcceptReply(oPaxosMsg);
}
return 0;
}
在这里我们首先会处理前一个paxos instance所返回来的过期(Expired)消息,因为假如集群中某个节点一直落后于整个集群的话,那么当该节点返回reject响应时我们可能仍需要忽略。若我们不处理这些reject响应,则使得该节点后续可能都会一直返回reject响应。
接着调用Proposer::OnPrepareReply()来处理prepare响应:
void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnPrepareReply();
if (!m_bIsPreparing)
{
BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
//PLGErr("Not preparing, skip this msg");
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
//PLGErr("ProposalID not same, skip this msg");
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
else
{
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
m_bCanSkipPrepare = true;
Accept();
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
此函数按如下步骤对响应消息进行处理:
1) 首先当前Proposer检查自身是否处于preparing阶段
2) 检查proposalID是否相同
3) MsgCounter用于记录当前收到来自哪些节点的响应,以及收到的响应结果(promised/reject)
4) 如果本轮prepare通过,那么进入Accept();否则开启定时器以进行下一轮的投票。
2.5 发起Accept请求
当Prepare阶段成功之后,proposer就会发起Accept请求:
void Proposer :: Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
ExitPrepare();
m_bIsAccepting = true;
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
AddAcceptTimer();
PLGHead("END");
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}
1) 生成PaxosMsg
|------------------------------------------------------------------------------------------- | MsgType | InstanceID | NodeID | ProposalID | Value | LastChecksum | | 4Byte | 8Byte | 8Byte | 8Byte | 变长 | 4Byte | ----------------------------------------------------------------------------------------------
2) 添加Accept超时定时器,比如网络丢包。当Accept超时时,处理方式也很简单,重新从Prepare开始
3) 向集群各个节点发起Accept请求
2.6 对Accept响应的处理
当收到对应消息时,首先会回调Instance::OnReceive()函数,对于Paxos消息会回调到Instance::OnReceivePaxosMsg(),对于MsgType_PaxosAcceptReply消息,会回调Instance::ReceiveMsgForProposer(),接着再调用到Proposer::OnAcceptReply(),现在我们来看一看该函数:
void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnAcceptReply();
if (!m_bIsAccepting)
{
//PLGErr("Not proposing, skip this msg");
BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//PLGErr("ProposalID not same, skip this msg");
BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
PLGDebug("[Accept]");
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
}
else
{
PLGDebug("[Reject]");
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
1) 若当前并不accepting阶段,直接返回;
2) 若proposalID不匹配,直接返回
3) MsgCounter记录相关的返回消息
4) 如果本轮Accept被采纳,则向Learnner发出通知消息;否则重新发起Prepare请求
3. Acceptor
Acceptor作为提案的被动参与者,也分为OnPrepare和OnAccept阶段。
3.1 OnPrepare阶段
当收到其他节点发送过来的Prepare请求时,首先会调用到Instance::OnReceive(),之后再回调到Instance::OnReceivePaxosMsg(),之后经过相关调用回调到Acceptor :: OnPrepare():
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
m_oAcceptorState.SetPromiseBallot(oBallot);
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
else
{
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}
OnPrepare函数看似并未做任何有效性校验,但这部分校验是必不可少的,并未省去,而是出现在了调用OnPrepare的Instance类的上层函数中。这里的校验主要是保证参数中的instance ID与acceptor一致。
若发过来的提案编号(BallotNumber)大于等于
当前Acceptor所承诺(promised)的提案编号,则对对新发送过来的提案编号进行promise,并返回其上一次所承诺的提案信息给proposer; 否则,则返回拒绝消息给proposer。
我们在进行promise时,会把当前所promise的信息写入到LevelDB与LogStore当中。写入到LogStore中的消息格式如下:
|------------------------------------------------------------------ | Length | InstanceID | AcceptorStateData | | 4Byte | 8Byte | | ------------------------------------------------------------------- 1) Length字段用于保存InstanceID与AcceptorStateData所占用的空间总和 2) AcceptorStateData消息格式如下 message AcceptorStateData { required uint64 InstanceID = 1; required uint64 PromiseID = 2; required uint64 PromiseNodeID = 3; required uint64 AcceptedID = 4; required uint64 AcceptedNodeID = 5; required bytes AcceptedValue = 6; required uint32 Checksum = 7; }
写入到LevelDB的消息格式如下:
------------------------------------------------------------- | Key | Value | ------------------------------------------------------------- 1) key为instanceID 2) Value为本记录在LogStore中的位置,包括logstore文件名、偏移、校验值。logstore文件名还记得以前我们讲述过的vfile目录下的0.f、1.f这样的文件吗?
3.2 OnAccept阶段
当收到其他节点发送过来的Accept请求时,会回调执行到Acceptor::OnAccess()函数:
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
else
{
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}
如果收到的Accept请求的提案编号(BallotNumber)大于等于
当前Acceptor所承诺(promised)的,那么会对发送过来的提案进行promise,并持久化到LevelDB与PaxosLog中,持久化格式与上面OnPrepare中的一模一样; 否则返回拒绝消息给proposer。
4. 总结
本章简要介绍了Paxos算法的原理,了解到Paxos算法的三大角色:Proposer、Acceptor、Learner。讲解了Proposer、Acceptor两个角色的主要代码实现,以及二者如何参与到Prepare、Accept两个阶段中。
至于最后一个角色Learner,原本的理解认为应该是参与度最低的,逻辑最少的角色。但PhxPaxos中,Learner是三者中实现最复杂的,这部分内容将在下一章单独讲解。
参看: