Paxos协议是分布式系统设计中的一个非常重要的协议,本文转载自微信后台团队公众号团队所发表一系列Paxos的文章,中间针对自己的理解略有修改或注释。在此处做一个备份,一方面为了加深对Paxos协议的理解,另一方面也方便自己的后续查找,防止文章丢失。
1. proposal的发起与接受
本文我们会通过分析phxpaxos中自带的phxecho示例程序,以进一步理解Paxos确定一个值
的过程。在分析具体的程序之前,我们再简要重申一下Paxos算法所包含3个角色:
-
Proposer: 提案发起者。负责发起提案,提案本身由编号(number)、值(value)两部分组成
-
Acceptor: 提案接收者。负责接收由Proposer发起的提案,通过一定的规则首先确定提案编号,最终确定提案值。
-
Learner: 被选中提案的学习者。Learner不参与提案的确定过程,只负责学习已确定好的提案值。
简单来说,提案由每个节点的Proposer、Acceptor参与决策,如果某个节点由于网络故障等原因未参与决策,由Learner负责学习已经选中的提案值。
Paxos算法分为提案选定(Prepare)、确定提案值(accept)两个阶段。每个阶段都有可能需要执行多次,每次都花费一个RTT网络耗时。为了达到产品级应用的目的,PhxPaxos采用选主并在主上执行Paxos算法的方式来避免提案冲突,随后为所有的提案执行noop操作,有条件跳过Prepare阶段将网络花费降至理论最小值(一个RTT)。
注: 示例程序phxecho的运行并未执行选主,但仍是以Multi-Paxos方式来进行工作的。Multi-Paxos的工作并不依赖于Master,只是在有Master的情况下可以避免提案冲突,达到最高的工作效率。
本章我们先来看Proposer、Acceptor两个角色,Learner我们会在后续的章节进行讲解。
2. Proposer
我们先来看PhxEchoServer::Echo()
函数,其通过调用如下函数来发起一个提案:
如上我们看到,是通过进一步调用对应Group的Committer来发起提案的。Committer::NewValueGetID()函数本身较为简单,我们不进行说明,我们来看其所调用的下一级函数:
对于同一个Committer,可能会有多个线程同时调用其来发起Proposal
,因此首先需要通过获取m_oWaitLock
来使其串行化(这里假设RTT最少为200ms,因此如果剩余时间iLeftTimeoutMs低于该值,不再进行后续的提交操作)。
接着执行如下步骤:
1) 将StateMachine ID打包进MSG
通过调用PackPaxosValue()函数来将StateMachine ID打包进所要提交的消息中(此处我们传递的状态机ID为1
):
2) 初始化该Committer对应的上下文
调用CommitCtx::NewCommit()重新对该Committer所关联的CommitCtx进行初始化,准备进行提交:
3) 通知IOLoop有新的Commit
调用IOLoop::AddNotify()唤醒可能处于挂起状态的IOLoop,让其执行我们的提交操作:
注意,这里我们并没有将真正需要发送的消息投递到队列,这仅仅只是起一个通知作用。事实上,m_oMessageQueue
是作为接收消息队列使用,而不是发送消息队列使用。这里发送一个nullptr
仅仅起唤醒作用。
题外话,IOLoop唤醒之后,其实是IOLoop::OneLoop()循环中,通过调用m_poInstance()->CheckNewValue()来处理该新的提议的。
4) 等待提交结果
调用CommitCtx::GetResult()不断等待提交结果:
这里会不断的进行等待,直到提交超时或者受到其他Acceptor返回过来的结果。
2.1 对新的Commit进行检查
在IOLoop::OneLoop()中会调用本函数检查是否有新的提交:
下面我们简要分析一下本函数的执行:
1) 通过CommitCtx检查是否是一个新的提交;
2) 如果当前Instance所关联的Learner检测到还有一些老的提议没有学习到,则这里会禁止提交。假如不禁止进行强行提交的话,也会因为Proposer ID过低而被其他Acceptor所拒绝,因此后续我们得不断的提高Proposer ID,这样不但浪费时间而且占用带宽。因此在我们明确知道Learner还有一些旧的提议没有学习完之前,我们拒绝提交。
3) 若本Instance只是作为一个Follower的话,不参与提交
4) 若当前节点并不是集群中的一员的话,禁止提交
注: 在我们的示例程序中,我们通过如下方式运行
./phxecho 127.0.0.1:11111 127.0.0.1:11111,127.0.0.1:11112,127.0.0.1:11113
这样当前节点就已经是集群中一员了。在Config::Init()函数中会将后面3个节点信息添加到SystemVSM中
5) 启动提交
关于m_oProposer
是如何初始化并产生新的proposerID的,我们会在后面通过相关章节进行讲解。
6) 根据提交时设置的超时时间,来判断我们是否要启动定时器(目前我们将超时时间设置为-1,因此并不需要启动超时定时器)
7) proposer发起新的提交
默认情况下,我们并不使用membership,m_oOptions.bOpenChangeValueBeforePropose的值也为false,因此我们会直接进入proposer的提交。
2.2 Proposer::NewValue()
接着上面,我们先来看Proposer::NewValue()函数:
1) 设置我们要提交的值设置到m_oProposerState
中,通常情况下一轮新的Paxos提交,初始m_oProposerState.GetValue()的值为空。这里我们设置的值为:
|---------------------------------
| StateMachineID | message |
| (4Bytes) | |
----------------------------------
2) 设置prepare的超时时间与accept的超时时间
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS; //2s
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS; //1s
3) 执行Prepare操作。
满足如下场景,允许跳过prepare阶段:
本节点之前执行国Prepare阶段,并且Prepare阶段的直接结果为Accept
当不满足上述场景时,需要执行完整的Paxos两阶段流程。这里我们虽然使用的是Multi-Paxos,但通常第一次提交我们还是要执行Prepare,因此下面我们来详细的分析一下Prepare()函数。
2.3 Prepare阶段
本函数正式进入我们在Paxos理论
中介绍的Prepare阶段,我们来看:
发起Prepare主要做4件事情:
1) Proposer状态重置,表明当前开始进入Prepare阶段
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeone = false;
2) 按需使用编号
这里按需
的意思是指,当有其他节点明确拒绝了该提案,按Paxos协议必须使用新的提案编号重写发起提案;而如果并无其他节点拒绝,即由于超时等原因导致的重新发起提案,可沿用原来的编号。
3) 构造PaxosMsg
通过如下方式构造PaxosMsg:
这里我们查看paxos_msg.proto
文件,其消息格式如下:
4) 清除消息计数器,以开启新的一轮消息记录
m_oMsgCounter.StartNewRound();
5) 设置Prepare超时定时器。
Prepare超时原因有很多,比如网络丢包。当Prepare超时时,处理方式也很简单,重新执行Prepare()。注: 这里用m_llTimeoutInstanceID
来标识一个Prepare超时定时器,因为可能有很多定时器同时在运行。
6) 发送Prepare消息
这里在消息发送之前依然会再进行一次检查,看是否有必要进行发送。这里可能主要涉及到两个时间段,这两个时间段内整个集群的状态都可能会发生改变:
接着我们再来看对Paxos消息的打包:
上面先调用protobuf相关序列化函数来进行序列化,之后再调用PackBaseMsg进行进一步的打包。打包后的消息类似如下:
|------------------------------------------------------------------------------------------------------------------------
| | Header | | |
| GroupID |-----------------------------------------| PaxosMsg | checksum |
| | header_len | Header_content | | |
|------------------------------------------------------------------------------------------------------------------------
| group_id | len | gid | rid | cmd | version| MsgType | InstanceID | NodeID | ProposalID | crc32_sum |
| 4byte | (2Byte) |8byte |8byte|4byte| 4byte | 4byte | 8byte | 8byte | 8byte | 4byte |
-------------------------------------------------------------------------------------------------------------------------
消息打完包之后,接着调用如下函数发往相应的节点:
注意上面并不会发送给节点自身。后续收到其他节点的返回消息时,我们默认节点自身已经promise了。
2.4 MsgType_PaxosPrepareReply阶段
当收到发来的网络消息时,首先会回调Instance::OnReceive()函数,假如我们收到的是Paxos消息,则会回调到Instance::OnReceivePaxosMsg()函数,现在我们来看一下当收到MsgType_PaxosPrepareReply消息时的回调函数Instance::ReceiveMsgForProposer():
在这里我们首先会处理前一个paxos instance所返回来的过期(Expired)消息,因为假如集群中某个节点一直落后于整个集群的话,那么当该节点返回reject响应时我们可能仍需要忽略。若我们不处理这些reject响应,则使得该节点后续可能都会一直返回reject响应。
接着调用Proposer::OnPrepareReply()来处理prepare响应:
此函数按如下步骤对响应消息进行处理:
1) 首先当前Proposer检查自身是否处于preparing阶段
2) 检查proposalID是否相同
3) MsgCounter用于记录当前收到来自哪些节点的响应,以及收到的响应结果(promised/reject)
4) 如果本轮prepare通过,那么进入Accept();否则开启定时器以进行下一轮的投票。
2.5 发起Accept请求
当Prepare阶段成功之后,proposer就会发起Accept请求:
1) 生成PaxosMsg
|-------------------------------------------------------------------------------------------
| MsgType | InstanceID | NodeID | ProposalID | Value | LastChecksum |
| 4Byte | 8Byte | 8Byte | 8Byte | 变长 | 4Byte |
----------------------------------------------------------------------------------------------
2) 添加Accept超时定时器,比如网络丢包。当Accept超时时,处理方式也很简单,重新从Prepare开始
3) 向集群各个节点发起Accept请求
2.6 对Accept响应的处理
当收到对应消息时,首先会回调Instance::OnReceive()函数,对于Paxos消息会回调到Instance::OnReceivePaxosMsg(),对于MsgType_PaxosAcceptReply消息,会回调Instance::ReceiveMsgForProposer(),接着再调用到Proposer::OnAcceptReply(),现在我们来看一看该函数:
1) 若当前并不accepting阶段,直接返回;
2) 若proposalID不匹配,直接返回
3) MsgCounter记录相关的返回消息
4) 如果本轮Accept被采纳,则向Learnner发出通知消息;否则重新发起Prepare请求
3. Acceptor
Acceptor作为提案的被动参与者,也分为OnPrepare和OnAccept阶段。
3.1 OnPrepare阶段
当收到其他节点发送过来的Prepare请求时,首先会调用到Instance::OnReceive(),之后再回调到Instance::OnReceivePaxosMsg(),之后经过相关调用回调到Acceptor :: OnPrepare():
OnPrepare函数看似并未做任何有效性校验,但这部分校验是必不可少的,并未省去,而是出现在了调用OnPrepare的Instance类的上层函数中。这里的校验主要是保证参数中的instance ID与acceptor一致。
若发过来的提案编号(BallotNumber)大于等于
当前Acceptor所承诺(promised)的提案编号,则对对新发送过来的提案编号进行promise,并返回其上一次所承诺的提案信息给proposer; 否则,则返回拒绝消息给proposer。
我们在进行promise时,会把当前所promise的信息写入到LevelDB与LogStore当中。写入到LogStore中的消息格式如下:
|------------------------------------------------------------------
| Length | InstanceID | AcceptorStateData |
| 4Byte | 8Byte | |
-------------------------------------------------------------------
1) Length字段用于保存InstanceID与AcceptorStateData所占用的空间总和
2) AcceptorStateData消息格式如下
message AcceptorStateData
{
required uint64 InstanceID = 1;
required uint64 PromiseID = 2;
required uint64 PromiseNodeID = 3;
required uint64 AcceptedID = 4;
required uint64 AcceptedNodeID = 5;
required bytes AcceptedValue = 6;
required uint32 Checksum = 7;
}
写入到LevelDB的消息格式如下:
-------------------------------------------------------------
| Key | Value |
-------------------------------------------------------------
1) key为instanceID
2) Value为本记录在LogStore中的位置,包括logstore文件名、偏移、校验值。logstore文件名还记得以前我们讲述过的vfile目录下的0.f、1.f这样的文件吗?
3.2 OnAccept阶段
当收到其他节点发送过来的Accept请求时,会回调执行到Acceptor::OnAccess()函数:
如果收到的Accept请求的提案编号(BallotNumber)大于等于
当前Acceptor所承诺(promised)的,那么会对发送过来的提案进行promise,并持久化到LevelDB与PaxosLog中,持久化格式与上面OnPrepare中的一模一样; 否则返回拒绝消息给proposer。
4. 总结
本章简要介绍了Paxos算法的原理,了解到Paxos算法的三大角色:Proposer、Acceptor、Learner。讲解了Proposer、Acceptor两个角色的主要代码实现,以及二者如何参与到Prepare、Accept两个阶段中。
至于最后一个角色Learner,原本的理解认为应该是参与度最低的,逻辑最少的角色。但PhxPaxos中,Learner是三者中实现最复杂的,这部分内容将在下一章单独讲解。
参看:
-
PhxPaxos源码分析之关于PhxPaxos
-
PhxPaxos源码解析(1)之概述篇
-
PhxPaxos源码分析之Proposer、Acceptor