本章来编写我们的第一个nginx模块。当前环境为:

# uname -a
Linux localhost.localdomain 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
# cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)

Read More

本章我们讲述一下如何为Nginx编译第三方动态加载模块。我们当前的操作系统环境为:

# uname -a
Linux localhost.localdomain 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
# cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)

Read More

分布式存储系统通常采用多副本的方式来保证系统的可靠性,而多副本之间如何保证数据的一致性就是系统的核心。Ceph号称统一存储,其核心RADOS既支持多副本,也支持纠删码。本文主要分析Ceph的多副本一致性协议。

Read More

ceph的PGLog是由PG来维护,记录了该PG的所有操作。其作用类似于数据库里的undo log。PGLog通常只保存近千条的操作记录(默认是3000条),但是当PG处于降级状态时,就会保存更多的日志(默认时10000条),这样就可以在故障的PG重新上线后用来恢复PG的数据。本文主要从PGLog的格式、存储方式、如何参与恢复来解析PGLLog。

Read More

ceph的PGLog是由PG来维护,记录了该PG的所有操作,其作用类似于数据库里的undo log。PGLog通常只保存近千条的操作记录(默认是3000条, 由osd_min_pg_log_entries指定),但是当PG处于降级状态时,就会保存更多的日志(默认是10000条),这样就可以在故障的PG重新上线后用来恢复PG的数据。本文主要从PG的格式、存储方式、如何参与恢复来解析PGLog。

Read More

分布式系统中经常需要考虑对象(或者记录、文件、数据块等)的读写顺序以及并发访问问题。通常来说,如果两个对象没有共享的资源,就可以进行并发的访问;如果有共享的部分,就需要对这部分资源进行加锁。而对于同一个对象的并发读写(尤其是并发写更新时),就需要注意顺序性以及并发访问的控制,以免数据错乱。本文主要针对ceph中对象读写的顺序及并发性保证机制进行介绍。

Read More

本章介绍Ceph的一致性检查工具Scrub机制。首先介绍数据校验的基本知识,其次介绍Scrub的基本概念,然后介绍Scrub的调度机制,最后介绍Scrub具体实现的源代码分析。

Read More

当PG完成了Peering过程后,处于Active状态的PG就可以对外提供服务了。如果该PG的各个副本上有不一致的对象,就需要进行修复。Ceph的修复过程有两种:Recovery和Backfill。

void ReplicatedPG::do_request(
  OpRequestRef& op,
  ThreadPool::TPHandle &handle)
{
	assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
	if (can_discard_request(op)) {
		return;
	}
	if (flushes_in_progress > 0) {
		dout(20) << flushes_in_progress << " flushes_in_progress pending " << "waiting for active on " << op << dendl;
		waiting_for_peered.push_back(op);
		op->mark_delayed("waiting for peered");
		return;
	}
	
	if (!is_peered()) {
		// Delay unless PGBackend says it's ok
		if (pgbackend->can_handle_while_inactive(op)) {
			bool handled = pgbackend->handle_message(op);
			assert(handled);
			return;
		} else {
			waiting_for_peered.push_back(op);
			op->mark_delayed("waiting for peered");
			return;
		}
	}

Read More

在上文我们讲到当接收到新的OSDMap,会向该OSD上的所有PG所有PG投递CephPeeringEvt事件。本章我们从该事件讲起,详细地讲述Peering地整个过程。

Read More

ceph的Peering过程是一个十分复杂的流程,其主要的目的是使一个PG内的OSD达成一个一致的状态。当主从副本达成一个一致的状态后,PG处于active状态,Peering过程的状态就结束了。但此时PG的三个OSD副本上的数据并非完全一致。

Read More

本节我们介绍一下PG Recovery过程中的一些重要数据结构。

1. RecoveryCtx数据结构

class PG : DoutPrefixProvider {
public:    
  struct BufferedRecoveryMessages {
	map<int, map<spg_t, pg_query_t> > query_map;
	map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > info_map;
	map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
  };
	
  struct RecoveryCtx {
	utime_t start_time;
	map<int, map<spg_t, pg_query_t> > *query_map;
	map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map;
	map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list;
	set<PGRef> created_pgs;
	C_Contexts *on_applied;
	C_Contexts *on_safe;
	ObjectStore::Transaction *transaction;
	ThreadPool::TPHandle* handle;
	
	RecoveryCtx(map<int, map<spg_t, pg_query_t> > *query_map,
		map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map,
		map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list,
		C_Contexts *on_applied,
		C_Contexts *on_safe,
		ObjectStore::Transaction *transaction)
	: query_map(query_map), info_map(info_map), 
	notify_list(notify_list),
	on_applied(on_applied),
	on_safe(on_safe),
	transaction(transaction),
	handle(NULL) {}

	RecoveryCtx(BufferedRecoveryMessages &buf, RecoveryCtx &rctx)
	: query_map(&(buf.query_map)),
	info_map(&(buf.info_map)),
	notify_list(&(buf.notify_list)),
	on_applied(rctx.on_applied),
	on_safe(rctx.on_safe),
	transaction(rctx.transaction),
	handle(rctx.handle) {}

	void accept_buffered_messages(BufferedRecoveryMessages &m) {
		assert(query_map);
		assert(info_map);
		assert(notify_list);
		
		for (map<int, map<spg_t, pg_query_t> >::iterator i = m.query_map.begin();i != m.query_map.end();++i) {
			map<spg_t, pg_query_t> &omap = (*query_map)[i->first];
			
			for (map<spg_t, pg_query_t>::iterator j = i->second.begin();j != i->second.end();++j) {
				omap[j->first] = j->second;
			}
		}
		
		for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i = m.info_map.begin();i != m.info_map.end();++i) {
			vector<pair<pg_notify_t, pg_interval_map_t> > &ovec = (*info_map)[i->first];
			ovec.reserve(ovec.size() + i->second.size());
			ovec.insert(ovec.end(), i->second.begin(), i->second.end());
		}
		
		for (map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator i = m.notify_list.begin();i != m.notify_list.end();++i) {
			vector<pair<pg_notify_t, pg_interval_map_t> > &ovec = (*notify_list)[i->first];
			ovec.reserve(ovec.size() + i->second.size());
			ovec.insert(ovec.end(), i->second.begin(), i->second.end());
		}
	}
	
  };
  
  
};

RecoveryCtx作为一次恢复操作的上下文,我们介绍一下其中几个比较重要的字段:

  • query_map: 用于缓存PG Query查询信息,后续会将这些缓存信息构造成MOSDPGQuery消息,然后发送到对应的OSD上。query_map的key部分为OSD的序号。

  • info_map: 用于缓存pg_notify_t信息,后续会将这些缓存信息构造成MOSDPGInfo查询的消息,然后发送到对应的OSD上。info_map的key部分为OSD的序号。

  • notify_list:用于缓存pg_notify_t信息,后续会将这些缓存信息构造成MOSDPGNotify消息,然后发送到对应的OSD上。notify_list的key部分为OSD的序号

  • transaction:本RecoveryCtx所关联的事务。在恢复过程中可能涉及到需要将相关信息持久化,就通过此transaction来完成

  • handle:ThreadPool::TPHandle的主要作用在于监视线程池中每一个线程的执行时常。每次线程函数执行时,都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致自杀。这里向transaction对应的ObjectStore传入handle参数,主要是为了处理超时方面的问题

int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
				  TrackedOpRef osd_op,
				  ThreadPool::TPHandle *handle)
{
	...

	if (handle)
		handle->suspend_tp_timeout();
	
	op_queue_reserve_throttle(o);
	journal->reserve_throttle_and_backoff(tbl.length());
	
	if (handle)
		handle->reset_tp_timeout();

	...
}

1.1 RecoveryCtx的使用

在OSD中,通常使用如下函数来创建RecoveryCtx对象:

// ----------------------------------------
// peering and recovery

PG::RecoveryCtx OSD::create_context()
{
	ObjectStore::Transaction *t = new ObjectStore::Transaction;
	C_Contexts *on_applied = new C_Contexts(cct);
	C_Contexts *on_safe = new C_Contexts(cct);

	map<int, map<spg_t,pg_query_t> > *query_map = 
		new map<int, map<spg_t, pg_query_t> >;

	map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list =
		new map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >;

	map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map =
		new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;

	PG::RecoveryCtx rctx(query_map, info_map, notify_list,on_applied, on_safe, t);
	return rctx;
}

之后,调用如下函数将对应map里面的数据发送出去:

void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
                           ThreadPool::TPHandle *handle)
{
	if (service.get_osdmap()->is_up(whoami) &&is_active()) {
		do_notifies(*ctx.notify_list, curmap);
		do_queries(*ctx.query_map, curmap);
		do_infos(*ctx.info_map, curmap);
	}
	delete ctx.notify_list;
	delete ctx.query_map;
	delete ctx.info_map;

	if ((ctx.on_applied->empty() &&ctx.on_safe->empty() &&
	  ctx.transaction->empty() && ctx.created_pgs.empty()) || !pg) {
		delete ctx.transaction;
		delete ctx.on_applied;
		delete ctx.on_safe;
		assert(ctx.created_pgs.empty());
	} else {
		if (!ctx.created_pgs.empty()) {
			ctx.on_applied->add(new C_OpenPGs(ctx.created_pgs, store));
		}

		int tr = store->queue_transaction(
			pg->osr.get(),
			std::move(*ctx.transaction), ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(),
			handle);

		delete (ctx.transaction);
		assert(tr == 0);
	}
}

2. NamedState数据结构

class PG : DoutPrefixProvider {
public:    
  struct NamedState {
	const char *state_name;
	utime_t enter_time;
	
	const char *get_state_name() { return state_name; }
	
	NamedState(CephContext *cct_, const char *state_name_)
		: state_name(state_name_),
		enter_time(ceph_clock_now(cct_)) {}
		
	virtual ~NamedState() {}
  };
  
};

NamedState主要是用于对一种状态进行命名。

3. statechart状态机

Ceph在处理PG的状态转换时,使用了boost库提供的statechart状态机。因此,这里先简单介绍一下statechart状态机的基本概念和涉及的相关知识,以便更好地理解Peering过程PG的状态转换流程。

3.1 状态

在statechart里,状态的定义有两种方式:

  • 没有子状态情况下的状态定义
//boost
template< class MostDerived,
          class Context,
          class InnerInitial = mpl::list<>,
          history_mode historyMode = has_no_history >
class state : public simple_state<
  MostDerived, Context, InnerInitial, historyMode >
{
};

//ceph
struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState {
};

定义一个状态需要继承boost::statechart::simple_state或者boost::statechart::state类。上面Reset状态继承了boost::statechart::state类。该类的模板参数中,第一个参数为状态机自己的名字Reset,第二个参数为所属状态机的名字,表明Reset是状态机RecoveryMachine的一个状态。

  • 有子状态情况下的状态定义
struct Start;

struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
};

状态Started也是状态机RecoveryMachine的一个状态,模板参数中多了一个参数Start,它是状态Started的默认初始子状态,其定义如下:

struct Start : boost::statechart::state< Start, Started >, NamedState {
};

上面定义的状态Start是状态Started的子状态。第一个模板参数是自己的名字,第二个模板参数是该子状态所属父状态的名字。

综上所述,一个状态,要么属于一个状态机,要么属于一个状态,成为该状态的子状态。其定义的模板参数是自己,第二个模板参数是拥有者,第三个模板参数是它的起始子状态。

3.2 事件

状态能够接收并处理事件。事件可以改变状态,促使状态发生转移。在boost库的statechart状态机中定义事件的方式如下所示:

struct QueryState : boost::statechart::event< QueryState >{
}; 

QueryState为一个事件,需要继承boost::statechart::event类,模板参数为自己的名字。

3.3 状态响应事件

在一个状态内部,需要定义状态机处于当前状态时,可以接受的事件以及如何处理这些事件的方法:

#define TrivialEvent(T) struct T : boost::statechart::event< T > { \
    T() : boost::statechart::event< T >() {}			   \
    void print(std::ostream *out) const {			   \
      *out << #T;						   \
    }								   \
  };
  TrivialEvent(Initialize)
  TrivialEvent(Load)
  TrivialEvent(NullEvt)

struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
  explicit Initial(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::transition< Initialize, Reset >,
boost::statechart::custom_reaction< Load >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;

  boost::statechart::result react(const Load&);
  boost::statechart::result react(const MNotifyRec&);
  boost::statechart::result react(const MInfoRec&);
  boost::statechart::result react(const MLogRec&);
  boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
  }
};
3.3.1 可处理的事件列表及处理对应事件的方法

上述代码列出了状态RecoveryMachine/Initial可以处理的事件列表和处理对应事件的方法:

1) 通过boost::mpl::list定义该状态可以处理多个事件类型。在本例中可以处理InitializeLoadNullEvtevent_base事件。

2)简单事件处理

boost::statechart::transition< Initialize, Reset >

定义了状态Initial接收到事件Initialize后,无条件直接跳转到Reset状态;

3) 用户自定义事件处理: 当接收到事件后,需要根据一些条件来决定状态如何转移,这个逻辑需要用户自己定义实现

boost::statechart::custom_reaction< Load >

custom_reaction定义了一个用户自定义的事件处理方法,必须有一个react()的处理函数处理对应该事件。状态转移的逻辑需要用户自己在react函数里实现:

boost::statechart::result react(const Load&);

4) NullEvt事件用户自定义处理,但是没有实现react()函数来处理,最终事件匹配了boost::statechart::event_base事件,直接调用函数discard_event把事件丢弃掉。

3.3.2 存在的一些疑问

1) 对于NullEvt事件

boost::statechart::custom_reaction< NullEvt >

但是我们却并没有定义该事件的reaction函数。因为NullEvt继承自boost::statechart::event_base,因此其会调用如下函数来进行处理:

boost::statechart::result react(const boost::statechart::event_base&) {
	return discard_event();
}

关于这一点,我们可以通过后面的statechart示例来得到验证;

2) 对于MNotifyRecMInfoRec以及MLogRec事件

对于上面的3个事件,我们发现并没有将其添加到boost::mpl::list中,对于此类事件,会通过如下语句来进行处理:

boost::statechart::transition< boost::statechart::event_base, Crashed >

关于这一点,我们也可以通过后面的statechart示例来得到验证;

3) 对于boost::statechart::event_base事件,似乎有两种不同的处理方式

struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {

  typedef boost::mpl::list <
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;

  boost::statechart::result react(const boost::statechart::event_base&) {
	return discard_event();
  }
};

这里我们通过试验发现:

  • 对于没有添加到boost::mpl::list中的事件,其默认就会调用如下来进行处理
boost::statechart::transition< boost::statechart::event_base, Crashed >
  • 对于添加到了boost::mpl::list中的事件,如果直接找不到对应的react函数,且没有定义其父类型boost::statechart::event_base的react函数,那么直接会编译时报错

  • 对于添加到了boost::mpl::list中的事件,如果直接找不到对应的react函数的话,但是定义了其父类型boost::statechart::event_base的react函数,那么会调用其父类型的react函数来进行处理

boost::statechart::result react(const boost::statechart::event_base&) {
	return discard_event();
 }

3.4 状态机的定义

RecoveryMachine为定义的状态机,需要继承boost::statechart::state_machine类:

struct Initial;
class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
};

模板参数第一个参数为自己的名字,第二个参数为状态机默认的初始状态Initial。

状态机的基本操作有两个:

  • 状态机的初始化
machine.initiate();

initiate()是继承自boost::statechart::state_machine的成员函数。

  • 函数process_event()用来向状态机投递事件,从而触发状态机接收并处理该事件
machine.process_event(evt);

process_event()也是继承自boost::statechart::state_machine的成员函数。

3.5 context函数

context是状态机的一个比较有用的函数,它可以获取当前状态的所有祖先状态的指针。通过它可以获取父状态以及祖先状态的一些内部参数和状态值。context()函数是实现在boost::statechart::state_machine中的:

// Returns a reference to the context identified by the template
// parameter. This can either be _this_ object or one of its direct or
// indirect contexts.
template< class Context >
Context & context()
{
  // As we are in the outermost context here, only this object can be
  // returned.
  return *polymorphic_downcast< MostDerived * >( this );
}

template< class Context >
const Context & context() const
{
  // As we are in the outermost context here, only this object can be
  // returned.
  return *polymorphic_downcast< const MostDerived * >( this );
}

同时context()函数在boost::statechart::simple_state中也有实现:

template< class OtherContext >
    OtherContext & context()
    {
      typedef typename mpl::if_<
        is_base_of< OtherContext, MostDerived >,
        context_impl_this_context,
        context_impl_other_context
      >::type impl;
      return impl::template context_impl< OtherContext >( *this );
    }

template< class OtherContext >
    const OtherContext & context() const
    {
      typedef typename mpl::if_<
        is_base_of< OtherContext, MostDerived >,
        context_impl_this_context,
        context_impl_other_context
      >::type impl;
      return impl::template context_impl< OtherContext >( *this );
    }

从simple_state的实现来看,可以获取当前状态的祖先状态指针,也可以获取当前状态所属状态机的指针。

例如状态Started是RecoveryMachine的一个状态,状态Start是Started状态的一个子状态,那么如果当前状态是Start,就可以通过该函数获取它的父状态Started的指针:

Started * parent = context< Started >();

同时也可以获取其祖先状态RecoveryMachine的指针:

RecoveryMachine *machine = context< RecoveryMachine >();

综上所述,context()函数为获取当前状态的祖先状态上下文提供了一种方法。

3.6 事件的特殊处理

事件除了在状态转移列表中触发状态转移,或者进入用户自定义的状态处理函数,还可以有下列特殊的处理方式:

  • 在用户自定义的函数里,可以直接调用transit来直接跳转到目标状态。例如:
boost::statechart::result PG::RecoveryState::Initial::react(const MLogRec& i)
{
  PG *pg = context< RecoveryMachine >().pg;
  assert(!pg->is_primary());
  post_event(i);
  return transit< Stray >();
}

可以直接跳转到状态Stray

  • 在用户自定义的函数里,可以调用函数post_event()直接产生相应的事件,并投递给状态机
PG::RecoveryState::Start::Start(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Start")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  if (pg->is_primary()) {
    dout(1) << "transitioning to Primary" << dendl;
    post_event(MakePrimary());
  } else { //is_stray
    dout(1) << "transitioning to Stray" << dendl; 
    post_event(MakeStray());
  }
}
  • 在用户的自定义函数里,调用函数discard_event()可以直接丢弃事件,不做任何处理
boost::statechart::result PG::RecoveryState::Primary::react(const ActMap&)
{
  dout(7) << "handle ActMap primary" << dendl;
  PG *pg = context< RecoveryMachine >().pg;
  pg->publish_stats_to_osd();
  pg->take_waiters();
  return discard_event();
}
  • 在用户的自定义函数里,调用函数forward_event()可以把当前事件继续投递给父状态机进行处理
boost::statechart::result PG::RecoveryState::WaitUpThru::react(const ActMap& am)
{
  PG *pg = context< RecoveryMachine >().pg;
  if (!pg->need_up_thru) {
    post_event(Activate(pg->get_osdmap()->get_epoch()));
  }
  return forward_event();
}

3.7 statechart示例

// Example program
#include <iostream>
#include <string>
#include <boost/statechart/custom_reaction.hpp>
#include <boost/statechart/event.hpp>
#include <boost/statechart/simple_state.hpp>
#include <boost/statechart/state.hpp>
#include <boost/statechart/state_machine.hpp>
#include <boost/statechart/transition.hpp>
#include <boost/statechart/event_base.hpp>



#define TrivialEvent(T) struct T : boost::statechart::event< T > { \
    T() : boost::statechart::event< T >() {}			   \
    void print(std::ostream *out) const {			   \
      *out << #T;						   \
    }								   \
  };
TrivialEvent(Initialize)
TrivialEvent(Load)
TrivialEvent(NullEvt)
TrivialEvent(GoClean)

struct MInfoRec : boost::statechart::event< MInfoRec > {
    std::string name; 
    MInfoRec(std::string name): name(name){
    }
    
    void print(){
        std::cout<<"MInfoRec: "<<name<<"\n";
    }
};

struct MLogRec : boost::statechart::event< MLogRec > {
    std::string name; 
    MLogRec(std::string name): name(name){
    }
    
    void print(){
        std::cout<<"MLogRec: "<<name<<"\n";
    }
};

struct MNotifyRec : boost::statechart::event< MNotifyRec > {
    std::string name; 
    MNotifyRec(std::string name): name(name){
        
    }
    
    void print(){
        std::cout<<"MNotifyRec: "<<name<<"\n";
    }
};



struct Initial; 

struct RecoveryMachine : boost::statechart::state_machine< RecoveryMachine, Initial > {}; 

struct Reset;

struct Crashed : boost::statechart::state< Crashed, RecoveryMachine > {
    explicit Crashed(my_context ctx) : my_base(ctx)
    {
        std::cout << "Hello, Crashed!\n";
    }
};
    
struct Initial : boost::statechart::state< Initial, RecoveryMachine > {
    
    typedef boost::mpl::list < 
    boost::statechart::transition< Initialize, Reset >,
    boost::statechart::custom_reaction< Load >,
    boost::statechart::custom_reaction< NullEvt >,
    boost::statechart::transition< boost::statechart::event_base, Crashed >
    > reactions;
    
    explicit Initial(my_context ctx) : my_base(ctx)
    {
        std::cout << "Hello, Initial!\n";
    } 
    
    boost::statechart::result react(const Load& l){
        return transit< Reset >();
    }
    
     boost::statechart::result react(const MNotifyRec& notify){
         std::cout<<"Initial::react::MLogRec!\n";
         
         return discard_event();
     }
	boost::statechart::result react(const MInfoRec& i){
		std::cout<<"Initial::react::MNotifiyRec!\n";

		return discard_event();
	}

	boost::statechart::result react(const MLogRec& log){
		std::cout<<"Initial::react::MLogRec!\n";
	
		return discard_event();
	}
    
    boost::statechart::result react(const boost::statechart::event_base&) {
        std::cout << "Initial event_base processed!\n";
	    return discard_event();
    }
    
    void exit() { 
        std::cout << "Bye, Initial!\n";
    } 
};

struct Reset : boost::statechart::state< Reset, RecoveryMachine > {
    explicit Reset(my_context ctx) : my_base(ctx)
    {
        std::cout << "Hello, Reset!\n";
    } 
    
    void exit() { 
        std::cout << "Bye, Reset!\n";
    }
};

int main(int argc, char *argv[])
{
  RecoveryMachine machine;
  
  machine.initiate();
  
  //machine.process_event(NullEvt());                        //语句1
  
  //machine.process_event(GoClean());                        //语句2
  
  //machine.process_event(MNotifyRec("notify record"));      //语句3
  
  return 0x0;
}

上面的示例与PG中对于Initial状态的处理类似,下面我们来看一下分别执行上述语句时的打印情况:

  • 单独执行语句1
Hello, Initial!
Initial event_base processed!
  • 单独执行语句2
Hello, Initial!
Bye, Initial!
Hello, Crashed!
  • 单独执行语句3
Hello, Initial!
Bye, Initial!
Hello, Crashed!

4. PG状态机

4.1 PG状态机中的所有事件

下面我们列出Recovery过程中的所有事件:

  • QueryState

  • MInfoRec

  • MLogRec

  • MNotifyRec

  • MQuery

  • AdvMap

  • ActMap

  • Activate

  • RequestBackfillPrio

  • TrivialEvent事件

class PG : DoutPrefixProvider {
public:    
#define TrivialEvent(T) struct T : boost::statechart::event< T > { \
    T() : boost::statechart::event< T >() {}			   \
    void print(std::ostream *out) const {			   \
      *out << #T;						   \
    }								   \
  };
  TrivialEvent(Initialize)
  TrivialEvent(Load)
  TrivialEvent(GotInfo)
  TrivialEvent(NeedUpThru)
  TrivialEvent(CheckRepops)
  TrivialEvent(NullEvt)
  TrivialEvent(FlushedEvt)
  TrivialEvent(Backfilled)
  TrivialEvent(LocalBackfillReserved)
  TrivialEvent(RemoteBackfillReserved)
  TrivialEvent(RemoteReservationRejected)
  TrivialEvent(RequestBackfill)
  TrivialEvent(RequestRecovery)
  TrivialEvent(RecoveryDone)
  TrivialEvent(BackfillTooFull)

  TrivialEvent(AllReplicasRecovered)
  TrivialEvent(DoRecovery)
  TrivialEvent(LocalRecoveryReserved)
  TrivialEvent(RemoteRecoveryReserved)
  TrivialEvent(AllRemotesReserved)
  TrivialEvent(AllBackfillsReserved)
  TrivialEvent(Recovering)
  TrivialEvent(GoClean)

  TrivialEvent(AllReplicasActivated)

  TrivialEvent(IntervalFlush)
  
};
  • MakePrimary

  • MakeStray

  • NeedActingChange

  • IsIncomplete

  • GotLog

  • SnapTrim

  • Reset

  • SnapTrimReserved

  • SnapTrimTimerReady

4.2 PG状态机中的所有状态及其对应的事件

  • Crashed状态
struct Crashed : boost::statechart::state< Crashed, RecoveryMachine >, NamedState {
      explicit Crashed(my_context ctx);
};
  • Initial状态
struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
  explicit Initial(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::transition< Initialize, Reset >,
boost::statechart::custom_reaction< Load >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;

  boost::statechart::result react(const Load&);
  boost::statechart::result react(const MNotifyRec&);
  boost::statechart::result react(const MInfoRec&);
  boost::statechart::result react(const MLogRec&);
  boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
  }
};
  • Reset状态
struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState {
  explicit Reset(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::custom_reaction< FlushedEvt >,
boost::statechart::custom_reaction< IntervalFlush >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const AdvMap&);
  boost::statechart::result react(const ActMap&);
  boost::statechart::result react(const FlushedEvt&);
  boost::statechart::result react(const IntervalFlush&);
  boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
  }
};
  • Started状态
struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
  explicit Started(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::custom_reaction< FlushedEvt >,
boost::statechart::custom_reaction< IntervalFlush >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const AdvMap&);
  boost::statechart::result react(const FlushedEvt&);
  boost::statechart::result react(const IntervalFlush&);
  boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
  }
};
  • Start状态
struct Start : boost::statechart::state< Start, Started >, NamedState {
  explicit Start(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::transition< MakePrimary, Primary >,
boost::statechart::transition< MakeStray, Stray >
> reactions;
};
  • Primary状态
struct Primary : boost::statechart::state< Primary, Started, Peering >, NamedState {
  explicit Primary(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MNotifyRec >,
boost::statechart::transition< NeedActingChange, WaitActingChange >
> reactions;
  boost::statechart::result react(const ActMap&);
  boost::statechart::result react(const MNotifyRec&);
};
  • WaitActingChange状态
struct WaitActingChange : boost::statechart::state< WaitActingChange, Primary>,
		      NamedState {
  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MNotifyRec >
> reactions;
  explicit WaitActingChange(my_context ctx);
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const AdvMap&);
  boost::statechart::result react(const MLogRec&);
  boost::statechart::result react(const MInfoRec&);
  boost::statechart::result react(const MNotifyRec&);
  void exit();
};
  • Peering状态
struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {
  std::unique_ptr< PriorSet > prior_set;
  bool history_les_bound;  //< need osd_find_best_info_ignore_history_les

  explicit Peering(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::transition< Activate, Active >,
boost::statechart::custom_reaction< AdvMap >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const AdvMap &advmap);
};
  • Active状态
struct Active : boost::statechart::state< Active, Primary, Activating >, NamedState {
  explicit Active(my_context ctx);
  void exit();

  const set<pg_shard_t> remote_shards_to_reserve_recovery;
  const set<pg_shard_t> remote_shards_to_reserve_backfill;
  bool all_replicas_activated;

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MNotifyRec >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< Backfilled >,
boost::statechart::custom_reaction< AllReplicasActivated >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const ActMap&);
  boost::statechart::result react(const AdvMap&);
  boost::statechart::result react(const MInfoRec& infoevt);
  boost::statechart::result react(const MNotifyRec& notevt);
  boost::statechart::result react(const MLogRec& logevt);
  boost::statechart::result react(const Backfilled&) {
return discard_event();
  }
  boost::statechart::result react(const AllReplicasActivated&);
};
  • Clean状态
struct Clean : boost::statechart::state< Clean, Active >, NamedState {
  typedef boost::mpl::list<
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >
  > reactions;
  explicit Clean(my_context ctx);
  void exit();
};
  • Recovered状态
struct Recovered : boost::statechart::state< Recovered, Active >, NamedState {
  typedef boost::mpl::list<
boost::statechart::transition< GoClean, Clean >,
boost::statechart::custom_reaction< AllReplicasActivated >
  > reactions;
  explicit Recovered(my_context ctx);
  void exit();
  boost::statechart::result react(const AllReplicasActivated&) {
post_event(GoClean());
return forward_event();
  }
};
  • Backfilling状态
struct Backfilling : boost::statechart::state< Backfilling, Active >, NamedState {
  typedef boost::mpl::list<
boost::statechart::transition< Backfilled, Recovered >,
boost::statechart::custom_reaction< RemoteReservationRejected >
> reactions;
  explicit Backfilling(my_context ctx);
  boost::statechart::result react(const RemoteReservationRejected& evt);
  void exit();
};
  • WaitRemoteBackfillReserved状态
struct WaitRemoteBackfillReserved : boost::statechart::state< WaitRemoteBackfillReserved, Active >, NamedState {
  typedef boost::mpl::list<
boost::statechart::custom_reaction< RemoteBackfillReserved >,
boost::statechart::custom_reaction< RemoteReservationRejected >,
boost::statechart::transition< AllBackfillsReserved, Backfilling >
> reactions;
  set<pg_shard_t>::const_iterator backfill_osd_it;
  explicit WaitRemoteBackfillReserved(my_context ctx);
  void exit();
  boost::statechart::result react(const RemoteBackfillReserved& evt);
  boost::statechart::result react(const RemoteReservationRejected& evt);
};
  • WaitLocalBackfillReserved状态
struct WaitLocalBackfillReserved : boost::statechart::state< WaitLocalBackfillReserved, Active >, NamedState {
  typedef boost::mpl::list<
boost::statechart::transition< LocalBackfillReserved, WaitRemoteBackfillReserved >
> reactions;
  explicit WaitLocalBackfillReserved(my_context ctx);
  void exit();
};
  • NotBackfilling状态
struct NotBackfilling : boost::statechart::state< NotBackfilling, Active>, NamedState {
  typedef boost::mpl::list<
boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved>,
boost::statechart::custom_reaction< RemoteBackfillReserved >,
boost::statechart::custom_reaction< RemoteReservationRejected >
> reactions;
  explicit NotBackfilling(my_context ctx);
  void exit();
  boost::statechart::result react(const RemoteBackfillReserved& evt);
  boost::statechart::result react(const RemoteReservationRejected& evt);
};
  • ReplicaActive状态
struct ReplicaActive : boost::statechart::state< ReplicaActive, Started, RepNotRecovering >, NamedState {
  explicit ReplicaActive(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MQuery >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< Activate >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const MInfoRec& infoevt);
  boost::statechart::result react(const MLogRec& logevt);
  boost::statechart::result react(const ActMap&);
  boost::statechart::result react(const MQuery&);
  boost::statechart::result react(const Activate&);
};
  • RepRecovering状态
struct RepRecovering : boost::statechart::state< RepRecovering, ReplicaActive >, NamedState {
  typedef boost::mpl::list<
boost::statechart::transition< RecoveryDone, RepNotRecovering >,
boost::statechart::transition< RemoteReservationRejected, RepNotRecovering >,
boost::statechart::custom_reaction< BackfillTooFull >
> reactions;
  explicit RepRecovering(my_context ctx);
  boost::statechart::result react(const BackfillTooFull &evt);
  void exit();
};
  • RepWaitBackfillReserved状态
struct RepWaitBackfillReserved : boost::statechart::state< RepWaitBackfillReserved, ReplicaActive >, NamedState {
  typedef boost::mpl::list<
boost::statechart::custom_reaction< RemoteBackfillReserved >,
boost::statechart::custom_reaction< RemoteReservationRejected >
> reactions;
  explicit RepWaitBackfillReserved(my_context ctx);
  void exit();
  boost::statechart::result react(const RemoteBackfillReserved &evt);
  boost::statechart::result react(const RemoteReservationRejected &evt);
};
  • RepWaitRecoveryReserved状态
struct RepWaitRecoveryReserved : boost::statechart::state< RepWaitRecoveryReserved, ReplicaActive >, NamedState {
  typedef boost::mpl::list<
boost::statechart::custom_reaction< RemoteRecoveryReserved >
> reactions;
  explicit RepWaitRecoveryReserved(my_context ctx);
  void exit();
  boost::statechart::result react(const RemoteRecoveryReserved &evt);
};
  • RepNotRecovering状态
struct RepNotRecovering : boost::statechart::state< RepNotRecovering, ReplicaActive>, NamedState {
  typedef boost::mpl::list<
boost::statechart::custom_reaction< RequestBackfillPrio >,
    boost::statechart::transition< RequestRecovery, RepWaitRecoveryReserved >,
boost::statechart::transition< RecoveryDone, RepNotRecovering >  // for compat with pre-reservation peers
> reactions;
  explicit RepNotRecovering(my_context ctx);
  boost::statechart::result react(const RequestBackfillPrio &evt);
  void exit();
};
  • Recovering状态
struct Recovering : boost::statechart::state< Recovering, Active >, NamedState {
  typedef boost::mpl::list <
boost::statechart::custom_reaction< AllReplicasRecovered >,
boost::statechart::custom_reaction< RequestBackfill >
> reactions;
  explicit Recovering(my_context ctx);
  void exit();
  void release_reservations();
  boost::statechart::result react(const AllReplicasRecovered &evt);
  boost::statechart::result react(const RequestBackfill &evt);
};
  • WaitRemoteRecoveryReserved状态
struct WaitRemoteRecoveryReserved : boost::statechart::state< WaitRemoteRecoveryReserved, Active >, NamedState {
  typedef boost::mpl::list <
boost::statechart::custom_reaction< RemoteRecoveryReserved >,
boost::statechart::transition< AllRemotesReserved, Recovering >
> reactions;
  set<pg_shard_t>::const_iterator remote_recovery_reservation_it;
  explicit WaitRemoteRecoveryReserved(my_context ctx);
  boost::statechart::result react(const RemoteRecoveryReserved &evt);
  void exit();
};
  • WaitLocalRecoveryReserved状态
struct WaitLocalRecoveryReserved : boost::statechart::state< WaitLocalRecoveryReserved, Active >, NamedState {
  typedef boost::mpl::list <
boost::statechart::transition< LocalRecoveryReserved, WaitRemoteRecoveryReserved >
> reactions;
  explicit WaitLocalRecoveryReserved(my_context ctx);
  void exit();
};
  • Activating状态
struct Activating : boost::statechart::state< Activating, Active >, NamedState {
  typedef boost::mpl::list <
boost::statechart::transition< AllReplicasRecovered, Recovered >,
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >,
boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved >
> reactions;
  explicit Activating(my_context ctx);
  void exit();
};
  • Stray状态
struct Stray : boost::statechart::state< Stray, Started >, NamedState {
  map<int, pair<pg_query_t, epoch_t> > pending_queries;

  explicit Stray(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< MQuery >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< RecoveryDone >
> reactions;
  boost::statechart::result react(const MQuery& query);
  boost::statechart::result react(const MLogRec& logevt);
  boost::statechart::result react(const MInfoRec& infoevt);
  boost::statechart::result react(const ActMap&);
  boost::statechart::result react(const RecoveryDone&) {
return discard_event();
  }
};
  • GetInfo状态
struct GetInfo : boost::statechart::state< GetInfo, Peering >, NamedState {
  set<pg_shard_t> peer_info_requested;

  explicit GetInfo(my_context ctx);
  void exit();
  void get_infos();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::transition< GotInfo, GetLog >,
boost::statechart::custom_reaction< MNotifyRec >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const MNotifyRec& infoevt);
};
  • GetLog状态
struct GetLog : boost::statechart::state< GetLog, Peering >, NamedState {
  pg_shard_t auth_log_shard;
  boost::intrusive_ptr<MOSDPGLog> msg;

  explicit GetLog(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< GotLog >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::transition< IsIncomplete, Incomplete >
> reactions;
  boost::statechart::result react(const AdvMap&);
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const MLogRec& logevt);
  boost::statechart::result react(const GotLog&);
};
  • GetMissing状态
struct GetMissing : boost::statechart::state< GetMissing, Peering >, NamedState {
  set<pg_shard_t> peer_missing_requested;

  explicit GetMissing(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::transition< NeedUpThru, WaitUpThru >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const MLogRec& logevt);
};
  • WaitUpThru状态
struct WaitUpThru : boost::statechart::state< WaitUpThru, Peering >, NamedState {
  explicit WaitUpThru(my_context ctx);
  void exit();

  typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MLogRec >
> reactions;
  boost::statechart::result react(const QueryState& q);
  boost::statechart::result react(const ActMap& am);
  boost::statechart::result react(const MLogRec& logrec);
};
  • Incomplete状态
struct Incomplete : boost::statechart::state< Incomplete, Peering>, NamedState {
  typedef boost::mpl::list <
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MNotifyRec >
> reactions;
  explicit Incomplete(my_context ctx);
  boost::statechart::result react(const AdvMap &advmap);
  boost::statechart::result react(const MNotifyRec& infoevt);
  void exit();
};

4.3 PG状态机

在类PG的内部定义了类RecoveryState,该类RecoveryState的内部定义了PG的状态机RecoveryMachine和它的各种状态。

class PG{
	class RecoveryState{
	
		class RecoveryMachine{
			RecoveryState *state;
			
		};
		
		RecoveryMachine machine;
		PG *pg;

		/// context passed in by state machine caller
		RecoveryCtx *orig_ctx;

		/// populated if we are buffering messages pending a flush
		boost::optional<BufferedRecoveryMessages> messages_pending_flush;

		/**
		* populated between start_handle() and end_handle(), points into
		* the message lists for messages_pending_flush while blocking messages
		* or into orig_ctx otherwise
		*/
		boost::optional<RecoveryCtx> rctx;
	
	}recovery_state;
};

在每个PG对象创建时,在构造函数里创建一个新的RecoveryState类的对象,并创建相应的RecoveryMachine类的对象,也就是创建了一个新的状态机。每个PG类对应一个独立的状态机来控制该PG的状态转换。

class RecoveryState{
public:
	explicit RecoveryState(PG *pg)
	  : machine(this, pg), pg(pg), orig_ctx(0) {
		machine.initiate();
    }
};

PG::PG(OSDService *o, OSDMapRef curmap,
       const PGPool &_pool, spg_t p) :
	recovery_state(this){
}

上面machine.initiate()调用的是boost::statechart::state_machine中的initiate()方法。



[参看]

  1. ceph osd heartbeat 分析

  2. boost官网

  3. 在线编译器

  4. boost在线编译器


Read More

本节我们从基础出发,来研究ceph peering这一复杂的过程,期望对其工作原理有更深入的理解。

1. OSD中网络消息的处理

在前面的ceph网络通信章节中,我们介绍了SimpleMessenger网络通信框架。这里OSD实现了Dispatcher接口:

class OSD : public Dispatcher,
	    public md_config_obs_t {
};

下面我们来看其对Dispatcher接口各函数的具体实现。

1) ms_can_fast_dispatch()的实现

bool ms_can_fast_dispatch(Message *m) const {
	switch (m->get_type()) {
		case CEPH_MSG_OSD_OP:
		case MSG_OSD_SUBOP:
		case MSG_OSD_REPOP:
		case MSG_OSD_SUBOPREPLY:
		case MSG_OSD_REPOPREPLY:
		case MSG_OSD_PG_PUSH:
		case MSG_OSD_PG_PULL:
		case MSG_OSD_PG_PUSH_REPLY:
		case MSG_OSD_PG_SCAN:
		case MSG_OSD_PG_BACKFILL:
		case MSG_OSD_EC_WRITE:
		case MSG_OSD_EC_WRITE_REPLY:
		case MSG_OSD_EC_READ:
		case MSG_OSD_EC_READ_REPLY:
		case MSG_OSD_REP_SCRUB:
		case MSG_OSD_PG_UPDATE_LOG_MISSING:
		case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
			return true;
		default:
			return false;
	}
}

从上面我们看到,对于case中所列举的消息,是可以进行fast_dispatch()进行处理的。

2) ms_can_fast_dispatch_any()的实现

bool ms_can_fast_dispatch_any() const { return true; }

上面说明,会将OSD这个Dispatcher加入到Messenger的fast_dispatchers列表中的。

3) ms_fast_dispatch()的实现

void OSD::ms_fast_dispatch(Message *m)
{
	if (service.is_stopping()) {
		m->put();
		return;
	}

	OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
	{
		#ifdef WITH_LTTNG
		osd_reqid_t reqid = op->get_reqid();
		#endif
		tracepoint(osd, ms_fast_dispatch, reqid.name._type,
		reqid.name._num, reqid.tid, reqid.inc);
	}

	OSDMapRef nextmap = service.get_nextmap_reserved();
	Session *session = static_cast<Session*>(m->get_connection()->get_priv());
	if (session) {
		{
			Mutex::Locker l(session->session_dispatch_lock);
			update_waiting_for_pg(session, nextmap);
			session->waiting_on_map.push_back(op);
			dispatch_session_waiting(session, nextmap);
		}
		session->put();
	}

	service.release_map(nextmap);
}

从上面可以看出,其会调用dispatch_session_waiting()来进行处理:

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
	assert(session->session_dispatch_lock.is_locked());
	assert(session->osdmap == osdmap);
	for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
	    i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
	    session->waiting_on_map.erase(i++));
	
	if (session->waiting_on_map.empty()) {
		clear_session_waiting_on_map(session);
	} else {
		register_session_waiting_on_map(session);
	}
	session->maybe_reset_osdmap();
}

bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
{
	...
}

在dispatch_op_fast()函数中就会对ms_can_fast_dispatch()所指定的消息进行处理。

4) ms_fast_preprocess()的实现

void OSD::ms_fast_preprocess(Message *m)
{
	if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
		if (m->get_type() == CEPH_MSG_OSD_MAP) {
			MOSDMap *mm = static_cast<MOSDMap*>(m);
			Session *s = static_cast<Session*>(m->get_connection()->get_priv());
			if (s) {
				s->received_map_lock.lock();
				s->received_map_epoch = mm->get_last();
				s->received_map_lock.unlock();
				s->put();
			}
		}
	}
}

从上面的代码我们看到,对于来自于其他OSD发送过来的CEPH_MSG_OSD_MAP消息,则将session.received_map_epoch设置为收到的OSDMap中的最新版本。

5) ms_dispatch()的实现

bool OSD::ms_dispatch(Message *m)
{
	if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
		service.got_stop_ack();
		m->put();
		return true;
	}
	
	// lock!
	
	osd_lock.Lock();
	if (is_stopping()) {
		osd_lock.Unlock();
		m->put();
		return true;
	}
	
	while (dispatch_running) {
		dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
		dispatch_cond.Wait(osd_lock);
	}
	dispatch_running = true;
	
	do_waiters();
	_dispatch(m);
	do_waiters();
	
	dispatch_running = false;
	dispatch_cond.Signal();
	
	osd_lock.Unlock();
	
	return true;
}

ms_dispatch()函数实现对普通消息的分发处理。现在我们来看一下_dispatch()函数:

void OSD::_dispatch(Message *m)
{
	assert(osd_lock.is_locked());
	dout(20) << "_dispatch " << m << " " << *m << dendl;
	
	logger->set(l_osd_buf, buffer::get_total_alloc());
	logger->set(l_osd_history_alloc_bytes, SHIFT_ROUND_UP(buffer::get_history_alloc_bytes(), 20));
	logger->set(l_osd_history_alloc_num, buffer::get_history_alloc_num());
	logger->set(l_osd_cached_crc, buffer::get_cached_crc());
	logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
	
	switch (m->get_type()) {
	
		// -- don't need lock --
		case CEPH_MSG_PING:
			dout(10) << "ping from " << m->get_source() << dendl;
			m->put();
			break;
		
		// -- don't need OSDMap --
		
		// map and replication
		case CEPH_MSG_OSD_MAP:
			handle_osd_map(static_cast<MOSDMap*>(m));
			break;
		
		// osd
		case MSG_PGSTATSACK:
			handle_pg_stats_ack(static_cast<MPGStatsAck*>(m));
			break;
		
		case MSG_MON_COMMAND:
			handle_command(static_cast<MMonCommand*>(m));
			break;
		case MSG_COMMAND:
			handle_command(static_cast<MCommand*>(m));
			break;
		
		case MSG_OSD_SCRUB:
			handle_scrub(static_cast<MOSDScrub*>(m));
			break;
		
		// -- need OSDMap --
		
		default:
		{
			OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
			// no map?  starting up?
			if (!osdmap) {
				dout(7) << "no OSDMap, not booted" << dendl;
				logger->inc(l_osd_waiting_for_map);
				waiting_for_osdmap.push_back(op);
				op->mark_delayed("no osdmap");
				break;
			}
		
			// need OSDMap
			dispatch_op(op);
		}
	}
	
	logger->set(l_osd_buf, buffer::get_total_alloc());
	logger->set(l_osd_history_alloc_bytes, SHIFT_ROUND_UP(buffer::get_history_alloc_bytes(), 20));
	logger->set(l_osd_history_alloc_num, buffer::get_history_alloc_num());

}

void OSD::dispatch_op(OpRequestRef op)
{
	switch (op->get_req()->get_type()) {
	
		case MSG_OSD_PG_CREATE:
			handle_pg_create(op);
			break;
		case MSG_OSD_PG_NOTIFY:
			handle_pg_notify(op);
			break;
		case MSG_OSD_PG_QUERY:
			handle_pg_query(op);
			break;
		case MSG_OSD_PG_LOG:
			handle_pg_log(op);
			break;
		case MSG_OSD_PG_REMOVE:
			handle_pg_remove(op);
			break;
		case MSG_OSD_PG_INFO:
			handle_pg_info(op);
			break;
		case MSG_OSD_PG_TRIM:
			handle_pg_trim(op);
			break;
		case MSG_OSD_PG_MISSING:
			assert(0 =="received MOSDPGMissing; this message is supposed to be unused!?!");
			break;
		
		case MSG_OSD_BACKFILL_RESERVE:
			handle_pg_backfill_reserve(op);
			break;
		case MSG_OSD_RECOVERY_RESERVE:
			handle_pg_recovery_reserve(op);
			break;
	}
}

上面我们看到了最重要的对CEPH_MSG_OSD_MAP消息的处理。

6) ms_handle_connect()的实现

void OSD::ms_handle_connect(Connection *con)
{
	if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
		Mutex::Locker l(osd_lock);
		if (is_stopping())
			return;

		dout(10) << "ms_handle_connect on mon" << dendl;
	
		if (is_preboot()) {
			start_boot();
		} else if (is_booting()) {
			_send_boot();       // resend boot message
		} else {
			map_lock.get_read();
			Mutex::Locker l2(mon_report_lock);
	
			utime_t now = ceph_clock_now(NULL);
			last_mon_report = now;
	
			// resend everything, it's a new session
			send_alive();
			service.requeue_pg_temp();
			service.send_pg_temp();
			requeue_failures();
			send_failures();
			send_pg_stats(now);
	
			map_lock.put_read();
		}
	
		// full map requests may happen while active or pre-boot
		if (requested_full_first) {
			rerequest_full_maps();
		}
	}
}

OSD只会主动向Monitor以及其他OSD发起连接。这里处理向其他Monitor发起连接的请求回调。

7) ms_handle_fast_connect()的实现

void OSD::ms_handle_fast_connect(Connection *con)
{
	if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
		Session *s = static_cast<Session*>(con->get_priv());
		if (!s) {
			s = new Session(cct);
			con->set_priv(s->get());
			s->con = con;
			dout(10) << " new session (outgoing) " << s << " con=" << s->con
				<< " addr=" << s->con->get_peer_addr() << dendl;
			// we don't connect to clients
			assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
			s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
		}
		s->put();
	}
}

OSD只会主动向Monitor以及其他OSD发起连接。这里处理向其他OSD主动发起连接的回调请求。

8) ms_handle_accept()的实现

在OSD中没有对ms_handle_accept()函数进行重新实现。

9)ms_handle_fast_accept()的实现

bool ms_can_fast_dispatch_any() const { return true; }

void OSD::ms_handle_fast_accept(Connection *con)
{
	if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
		Session *s = static_cast<Session*>(con->get_priv());
		if (!s) {
			s = new Session(cct);
			con->set_priv(s->get());
			s->con = con;
			dout(10) << "new session (incoming)" << s << " con=" << con
			  << " addr=" << con->get_peer_addr()
			  << " must have raced with connect" << dendl;
			assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
			s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
		}
		s->put();
	}
}

由于ms_can_fast_dispatch_any()永远返回true,因此OSD会接受Monitor,RadosGw以及其他OSD的连接。通过上面的代码,我们看到当RadosGW和其他OSD向OSD发起新的连接时,会构造生成一个新的Session,将其作为connection的private值保存起来。

10) ms_handle_reset()的实现

bool OSD::ms_handle_reset(Connection *con)
{
	OSD::Session *session = (OSD::Session *)con->get_priv();
	dout(1) << "ms_handle_reset con " << con << " session " << session << dendl;
	if (!session)
		return false;
	session->wstate.reset(con);
	session->con.reset(NULL);  // break con <-> session ref cycle
	session_handle_reset(session);
	session->put();
	return true;
}

当连接被reset时,回调此函数清空该连接对应的session信息

11)ms_handle_remote_reset()的实现

void ms_handle_remote_reset(Connection *con) {}

上面对ms_handle_remote_reset()的实现为空。

12) ms_get_authorizer()的实现

bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
	dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
	
	if (dest_type == CEPH_ENTITY_TYPE_MON)
		return true;
	
	if (force_new) {
		/* the MonClient checks keys every tick(), so we should just wait for that cycle to get through */
		if (monc->wait_auth_rotating(10) < 0)
			return false;
	}
	
	*authorizer = monc->auth->build_authorizer(dest_type);
	return *authorizer != NULL;
}

上面可以看到其调用build_authorizer()来构建AuthAuthorizer。

13)ms_verify_authorizer()的实现

bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
			       int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
			       bool& isvalid, CryptoKey& session_key)
{
	...
}

实现对incomming连接的校验。

2. OSD模块消息

下面介绍一下OSD模块所使用到的一些消息:

  • CEPH_MSG_OSD_OP:客户端进行读写请求时会构造此消息,primary OSD会收到

  • MSG_OSD_REPOP:在进行数据修改操作时,Replicated OSD会收到此类消息,由Primary OSD发送

  • MSG_OSD_REPOPREPLY: 针对MSG_OSD_REPOP的响应

  • MSG_OSD_SUBOP:Primary与Replicas之间针对objects的一些内部操作,主要用于object recovery的时候。

  • MSG_OSD_SUBOPREPLY:针对MSG_OSD_SUBOP的响应

  • CEPH_OSD_OP_WRITE: 写部分对象

  • CEPH_OSD_OP_WRITEFULL: 写一个完整对象

2.1 ms_fast分发的消息类型

  • CEPH_MSG_OSD_OP

  • MSG_OSD_SUBOP

  • MSG_OSD_REPOP

  • MSG_OSD_SUBOPREPLY

  • MSG_OSD_REPOPREPLY

  • MSG_OSD_PG_PUSH

  • MSG_OSD_PG_PULL

  • MSG_OSD_PG_PUSH_REPLY

  • MSG_OSD_PG_SCAN

  • MSG_OSD_PG_BACKFILL

  • MSG_OSD_EC_WRITE

  • MSG_OSD_EC_WRITE_REPLY

  • MSG_OSD_EC_READ

  • MSG_OSD_EC_READ_REPLY

  • MSG_OSD_REP_SCRUB

  • MSG_OSD_PG_UPDATE_LOG_MISSING

  • MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY

2.2 普通分发的消息类型

  • MSG_OSD_MARK_ME_DOWN

  • CEPH_MSG_PING

  • CEPH_MSG_OSD_MAP

  • MSG_PGSTATSACK

  • MSG_MON_COMMAND

  • MSG_COMMAND

  • MSG_OSD_SCRUB

  • MSG_OSD_PG_CREATE

  • MSG_OSD_PG_NOTIFY

  • MSG_OSD_PG_QUERY

  • MSG_OSD_PG_LOG

  • MSG_OSD_PG_REMOVE

  • MSG_OSD_PG_INFO

  • MSG_OSD_PG_TRIM

  • MSG_OSD_PG_MISSING:

  • MSG_OSD_BACKFILL_RESERVE

  • MSG_OSD_RECOVERY_RESERVE

3. ms_fast_dispatch()代码分析

通过上文分析,我们知道对于有以下消息,会调用ms_fast_dispatch()来进行分发,下面我们来看一下该函数的实现:

void OSD::ms_fast_dispatch(Message *m)
{
	if (service.is_stopping()) {
		m->put();
		return;
	}

	OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
	{
		#ifdef WITH_LTTNG
		osd_reqid_t reqid = op->get_reqid();
		#endif
		tracepoint(osd, ms_fast_dispatch, reqid.name._type,
		reqid.name._num, reqid.tid, reqid.inc);
	}

	OSDMapRef nextmap = service.get_nextmap_reserved();
	Session *session = static_cast<Session*>(m->get_connection()->get_priv());
	if (session) {
		{
			Mutex::Locker l(session->session_dispatch_lock);
			update_waiting_for_pg(session, nextmap);
			session->waiting_on_map.push_back(op);
			dispatch_session_waiting(session, nextmap);
		}

		session->put();
	}
	service.release_map(nextmap);
}

首先我们这里调用service.get_nextmap_reserved()来获得nextmap,我们来看其实现:

class OSDService{
public:
	OSDMapRef osdmap;

	/*
	* osdmap - current published map
	* next_osdmap - pre_published map that is about to be published.
	*
	* We use the next_osdmap to send messages and initiate connections,
	* but only if the target is the same instance as the one in the map
	* epoch the current user is working from (i.e., the result is
	* equivalent to what is in next_osdmap).
	*
	* This allows the helpers to start ignoring osds that are about to
	* go down, and let OSD::handle_osd_map()/note_down_osd() mark them
	* down, without worrying about reopening connections from threads
	* working from old maps.
	*/
	OSDMapRef next_osdmap;

	/// gets ref to next_osdmap and registers the epoch as reserved
	OSDMapRef get_nextmap_reserved() {
		Mutex::Locker l(pre_publish_lock);
		if (!next_osdmap)
			return OSDMapRef();
	
		epoch_t e = next_osdmap->get_epoch();
		map<epoch_t, unsigned>::iterator i =map_reservations.insert(make_pair(e, 0)).first;
		i->second++;
	
		return next_osdmap;
	}

};

从上面的注释中我们看到,service.osdmap是指当前已经发布的最新的OSDMap,而service.next_osdmap是将要发布的OSDMap。我们会使用next_osdmap来发送消息和初始化连接,但前提是目标target与当前用户工作在相同的OSDMap epoch。

下面我们从代码中来看一下service.osdmap与service.next_osdmap的区别:

1) OSDService::publish_map()

查找publish_map()函数,发现只有两个地方调用:

  • OSD::init()时调用publish_map()
int OSD::init()
{
	...
	service.publish_map(osdmap);

	...

	consume_map();

	...
}

在OSD初始化时,首先加载superblock中所指定的OSDMap版本作为当前的初始化osdmap,然后再调用consume_map()来消费该osdmap,这可能触发启动时PG的第一次peering操作。

  • OSD::consume_map()

除了上面介绍的在OSD init阶段会触发调用consume_map(),还会在保存完一个新的OSDMap之后触发调用consume_map(),下面我们来看:

void OSD::consume_map()
{
	...

	service.pre_publish_map(osdmap);
	service.await_reserved_maps();
	service.publish_map(osdmap);

	...
}

从上面代码我们看出,要等到reserved_maps都消费完成之后,osdmap才会真正发布。如下图所示:

ceph-chapter10

2) OSDService::pre_publish_map()

查找pre_publish_map()函数发现也只有两个地方调用:

  • _committed_osd_maps()中调用pre_publish_map()
void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
{
	...

	// advance through the new maps
	for (epoch_t cur = first; cur <= last; cur++) {

		dout(10) << " advance to epoch " << cur<< " (<= last " << last<< " <= newest_map " << superblock.newest_map<< ")" << dendl;
	
		OSDMapRef newmap = get_map(cur);
		assert(newmap);  // we just cached it above!
		
		// start blacklisting messages sent to peers that go down.
		service.pre_publish_map(newmap);
	
		...
	}

	...
}

当接收到MOSDMap消息,如果有符合条件的新OSDMaps,则会将其打包到一个Transaction中,之后再将该Transaction持久化到硬盘上。当持久化成功,会回调_committed_osd_map()函数。如上代码所示,当前OSD会遍历MOSDMap消息中的所有新OSDMap,然后调用service.pre_publish_map()将去标记为预发布状态。

  • OSD::consume_map()中调用pre_publish_map

在_committed_osd_map()函数中还会调用OSD::consume_map():

void OSD::consume_map()
{
	...

	service.pre_publish_map(osdmap);
	service.await_reserved_maps();
	service.publish_map(osdmap);

	...
}

在consume_map()中会触发发布新接收到的OSDMap,之后再触发相应PG的peering操作。

3.1 几个相关变量

在进一步分析ms_fast_dispatch()之前,我们先来分析一下如下几个重要的变量:

struct Session : public RefCountedObject {
	list<OpRequestRef> waiting_on_map;

	map<spg_t, list<OpRequestRef> > waiting_for_pg;
};
class OSD : public Dispatcher,public md_config_obs_t {
public:
	set<Session*> session_waiting_for_map;
  	map<spg_t, set<Session*> > session_waiting_for_pg;

	list<OpRequestRef>  waiting_for_osdmap;
};

如下图所示:

ceph-chapter10

  • Session::waiting_on_map: 等待在OSDMap上的请求。通常来说,在peering过程中当发现要创建PG,或者Peering过程中发现该PG有分裂出子PG,则可能会把相关的一些请求放入到session对应的waiting_on_map中;
//处理peering event
void OSD::handle_pg_peering_evt(
  spg_t pgid,
  const pg_history_t& orig_history,
  pg_interval_map_t& pi,
  epoch_t epoch,
  PG::CephPeeringEvtRef evt)
{
	//
	wake_pg_waiters(pgid);
}


//PG分裂
void OSD::process_peering_events(
  const list<PG*> &pgs,
  ThreadPool::TPHandle &handle
  )
{
	...
	if (!split_pgs.empty()) {
		rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
		split_pgs.clear();
	}
}
  • Session::waiting_for_pg: 等待在指定PG上的请求。通常来说,当获取不到指定的PG时(比如当前并没有获得到最新的OSDMap,从而导致PG找不到),就会将请求放入到waiting_for_pg中;

  • OSD::session_waiting_for_map: 保存等待在OSDMap上的Session;

  • OSD::session_waiting_for_pg:保存等待在指定PG上的session。

  • OSD::waiting_for_osdmap: 保存等待在OSDMap上的请求

注:一般具有session状态、需要等待响应的消息会加入到session_waiting_for_map中;而无状态的需要等待osdmap的请求加入到waiting_for_osdmap中

3.1 函数update_waiting_for_pg()

void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
{
	assert(session->session_dispatch_lock.is_locked());
	if (!session->osdmap) {
		session->osdmap = newmap;
		return;
	}
	
	if (newmap->get_epoch() == session->osdmap->get_epoch())
		return;
	
	assert(newmap->get_epoch() > session->osdmap->get_epoch());
	
	map<spg_t, list<OpRequestRef> > from;
	from.swap(session->waiting_for_pg);
	
	for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();i != from.end();from.erase(i++)) {
		set<spg_t> children;

		if (!newmap->have_pg_pool(i->first.pool())) {
			// drop this wait list on the ground
			i->second.clear();
		} else {
			assert(session->osdmap->have_pg_pool(i->first.pool()));
			if (i->first.is_split(session->osdmap->get_pg_num(i->first.pool()), newmap->get_pg_num(i->first.pool()), &children)) {
				for (set<spg_t>::iterator child = children.begin(); child != children.end(); ++child) {

					unsigned split_bits = child->get_split_bits(newmap->get_pg_num(child->pool()));
					list<OpRequestRef> child_ops;

					OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);

					if (!child_ops.empty()) {
						session->waiting_for_pg[*child].swap(child_ops);
						register_session_waiting_on_pg(session, *child);
					}
				}
			}
		}

		if (i->second.empty()) {
			clear_session_waiting_on_pg(session, i->first);
		} else {
			session->waiting_for_pg[i->first].swap(i->second);
		}
	}
	
	session->osdmap = newmap;
}

主要处理有PG分裂情况下,更新session的waiting_for_pg。

注:PG的分裂会造成Monitor更新OSDMap

3.2 dispatch_session_waiting()函数

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
	assert(session->session_dispatch_lock.is_locked());
	assert(session->osdmap == osdmap);
	for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
	   i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap); session->waiting_on_map.erase(i++));
	
	if (session->waiting_on_map.empty()) {
		clear_session_waiting_on_map(session);
	} else {
		register_session_waiting_on_map(session);
	}
	session->maybe_reset_osdmap();
}

dispatch_session_waiting()就是将session上waiting_on_map里面的请求,调用dispatch_op_fast()转发出去。

4. ms_dispatch()分析

bool OSD::ms_dispatch(Message *m)
{
	if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
	service.got_stop_ack();
		m->put();
		return true;
	}
	
	// lock!
	
	osd_lock.Lock();
	if (is_stopping()) {
		osd_lock.Unlock();
		m->put();
		return true;
	}
	
	while (dispatch_running) {
		dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
		dispatch_cond.Wait(osd_lock);
	}
	dispatch_running = true;
	
	do_waiters();
	_dispatch(m);
	do_waiters();
	
	dispatch_running = false;
	dispatch_cond.Signal();
	
	osd_lock.Unlock();
	
	return true;
}

此函数用于分发非fast消息。我们来看,这里有一个osd_lock,这是一把十分大的锁。我们知道一个DispatchQueue会有dispatch_thread以及local_delivery_thread这两个线程来进行分发,这就存在竞争关系。OSD作为一个Dispatcher,使用osd_lock来保证同一时刻,只能有一个线程调用到此函数。

4.1 do_waiters()函数

do_waiters()主要用于处理当前阻塞在waiting_for_osdmap上的请求。比如有些请求需要new osdMap,那么就会先将这些请求放入waiting_for_osdmap上。然后在新的OSDMap准备好后,就会调用take_waiters()将其加入到finished列表中:

void OSD::activate_map(){
	...

	// process waiters
	take_waiters(waiting_for_osdmap);
}

void take_waiters(list<OpRequestRef>& ls) {
	finished_lock.Lock();
	finished.splice(finished.end(), ls);
	finished_lock.Unlock();
}

有如下两种情况会将请求加入到waiting_for_osdmap上:

  • 分发消息时没有OSDMap
void OSD::_dispatch(Message *m)
{
	switch (m->get_type()) {

	default:
		OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
		// no map?  starting up?
		if (!osdmap) {
			dout(7) << "no OSDMap, not booted" << dendl;
			logger->inc(l_osd_waiting_for_map);
			waiting_for_osdmap.push_back(op);
			op->mark_delayed("no osdmap");
			break;
		}
	}
}
  • 需要更新的OSDMap的请求
bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
				    bool is_fast_dispatch)
{
	Message *m = op->get_req();
	dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
	
	assert(osd_lock.is_locked());
	
	// do they have a newer map?
	if (epoch > osdmap->get_epoch()) {
		dout(7) << "waiting for newer map epoch " << epoch<< " > my " << osdmap->get_epoch() << " with " << m << dendl;
		wait_for_new_map(op);
		return false;
	}
	
	if (!require_self_aliveness(op->get_req(), epoch)) {
		return false;
	}
	
	// ok, our map is same or newer.. do they still exist?
	if (m->get_connection()->get_messenger() == cluster_messenger && !require_same_peer_instance(op->get_req(), osdmap, is_fast_dispatch)) {
		return false;
	}
	
	return true;
}

void OSD::wait_for_new_map(OpRequestRef op)
{
	// ask?
	if (waiting_for_osdmap.empty()) {
		osdmap_subscribe(osdmap->get_epoch() + 1, false);
	}
	
	logger->inc(l_osd_waiting_for_map);
	waiting_for_osdmap.push_back(op);
	op->mark_delayed("wait for new map");
}
4.2 _dispatch()分发消息

_dispatch()会分发如下消息,其中有一些不需要依赖OSDMap,另外一些则需要:

1) 无需OSDMap的消息

  • CEPH_MSG_PING

  • CEPH_MSG_OSD_MAP

  • MSG_MON_COMMAND

  • MSG_COMMAND

  • MSG_OSD_SCRUB

2) 需要OSDMap的消息

  • MSG_OSD_PG_CREATE

  • MSG_OSD_PG_NOTIFY:由PG stray发送到PG primary的通知消息(其中包含pginfo信息)。

注: Peering过程中,RecoveryCtx::send_notify()发送的就是此消息

  • MSG_OSD_PG_QUERY:replica(stray)上用于处理来自于primary的PG查询。

注:Peering过程中GetInfo就是通过发送此消息来查询的

  • MSG_OSD_PG_LOG:PG一个副本向另一个副本发送的PGLog信息。通常发生于Peering过程

  • MSG_OSD_PG_REMOVE

  • MSG_OSD_PG_INFO:PG的一个副本向另一个副本发送的PGInfo信息。

注:Peering过程中,发现有missing object时,就通过PG::search_for_missing()来发送此查询消息

  • MSG_OSD_PG_TRIM

  • MSG_OSD_PG_MISSING

  • MSG_OSD_BACKFILL_RESERVE

  • MSG_OSD_RECOVERY_RESERVE

5. OSD运行状态

下面我们简要介绍一下OSD运行中的几个状态,以便更好的理解peering:

ceph-chapter10



[参看]

  1. Ceph OSD

  2. Ceph pg分裂流程及可行性分析




Read More

PGInfo存在于PG的整个生命周期中,其在对象数据的写入、数据恢复、PG Peering过程中均发挥重要的作用。本章试图研究pg info在整个PG生命周期中的变化过程,从而对PG及PGInfo有一个更深入的理解。

class PG : DoutPrefixProvider {
public:
	// pg state
	pg_info_t        info;
};

Read More

PG的Peering过程是十分复杂的,相关的代码实现也相当冗长。这里我们从侧面出发,介绍一下PG源代码实现中的一些重要字段,弄清其含义及用途之后也有利于我们理解PG的Peering操作:

Read More