Paxos协议是分布式系统设计中的一个非常重要的协议,本文转载自微信后台团队公众号团队所发表一系列Paxos的文章,中间针对自己的理解略有修改或注释。在此处做一个备份,一方面为了加深对Paxos协议的理解,另一方面也方便自己的后续查找,防止文章丢失。
1. 基本概念
上一章我们讲解了PhxPaxos的状态机,各种场景下可以定制不同的状态机,每个状态机独立消费一个paxos log以驱动业务状态变更。
状态机消费完paxos log之后,paxos log是否可以删除呢?答案是不行。有如下几个原因:
那什么时候可以删除paxos log呢?答案是不知道。因为某个节点可能永远处于离线状态,这时候必须保留从最初到现在所有的paxos log。但另一方面,如果数据不删除将无限增长,这是无法忍受的。
PhxPaxos因此引入了Checkpoint机制,关于该机制的详细描述请参见《状态机Checkpoint详解》,这里简要说明如下:
1) 一个Checkpoint代表着一份某一时刻被固化下来的状态机数据,它通过sm.h
下的StateMachine::GetCheckpointInstanceID()函数反馈它的精确时刻;
2) 每次启动replay时,只需要从GetCheckpointInstanceID()所指向的paxos log位置开始,而不是从0开始;
3) Node::SetHoldPaxosLogCount()控制需要保留多少在StateMachine::GetCheckpointInstanceID()之前的paxos log
4) 保留一定数量的paxos log的目的在于,如果其他节点数据不对齐,可以通过保留的这部分paxos log完成对齐,而不需要checkpoint数据介入;
5) 如果对齐数据已被删除,这时需要Checkpoint数据传输;
2. 代码设计
Checkpoint机制相关类图如下:
参照上一节讲的功能及类图补充说明如下:
-
Replayer: replay线程。当业务状态机已经消费指定paxos log后,交由Checkpoint重新执行。由Checkpoint relay的数据允许被删除
-
Cleaner: paxos log清理线程,根据配置的保留条数等清理paxos log
-
CheckpointSender: 归档数据发送线程。将归档数据发往其他节点,用于归档数据同步。
-
CheckpointReceiver: 归档数据接收器。接收其他节点发送过来的归档数据。通常归档数据接收完成之后,就会调用Learner::OnSendCheckpoint_End(),之后要求我们LoadCheckpointState()
-
CheckpointMgr: Checkpoint机制管理器,统一管理整个归档机制。
3. 状态机(StateMachine)
再来看StateMachine接口,这次我们重点关注Checkpoint相关接口:
接口说明如下:
-
GetCheckpointInstanceID(): Checkpoint所指向的最大InstanceID,在此之前的paxos log数据状态机已经不需要了。但通常为了其他节点的数据对齐需要,我们仍然会保留其之前的SetHoldPaxosLogCount()个paxos log
-
LockCheckpointState(): 用于开发者锁定状态机Checkpoint数据,这个锁定的意思是指这份数据文件不能被修改、移动和删除。因为接下来PhxPaxos就要将这些文件发送给其他节点,而如果这个过程中出现了修改,则发送的数据可能乱掉。
-
GetCheckpointState(): PhxPaxos获取Checkpoint文件列表,从而将文件发送给其他节点
-
UnLockCheckpointState(): 当PhxPaxos发送Checkpoint数据到其他节点后,会调用解锁函数,解除对开发者的状态机Checkpoint数据的锁定
-
LoadCheckpointState(): 当一个节点获得来自其他节点的Checkpoint数据时,会调用这个函数,将这份数据交由开发者进行处理(开发者往往要做的事情就是将这份Checkpoint数据覆盖当前节点的数据),当调用此函数完成后,PhxPaxos将会进行进程自杀操作,通过重启来完成一个新Checkpoint数据的启动。
-
ExecuteForCheckpoint(): paxos log归档的replay接口
4. Replayer
Replayer是一个独立的线程,负责将选中的提案值Checkpoint操作。实现逻辑非常简单,读取本机的Checkpoint InstanceID,定时和Max Chosen InstanceID比较,如果Checkpoint落后于Max Chosen InstanceID,则通过调用状态机的ExecuteForCheckpoint()进行重演:
PlayOne()的实现逻辑如下:
5. Paxos Log清理(Cleaner)
在PhxPaxos中,每个Group启动一个Cleaner线程清理本Group的paxos log。在Group中,instanceID是不断递增的,每个instanceID对应一个paxos log,当加入Checkpoint之后,我们有三个关键的instanceID:
-
min Chosen InstanceID: 本节点,选定提案的最小instanceID,即最老的paxos log所对应的instanceID
-
Checkpoint instanceID: 本节点,Checkpoint确定的最大instanceID,即在这之前的paxos log数据Checkpoint都已经不需要了。
-
max Chosen InstanceID: 本节点,选定提案的最大instanceID,即最新的paxos log所对应的instanceID
注意: 这里强调的是本节点, 不同节点数据可能不同
除此之外,还有一个配置: Node::SetHoldPaxosLogCount()控制需要保留多少在StateMachine::GetCheckpointInstanceID()之前的PaxosLog。保留一定数量的PaxosLog的目的在于,如果其他节点数据不对齐,可以通过保留的这部分paxos log完成对齐,而不需要checkpoint数据介入。
正常情况下,三者关系如下:
清理动作由Cleaner线程的run()动作触发,触发频率、清理数据量等通过配置参数指定。为了减少清理动作对系统产生的副作用,PhxPaxos将清理动作拆解到毫秒级别分批次运行。假设我们设定每秒清理2000条记录(Cleaner_DELETE_QPS=2000),那么执行间隔为iSleepMs=1ms,每次删除数据量为iDeleteInterval=3条。代码逻辑如下:
根据此频率触发清理动作,下面我们来看run()函数的实现:
触发清理paxos log的条件包括checkpoint instanceID和min chosen instanceID之间的记录超过需要保留的条数,且max chosen instanceID和min chosen instanceID之间的记录超过了保留条数。后面一个判定只是一种保险的附加逻辑,因为实际上max chosen instanceID >= checkpoint instanceID永远成立。
删除数据包括两个动作: 删除paxos log;更新本节点的min chosen instanceID信息。但如果每删除一条都调用SetMinChoseInstance()来写入数据库的话,效率极低,因此这里会每隔DELETE_SAVE_INTERVAL才会真正写到数据库。
6. CheckpointMgr
CheckpointMgr真正管理的只有Cleaner、Replayer两个对象,负责管理这两个线程的初始化、启动、停止等典型动作。关于Cleaner和Replayer前面已经讲过了,此处不再赘述。来看CheckpointMgr另外一组功能:
后面两个函数只是设置和查看标志位(m_bInAskforCheckpointMode),来看PrepareForAskforCheckpoint()的实现:
何时触发进入Checkpoint模式呢? 当本节点向其他节点发起learn请求,其他节点保存的min chosen instanceID比本地的instanceID还要大。这说明本节点无法再向该节点通过learn完成追赶,这时调用PrepareForAskForCheckpoint()请求进入Checkpoint模式。而真正进入Checkpoint模式如下两个条件之一:
一旦决定进入Checkpoint模式,向对应告知的节点发起Checkpoint请求:
对应节点接收该消息后,启动CheckpointSender线程,发送Checkpoint数据。
7. CheckpointSender/CheckpointReceiver
CheckpointSender的定位很明确,一旦启动线程,立即执行Checkpoint发送数据。线程核心函数如下:
-
Replayer负责本地追赶Learner数据,当需要发起Checkpoint操作时,要保证Checkpoint文件不变,因此PhxPaxos机制需要首先暂停Replayer;
-
Checkpoint之前,调用所有状态机的LockCheckpoint()锁定Checkpoint文件;
-
Checkpoint之后,调用所有状态机的UnLockCheckpoint()解除锁定;
-
恢复Replayer,追赶落后的Learner数据
SendCheckpoint内部又分为三个阶段: Checkpoint开始、Checkpoint数据传输、Checkpoint结束。
7.1 Checkpoint开始
通过向对方发送带CheckpointSendFileFlag_BEGIN标识的CheckpointMsgType_SendFile消息告知对方马上要发送checkpoint文件了:
CheckpointReceiver接收到该消息后,回调如下函数:
上面会做如下几件事情:
值得注意的是,整个过程中没有提及本机的Cleaner和Replayer,难道不应该把这些一并停掉?再有,先删除本地数据库中的paxos log再重置min chosen instanceID,过程中可能又有新的数据写入了,并发问题怎么解决呢?
记得CheckpointMgr中有一个InAskforCheckpointMode()方法,判断当前是否处于Checkpoint模式,搜索其使用者,我们看到这样一段逻辑:
当本节点接收到来自其他节点的数据时,如果当前处于Checkpoint模式,则非Checkpoint相关的数据包全部丢弃。因此,即便Cleaner和Replayer未停止,即便删除数据和重置之间存在时间间隔也不会有问题。因为整个Checkpoint过程中,并没有新的消息被处理,一切处于停滞
状态。
注: 这里CheckpointSender实现上看起来存在一点问题,貌似会发送所有的Checkpoint文件? 因为看GetCheckpointState()函数参数并没有指定开始instanceID ?
7.2 Checkpoint数据传输
Checkpoint执行过程代码虽然很多,但是逻辑并不复杂:
Checkpoint数据传输、处理过程中数据包标识为CheckpointSendFileFlag_ING。其会遍历本节点的所有状态机对象,依次做每个状态机的Checkpoint对账。通过调用状态机的GetCheckpointState()获取本状态机的全部Checkpoint文件列表,并依次发送。单次发送大小不超过1M。
CheckpointReceiver的伪代码如下:
7.3 Checkpoint结束
一旦所有的Checkpoint数据发送完成,需要发送CheckpointSendFileFlag_END消息到Checkpoint的发起者,处理逻辑如下:
引用《状态机Checkpoint详解》中的文字:
StateMachine::LoadCheckpointState()当一个节点获得来自其他节点的Checkpoint数据时,会调用这个函数,将这份数据交由开发者进行处理(开发者往往要做的事情就是将这份Checkpoint数据覆盖当前节点的数据)。当调用此函数完成后,PhxPaxos将会进行进程自杀,通过重启来完成一个新Checkpoint数据的启动。
也就是说,整个Checkpoint的对账过程,业务侧并没有感知,PhxPaxos内部只是将这些文件放到了一个临时路径,只在最后一刻才通过LoadCheckpointState()来通知业务处理。
8. 总结
Checkpoint机制在每个节点默认启用两个线程: Cleaner、Replayer。Cleaner负责定期清理过时的Paxos log。Replayer则负责追赶状态机的Execute数据到ExecuteForCheckpoint。另外,如果某个节点落后太多,通过Learner无法完成追赶时,将触发启动Checkpoint的另外一个线程CheckpointSender。CheckpointSender负责发送本节点的Checkpoint文件到CheckpointReceiver。一旦数据发送完成,CheckpointReceiver将进行进程自杀操作,通过重启完成Checkpoint数据同步。
Checkpoint机制并不是PhxPaxos算法的一部分,而是真正工程化的产物。它尝试解决的工程问题包括:
-
某个节点落后太多的场景下,如何实现快速追赶;
-
系统长时间运行场景下,如何避免海量日志;
-
系统异常等故障场景下,如何保证系统的可靠性;
OK,至此我们已经了解了PhxPaxos的基础网络部件、paxos角色以及工程化的Checkpoint机制,终于可以浮到水面窥视PhxPaxos的全貌了。
参看:
-
PhxPaxos源码分析之关于PhxPaxos
-
PhxPaxos源码解析(1)之概述篇
-
PhxPaxos源码分析之状态机
-
如何进行成员变更
-
一致性协议