nginx缓冲区优化
本章我们介绍一下Nginx中用到的一些缓冲,通过合理的设置缓冲区大小,可以改善整个程序的性能。
本章我们介绍一下Nginx中用到的一些缓冲,通过合理的设置缓冲区大小,可以改善整个程序的性能。
本章来编写我们的第一个nginx模块。当前环境为:
# uname -a Linux localhost.localdomain 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux # cat /etc/redhat-release CentOS Linux release 7.3.1611 (Core)
本章我们讲述一下如何为Nginx编译第三方动态加载模块。我们当前的操作系统环境为:
# uname -a Linux localhost.localdomain 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux # cat /etc/redhat-release CentOS Linux release 7.3.1611 (Core)
分布式存储系统通常采用多副本的方式来保证系统的可靠性,而多副本之间如何保证数据的一致性就是系统的核心。Ceph号称统一存储,其核心RADOS既支持多副本,也支持纠删码。本文主要分析Ceph的多副本一致性协议。
ceph的PGLog是由PG来维护,记录了该PG的所有操作。其作用类似于数据库里的undo log。PGLog通常只保存近千条的操作记录(默认是3000条),但是当PG处于降级状态时,就会保存更多的日志(默认时10000条),这样就可以在故障的PG重新上线后用来恢复PG的数据。本文主要从PGLog的格式、存储方式、如何参与恢复来解析PGLLog。
ceph的PGLog是由PG来维护,记录了该PG的所有操作,其作用类似于数据库里的undo log。PGLog通常只保存近千条的操作记录(默认是3000条, 由osd_min_pg_log_entries指定),但是当PG处于降级状态时,就会保存更多的日志(默认是10000条),这样就可以在故障的PG重新上线后用来恢复PG的数据。本文主要从PG的格式、存储方式、如何参与恢复来解析PGLog。
分布式系统中经常需要考虑对象(或者记录、文件、数据块等)的读写顺序以及并发访问问题。通常来说,如果两个对象没有共享的资源,就可以进行并发的访问;如果有共享的部分,就需要对这部分资源进行加锁。而对于同一个对象的并发读写(尤其是并发写更新时),就需要注意顺序性以及并发访问的控制,以免数据错乱。本文主要针对ceph中对象读写的顺序及并发性保证机制进行介绍。
本章介绍Ceph的一致性检查工具Scrub机制。首先介绍数据校验的基本知识,其次介绍Scrub的基本概念,然后介绍Scrub的调度机制,最后介绍Scrub具体实现的源代码分析。
本文接续上文《ceph recovery研究(1)》,继续讲解ceph的数据修复过程。
当PG完成了Peering过程后,处于Active状态的PG就可以对外提供服务了。如果该PG的各个副本上有不一致的对象,就需要进行修复。Ceph的修复过程有两种:Recovery和Backfill。
void ReplicatedPG::do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle)
{
assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
if (can_discard_request(op)) {
return;
}
if (flushes_in_progress > 0) {
dout(20) << flushes_in_progress << " flushes_in_progress pending " << "waiting for active on " << op << dendl;
waiting_for_peered.push_back(op);
op->mark_delayed("waiting for peered");
return;
}
if (!is_peered()) {
// Delay unless PGBackend says it's ok
if (pgbackend->can_handle_while_inactive(op)) {
bool handled = pgbackend->handle_message(op);
assert(handled);
return;
} else {
waiting_for_peered.push_back(op);
op->mark_delayed("waiting for peered");
return;
}
}
在上一章我们讲述到PG Primary获取到权威日志之后,会进入到GetMissing阶段。本文介绍GetMissing阶段的处理,包括日志操作以及Active操作。
我们在上一章讲述到在GetInfo状态下抛出GotInfo事件,会直接跳转到GetLog阶段。本章我们就从GetLog开始,继续讲述Ceph的Peering过程。
在前面的章节中,我们从OSD接收到新的OSDMap开始讲起,然后讲述到其会向状态机投递两个事件:
在上文我们讲到当接收到新的OSDMap,会向该OSD上的所有PG所有PG投递CephPeeringEvt事件。本章我们从该事件讲起,详细地讲述Peering地整个过程。
本章我们会先讲述一下PriorSet以及pg_info_t数据结构,然后开始正式的讲解Ceph的Peering流程。
ceph的Peering过程是一个十分复杂的流程,其主要的目的是使一个PG内的OSD达成一个一致的状态。当主从副本达成一个一致的状态后,PG处于active状态,Peering过程的状态就结束了。但此时PG的三个OSD副本上的数据并非完全一致。
本节我们介绍一下PG Recovery过程中的一些重要数据结构。
class PG : DoutPrefixProvider {
public:
struct BufferedRecoveryMessages {
map<int, map<spg_t, pg_query_t> > query_map;
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
};
struct RecoveryCtx {
utime_t start_time;
map<int, map<spg_t, pg_query_t> > *query_map;
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map;
map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list;
set<PGRef> created_pgs;
C_Contexts *on_applied;
C_Contexts *on_safe;
ObjectStore::Transaction *transaction;
ThreadPool::TPHandle* handle;
RecoveryCtx(map<int, map<spg_t, pg_query_t> > *query_map,
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map,
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list,
C_Contexts *on_applied,
C_Contexts *on_safe,
ObjectStore::Transaction *transaction)
: query_map(query_map), info_map(info_map),
notify_list(notify_list),
on_applied(on_applied),
on_safe(on_safe),
transaction(transaction),
handle(NULL) {}
RecoveryCtx(BufferedRecoveryMessages &buf, RecoveryCtx &rctx)
: query_map(&(buf.query_map)),
info_map(&(buf.info_map)),
notify_list(&(buf.notify_list)),
on_applied(rctx.on_applied),
on_safe(rctx.on_safe),
transaction(rctx.transaction),
handle(rctx.handle) {}
void accept_buffered_messages(BufferedRecoveryMessages &m) {
assert(query_map);
assert(info_map);
assert(notify_list);
for (map<int, map<spg_t, pg_query_t> >::iterator i = m.query_map.begin();i != m.query_map.end();++i) {
map<spg_t, pg_query_t> &omap = (*query_map)[i->first];
for (map<spg_t, pg_query_t>::iterator j = i->second.begin();j != i->second.end();++j) {
omap[j->first] = j->second;
}
}
for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i = m.info_map.begin();i != m.info_map.end();++i) {
vector<pair<pg_notify_t, pg_interval_map_t> > &ovec = (*info_map)[i->first];
ovec.reserve(ovec.size() + i->second.size());
ovec.insert(ovec.end(), i->second.begin(), i->second.end());
}
for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i = m.notify_list.begin();i != m.notify_list.end();++i) {
vector<pair<pg_notify_t, pg_interval_map_t> > &ovec = (*notify_list)[i->first];
ovec.reserve(ovec.size() + i->second.size());
ovec.insert(ovec.end(), i->second.begin(), i->second.end());
}
}
};
};
RecoveryCtx作为一次恢复操作的上下文,我们介绍一下其中几个比较重要的字段:
query_map: 用于缓存PG Query查询信息,后续会将这些缓存信息构造成MOSDPGQuery消息,然后发送到对应的OSD上。query_map的key部分为OSD的序号。
info_map: 用于缓存pg_notify_t信息,后续会将这些缓存信息构造成MOSDPGInfo查询的消息,然后发送到对应的OSD上。info_map的key部分为OSD的序号。
notify_list:用于缓存pg_notify_t信息,后续会将这些缓存信息构造成MOSDPGNotify消息,然后发送到对应的OSD上。notify_list的key部分为OSD的序号
transaction:本RecoveryCtx所关联的事务。在恢复过程中可能涉及到需要将相关信息持久化,就通过此transaction来完成
handle:ThreadPool::TPHandle的主要作用在于监视线程池中每一个线程的执行时常。每次线程函数执行时,都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致自杀。这里向transaction对应的ObjectStore传入handle参数,主要是为了处理超时方面的问题
int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
TrackedOpRef osd_op,
ThreadPool::TPHandle *handle)
{
...
if (handle)
handle->suspend_tp_timeout();
op_queue_reserve_throttle(o);
journal->reserve_throttle_and_backoff(tbl.length());
if (handle)
handle->reset_tp_timeout();
...
}
在OSD中,通常使用如下函数来创建RecoveryCtx对象:
// ----------------------------------------
// peering and recovery
PG::RecoveryCtx OSD::create_context()
{
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_Contexts *on_applied = new C_Contexts(cct);
C_Contexts *on_safe = new C_Contexts(cct);
map<int, map<spg_t,pg_query_t> > *query_map =
new map<int, map<spg_t, pg_query_t> >;
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list =
new map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >;
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map =
new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;
PG::RecoveryCtx rctx(query_map, info_map, notify_list,on_applied, on_safe, t);
return rctx;
}
之后,调用如下函数将对应map里面的数据发送出去:
void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle)
{
if (service.get_osdmap()->is_up(whoami) &&is_active()) {
do_notifies(*ctx.notify_list, curmap);
do_queries(*ctx.query_map, curmap);
do_infos(*ctx.info_map, curmap);
}
delete ctx.notify_list;
delete ctx.query_map;
delete ctx.info_map;
if ((ctx.on_applied->empty() &&ctx.on_safe->empty() &&
ctx.transaction->empty() && ctx.created_pgs.empty()) || !pg) {
delete ctx.transaction;
delete ctx.on_applied;
delete ctx.on_safe;
assert(ctx.created_pgs.empty());
} else {
if (!ctx.created_pgs.empty()) {
ctx.on_applied->add(new C_OpenPGs(ctx.created_pgs, store));
}
int tr = store->queue_transaction(
pg->osr.get(),
std::move(*ctx.transaction), ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(),
handle);
delete (ctx.transaction);
assert(tr == 0);
}
}
class PG : DoutPrefixProvider {
public:
struct NamedState {
const char *state_name;
utime_t enter_time;
const char *get_state_name() { return state_name; }
NamedState(CephContext *cct_, const char *state_name_)
: state_name(state_name_),
enter_time(ceph_clock_now(cct_)) {}
virtual ~NamedState() {}
};
};
NamedState主要是用于对一种状态进行命名。
Ceph在处理PG的状态转换时,使用了boost库提供的statechart状态机。因此,这里先简单介绍一下statechart状态机的基本概念和涉及的相关知识,以便更好地理解Peering过程PG的状态转换流程。
在statechart里,状态的定义有两种方式:
//boost
template< class MostDerived,
class Context,
class InnerInitial = mpl::list<>,
history_mode historyMode = has_no_history >
class state : public simple_state<
MostDerived, Context, InnerInitial, historyMode >
{
};
//ceph
struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState {
};
定义一个状态需要继承boost::statechart::simple_state或者boost::statechart::state类。上面Reset状态继承了boost::statechart::state类。该类的模板参数中,第一个参数为状态机自己的名字Reset,第二个参数为所属状态机的名字,表明Reset是状态机RecoveryMachine的一个状态。
struct Start;
struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
};
状态Started
也是状态机RecoveryMachine的一个状态,模板参数中多了一个参数Start
,它是状态Started
的默认初始子状态,其定义如下:
struct Start : boost::statechart::state< Start, Started >, NamedState {
};
上面定义的状态Start
是状态Started的子状态。第一个模板参数是自己的名字,第二个模板参数是该子状态所属父状态的名字。
综上所述,一个状态,要么属于一个状态机,要么属于一个状态,成为该状态的子状态。其定义的模板参数是自己,第二个模板参数是拥有者,第三个模板参数是它的起始子状态。
状态能够接收并处理事件。事件可以改变状态,促使状态发生转移。在boost库的statechart状态机中定义事件的方式如下所示:
struct QueryState : boost::statechart::event< QueryState >{
};
QueryState为一个事件,需要继承boost::statechart::event类,模板参数为自己的名字。
在一个状态内部,需要定义状态机处于当前状态时,可以接受的事件以及如何处理这些事件的方法:
#define TrivialEvent(T) struct T : boost::statechart::event< T > { \
T() : boost::statechart::event< T >() {} \
void print(std::ostream *out) const { \
*out << #T; \
} \
};
TrivialEvent(Initialize)
TrivialEvent(Load)
TrivialEvent(NullEvt)
struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
explicit Initial(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::transition< Initialize, Reset >,
boost::statechart::custom_reaction< Load >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const Load&);
boost::statechart::result react(const MNotifyRec&);
boost::statechart::result react(const MInfoRec&);
boost::statechart::result react(const MLogRec&);
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
上述代码列出了状态RecoveryMachine/Initial可以处理的事件列表和处理对应事件的方法:
1) 通过boost::mpl::list定义该状态可以处理多个事件类型。在本例中可以处理Initialize
、Load
、NullEvt
和event_base
事件。
2)简单事件处理
boost::statechart::transition< Initialize, Reset >
定义了状态Initial接收到事件Initialize后,无条件直接跳转到Reset状态;
3) 用户自定义事件处理: 当接收到事件后,需要根据一些条件来决定状态如何转移,这个逻辑需要用户自己定义实现
boost::statechart::custom_reaction< Load >
custom_reaction定义了一个用户自定义的事件处理方法,必须有一个react()的处理函数处理对应该事件。状态转移的逻辑需要用户自己在react函数里实现:
boost::statechart::result react(const Load&);
4) NullEvt事件用户自定义处理,但是没有实现react()函数来处理,最终事件匹配了boost::statechart::event_base事件,直接调用函数discard_event把事件丢弃掉。
1) 对于NullEvt事件
boost::statechart::custom_reaction< NullEvt >
但是我们却并没有定义该事件的reaction函数。因为NullEvt继承自boost::statechart::event_base,因此其会调用如下函数来进行处理:
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
关于这一点,我们可以通过后面的statechart示例
来得到验证;
2) 对于MNotifyRec
、MInfoRec
以及MLogRec
事件
对于上面的3个事件,我们发现并没有将其添加到boost::mpl::list中,对于此类事件,会通过如下语句来进行处理:
boost::statechart::transition< boost::statechart::event_base, Crashed >
关于这一点,我们也可以通过后面的statechart示例
来得到验证;
3) 对于boost::statechart::event_base事件,似乎有两种不同的处理方式
struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
typedef boost::mpl::list <
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
这里我们通过试验发现:
boost::statechart::transition< boost::statechart::event_base, Crashed >
对于添加到了boost::mpl::list中的事件,如果直接找不到对应的react函数,且没有定义其父类型boost::statechart::event_base的react函数,那么直接会编译时报错
对于添加到了boost::mpl::list中的事件,如果直接找不到对应的react函数的话,但是定义了其父类型boost::statechart::event_base的react函数,那么会调用其父类型的react函数来进行处理
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
RecoveryMachine为定义的状态机,需要继承boost::statechart::state_machine类:
struct Initial;
class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
};
模板参数第一个参数为自己的名字,第二个参数为状态机默认的初始状态Initial。
状态机的基本操作有两个:
machine.initiate();
initiate()
是继承自boost::statechart::state_machine的成员函数。
machine.process_event(evt);
process_event()
也是继承自boost::statechart::state_machine的成员函数。
context是状态机的一个比较有用的函数,它可以获取当前状态的所有祖先状态的指针。通过它可以获取父状态以及祖先状态的一些内部参数和状态值。context()函数是实现在boost::statechart::state_machine中的:
// Returns a reference to the context identified by the template
// parameter. This can either be _this_ object or one of its direct or
// indirect contexts.
template< class Context >
Context & context()
{
// As we are in the outermost context here, only this object can be
// returned.
return *polymorphic_downcast< MostDerived * >( this );
}
template< class Context >
const Context & context() const
{
// As we are in the outermost context here, only this object can be
// returned.
return *polymorphic_downcast< const MostDerived * >( this );
}
同时context()函数在boost::statechart::simple_state中也有实现:
template< class OtherContext >
OtherContext & context()
{
typedef typename mpl::if_<
is_base_of< OtherContext, MostDerived >,
context_impl_this_context,
context_impl_other_context
>::type impl;
return impl::template context_impl< OtherContext >( *this );
}
template< class OtherContext >
const OtherContext & context() const
{
typedef typename mpl::if_<
is_base_of< OtherContext, MostDerived >,
context_impl_this_context,
context_impl_other_context
>::type impl;
return impl::template context_impl< OtherContext >( *this );
}
从simple_state的实现来看,可以获取当前状态的祖先状态指针,也可以获取当前状态所属状态机的指针。
例如状态Started
是RecoveryMachine的一个状态,状态Start是Started状态的一个子状态,那么如果当前状态是Start
,就可以通过该函数获取它的父状态Started
的指针:
Started * parent = context< Started >();
同时也可以获取其祖先状态RecoveryMachine的指针:
RecoveryMachine *machine = context< RecoveryMachine >();
综上所述,context()函数为获取当前状态的祖先状态上下文提供了一种方法。
事件除了在状态转移列表中触发状态转移,或者进入用户自定义的状态处理函数,还可以有下列特殊的处理方式:
transit
来直接跳转到目标状态。例如:boost::statechart::result PG::RecoveryState::Initial::react(const MLogRec& i)
{
PG *pg = context< RecoveryMachine >().pg;
assert(!pg->is_primary());
post_event(i);
return transit< Stray >();
}
可以直接跳转到状态Stray
。
PG::RecoveryState::Start::Start(my_context ctx)
: my_base(ctx),
NamedState(context< RecoveryMachine >().pg->cct, "Start")
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
if (pg->is_primary()) {
dout(1) << "transitioning to Primary" << dendl;
post_event(MakePrimary());
} else { //is_stray
dout(1) << "transitioning to Stray" << dendl;
post_event(MakeStray());
}
}
boost::statechart::result PG::RecoveryState::Primary::react(const ActMap&)
{
dout(7) << "handle ActMap primary" << dendl;
PG *pg = context< RecoveryMachine >().pg;
pg->publish_stats_to_osd();
pg->take_waiters();
return discard_event();
}
boost::statechart::result PG::RecoveryState::WaitUpThru::react(const ActMap& am)
{
PG *pg = context< RecoveryMachine >().pg;
if (!pg->need_up_thru) {
post_event(Activate(pg->get_osdmap()->get_epoch()));
}
return forward_event();
}
// Example program
#include <iostream>
#include <string>
#include <boost/statechart/custom_reaction.hpp>
#include <boost/statechart/event.hpp>
#include <boost/statechart/simple_state.hpp>
#include <boost/statechart/state.hpp>
#include <boost/statechart/state_machine.hpp>
#include <boost/statechart/transition.hpp>
#include <boost/statechart/event_base.hpp>
#define TrivialEvent(T) struct T : boost::statechart::event< T > { \
T() : boost::statechart::event< T >() {} \
void print(std::ostream *out) const { \
*out << #T; \
} \
};
TrivialEvent(Initialize)
TrivialEvent(Load)
TrivialEvent(NullEvt)
TrivialEvent(GoClean)
struct MInfoRec : boost::statechart::event< MInfoRec > {
std::string name;
MInfoRec(std::string name): name(name){
}
void print(){
std::cout<<"MInfoRec: "<<name<<"\n";
}
};
struct MLogRec : boost::statechart::event< MLogRec > {
std::string name;
MLogRec(std::string name): name(name){
}
void print(){
std::cout<<"MLogRec: "<<name<<"\n";
}
};
struct MNotifyRec : boost::statechart::event< MNotifyRec > {
std::string name;
MNotifyRec(std::string name): name(name){
}
void print(){
std::cout<<"MNotifyRec: "<<name<<"\n";
}
};
struct Initial;
struct RecoveryMachine : boost::statechart::state_machine< RecoveryMachine, Initial > {};
struct Reset;
struct Crashed : boost::statechart::state< Crashed, RecoveryMachine > {
explicit Crashed(my_context ctx) : my_base(ctx)
{
std::cout << "Hello, Crashed!\n";
}
};
struct Initial : boost::statechart::state< Initial, RecoveryMachine > {
typedef boost::mpl::list <
boost::statechart::transition< Initialize, Reset >,
boost::statechart::custom_reaction< Load >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
explicit Initial(my_context ctx) : my_base(ctx)
{
std::cout << "Hello, Initial!\n";
}
boost::statechart::result react(const Load& l){
return transit< Reset >();
}
boost::statechart::result react(const MNotifyRec& notify){
std::cout<<"Initial::react::MLogRec!\n";
return discard_event();
}
boost::statechart::result react(const MInfoRec& i){
std::cout<<"Initial::react::MNotifiyRec!\n";
return discard_event();
}
boost::statechart::result react(const MLogRec& log){
std::cout<<"Initial::react::MLogRec!\n";
return discard_event();
}
boost::statechart::result react(const boost::statechart::event_base&) {
std::cout << "Initial event_base processed!\n";
return discard_event();
}
void exit() {
std::cout << "Bye, Initial!\n";
}
};
struct Reset : boost::statechart::state< Reset, RecoveryMachine > {
explicit Reset(my_context ctx) : my_base(ctx)
{
std::cout << "Hello, Reset!\n";
}
void exit() {
std::cout << "Bye, Reset!\n";
}
};
int main(int argc, char *argv[])
{
RecoveryMachine machine;
machine.initiate();
//machine.process_event(NullEvt()); //语句1
//machine.process_event(GoClean()); //语句2
//machine.process_event(MNotifyRec("notify record")); //语句3
return 0x0;
}
上面的示例与PG中对于Initial状态的处理类似,下面我们来看一下分别执行上述语句时的打印情况:
Hello, Initial! Initial event_base processed!
Hello, Initial! Bye, Initial! Hello, Crashed!
Hello, Initial! Bye, Initial! Hello, Crashed!
下面我们列出Recovery过程中的所有事件:
QueryState
MInfoRec
MLogRec
MNotifyRec
MQuery
AdvMap
ActMap
Activate
RequestBackfillPrio
TrivialEvent事件
class PG : DoutPrefixProvider {
public:
#define TrivialEvent(T) struct T : boost::statechart::event< T > { \
T() : boost::statechart::event< T >() {} \
void print(std::ostream *out) const { \
*out << #T; \
} \
};
TrivialEvent(Initialize)
TrivialEvent(Load)
TrivialEvent(GotInfo)
TrivialEvent(NeedUpThru)
TrivialEvent(CheckRepops)
TrivialEvent(NullEvt)
TrivialEvent(FlushedEvt)
TrivialEvent(Backfilled)
TrivialEvent(LocalBackfillReserved)
TrivialEvent(RemoteBackfillReserved)
TrivialEvent(RemoteReservationRejected)
TrivialEvent(RequestBackfill)
TrivialEvent(RequestRecovery)
TrivialEvent(RecoveryDone)
TrivialEvent(BackfillTooFull)
TrivialEvent(AllReplicasRecovered)
TrivialEvent(DoRecovery)
TrivialEvent(LocalRecoveryReserved)
TrivialEvent(RemoteRecoveryReserved)
TrivialEvent(AllRemotesReserved)
TrivialEvent(AllBackfillsReserved)
TrivialEvent(Recovering)
TrivialEvent(GoClean)
TrivialEvent(AllReplicasActivated)
TrivialEvent(IntervalFlush)
};
MakePrimary
MakeStray
NeedActingChange
IsIncomplete
GotLog
SnapTrim
Reset
SnapTrimReserved
SnapTrimTimerReady
struct Crashed : boost::statechart::state< Crashed, RecoveryMachine >, NamedState {
explicit Crashed(my_context ctx);
};
struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
explicit Initial(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::transition< Initialize, Reset >,
boost::statechart::custom_reaction< Load >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const Load&);
boost::statechart::result react(const MNotifyRec&);
boost::statechart::result react(const MInfoRec&);
boost::statechart::result react(const MLogRec&);
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState {
explicit Reset(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::custom_reaction< FlushedEvt >,
boost::statechart::custom_reaction< IntervalFlush >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const FlushedEvt&);
boost::statechart::result react(const IntervalFlush&);
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
explicit Started(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::custom_reaction< FlushedEvt >,
boost::statechart::custom_reaction< IntervalFlush >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const FlushedEvt&);
boost::statechart::result react(const IntervalFlush&);
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
struct Start : boost::statechart::state< Start, Started >, NamedState {
explicit Start(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::transition< MakePrimary, Primary >,
boost::statechart::transition< MakeStray, Stray >
> reactions;
};
struct Primary : boost::statechart::state< Primary, Started, Peering >, NamedState {
explicit Primary(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MNotifyRec >,
boost::statechart::transition< NeedActingChange, WaitActingChange >
> reactions;
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const MNotifyRec&);
};
struct WaitActingChange : boost::statechart::state< WaitActingChange, Primary>,
NamedState {
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MNotifyRec >
> reactions;
explicit WaitActingChange(my_context ctx);
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const MLogRec&);
boost::statechart::result react(const MInfoRec&);
boost::statechart::result react(const MNotifyRec&);
void exit();
};
struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {
std::unique_ptr< PriorSet > prior_set;
bool history_les_bound; //< need osd_find_best_info_ignore_history_les
explicit Peering(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::transition< Activate, Active >,
boost::statechart::custom_reaction< AdvMap >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const AdvMap &advmap);
};
struct Active : boost::statechart::state< Active, Primary, Activating >, NamedState {
explicit Active(my_context ctx);
void exit();
const set<pg_shard_t> remote_shards_to_reserve_recovery;
const set<pg_shard_t> remote_shards_to_reserve_backfill;
bool all_replicas_activated;
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MNotifyRec >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< Backfilled >,
boost::statechart::custom_reaction< AllReplicasActivated >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const MInfoRec& infoevt);
boost::statechart::result react(const MNotifyRec& notevt);
boost::statechart::result react(const MLogRec& logevt);
boost::statechart::result react(const Backfilled&) {
return discard_event();
}
boost::statechart::result react(const AllReplicasActivated&);
};
struct Clean : boost::statechart::state< Clean, Active >, NamedState {
typedef boost::mpl::list<
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >
> reactions;
explicit Clean(my_context ctx);
void exit();
};
struct Recovered : boost::statechart::state< Recovered, Active >, NamedState {
typedef boost::mpl::list<
boost::statechart::transition< GoClean, Clean >,
boost::statechart::custom_reaction< AllReplicasActivated >
> reactions;
explicit Recovered(my_context ctx);
void exit();
boost::statechart::result react(const AllReplicasActivated&) {
post_event(GoClean());
return forward_event();
}
};
struct Backfilling : boost::statechart::state< Backfilling, Active >, NamedState {
typedef boost::mpl::list<
boost::statechart::transition< Backfilled, Recovered >,
boost::statechart::custom_reaction< RemoteReservationRejected >
> reactions;
explicit Backfilling(my_context ctx);
boost::statechart::result react(const RemoteReservationRejected& evt);
void exit();
};
struct WaitRemoteBackfillReserved : boost::statechart::state< WaitRemoteBackfillReserved, Active >, NamedState {
typedef boost::mpl::list<
boost::statechart::custom_reaction< RemoteBackfillReserved >,
boost::statechart::custom_reaction< RemoteReservationRejected >,
boost::statechart::transition< AllBackfillsReserved, Backfilling >
> reactions;
set<pg_shard_t>::const_iterator backfill_osd_it;
explicit WaitRemoteBackfillReserved(my_context ctx);
void exit();
boost::statechart::result react(const RemoteBackfillReserved& evt);
boost::statechart::result react(const RemoteReservationRejected& evt);
};
struct WaitLocalBackfillReserved : boost::statechart::state< WaitLocalBackfillReserved, Active >, NamedState {
typedef boost::mpl::list<
boost::statechart::transition< LocalBackfillReserved, WaitRemoteBackfillReserved >
> reactions;
explicit WaitLocalBackfillReserved(my_context ctx);
void exit();
};
struct NotBackfilling : boost::statechart::state< NotBackfilling, Active>, NamedState {
typedef boost::mpl::list<
boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved>,
boost::statechart::custom_reaction< RemoteBackfillReserved >,
boost::statechart::custom_reaction< RemoteReservationRejected >
> reactions;
explicit NotBackfilling(my_context ctx);
void exit();
boost::statechart::result react(const RemoteBackfillReserved& evt);
boost::statechart::result react(const RemoteReservationRejected& evt);
};
struct ReplicaActive : boost::statechart::state< ReplicaActive, Started, RepNotRecovering >, NamedState {
explicit ReplicaActive(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MQuery >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< Activate >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const MInfoRec& infoevt);
boost::statechart::result react(const MLogRec& logevt);
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const MQuery&);
boost::statechart::result react(const Activate&);
};
struct RepRecovering : boost::statechart::state< RepRecovering, ReplicaActive >, NamedState {
typedef boost::mpl::list<
boost::statechart::transition< RecoveryDone, RepNotRecovering >,
boost::statechart::transition< RemoteReservationRejected, RepNotRecovering >,
boost::statechart::custom_reaction< BackfillTooFull >
> reactions;
explicit RepRecovering(my_context ctx);
boost::statechart::result react(const BackfillTooFull &evt);
void exit();
};
struct RepWaitBackfillReserved : boost::statechart::state< RepWaitBackfillReserved, ReplicaActive >, NamedState {
typedef boost::mpl::list<
boost::statechart::custom_reaction< RemoteBackfillReserved >,
boost::statechart::custom_reaction< RemoteReservationRejected >
> reactions;
explicit RepWaitBackfillReserved(my_context ctx);
void exit();
boost::statechart::result react(const RemoteBackfillReserved &evt);
boost::statechart::result react(const RemoteReservationRejected &evt);
};
struct RepWaitRecoveryReserved : boost::statechart::state< RepWaitRecoveryReserved, ReplicaActive >, NamedState {
typedef boost::mpl::list<
boost::statechart::custom_reaction< RemoteRecoveryReserved >
> reactions;
explicit RepWaitRecoveryReserved(my_context ctx);
void exit();
boost::statechart::result react(const RemoteRecoveryReserved &evt);
};
struct RepNotRecovering : boost::statechart::state< RepNotRecovering, ReplicaActive>, NamedState {
typedef boost::mpl::list<
boost::statechart::custom_reaction< RequestBackfillPrio >,
boost::statechart::transition< RequestRecovery, RepWaitRecoveryReserved >,
boost::statechart::transition< RecoveryDone, RepNotRecovering > // for compat with pre-reservation peers
> reactions;
explicit RepNotRecovering(my_context ctx);
boost::statechart::result react(const RequestBackfillPrio &evt);
void exit();
};
struct Recovering : boost::statechart::state< Recovering, Active >, NamedState {
typedef boost::mpl::list <
boost::statechart::custom_reaction< AllReplicasRecovered >,
boost::statechart::custom_reaction< RequestBackfill >
> reactions;
explicit Recovering(my_context ctx);
void exit();
void release_reservations();
boost::statechart::result react(const AllReplicasRecovered &evt);
boost::statechart::result react(const RequestBackfill &evt);
};
struct WaitRemoteRecoveryReserved : boost::statechart::state< WaitRemoteRecoveryReserved, Active >, NamedState {
typedef boost::mpl::list <
boost::statechart::custom_reaction< RemoteRecoveryReserved >,
boost::statechart::transition< AllRemotesReserved, Recovering >
> reactions;
set<pg_shard_t>::const_iterator remote_recovery_reservation_it;
explicit WaitRemoteRecoveryReserved(my_context ctx);
boost::statechart::result react(const RemoteRecoveryReserved &evt);
void exit();
};
struct WaitLocalRecoveryReserved : boost::statechart::state< WaitLocalRecoveryReserved, Active >, NamedState {
typedef boost::mpl::list <
boost::statechart::transition< LocalRecoveryReserved, WaitRemoteRecoveryReserved >
> reactions;
explicit WaitLocalRecoveryReserved(my_context ctx);
void exit();
};
struct Activating : boost::statechart::state< Activating, Active >, NamedState {
typedef boost::mpl::list <
boost::statechart::transition< AllReplicasRecovered, Recovered >,
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >,
boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved >
> reactions;
explicit Activating(my_context ctx);
void exit();
};
struct Stray : boost::statechart::state< Stray, Started >, NamedState {
map<int, pair<pg_query_t, epoch_t> > pending_queries;
explicit Stray(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< MQuery >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< RecoveryDone >
> reactions;
boost::statechart::result react(const MQuery& query);
boost::statechart::result react(const MLogRec& logevt);
boost::statechart::result react(const MInfoRec& infoevt);
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const RecoveryDone&) {
return discard_event();
}
};
struct GetInfo : boost::statechart::state< GetInfo, Peering >, NamedState {
set<pg_shard_t> peer_info_requested;
explicit GetInfo(my_context ctx);
void exit();
void get_infos();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::transition< GotInfo, GetLog >,
boost::statechart::custom_reaction< MNotifyRec >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const MNotifyRec& infoevt);
};
struct GetLog : boost::statechart::state< GetLog, Peering >, NamedState {
pg_shard_t auth_log_shard;
boost::intrusive_ptr<MOSDPGLog> msg;
explicit GetLog(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< GotLog >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::transition< IsIncomplete, Incomplete >
> reactions;
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const MLogRec& logevt);
boost::statechart::result react(const GotLog&);
};
struct GetMissing : boost::statechart::state< GetMissing, Peering >, NamedState {
set<pg_shard_t> peer_missing_requested;
explicit GetMissing(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::transition< NeedUpThru, WaitUpThru >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const MLogRec& logevt);
};
struct WaitUpThru : boost::statechart::state< WaitUpThru, Peering >, NamedState {
explicit WaitUpThru(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MLogRec >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const ActMap& am);
boost::statechart::result react(const MLogRec& logrec);
};
struct Incomplete : boost::statechart::state< Incomplete, Peering>, NamedState {
typedef boost::mpl::list <
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MNotifyRec >
> reactions;
explicit Incomplete(my_context ctx);
boost::statechart::result react(const AdvMap &advmap);
boost::statechart::result react(const MNotifyRec& infoevt);
void exit();
};
在类PG的内部定义了类RecoveryState,该类RecoveryState的内部定义了PG的状态机RecoveryMachine和它的各种状态。
class PG{
class RecoveryState{
class RecoveryMachine{
RecoveryState *state;
};
RecoveryMachine machine;
PG *pg;
/// context passed in by state machine caller
RecoveryCtx *orig_ctx;
/// populated if we are buffering messages pending a flush
boost::optional<BufferedRecoveryMessages> messages_pending_flush;
/**
* populated between start_handle() and end_handle(), points into
* the message lists for messages_pending_flush while blocking messages
* or into orig_ctx otherwise
*/
boost::optional<RecoveryCtx> rctx;
}recovery_state;
};
在每个PG对象创建时,在构造函数里创建一个新的RecoveryState类的对象,并创建相应的RecoveryMachine类的对象,也就是创建了一个新的状态机。每个PG类对应一个独立的状态机来控制该PG的状态转换。
class RecoveryState{
public:
explicit RecoveryState(PG *pg)
: machine(this, pg), pg(pg), orig_ctx(0) {
machine.initiate();
}
};
PG::PG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool, spg_t p) :
recovery_state(this){
}
上面machine.initiate()调用的是boost::statechart::state_machine中的initiate()方法。
[参看]
本节我们从基础出发,来研究ceph peering这一复杂的过程,期望对其工作原理有更深入的理解。
在前面的ceph网络通信章节中,我们介绍了SimpleMessenger网络通信框架。这里OSD实现了Dispatcher接口:
class OSD : public Dispatcher,
public md_config_obs_t {
};
下面我们来看其对Dispatcher接口各函数的具体实现。
1) ms_can_fast_dispatch()的实现
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
case CEPH_MSG_OSD_OP:
case MSG_OSD_SUBOP:
case MSG_OSD_REPOP:
case MSG_OSD_SUBOPREPLY:
case MSG_OSD_REPOPREPLY:
case MSG_OSD_PG_PUSH:
case MSG_OSD_PG_PULL:
case MSG_OSD_PG_PUSH_REPLY:
case MSG_OSD_PG_SCAN:
case MSG_OSD_PG_BACKFILL:
case MSG_OSD_EC_WRITE:
case MSG_OSD_EC_WRITE_REPLY:
case MSG_OSD_EC_READ:
case MSG_OSD_EC_READ_REPLY:
case MSG_OSD_REP_SCRUB:
case MSG_OSD_PG_UPDATE_LOG_MISSING:
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
return true;
default:
return false;
}
}
从上面我们看到,对于case中所列举的消息,是可以进行fast_dispatch()进行处理的。
2) ms_can_fast_dispatch_any()的实现
bool ms_can_fast_dispatch_any() const { return true; }
上面说明,会将OSD这个Dispatcher加入到Messenger的fast_dispatchers列表中的。
3) ms_fast_dispatch()的实现
void OSD::ms_fast_dispatch(Message *m)
{
if (service.is_stopping()) {
m->put();
return;
}
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
#endif
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
OSDMapRef nextmap = service.get_nextmap_reserved();
Session *session = static_cast<Session*>(m->get_connection()->get_priv());
if (session) {
{
Mutex::Locker l(session->session_dispatch_lock);
update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
dispatch_session_waiting(session, nextmap);
}
session->put();
}
service.release_map(nextmap);
}
从上面可以看出,其会调用dispatch_session_waiting()来进行处理:
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
session->waiting_on_map.erase(i++));
if (session->waiting_on_map.empty()) {
clear_session_waiting_on_map(session);
} else {
register_session_waiting_on_map(session);
}
session->maybe_reset_osdmap();
}
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
{
...
}
在dispatch_op_fast()函数中就会对ms_can_fast_dispatch()所指定的消息进行处理。
4) ms_fast_preprocess()的实现
void OSD::ms_fast_preprocess(Message *m)
{
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
if (m->get_type() == CEPH_MSG_OSD_MAP) {
MOSDMap *mm = static_cast<MOSDMap*>(m);
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (s) {
s->received_map_lock.lock();
s->received_map_epoch = mm->get_last();
s->received_map_lock.unlock();
s->put();
}
}
}
}
从上面的代码我们看到,对于来自于其他OSD发送过来的CEPH_MSG_OSD_MAP消息,则将session.received_map_epoch设置为收到的OSDMap中的最新版本。
5) ms_dispatch()的实现
bool OSD::ms_dispatch(Message *m)
{
if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
service.got_stop_ack();
m->put();
return true;
}
// lock!
osd_lock.Lock();
if (is_stopping()) {
osd_lock.Unlock();
m->put();
return true;
}
while (dispatch_running) {
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
dispatch_cond.Wait(osd_lock);
}
dispatch_running = true;
do_waiters();
_dispatch(m);
do_waiters();
dispatch_running = false;
dispatch_cond.Signal();
osd_lock.Unlock();
return true;
}
ms_dispatch()函数实现对普通消息的分发处理。现在我们来看一下_dispatch()函数:
void OSD::_dispatch(Message *m)
{
assert(osd_lock.is_locked());
dout(20) << "_dispatch " << m << " " << *m << dendl;
logger->set(l_osd_buf, buffer::get_total_alloc());
logger->set(l_osd_history_alloc_bytes, SHIFT_ROUND_UP(buffer::get_history_alloc_bytes(), 20));
logger->set(l_osd_history_alloc_num, buffer::get_history_alloc_num());
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
switch (m->get_type()) {
// -- don't need lock --
case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source() << dendl;
m->put();
break;
// -- don't need OSDMap --
// map and replication
case CEPH_MSG_OSD_MAP:
handle_osd_map(static_cast<MOSDMap*>(m));
break;
// osd
case MSG_PGSTATSACK:
handle_pg_stats_ack(static_cast<MPGStatsAck*>(m));
break;
case MSG_MON_COMMAND:
handle_command(static_cast<MMonCommand*>(m));
break;
case MSG_COMMAND:
handle_command(static_cast<MCommand*>(m));
break;
case MSG_OSD_SCRUB:
handle_scrub(static_cast<MOSDScrub*>(m));
break;
// -- need OSDMap --
default:
{
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
// no map? starting up?
if (!osdmap) {
dout(7) << "no OSDMap, not booted" << dendl;
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("no osdmap");
break;
}
// need OSDMap
dispatch_op(op);
}
}
logger->set(l_osd_buf, buffer::get_total_alloc());
logger->set(l_osd_history_alloc_bytes, SHIFT_ROUND_UP(buffer::get_history_alloc_bytes(), 20));
logger->set(l_osd_history_alloc_num, buffer::get_history_alloc_num());
}
void OSD::dispatch_op(OpRequestRef op)
{
switch (op->get_req()->get_type()) {
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
break;
case MSG_OSD_PG_NOTIFY:
handle_pg_notify(op);
break;
case MSG_OSD_PG_QUERY:
handle_pg_query(op);
break;
case MSG_OSD_PG_LOG:
handle_pg_log(op);
break;
case MSG_OSD_PG_REMOVE:
handle_pg_remove(op);
break;
case MSG_OSD_PG_INFO:
handle_pg_info(op);
break;
case MSG_OSD_PG_TRIM:
handle_pg_trim(op);
break;
case MSG_OSD_PG_MISSING:
assert(0 =="received MOSDPGMissing; this message is supposed to be unused!?!");
break;
case MSG_OSD_BACKFILL_RESERVE:
handle_pg_backfill_reserve(op);
break;
case MSG_OSD_RECOVERY_RESERVE:
handle_pg_recovery_reserve(op);
break;
}
}
上面我们看到了最重要的对CEPH_MSG_OSD_MAP消息的处理。
6) ms_handle_connect()的实现
void OSD::ms_handle_connect(Connection *con)
{
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
Mutex::Locker l(osd_lock);
if (is_stopping())
return;
dout(10) << "ms_handle_connect on mon" << dendl;
if (is_preboot()) {
start_boot();
} else if (is_booting()) {
_send_boot(); // resend boot message
} else {
map_lock.get_read();
Mutex::Locker l2(mon_report_lock);
utime_t now = ceph_clock_now(NULL);
last_mon_report = now;
// resend everything, it's a new session
send_alive();
service.requeue_pg_temp();
service.send_pg_temp();
requeue_failures();
send_failures();
send_pg_stats(now);
map_lock.put_read();
}
// full map requests may happen while active or pre-boot
if (requested_full_first) {
rerequest_full_maps();
}
}
}
OSD只会主动向Monitor以及其他OSD发起连接。这里处理向其他Monitor发起连接的请求回调。
7) ms_handle_fast_connect()的实现
void OSD::ms_handle_fast_connect(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << " new session (outgoing) " << s << " con=" << s->con
<< " addr=" << s->con->get_peer_addr() << dendl;
// we don't connect to clients
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
}
s->put();
}
}
OSD只会主动向Monitor以及其他OSD发起连接。这里处理向其他OSD主动发起连接的回调请求。
8) ms_handle_accept()的实现
在OSD中没有对ms_handle_accept()函数进行重新实现。
9)ms_handle_fast_accept()的实现
bool ms_can_fast_dispatch_any() const { return true; }
void OSD::ms_handle_fast_accept(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << "new session (incoming)" << s << " con=" << con
<< " addr=" << con->get_peer_addr()
<< " must have raced with connect" << dendl;
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
}
s->put();
}
}
由于ms_can_fast_dispatch_any()永远返回true,因此OSD会接受Monitor,RadosGw以及其他OSD的连接。通过上面的代码,我们看到当RadosGW和其他OSD向OSD发起新的连接时,会构造生成一个新的Session,将其作为connection的private值保存起来。
10) ms_handle_reset()的实现
bool OSD::ms_handle_reset(Connection *con)
{
OSD::Session *session = (OSD::Session *)con->get_priv();
dout(1) << "ms_handle_reset con " << con << " session " << session << dendl;
if (!session)
return false;
session->wstate.reset(con);
session->con.reset(NULL); // break con <-> session ref cycle
session_handle_reset(session);
session->put();
return true;
}
当连接被reset时,回调此函数清空该连接对应的session信息
11)ms_handle_remote_reset()的实现
void ms_handle_remote_reset(Connection *con) {}
上面对ms_handle_remote_reset()的实现为空。
12) ms_get_authorizer()的实现
bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
if (force_new) {
/* the MonClient checks keys every tick(), so we should just wait for that cycle to get through */
if (monc->wait_auth_rotating(10) < 0)
return false;
}
*authorizer = monc->auth->build_authorizer(dest_type);
return *authorizer != NULL;
}
上面可以看到其调用build_authorizer()来构建AuthAuthorizer。
13)ms_verify_authorizer()的实现
bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key)
{
...
}
实现对incomming连接的校验。
下面介绍一下OSD模块所使用到的一些消息:
CEPH_MSG_OSD_OP:客户端进行读写请求时会构造此消息,primary OSD会收到
MSG_OSD_REPOP:在进行数据修改操作时,Replicated OSD会收到此类消息,由Primary OSD发送
MSG_OSD_REPOPREPLY: 针对MSG_OSD_REPOP的响应
MSG_OSD_SUBOP:Primary与Replicas之间针对objects的一些内部操作,主要用于object recovery的时候。
MSG_OSD_SUBOPREPLY:针对MSG_OSD_SUBOP的响应
CEPH_OSD_OP_WRITE: 写部分对象
CEPH_OSD_OP_WRITEFULL: 写一个完整对象
CEPH_MSG_OSD_OP
MSG_OSD_SUBOP
MSG_OSD_REPOP
MSG_OSD_SUBOPREPLY
MSG_OSD_REPOPREPLY
MSG_OSD_PG_PUSH
MSG_OSD_PG_PULL
MSG_OSD_PG_PUSH_REPLY
MSG_OSD_PG_SCAN
MSG_OSD_PG_BACKFILL
MSG_OSD_EC_WRITE
MSG_OSD_EC_WRITE_REPLY
MSG_OSD_EC_READ
MSG_OSD_EC_READ_REPLY
MSG_OSD_REP_SCRUB
MSG_OSD_PG_UPDATE_LOG_MISSING
MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
MSG_OSD_MARK_ME_DOWN
CEPH_MSG_PING
CEPH_MSG_OSD_MAP
MSG_PGSTATSACK
MSG_MON_COMMAND
MSG_COMMAND
MSG_OSD_SCRUB
MSG_OSD_PG_CREATE
MSG_OSD_PG_NOTIFY
MSG_OSD_PG_QUERY
MSG_OSD_PG_LOG
MSG_OSD_PG_REMOVE
MSG_OSD_PG_INFO
MSG_OSD_PG_TRIM
MSG_OSD_PG_MISSING:
MSG_OSD_BACKFILL_RESERVE
MSG_OSD_RECOVERY_RESERVE
通过上文分析,我们知道对于有以下消息,会调用ms_fast_dispatch()来进行分发,下面我们来看一下该函数的实现:
void OSD::ms_fast_dispatch(Message *m)
{
if (service.is_stopping()) {
m->put();
return;
}
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
#endif
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
OSDMapRef nextmap = service.get_nextmap_reserved();
Session *session = static_cast<Session*>(m->get_connection()->get_priv());
if (session) {
{
Mutex::Locker l(session->session_dispatch_lock);
update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
dispatch_session_waiting(session, nextmap);
}
session->put();
}
service.release_map(nextmap);
}
首先我们这里调用service.get_nextmap_reserved()来获得nextmap,我们来看其实现:
class OSDService{
public:
OSDMapRef osdmap;
/*
* osdmap - current published map
* next_osdmap - pre_published map that is about to be published.
*
* We use the next_osdmap to send messages and initiate connections,
* but only if the target is the same instance as the one in the map
* epoch the current user is working from (i.e., the result is
* equivalent to what is in next_osdmap).
*
* This allows the helpers to start ignoring osds that are about to
* go down, and let OSD::handle_osd_map()/note_down_osd() mark them
* down, without worrying about reopening connections from threads
* working from old maps.
*/
OSDMapRef next_osdmap;
/// gets ref to next_osdmap and registers the epoch as reserved
OSDMapRef get_nextmap_reserved() {
Mutex::Locker l(pre_publish_lock);
if (!next_osdmap)
return OSDMapRef();
epoch_t e = next_osdmap->get_epoch();
map<epoch_t, unsigned>::iterator i =map_reservations.insert(make_pair(e, 0)).first;
i->second++;
return next_osdmap;
}
};
从上面的注释中我们看到,service.osdmap是指当前已经发布的最新的OSDMap,而service.next_osdmap是将要发布的OSDMap。我们会使用next_osdmap来发送消息和初始化连接,但前提是目标target与当前用户工作在相同的OSDMap epoch。
下面我们从代码中来看一下service.osdmap与service.next_osdmap的区别:
1) OSDService::publish_map()
查找publish_map()函数,发现只有两个地方调用:
int OSD::init()
{
...
service.publish_map(osdmap);
...
consume_map();
...
}
在OSD初始化时,首先加载superblock中所指定的OSDMap版本作为当前的初始化osdmap,然后再调用consume_map()来消费该osdmap,这可能触发启动时PG的第一次peering操作。
除了上面介绍的在OSD init阶段会触发调用consume_map(),还会在保存完一个新的OSDMap之后触发调用consume_map(),下面我们来看:
void OSD::consume_map()
{
...
service.pre_publish_map(osdmap);
service.await_reserved_maps();
service.publish_map(osdmap);
...
}
从上面代码我们看出,要等到reserved_maps都消费完成之后,osdmap才会真正发布。如下图所示:
2) OSDService::pre_publish_map()
查找pre_publish_map()函数发现也只有两个地方调用:
void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
{
...
// advance through the new maps
for (epoch_t cur = first; cur <= last; cur++) {
dout(10) << " advance to epoch " << cur<< " (<= last " << last<< " <= newest_map " << superblock.newest_map<< ")" << dendl;
OSDMapRef newmap = get_map(cur);
assert(newmap); // we just cached it above!
// start blacklisting messages sent to peers that go down.
service.pre_publish_map(newmap);
...
}
...
}
当接收到MOSDMap消息,如果有符合条件的新OSDMaps,则会将其打包到一个Transaction中,之后再将该Transaction持久化到硬盘上。当持久化成功,会回调_committed_osd_map()函数。如上代码所示,当前OSD会遍历MOSDMap消息中的所有新OSDMap,然后调用service.pre_publish_map()将去标记为预发布状态。
在_committed_osd_map()函数中还会调用OSD::consume_map():
void OSD::consume_map()
{
...
service.pre_publish_map(osdmap);
service.await_reserved_maps();
service.publish_map(osdmap);
...
}
在consume_map()中会触发发布新接收到的OSDMap,之后再触发相应PG的peering操作。
在进一步分析ms_fast_dispatch()之前,我们先来分析一下如下几个重要的变量:
struct Session : public RefCountedObject {
list<OpRequestRef> waiting_on_map;
map<spg_t, list<OpRequestRef> > waiting_for_pg;
};
class OSD : public Dispatcher,public md_config_obs_t {
public:
set<Session*> session_waiting_for_map;
map<spg_t, set<Session*> > session_waiting_for_pg;
list<OpRequestRef> waiting_for_osdmap;
};
如下图所示:
//处理peering event
void OSD::handle_pg_peering_evt(
spg_t pgid,
const pg_history_t& orig_history,
pg_interval_map_t& pi,
epoch_t epoch,
PG::CephPeeringEvtRef evt)
{
//
wake_pg_waiters(pgid);
}
//PG分裂
void OSD::process_peering_events(
const list<PG*> &pgs,
ThreadPool::TPHandle &handle
)
{
...
if (!split_pgs.empty()) {
rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
split_pgs.clear();
}
}
Session::waiting_for_pg: 等待在指定PG上的请求。通常来说,当获取不到指定的PG时(比如当前并没有获得到最新的OSDMap,从而导致PG找不到),就会将请求放入到waiting_for_pg中;
OSD::session_waiting_for_map: 保存等待在OSDMap上的Session;
OSD::session_waiting_for_pg:保存等待在指定PG上的session。
OSD::waiting_for_osdmap: 保存等待在OSDMap上的请求
注:一般具有session状态、需要等待响应的消息会加入到session_waiting_for_map中;而无状态的需要等待osdmap的请求加入到waiting_for_osdmap中
void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
{
assert(session->session_dispatch_lock.is_locked());
if (!session->osdmap) {
session->osdmap = newmap;
return;
}
if (newmap->get_epoch() == session->osdmap->get_epoch())
return;
assert(newmap->get_epoch() > session->osdmap->get_epoch());
map<spg_t, list<OpRequestRef> > from;
from.swap(session->waiting_for_pg);
for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();i != from.end();from.erase(i++)) {
set<spg_t> children;
if (!newmap->have_pg_pool(i->first.pool())) {
// drop this wait list on the ground
i->second.clear();
} else {
assert(session->osdmap->have_pg_pool(i->first.pool()));
if (i->first.is_split(session->osdmap->get_pg_num(i->first.pool()), newmap->get_pg_num(i->first.pool()), &children)) {
for (set<spg_t>::iterator child = children.begin(); child != children.end(); ++child) {
unsigned split_bits = child->get_split_bits(newmap->get_pg_num(child->pool()));
list<OpRequestRef> child_ops;
OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
if (!child_ops.empty()) {
session->waiting_for_pg[*child].swap(child_ops);
register_session_waiting_on_pg(session, *child);
}
}
}
}
if (i->second.empty()) {
clear_session_waiting_on_pg(session, i->first);
} else {
session->waiting_for_pg[i->first].swap(i->second);
}
}
session->osdmap = newmap;
}
主要处理有PG分裂情况下,更新session的waiting_for_pg。
注:PG的分裂会造成Monitor更新OSDMap
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap); session->waiting_on_map.erase(i++));
if (session->waiting_on_map.empty()) {
clear_session_waiting_on_map(session);
} else {
register_session_waiting_on_map(session);
}
session->maybe_reset_osdmap();
}
dispatch_session_waiting()就是将session上waiting_on_map里面的请求,调用dispatch_op_fast()转发出去。
bool OSD::ms_dispatch(Message *m)
{
if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
service.got_stop_ack();
m->put();
return true;
}
// lock!
osd_lock.Lock();
if (is_stopping()) {
osd_lock.Unlock();
m->put();
return true;
}
while (dispatch_running) {
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
dispatch_cond.Wait(osd_lock);
}
dispatch_running = true;
do_waiters();
_dispatch(m);
do_waiters();
dispatch_running = false;
dispatch_cond.Signal();
osd_lock.Unlock();
return true;
}
此函数用于分发非fast消息。我们来看,这里有一个osd_lock
,这是一把十分大的锁。我们知道一个DispatchQueue会有dispatch_thread以及local_delivery_thread这两个线程来进行分发,这就存在竞争关系。OSD作为一个Dispatcher,使用osd_lock来保证同一时刻,只能有一个线程调用到此函数。
do_waiters()主要用于处理当前阻塞在waiting_for_osdmap上的请求。比如有些请求需要new osdMap,那么就会先将这些请求放入waiting_for_osdmap上。然后在新的OSDMap准备好后,就会调用take_waiters()将其加入到finished列表中:
void OSD::activate_map(){
...
// process waiters
take_waiters(waiting_for_osdmap);
}
void take_waiters(list<OpRequestRef>& ls) {
finished_lock.Lock();
finished.splice(finished.end(), ls);
finished_lock.Unlock();
}
有如下两种情况会将请求加入到waiting_for_osdmap上:
void OSD::_dispatch(Message *m)
{
switch (m->get_type()) {
default:
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
// no map? starting up?
if (!osdmap) {
dout(7) << "no OSDMap, not booted" << dendl;
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("no osdmap");
break;
}
}
}
bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
bool is_fast_dispatch)
{
Message *m = op->get_req();
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
assert(osd_lock.is_locked());
// do they have a newer map?
if (epoch > osdmap->get_epoch()) {
dout(7) << "waiting for newer map epoch " << epoch<< " > my " << osdmap->get_epoch() << " with " << m << dendl;
wait_for_new_map(op);
return false;
}
if (!require_self_aliveness(op->get_req(), epoch)) {
return false;
}
// ok, our map is same or newer.. do they still exist?
if (m->get_connection()->get_messenger() == cluster_messenger && !require_same_peer_instance(op->get_req(), osdmap, is_fast_dispatch)) {
return false;
}
return true;
}
void OSD::wait_for_new_map(OpRequestRef op)
{
// ask?
if (waiting_for_osdmap.empty()) {
osdmap_subscribe(osdmap->get_epoch() + 1, false);
}
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("wait for new map");
}
_dispatch()会分发如下消息,其中有一些不需要依赖OSDMap,另外一些则需要:
1) 无需OSDMap的消息
CEPH_MSG_PING
CEPH_MSG_OSD_MAP
MSG_MON_COMMAND
MSG_COMMAND
MSG_OSD_SCRUB
2) 需要OSDMap的消息
MSG_OSD_PG_CREATE
MSG_OSD_PG_NOTIFY:由PG stray发送到PG primary的通知消息(其中包含pginfo信息)。
注: Peering过程中,RecoveryCtx::send_notify()发送的就是此消息
注:Peering过程中GetInfo就是通过发送此消息来查询的
MSG_OSD_PG_LOG:PG一个副本向另一个副本发送的PGLog信息。通常发生于Peering过程
MSG_OSD_PG_REMOVE
MSG_OSD_PG_INFO:PG的一个副本向另一个副本发送的PGInfo信息。
注:Peering过程中,发现有missing object时,就通过PG::search_for_missing()来发送此查询消息
MSG_OSD_PG_TRIM
MSG_OSD_PG_MISSING
MSG_OSD_BACKFILL_RESERVE
MSG_OSD_RECOVERY_RESERVE
下面我们简要介绍一下OSD运行中的几个状态,以便更好的理解peering:
[参看]
PGInfo存在于PG的整个生命周期中,其在对象数据的写入、数据恢复、PG Peering过程中均发挥重要的作用。本章试图研究pg info在整个PG生命周期中的变化过程,从而对PG及PGInfo有一个更深入的理解。
class PG : DoutPrefixProvider {
public:
// pg state
pg_info_t info;
};
PG的Peering过程是十分复杂的,相关的代码实现也相当冗长。这里我们从侧面出发,介绍一下PG源代码实现中的一些重要字段,弄清其含义及用途之后也有利于我们理解PG的Peering操作: