ceph peering机制再研究(1)
本节我们从基础出发,来研究ceph peering这一复杂的过程,期望对其工作原理有更深入的理解。
1. OSD中网络消息的处理
在前面的ceph网络通信章节中,我们介绍了SimpleMessenger网络通信框架。这里OSD实现了Dispatcher接口:
class OSD : public Dispatcher,
public md_config_obs_t {
};
下面我们来看其对Dispatcher接口各函数的具体实现。
1) ms_can_fast_dispatch()的实现
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
case CEPH_MSG_OSD_OP:
case MSG_OSD_SUBOP:
case MSG_OSD_REPOP:
case MSG_OSD_SUBOPREPLY:
case MSG_OSD_REPOPREPLY:
case MSG_OSD_PG_PUSH:
case MSG_OSD_PG_PULL:
case MSG_OSD_PG_PUSH_REPLY:
case MSG_OSD_PG_SCAN:
case MSG_OSD_PG_BACKFILL:
case MSG_OSD_EC_WRITE:
case MSG_OSD_EC_WRITE_REPLY:
case MSG_OSD_EC_READ:
case MSG_OSD_EC_READ_REPLY:
case MSG_OSD_REP_SCRUB:
case MSG_OSD_PG_UPDATE_LOG_MISSING:
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
return true;
default:
return false;
}
}
从上面我们看到,对于case中所列举的消息,是可以进行fast_dispatch()进行处理的。
2) ms_can_fast_dispatch_any()的实现
bool ms_can_fast_dispatch_any() const { return true; }
上面说明,会将OSD这个Dispatcher加入到Messenger的fast_dispatchers列表中的。
3) ms_fast_dispatch()的实现
void OSD::ms_fast_dispatch(Message *m)
{
if (service.is_stopping()) {
m->put();
return;
}
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
#endif
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
OSDMapRef nextmap = service.get_nextmap_reserved();
Session *session = static_cast<Session*>(m->get_connection()->get_priv());
if (session) {
{
Mutex::Locker l(session->session_dispatch_lock);
update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
dispatch_session_waiting(session, nextmap);
}
session->put();
}
service.release_map(nextmap);
}
从上面可以看出,其会调用dispatch_session_waiting()来进行处理:
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
session->waiting_on_map.erase(i++));
if (session->waiting_on_map.empty()) {
clear_session_waiting_on_map(session);
} else {
register_session_waiting_on_map(session);
}
session->maybe_reset_osdmap();
}
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
{
...
}
在dispatch_op_fast()函数中就会对ms_can_fast_dispatch()所指定的消息进行处理。
4) ms_fast_preprocess()的实现
void OSD::ms_fast_preprocess(Message *m)
{
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
if (m->get_type() == CEPH_MSG_OSD_MAP) {
MOSDMap *mm = static_cast<MOSDMap*>(m);
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (s) {
s->received_map_lock.lock();
s->received_map_epoch = mm->get_last();
s->received_map_lock.unlock();
s->put();
}
}
}
}
从上面的代码我们看到,对于来自于其他OSD发送过来的CEPH_MSG_OSD_MAP消息,则将session.received_map_epoch设置为收到的OSDMap中的最新版本。
5) ms_dispatch()的实现
bool OSD::ms_dispatch(Message *m)
{
if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
service.got_stop_ack();
m->put();
return true;
}
// lock!
osd_lock.Lock();
if (is_stopping()) {
osd_lock.Unlock();
m->put();
return true;
}
while (dispatch_running) {
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
dispatch_cond.Wait(osd_lock);
}
dispatch_running = true;
do_waiters();
_dispatch(m);
do_waiters();
dispatch_running = false;
dispatch_cond.Signal();
osd_lock.Unlock();
return true;
}
ms_dispatch()函数实现对普通消息的分发处理。现在我们来看一下_dispatch()函数:
void OSD::_dispatch(Message *m)
{
assert(osd_lock.is_locked());
dout(20) << "_dispatch " << m << " " << *m << dendl;
logger->set(l_osd_buf, buffer::get_total_alloc());
logger->set(l_osd_history_alloc_bytes, SHIFT_ROUND_UP(buffer::get_history_alloc_bytes(), 20));
logger->set(l_osd_history_alloc_num, buffer::get_history_alloc_num());
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
switch (m->get_type()) {
// -- don't need lock --
case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source() << dendl;
m->put();
break;
// -- don't need OSDMap --
// map and replication
case CEPH_MSG_OSD_MAP:
handle_osd_map(static_cast<MOSDMap*>(m));
break;
// osd
case MSG_PGSTATSACK:
handle_pg_stats_ack(static_cast<MPGStatsAck*>(m));
break;
case MSG_MON_COMMAND:
handle_command(static_cast<MMonCommand*>(m));
break;
case MSG_COMMAND:
handle_command(static_cast<MCommand*>(m));
break;
case MSG_OSD_SCRUB:
handle_scrub(static_cast<MOSDScrub*>(m));
break;
// -- need OSDMap --
default:
{
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
// no map? starting up?
if (!osdmap) {
dout(7) << "no OSDMap, not booted" << dendl;
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("no osdmap");
break;
}
// need OSDMap
dispatch_op(op);
}
}
logger->set(l_osd_buf, buffer::get_total_alloc());
logger->set(l_osd_history_alloc_bytes, SHIFT_ROUND_UP(buffer::get_history_alloc_bytes(), 20));
logger->set(l_osd_history_alloc_num, buffer::get_history_alloc_num());
}
void OSD::dispatch_op(OpRequestRef op)
{
switch (op->get_req()->get_type()) {
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
break;
case MSG_OSD_PG_NOTIFY:
handle_pg_notify(op);
break;
case MSG_OSD_PG_QUERY:
handle_pg_query(op);
break;
case MSG_OSD_PG_LOG:
handle_pg_log(op);
break;
case MSG_OSD_PG_REMOVE:
handle_pg_remove(op);
break;
case MSG_OSD_PG_INFO:
handle_pg_info(op);
break;
case MSG_OSD_PG_TRIM:
handle_pg_trim(op);
break;
case MSG_OSD_PG_MISSING:
assert(0 =="received MOSDPGMissing; this message is supposed to be unused!?!");
break;
case MSG_OSD_BACKFILL_RESERVE:
handle_pg_backfill_reserve(op);
break;
case MSG_OSD_RECOVERY_RESERVE:
handle_pg_recovery_reserve(op);
break;
}
}
上面我们看到了最重要的对CEPH_MSG_OSD_MAP消息的处理。
6) ms_handle_connect()的实现
void OSD::ms_handle_connect(Connection *con)
{
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
Mutex::Locker l(osd_lock);
if (is_stopping())
return;
dout(10) << "ms_handle_connect on mon" << dendl;
if (is_preboot()) {
start_boot();
} else if (is_booting()) {
_send_boot(); // resend boot message
} else {
map_lock.get_read();
Mutex::Locker l2(mon_report_lock);
utime_t now = ceph_clock_now(NULL);
last_mon_report = now;
// resend everything, it's a new session
send_alive();
service.requeue_pg_temp();
service.send_pg_temp();
requeue_failures();
send_failures();
send_pg_stats(now);
map_lock.put_read();
}
// full map requests may happen while active or pre-boot
if (requested_full_first) {
rerequest_full_maps();
}
}
}
OSD只会主动向Monitor以及其他OSD发起连接。这里处理向其他Monitor发起连接的请求回调。
7) ms_handle_fast_connect()的实现
void OSD::ms_handle_fast_connect(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << " new session (outgoing) " << s << " con=" << s->con
<< " addr=" << s->con->get_peer_addr() << dendl;
// we don't connect to clients
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
}
s->put();
}
}
OSD只会主动向Monitor以及其他OSD发起连接。这里处理向其他OSD主动发起连接的回调请求。
8) ms_handle_accept()的实现
在OSD中没有对ms_handle_accept()函数进行重新实现。
9)ms_handle_fast_accept()的实现
bool ms_can_fast_dispatch_any() const { return true; }
void OSD::ms_handle_fast_accept(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(cct);
con->set_priv(s->get());
s->con = con;
dout(10) << "new session (incoming)" << s << " con=" << con
<< " addr=" << con->get_peer_addr()
<< " must have raced with connect" << dendl;
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
}
s->put();
}
}
由于ms_can_fast_dispatch_any()永远返回true,因此OSD会接受Monitor,RadosGw以及其他OSD的连接。通过上面的代码,我们看到当RadosGW和其他OSD向OSD发起新的连接时,会构造生成一个新的Session,将其作为connection的private值保存起来。
10) ms_handle_reset()的实现
bool OSD::ms_handle_reset(Connection *con)
{
OSD::Session *session = (OSD::Session *)con->get_priv();
dout(1) << "ms_handle_reset con " << con << " session " << session << dendl;
if (!session)
return false;
session->wstate.reset(con);
session->con.reset(NULL); // break con <-> session ref cycle
session_handle_reset(session);
session->put();
return true;
}
当连接被reset时,回调此函数清空该连接对应的session信息
11)ms_handle_remote_reset()的实现
void ms_handle_remote_reset(Connection *con) {}
上面对ms_handle_remote_reset()的实现为空。
12) ms_get_authorizer()的实现
bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
if (force_new) {
/* the MonClient checks keys every tick(), so we should just wait for that cycle to get through */
if (monc->wait_auth_rotating(10) < 0)
return false;
}
*authorizer = monc->auth->build_authorizer(dest_type);
return *authorizer != NULL;
}
上面可以看到其调用build_authorizer()来构建AuthAuthorizer。
13)ms_verify_authorizer()的实现
bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key)
{
...
}
实现对incomming连接的校验。
2. OSD模块消息
下面介绍一下OSD模块所使用到的一些消息:
-
CEPH_MSG_OSD_OP:客户端进行读写请求时会构造此消息,primary OSD会收到
-
MSG_OSD_REPOP:在进行数据修改操作时,Replicated OSD会收到此类消息,由Primary OSD发送
-
MSG_OSD_REPOPREPLY: 针对MSG_OSD_REPOP的响应
-
MSG_OSD_SUBOP:Primary与Replicas之间针对objects的一些内部操作,主要用于object recovery的时候。
-
MSG_OSD_SUBOPREPLY:针对MSG_OSD_SUBOP的响应
-
CEPH_OSD_OP_WRITE: 写部分对象
-
CEPH_OSD_OP_WRITEFULL: 写一个完整对象
2.1 ms_fast分发的消息类型
-
CEPH_MSG_OSD_OP
-
MSG_OSD_SUBOP
-
MSG_OSD_REPOP
-
MSG_OSD_SUBOPREPLY
-
MSG_OSD_REPOPREPLY
-
MSG_OSD_PG_PUSH
-
MSG_OSD_PG_PULL
-
MSG_OSD_PG_PUSH_REPLY
-
MSG_OSD_PG_SCAN
-
MSG_OSD_PG_BACKFILL
-
MSG_OSD_EC_WRITE
-
MSG_OSD_EC_WRITE_REPLY
-
MSG_OSD_EC_READ
-
MSG_OSD_EC_READ_REPLY
-
MSG_OSD_REP_SCRUB
-
MSG_OSD_PG_UPDATE_LOG_MISSING
-
MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
2.2 普通分发的消息类型
-
MSG_OSD_MARK_ME_DOWN
-
CEPH_MSG_PING
-
CEPH_MSG_OSD_MAP
-
MSG_PGSTATSACK
-
MSG_MON_COMMAND
-
MSG_COMMAND
-
MSG_OSD_SCRUB
-
MSG_OSD_PG_CREATE
-
MSG_OSD_PG_NOTIFY
-
MSG_OSD_PG_QUERY
-
MSG_OSD_PG_LOG
-
MSG_OSD_PG_REMOVE
-
MSG_OSD_PG_INFO
-
MSG_OSD_PG_TRIM
-
MSG_OSD_PG_MISSING:
-
MSG_OSD_BACKFILL_RESERVE
-
MSG_OSD_RECOVERY_RESERVE
3. ms_fast_dispatch()代码分析
通过上文分析,我们知道对于有以下消息,会调用ms_fast_dispatch()来进行分发,下面我们来看一下该函数的实现:
void OSD::ms_fast_dispatch(Message *m)
{
if (service.is_stopping()) {
m->put();
return;
}
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
#endif
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
OSDMapRef nextmap = service.get_nextmap_reserved();
Session *session = static_cast<Session*>(m->get_connection()->get_priv());
if (session) {
{
Mutex::Locker l(session->session_dispatch_lock);
update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
dispatch_session_waiting(session, nextmap);
}
session->put();
}
service.release_map(nextmap);
}
首先我们这里调用service.get_nextmap_reserved()来获得nextmap,我们来看其实现:
class OSDService{
public:
OSDMapRef osdmap;
/*
* osdmap - current published map
* next_osdmap - pre_published map that is about to be published.
*
* We use the next_osdmap to send messages and initiate connections,
* but only if the target is the same instance as the one in the map
* epoch the current user is working from (i.e., the result is
* equivalent to what is in next_osdmap).
*
* This allows the helpers to start ignoring osds that are about to
* go down, and let OSD::handle_osd_map()/note_down_osd() mark them
* down, without worrying about reopening connections from threads
* working from old maps.
*/
OSDMapRef next_osdmap;
/// gets ref to next_osdmap and registers the epoch as reserved
OSDMapRef get_nextmap_reserved() {
Mutex::Locker l(pre_publish_lock);
if (!next_osdmap)
return OSDMapRef();
epoch_t e = next_osdmap->get_epoch();
map<epoch_t, unsigned>::iterator i =map_reservations.insert(make_pair(e, 0)).first;
i->second++;
return next_osdmap;
}
};
从上面的注释中我们看到,service.osdmap是指当前已经发布的最新的OSDMap,而service.next_osdmap是将要发布的OSDMap。我们会使用next_osdmap来发送消息和初始化连接,但前提是目标target与当前用户工作在相同的OSDMap epoch。
下面我们从代码中来看一下service.osdmap与service.next_osdmap的区别:
1) OSDService::publish_map()
查找publish_map()函数,发现只有两个地方调用:
- OSD::init()时调用publish_map()
int OSD::init()
{
...
service.publish_map(osdmap);
...
consume_map();
...
}
在OSD初始化时,首先加载superblock中所指定的OSDMap版本作为当前的初始化osdmap,然后再调用consume_map()来消费该osdmap,这可能触发启动时PG的第一次peering操作。
- OSD::consume_map()
除了上面介绍的在OSD init阶段会触发调用consume_map(),还会在保存完一个新的OSDMap之后触发调用consume_map(),下面我们来看:
void OSD::consume_map()
{
...
service.pre_publish_map(osdmap);
service.await_reserved_maps();
service.publish_map(osdmap);
...
}
从上面代码我们看出,要等到reserved_maps都消费完成之后,osdmap才会真正发布。如下图所示:
2) OSDService::pre_publish_map()
查找pre_publish_map()函数发现也只有两个地方调用:
- _committed_osd_maps()中调用pre_publish_map()
void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
{
...
// advance through the new maps
for (epoch_t cur = first; cur <= last; cur++) {
dout(10) << " advance to epoch " << cur<< " (<= last " << last<< " <= newest_map " << superblock.newest_map<< ")" << dendl;
OSDMapRef newmap = get_map(cur);
assert(newmap); // we just cached it above!
// start blacklisting messages sent to peers that go down.
service.pre_publish_map(newmap);
...
}
...
}
当接收到MOSDMap消息,如果有符合条件的新OSDMaps,则会将其打包到一个Transaction中,之后再将该Transaction持久化到硬盘上。当持久化成功,会回调_committed_osd_map()函数。如上代码所示,当前OSD会遍历MOSDMap消息中的所有新OSDMap,然后调用service.pre_publish_map()将去标记为预发布状态。
- OSD::consume_map()中调用pre_publish_map
在_committed_osd_map()函数中还会调用OSD::consume_map():
void OSD::consume_map()
{
...
service.pre_publish_map(osdmap);
service.await_reserved_maps();
service.publish_map(osdmap);
...
}
在consume_map()中会触发发布新接收到的OSDMap,之后再触发相应PG的peering操作。
3.1 几个相关变量
在进一步分析ms_fast_dispatch()之前,我们先来分析一下如下几个重要的变量:
struct Session : public RefCountedObject {
list<OpRequestRef> waiting_on_map;
map<spg_t, list<OpRequestRef> > waiting_for_pg;
};
class OSD : public Dispatcher,public md_config_obs_t {
public:
set<Session*> session_waiting_for_map;
map<spg_t, set<Session*> > session_waiting_for_pg;
list<OpRequestRef> waiting_for_osdmap;
};
如下图所示:
- Session::waiting_on_map: 等待在OSDMap上的请求。通常来说,在peering过程中当发现要创建PG,或者Peering过程中发现该PG有分裂出子PG,则可能会把相关的一些请求放入到session对应的waiting_on_map中;
//处理peering event
void OSD::handle_pg_peering_evt(
spg_t pgid,
const pg_history_t& orig_history,
pg_interval_map_t& pi,
epoch_t epoch,
PG::CephPeeringEvtRef evt)
{
//
wake_pg_waiters(pgid);
}
//PG分裂
void OSD::process_peering_events(
const list<PG*> &pgs,
ThreadPool::TPHandle &handle
)
{
...
if (!split_pgs.empty()) {
rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
split_pgs.clear();
}
}
-
Session::waiting_for_pg: 等待在指定PG上的请求。通常来说,当获取不到指定的PG时(比如当前并没有获得到最新的OSDMap,从而导致PG找不到),就会将请求放入到waiting_for_pg中;
-
OSD::session_waiting_for_map: 保存等待在OSDMap上的Session;
-
OSD::session_waiting_for_pg:保存等待在指定PG上的session。
-
OSD::waiting_for_osdmap: 保存等待在OSDMap上的请求
注:一般具有session状态、需要等待响应的消息会加入到session_waiting_for_map中;而无状态的需要等待osdmap的请求加入到waiting_for_osdmap中
3.1 函数update_waiting_for_pg()
void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
{
assert(session->session_dispatch_lock.is_locked());
if (!session->osdmap) {
session->osdmap = newmap;
return;
}
if (newmap->get_epoch() == session->osdmap->get_epoch())
return;
assert(newmap->get_epoch() > session->osdmap->get_epoch());
map<spg_t, list<OpRequestRef> > from;
from.swap(session->waiting_for_pg);
for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();i != from.end();from.erase(i++)) {
set<spg_t> children;
if (!newmap->have_pg_pool(i->first.pool())) {
// drop this wait list on the ground
i->second.clear();
} else {
assert(session->osdmap->have_pg_pool(i->first.pool()));
if (i->first.is_split(session->osdmap->get_pg_num(i->first.pool()), newmap->get_pg_num(i->first.pool()), &children)) {
for (set<spg_t>::iterator child = children.begin(); child != children.end(); ++child) {
unsigned split_bits = child->get_split_bits(newmap->get_pg_num(child->pool()));
list<OpRequestRef> child_ops;
OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
if (!child_ops.empty()) {
session->waiting_for_pg[*child].swap(child_ops);
register_session_waiting_on_pg(session, *child);
}
}
}
}
if (i->second.empty()) {
clear_session_waiting_on_pg(session, i->first);
} else {
session->waiting_for_pg[i->first].swap(i->second);
}
}
session->osdmap = newmap;
}
主要处理有PG分裂情况下,更新session的waiting_for_pg。
注:PG的分裂会造成Monitor更新OSDMap
3.2 dispatch_session_waiting()函数
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap); session->waiting_on_map.erase(i++));
if (session->waiting_on_map.empty()) {
clear_session_waiting_on_map(session);
} else {
register_session_waiting_on_map(session);
}
session->maybe_reset_osdmap();
}
dispatch_session_waiting()就是将session上waiting_on_map里面的请求,调用dispatch_op_fast()转发出去。
4. ms_dispatch()分析
bool OSD::ms_dispatch(Message *m)
{
if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
service.got_stop_ack();
m->put();
return true;
}
// lock!
osd_lock.Lock();
if (is_stopping()) {
osd_lock.Unlock();
m->put();
return true;
}
while (dispatch_running) {
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
dispatch_cond.Wait(osd_lock);
}
dispatch_running = true;
do_waiters();
_dispatch(m);
do_waiters();
dispatch_running = false;
dispatch_cond.Signal();
osd_lock.Unlock();
return true;
}
此函数用于分发非fast消息。我们来看,这里有一个osd_lock
,这是一把十分大的锁。我们知道一个DispatchQueue会有dispatch_thread以及local_delivery_thread这两个线程来进行分发,这就存在竞争关系。OSD作为一个Dispatcher,使用osd_lock来保证同一时刻,只能有一个线程调用到此函数。
4.1 do_waiters()函数
do_waiters()主要用于处理当前阻塞在waiting_for_osdmap上的请求。比如有些请求需要new osdMap,那么就会先将这些请求放入waiting_for_osdmap上。然后在新的OSDMap准备好后,就会调用take_waiters()将其加入到finished列表中:
void OSD::activate_map(){
...
// process waiters
take_waiters(waiting_for_osdmap);
}
void take_waiters(list<OpRequestRef>& ls) {
finished_lock.Lock();
finished.splice(finished.end(), ls);
finished_lock.Unlock();
}
有如下两种情况会将请求加入到waiting_for_osdmap上:
- 分发消息时没有OSDMap
void OSD::_dispatch(Message *m)
{
switch (m->get_type()) {
default:
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
// no map? starting up?
if (!osdmap) {
dout(7) << "no OSDMap, not booted" << dendl;
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("no osdmap");
break;
}
}
}
- 需要更新的OSDMap的请求
bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
bool is_fast_dispatch)
{
Message *m = op->get_req();
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
assert(osd_lock.is_locked());
// do they have a newer map?
if (epoch > osdmap->get_epoch()) {
dout(7) << "waiting for newer map epoch " << epoch<< " > my " << osdmap->get_epoch() << " with " << m << dendl;
wait_for_new_map(op);
return false;
}
if (!require_self_aliveness(op->get_req(), epoch)) {
return false;
}
// ok, our map is same or newer.. do they still exist?
if (m->get_connection()->get_messenger() == cluster_messenger && !require_same_peer_instance(op->get_req(), osdmap, is_fast_dispatch)) {
return false;
}
return true;
}
void OSD::wait_for_new_map(OpRequestRef op)
{
// ask?
if (waiting_for_osdmap.empty()) {
osdmap_subscribe(osdmap->get_epoch() + 1, false);
}
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("wait for new map");
}
4.2 _dispatch()分发消息
_dispatch()会分发如下消息,其中有一些不需要依赖OSDMap,另外一些则需要:
1) 无需OSDMap的消息
-
CEPH_MSG_PING
-
CEPH_MSG_OSD_MAP
-
MSG_MON_COMMAND
-
MSG_COMMAND
-
MSG_OSD_SCRUB
2) 需要OSDMap的消息
-
MSG_OSD_PG_CREATE
-
MSG_OSD_PG_NOTIFY:由PG stray发送到PG primary的通知消息(其中包含pginfo信息)。
注: Peering过程中,RecoveryCtx::send_notify()发送的就是此消息
- MSG_OSD_PG_QUERY:replica(stray)上用于处理来自于primary的PG查询。
注:Peering过程中GetInfo就是通过发送此消息来查询的
-
MSG_OSD_PG_LOG:PG一个副本向另一个副本发送的PGLog信息。通常发生于Peering过程
-
MSG_OSD_PG_REMOVE
-
MSG_OSD_PG_INFO:PG的一个副本向另一个副本发送的PGInfo信息。
注:Peering过程中,发现有missing object时,就通过PG::search_for_missing()来发送此查询消息
-
MSG_OSD_PG_TRIM
-
MSG_OSD_PG_MISSING
-
MSG_OSD_BACKFILL_RESERVE
-
MSG_OSD_RECOVERY_RESERVE
5. OSD运行状态
下面我们简要介绍一下OSD运行中的几个状态,以便更好的理解peering:
[参看]