在上一章我们讲述到PG Primary获取到权威日志之后,会进入到GetMissing阶段。本文介绍GetMissing阶段的处理,包括日志操作以及Active操作。
1. Peering之GetMissing状态处理
GetMissing的处理过程为:首先,拉取各个从OSD上的有效日志;其次,用主OSD上的权威日志与各个从OSD的日志进行对比,从而计算出各个从OSD上不一致的对象并保存在对应的pg_missing_t结构中,作为后续数据修复的依据。
主OSD的不一致的对象信息,已经在调用函数PG::proc_master_log()合并权威日志的过程中计算出来,所以这里只计算从OSD上不一致的对象。
1.1 GetMissing状态
在GetMissing的构造函数里,通过对比主OSD上的权威pg_info信息,来获取从OSD上的日志信息。
在GetMissing构造函数中,遍历pg->actingbackfill的OSD列表,针对每一个pg_shard_t做如下处理:
1) 不需要获取PG日志的情况
a) 如果当前副本为PG Primary,因为其已经有了完整的权威日志,因此可以直接跳过;
b) 如果pi.is_empty()为空,没有任何信息,需要Backfill过程来修复,不需要获取日志;
c) 如果pi.last_update小于pg->pg_log.get_tail(),该OSD的pg_info记录中,last_update小于权威日志的尾部记录,该OSD的日志和权威日志不重叠,该OSD操作已经远远落后于权威OSD,已经无法根据日志来修复,需要Backfill过程来修复;
d)pi.last_backfill为hobject_t(),说明在past interval期间,该OSD标记需要执行full Backfill操作,实际并没开始Backfill的工作,需要继续Backfill过程
e) pi.last_upate等于pi.last_complete,说明该PG没有丢失的对象,已经完成Recovery操作阶段,并且pi.last_update等于pg->info.last_update,说明该OSD的日志和权威日志的最后更新一致,说明该PG副本数据完整,不需要恢复;
2)获取日志情况: 当pi.last_update大于等于pg->info.log_tail,该OSD的日志记录和权威日志记录重叠,可以通过日志来修复。变量since是从last_epoch_started开始的版本值:
a) 如果该PG的日志记录pi.log_tail小于等于版本值since,那就发送消息pg_query_t::LOG,从since开始获取日志记录;
b) 如果该PG的日志记录pi.log_tail大于版本值since,就发送消息pg_query_t::FULLLOG来获取该OSD的全部日志记录。
注:这里我们只需要发送last_epoch_started之后的日志就可以,因为last_epoch_started之前的日志一定是达成一致的。
3) 最后检查如果peer_missing_requested为空,说明所有获取日志的请求返回并处理完成。如果pg->need_up_thru为true,抛出NeedUpThru事件,会进入到WaitUpThru
状态;否则,直接调用post_event(Activate(pg->get_osdmap()->get_epoch()))抛出Activate
事件。
下面举例说明日志的两种情况:
当前last_epoch_started的只为10,since为last_epoch_started后的首个日志版本值,当前需要恢复的有效日志是经过since操作之后的日志,之前的日志已经没有用了。
对应osd0,其日志log_tail大于since,全部拷贝osd0上的日志;对应osd1,其日志log_tail小于since,只拷贝从since开始的日志记录。
1.2 获取副本日志流程
下面我们给出PG Primary获取副本日志的调用流程:
1.3 PG Primary收到从副本上的日志处理
当一个PG的主OSD接收到从OSD返回的获取日志ACK应答后,就把该消息封装成MLogRec事件。状态GetMissing接收到该事件后,在下列事件函数里处理该事件:
具体处理过程如下:
1) 调用PG::proc_replica_log()处理日志。通过日志的对比,获取该OSD上处于missing状态的对象列表;
2) 如果peer_missing_requested为空,即所有的获取日志请求返回并处理。如果pg->need_up_thru为true,那么抛出NeedUpThru()事件;否则,直接调用函数post_event(Activate(pg->get_osdmap()->get_epoch()))进入Activate
流程。
注:logevt.msg->missing是从OSD进行PG加载时自己发现所缺失的对象
PG::proc_replica_log()函数
下面我们来看PG::proc_replica_log()函数的具体实现:
从上面可以看到,其实际是调用PGLog::proc_replica_log()来进行处理:它通过比较该OSD的日志和本地权威日志,来计算该OSD上处于missing状态的对象列表。下面我们来看该函数的是实现:
PGLog::proc_replica_log()所做的事情基本上是:rewind从OSD日志、drop分歧的日志条目,从而与权威日志达成一致。之后更新该从OSD的peer_info.last_update以精确的反应该从OSD所丢失的对象。
之后在PG::activate()函数中,missing将再次向前推进,我们将向Peer发送足够的日志信息,从而使各个副本达成一致。
下面我们来看具体的处理流程:
1) 如果从OSD的日志olog.head小于当前权威日志log.tail,没有日志重叠,不能处理此种情况,直接返回;
2) 如果从OSD的日志olog.head等于当前权威日志log.head,说明没有分歧对象,直接返回;
3) 从后往前遍历当前的权威日志log,与从OSD的olog进行对比,从而找出第一条非分歧日志(first_none_divergent);
注:上面的lu为该从OSD在合并权威日志后,其应该对应的last_update值
4) 找出分歧日志,并将分歧日志加入到divergent
列表中
void PGLog::proc_replica_log(
ObjectStore::Transaction& t,
pg_info_t &oinfo, const pg_log_t &olog, pg_missing_t& omissing,
pg_shard_t from) const
{
…
list<pg_log_entry_t> divergent;
list<pg_log_entry_t>::const_iterator pp = olog.log.end();
while (true) {
if (pp == olog.log.begin())
break;
--pp;
const pg_log_entry_t& oe = *pp;
// don't continue past the tail of our log.
if (oe.version <= log.tail) {
++pp;
break;
}
if (oe.version <= lu) {
++pp;
break;
}
divergent.push_front(oe);
}
... }
4) 把接收到的从OSD的日志中的非分歧部分构造成IndexedLog,之后调用_merge_divergent_entries()来继续处理
void PGLog::proc_replica_log(
ObjectStore::Transaction& t,
pg_info_t &oinfo, const pg_log_t &olog, pg_missing_t& omissing,
pg_shard_t from) const
{
…
IndexedLog folog;
folog.log.insert(folog.log.begin(), olog.log.begin(), pp);
folog.index();
_merge_divergent_entries(
folog,
divergent,
oinfo,
olog.can_rollback_to,
omissing,
0,
0,
this);
if (lu < oinfo.last_update) {
dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
oinfo.last_update = lu;
}
.... } 关于_merge_divergent_entries()我们前面已经介绍过,其会构造出一个missing对象列表,存放于参数omissing中;之后可能还需要修正oinfo.last_update值;
5) 如果合并日志后,发现从OSD的omissing不为空,那么说明该从OSD有对象丢失,可能需要修正oinfo.last_complete的值;否则,说明该OSD没有对象丢失,直接将oinfo.last_complete更新为oinfo.last_update。
2. activate操作
由上可知,如果GetMissing()处理成功,就会抛出Activate()
事件,从而执行activate流程。进行到本阶段为止,可以说Peering主要工作
已经完成,剩下的后续流程就是激活各个副本。搜索Activate
,我们发现如下几个地方会处理该事件:
这里对于PG Primary来说,就是跳转到Active状态。我们来看Active状态构造函数的实现:
在状态Active的构造函数里,其处理过程如下:
1) 在构造函数里初始化了remote_shards_to_reserve_recovery和remote_shards_to_reserve_backfill,即需要Recovery操作和Backfill操作的OSD;
2)调用函数PG::start_flush()来完成相关数据的flush工作;
3)调用函数PG::activate()完成最后的激活工作;
2.1 MissingLoc
在讲解PG::activate()函数之前,我们先介绍类MissingLoc。MissingLoc用来记录missing状态对象的位置,也就是缺失对象的正确版本分布在哪些OSD上。恢复这些时,就去这些OSD上去拉取正确对象的数据:
下面介绍一下各字段的含义:
-
needs_recovery_map: 需要进行recovery操作的对象列表。key为缺失的对象,value为item(缺失版本,现在版本)
-
missing_loc: 缺失对象的正确版本在哪些OSD上
-
missing_loc_source: 所有缺失对象所在的OSD集合
下面介绍一些MissingLoc的处理函数:
2.1.1 MissingLoc::add_active_missing()
add_active_missing()函数的作用是把参数missing
中的缺失对象添加到MissingLoc的needs_recovery_map里。
2.1.2 MissingLoc::add_source_info()
add_source_info()函数用于计算每个缺失的对象是否在参数(fromosd)指定的OSD上。下面我们来看具体实现:
遍历MissingLoc::needs_recovery_map里的所有对象,对每个对象做如下处理:
1) 如果oinfo.last_update小于need(所需的缺失对象版本),直接跳过;
2) 如果该PG副本的last_backfill指针小于MAX值,说明还处于Backfill阶段,但是sort_bitwise不正确,跳过;
3) 如果该对象大于等于last_backfill,显然该对象不存在,跳过;
4) 如果该对象大于last_complete,说明该对象或者是上次Peering之后缺失的对象,还没来得及修复;或者是新创建的对象。检查如果在missing记录已经存在,就是上次缺失的对象;否则就是新创建的对象,存在该OSD中
5) 经过上述检查后,确认该对象在参数(fromosd)指定的OSD上,在MissingLoc::missing_loc中添加该对象的location为fromosd。
2.1.3 PG::search_for_missing()
PG::search_for_missing()其实就是调用MissingLoc::add_source_info()函数,检查副本from
是否有我们当前所缺失的对象。
2.2 PG::activate()操作
PG::activate()函数是Peering过程的最后一步,该函数完成以下功能:
我们来看具体的处理流程:
1) 如果PG在上一个Interval时的所有OSD副本在当前interval
(current interval)下均崩溃了,说明时崩溃恢复场景。此时,如果需要客户回答,就把PG添加到replay_queue队列里;
2)更新info.last_epoch_started变量。info.last_epoch_started指的是PG的当前OSD副本在完成目前Peering进程后的更新;而info.history.last_epoch_started是PG的所有OSD副本都确认完成Peering的更新;
3) 更新一些相关的字段
4) 注册C_PG_ActivateCommitted回调函数,该函数最终完成activate的工作
注:对于PG Primary来说,其也要激活本身,当transaction完成回调C_PG_ActivateCommitted,就表示PG Primary激活完成;对于PG Replicas而言,其同样会调用PG::activate(),然后注册C_PG_ActivateCommitted回调函数,在该副本激活完成,就会在该回调函数中向PG Primary报告本副本激活情况。
5) 初始化snap_trimq快照相关的变量;
6)设置info.last_complete指针:
-
如果missing.num_missing()等于0,表明处于clean状态。直接更新info.last_complete等于info.last_update,并调用pg_log.reset_recovery_pointers()调整log的complete_to指针;
-
否则,如果有需要恢复的对象,就调用函数pg_log.activate_not_complete(info),设置info.last_complete为缺失的第一个对象的前一版本;
7)以下都是主OSD的操作,给每个从OSD发送MOSDPGLog类型的消息,激活该PG的从OSD上的副本。分别对应三种不同处理:
-
如果pi.last_update等于info.last_update,且不需要执行backfill,这种情况下,该OSD本身就是clean的,不需要给该OSD发送其他信息。添加到activator_map只发送pg_info来激活从OSD。其最终的执行是在PeeringWQ的线程函数OSD::process_peering_events()执行完状态机的事件处理后,在函数OSD::dispatch_context()里调用OSD::do_infos()函数实现;
-
对于需要backfill操作的OSD,发送pg_info,以及osd_min_pg_log_entries数量的PG日志
-
对于需要Recovery操作的OSD,发送pg_info,以及从OSD所缺失的日志
之后调用OSD::send_message_osd_cluster()将MOSDPGLog消息发送给从OSD
8)设置MissingLoc,也就是统计缺失的对象,以及缺失对象所在的OSD(即恢复时可以从哪些OSD上找到),核心就是调用MissingLoc::add_source_info()函数,见上面的MissingLoc分析
9) 如果有任何一个副本需要恢复,把该PG加入到osd->queue_for_recovery(this)队列中
注:complete_shards就是不需要进行recovery操作的PG副本集合
这里计算每一个PG副本(包括PG Primary、PG Replicas)所缺失的对象可以从哪些OSD来恢复时,来用了一点小小的技巧:假如只有一个副本有object丢失,则直接调用MissingLoc::add_batch_sources_info()来批量添加,即可以直接从其他所有副本中恢复missing objects;否则,调用MissingLoc::add_source_info()将PG Primary本身以及其他PG Replicas都添加到MissingLoc中。
之后,调用PG::search_for_missing()来检查peer_missing列表中的OSD是否有我们想要的object。
之后,调用PG::build_might_have_unfound()来构建PG::might_have_unfound集合,即我们当前所缺失的对象有可能在哪些OSD上能够找到。
之后,再调用OSD::queue_for_recovery(this)将当前PG添加到恢复队列中。
最后,假如PG Primary上仍然还有一些对象找不到,调用PG::discovery_all_missing()来在might_have_unfound集合中查找。
10)如果PG当前actingset的size小于该PG在当前OSDMap中的副本数,也就是当前的OSD不够,就标记PG的状态为PG_STATE_DEGRADED
和PG_STATE_UNDERSIZED
,最后标记PG为PG_STATE_ACTIVATING
状态。
2.3 激活流程
在上面我们讲到PG Primary会向副本发送MOSDPGInfo
或MOSDPGLog
消息来激活:
这里我们给出这一激活流程图:
1)PG Primary发送MOSDPGInfo消息到PG Replica
2) PG Primary发送MOSDPGLog消息到PG Replica
2.4 收到从OSD的MOSDPGLog的应对
当收到从OSD发送的MOSDPGLog的ACK消息后(即上面激活流程图中的MOSDPGInfo消息),触发MInfoRec事件,下面这个函数处理该事件:
处理过程比较简单:检查该请求的源OSD在本PG的actingbackfill列表中,之后将其从PG::blocked_by中删除。最后检查,当收集到所有的从OSD发送的ACK,就调用函数PG::all_activated_and_committed()触发AllReplicasActivated
事件。
对应主OSD在事务回调函数C_PG_ActivateCommitted里实现,最终调用PG::_activate_committed()加入PG::peer_activated集合里。
2.5 AllReplicasActivated事件处理
如下函数处理AllReplicasActivated
事件:
当所有的replica处于activated状态时,进行如下处理:
1)取消PG_STATE_ACTIVATING
和PG_STATE_CREATING
状态,如果该pg->acting.size()大于等于pool的min_size,设置该PG为PG_STATE_ACTIVE
状态,否则设置为PG_STATE_PEERED
状态。
2) ReplicatedPG::check_local()检查本地的stray对象是否都被删除;
3) 如果有读写请求在等待Peering操作完成,则把该请求添加到处理队列pg->requeue_ops(pg->waiting_for_peered)
4) 调用函数ReplicatedPG::on_activate(),如果需要Recovery操作,触发DoRecovery()事件;如果需要Backfill操作,触发RequestBackfill()事件;否则触发AllReplicasRecovered()事件。
5)初始化Cache Tier需要的hit_set对象;
5)初始化Cache Tier需要的agent对象
2.6 补充
在上面的ReplicatedPG::on_activate()中,会调用PG::needs_recovery()和PG::needs_backfill()来判断是否要执行recovery动作和backfill动作。下面我们来分别看一下这两个函数:
2.6.1 PG::needs_recovery()
从上面我们看到,实现也比较简单:首先判断PG Primary自身是否有missing对象,如果有则直接返回true;接着遍历actingbackfill,看PG的Replicas是否有missing对象,如果有直接返回true。
2.6.2 PG::needs_backfill()
基本处理流程为:遍历PG::backfill_targets列表,发现有一个副本需要backfill,则直接返回true。
3. 副本端的状态转移
当创建PG后,根据不同的角色,如果是主OSD,PG对应的状态机就进入了Primary状态。如果不是主OSD,就进入Stray状态。
3.1 Stray状态
Stray状态有两种情况:
从PG接收到主PG发送的MInfoRec事件,也就是接收到主OSD发送的pg_info信息。其判断如果当前pg->info.last_update大于infoevt.info.last_update,说明当前的日志有divergent的日志,调用函数rewind_divergent_log()清理日志即可。最后抛出Activate(infoevt.info.last_epoch_started)事件,进入ReplicaActive状态。
注:上面rewind_divergent_log()只需要将相应的指针回退即可,并不需要擦除相关数据
当从PG接收到MLogRec事件,就对应着接收到主PG发送的MOSDPGLog消息,其通知从PG执行activate操作,具体处理过程如下:
1) 如果msg->info.last_backfill为hobject_t(),则说明本副本是需要执行Backfill的OSD
2)否则就是需要执行Recovery操作的OSD,调用PG::merge_log()把主OSD发送过来的日志合并
抛出Activate(logevt.msg->info.last_epoch_started)事件,使副本转移到ReplicaActive状态。
3.2 ReplicaActive状态
对于副本PG,在Stray状态下接收到MInfoRec或MLogRec事件后,会抛出Activate()事件,并跳转到ReplicaActive状态。下面我们来看处理过程:
当处于ReplicaActive状态,接收到Activate()事件,就调用函数PG::activate(),在函数PG::_activate_committed()给PG Primary发送应答信息,告诉PG Primary自己处于激活
(acitvate)状态,设置PG为Active或Peered状态。
注:参看PG::_activate_committed()实现,如果acting.size()大于等于pool.info.min_size,则将PG设置为Active状态,否则设置为Peered状态。
4. 状态机异常处理
在上面的流程介绍中,只介绍了正常状态机的转换流程。Ceph之所以用状态机来实现PG的状态转换,就是可以实现任何异常情况下的处理。下面介绍当OSD失效时导致相关的PG重新进行Peering的机制。
当一个OSD失效,Monitor会通过heartbeat检测到,导致OSDMap发生了变化,Monitor会把最新的osdmap推送给OSD,导致OSD上受到影响的PG重新进行Peering操作。
具体流程如下:
1) 在函数OSD::handle_osd_map()处理osdmap的变化,该函数调用OSD::consume_map(),对每一个PG调用pg->queue_null,把PG加入到peering_wq中;
2) peering_wq的处理函数OSD::process_peering_events()调用OSD::advance_pg()函数,在该函数里调用PG::handle_advance_map()给PG状态机RecoveryMachine发送AdvMap事件:
当处于Started状态,接收到AdvMap事件,调用函数pg->should_restart_peering()检查,如果时new_interval,就跳转到Reset状态,重新开始一次Peering过程。
5. 小结
本章介绍了Ceph的Peering流程,其核心过程就是通过各个OSD上保存的PG日志选择出一个权威日志的OSD。以该OSD上的日志为基础,对比其他OSD上的日志记录,计算出各个OSD上缺失的对象信息。这样,PG就使各个OSD的数据达成了一致。
[参看]
-
ceph博客
-
ceph官网
-
PEERING
-
分布式系统
-
分布式存储Ceph之PG状态详解