本章介绍ceph的服务端OSD(书中简称OSD模块或者OSD)的实现。其对应的源代码在src/osd目录下。OSD模块是Ceph服务进程的核心实现,它实现了服务端的核心功能。本章先介绍OSD模块静态类图相关数据结构,再着重介绍服务端数据的写入和读取流程。

1. 读写操作的序列图

写操作的序列图如下图6-2所示:

ceph-chapter6-2

写操作分为三个阶段:

  • 阶段1:从函数ms_fast_dispatch()到函数op_wq.queue函数为止,其处理过程都在网络模块的回调函数中处理,主要检查当前OSD的状态,以及epoch是否一致;

  • 阶段2: 这个阶段在工作队列op_wq中的线程池里处理,在类ReplicatedPG里,其完成对PG的状态、对象的状态的检查,并把请求封装成事务。

  • 阶段3: 本阶段也是在工作队列op_wq中的线程池里处理,主要功能都在类ReplicatedBackend中实现。核心工作就是把封装好的事务通过网络分发到从副本上,最后调用本地FileStore的函数完成本地对象的数据写入。

通过上面,其实我们可以这样理解OSD、ReplicatedPG以及ReplicatedBackend三者之间的关系: OSD是PG操作的主入口,而ReplicatedPG对应于一个特定的PG,其调用ReplicatedBackend来完成Object读写消息的发送及事件的处理。

注: OSD进程的主入口为src/ceph_osd.cc。这里从RadosClient发送过来的对象读写消息都为CEPH_MSG_OSD_OP

从上面我们可以看到,PG写操作的核心逻辑为:

void ReplicatedPG::do_op()
{
	execute_ctx();
}

void ReplicatedPG::execute_ctx()
{
	prepare_transaction();
	issue_repop();

	submit_transaction();
	eval_repop();
}

2. 读写流程代码分析

在介绍了上述的数据结构和基本的流程之后,下面将从服务端接收到消息开始,分三个阶段具体分析读写的过程。

2.1 阶段1: 接收请求

读写请求都是从OSD::ms_fast_dispatch()开始,它是接收读写消息message的入口。下面从这里开始读写操作的分析。本阶段所有的函数是被网络模块的接收线程调用,所以理论上应该尽可能的简单,处理完成后交给后面的OSD模块的op_wq工作队列来处理。

1. ms_fast_dispatch
void OSD::ms_fast_dispatch(Message *m)
{
	if (service.is_stopping()) {
		m->put();
		return;
	}

	OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
	{
		#ifdef WITH_LTTNG
			osd_reqid_t reqid = op->get_reqid();
		#endif
		tracepoint(osd, ms_fast_dispatch, reqid.name._type,
		reqid.name._num, reqid.tid, reqid.inc);
	}

	OSDMapRef nextmap = service.get_nextmap_reserved();
	Session *session = static_cast<Session*>(m->get_connection()->get_priv());
	if (session) {
		{
			Mutex::Locker l(session->session_dispatch_lock);
			update_waiting_for_pg(session, nextmap);
			session->waiting_on_map.push_back(op);
			dispatch_session_waiting(session, nextmap);
		}
		session->put();
	}

	service.release_map(nextmap);
}

; 函数ms_fast_dispatch()为OSD注册了网络模块的回调函数,其被网络的接收线程调用,具体实现过程如下:

1) 首先检查service,如果已经停止了,就直接返回;

2) 调用函数op_tracker.create_request()把Message消息转换成OpRequest类型,数据结构OpRequest包装了Message,并添加了一些其他信息。

3) 获取nextmap(也就是最新的osdmap)和session,类Session保存了一个Connection的相关信息。

4) 调用函数update_waiting_for_pg来更新session里保存的OSDMap信息;

5) 把请求加入waiting_on_map的列表里;

6) 调用函数dispatch_session_waiting()处理,它循环调用函数dispatch_op_fast处理请求;

7) 如果session->waiting_on_map不为空,说明该session里还有等待osdmap的请求,把该session加入到session_waiting_for_map队列里(该队列表示正在等待最新OSDMap的session,当成功获取到最新的OSDMap,那么OSD的相关线程会将该session里的消息马上发送出去)。

2. dispatch_op_fast
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap);

该函数检查OSD目前的epoch是否最新:

1) 检查变量is_stopping,如果为true,说明当前OSD正处于停止过程中,返回true,这样就会直接删除掉该请求。

2) 调用函数op_required_epoch(op),从OpRequest中获取msg带的epoch,进行比较,如果该值大于OSD最新的epoch,则调用函数osdmap_subscribe()订阅新的OSDMap更新,返回false值,表明不应该把此请求从session->waiting_on_map里移除。

3) 否则,根据消息类型,调用相应的消息处理函数。本章只关注处理函数handle_op相关的流程。

3. handle_op
void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap);

该函数处理OSD相关的操作,其处理流程如下:

1) 首先调用op_is_discardable,检查该op是否可以丢弃;

2) 构建share_map结构体,获取client_session,从client_session获取last_sent_epoch,调用函数service.should_share_map()来设置share_map.should_send标志,该函数用于检查是否需要通知对方更新epoch值。这里和dispatch_op_fast的处理区别是:上次是更新自己,这里是通知对方更新。需要注意的是,client和OSD的epoch不一致,并不影响读写,只要epoch的变化不影响本次读写PG的OSD list变化。

3) 从消息里获取_pgid,再从_pgid里获取pool

// calc actual pgid
pg_t _pgid = m->get_pg();
int64_t pool = _pgid.pool();

4) 调用函数osdmap->raw_pg_to_pg,最终调用pg_pool_t::raw_pg_to_pg()函数,对PG做了调整

//参看: src/osd/OSDMap.h
pg_t OSDMap::raw_pg_to_pg(pg_t pg) const {
	map<int64_t,pg_pool_t>::const_iterator p = pools.find(pg.pool());
	assert(p != pools.end());
	return p->second.raw_pg_to_pg(pg);
}

/*
 * map a raw pg (with full precision ps) into an actual pg, for storage
 * 参看: src/osd/Osd_types.cc
 */
pg_t pg_pool_t::raw_pg_to_pg(pg_t pg) const
{
  pg.set_ps(ceph_stable_mod(pg.ps(), pg_num, pg_num_mask));
  return pg;
}

至于为什么要调用raw_pg_to_pg(),上面代码的注释也比较详细。

5) 调用osdmap->get_primary_shard()函数,获取该PG的主OSD

6) 调用函数get_pg_or_queue_for_pg(),通过pgid获取PG类的指针。如果获取成功,就调用函数enqueue_op处理请求;

7) 如果PG类的指针没有获取成功,做一些错误检查:

  a) send_map为空,client需要重试;

  b) 客户端的osdmap里没有当前的pool;

  c) 当前OSD的osdmap没有该pool,或者当前OSD不是该PG的主OSD

总结,这个函数主要检查了消息的源端epoch是否需要share,最主要的是获取读写请求相关的PG类后,下面就进入PG类的处理。

4. queue_op
void PG::queue_op(OpRequestRef& op)
{
	Mutex::Locker l(map_lock);

	if (!waiting_for_map.empty()) {
		// preserve ordering
		waiting_for_map.push_back(op);
		op->mark_delayed("waiting_for_map not empty");
		return;
	}

	if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), op)) {
		waiting_for_map.push_back(op);
		op->mark_delayed("op must wait for map");
		return;
	}

	op->mark_queued_for_pg();
	osd->op_wq.queue(make_pair(PGRef(this), op));
	{
		// after queue() to include any locking costs
		#ifdef WITH_LTTNG
			osd_reqid_t reqid = op->get_reqid();
		#endif
		tracepoint(pg, queue_op, reqid.name._type,
		reqid.name._num, reqid.tid, reqid.inc, op->rmw_flags);
	}
}

该函数实现如下:

1) 加map_lock锁,该锁保护waiting_for_map列表,判断waiting_for_map列表不为空,就把当前Op加入该列表,直接返回。waiting_for_map列表不为空,说明有操作在等待osdmap的更新,说明当前osdmap不信任,不能继续当前的处理。

2) 函数op_must_wait_for_map()判断当前的epoch是否大于Op的epoch,如果是,则必须加入waiting_for_map等待,等待更新PG当前的epoch值。

提示: 这里的osdmap的epoch的判断,是一个PG层的epoch的判断。和前面的判断不在一个层次,这里是需要等待的。

3) 最终,把请求加入OSD的op_wq处理队列里

总结,这个函数做在PG类里,做PG层面的相关检查,如果ok,就加入OSD的op_wq工作队列里继续处理。

2.2 阶段2: OSD的op_wq处理

op_wq是一个ShardedWQ类型的工作队列。以下操作都是在op_wq对应的线程池里调用做相应的处理。这里着重分析读写流程。

1. dequeue_op
/*
 * NOTE: dequeue called in worker thread, with pg lock
 */
void OSD::dequeue_op(
  PGRef pg, OpRequestRef op,
  ThreadPool::TPHandle &handle);

1) 检查如果op->send_map_update为true,也就是如果需要更新osdmap,就调用函数service.share_map()更新源端的osdmap信息。在函数OSD::handle_op()里,只在op->send_map_update里设置了是否需要share_map标记,这里才真正去发消息实现share操作。

2) 检查如果pg正在删除,就把本请求丢弃,直接返回;

3) 调用函数pg->do_request(op, handle)处理请求。

总之,本函数主要实现了使请求源端更新osdmap的操作,接下来在PG里调用do_request()来处理。

2. do_request()

本函数进入ReplicatedPG类来处理:

void ReplicatedPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle);

处理过程如下:

1) 调用函数can_discard_request()检查op是否可以直接丢弃掉;

2) 检查变量flushes_in_progress,如果还有flush操作,把op加入waiting_for_peered队列里,直接返回;

3) 如果PG还没有peered,调用函数can_handle_while_inactive()检查pgbackend能否处理该请求,如果可以,就调用pgbackend->handle_message()处理;否则加入waiting_for_peered队列,等待PG完成peering后再处理;

注意: PG处于inactive状态,pgbackend只能处理MSG_OSD_PG_PULL类型的消息。这种情况可能是: 本OSD可能已经不在该PG的acting osd列表中,但是可能在上一阶段该PG的OSD列表中,所以PG可能含有有效的对象数据,这些对象数据可以被该PG当前的主OSD拉取以修复当前PG的数据。

4) 此时PG处于peered并且flushes_in_progress未0的状态下,检查pgbackend能否处理该请求。pgbackend可以处理数据恢复过程中的PULL和PUSH请求,以及主副本发的从副本的更新相关SUBOP类型的请求。

5) 如果是CEPH_MSG_OSD_OP,检查该PG的状态,如果处于非active状态或者处于replay状态,则把请求添加到waiting_for_active等待队列;

6) 检查如果该pool是cache pool,而该操作没有带CEPH_FEATURE_OSD_CACHEPOOL标志,返回EOPNOTSUPP错误码;

7) 根据消息的类型,调用相应的处理函数来处理;

本函数开始进入ReplicatedPG层面来处理,主要检查当前PG的状态是否正常,是否可以处理请求。

3. do_op()

函数do_op()比较复杂,处理读写请求的状态检查和一些上下文的准备工作。其中大量的关于快照的处理,本章在遇到快照处理时只简单介绍一下。

void ReplicatedPG::do_op(OpRequestRef& op);

具体处理过程如下:

1) 调用函数m->finish_decode(),把消息带的数据从bufferlist中解析出相关的字段;

2) 调用osd->osd->init_op_flags()初始化op->rmw_flags,函数init_op_flags()根据flag来设置rmw_flags标志;

3) 如果是读操作:

  a) 如果消息里带有CEPH_OSD_FLAG_BALANCE_READS(平衡读)或者CEPH_OSD_FLAG_LOCALIZE_READS(本地读)标志,表明主副本都允许读。检查本OSD必须是该PG的primary或者replica之一;

  b) 如果没有上述标志,读操作只能读取主副本,本OSD必须是该PG的主OSD

4) 调用函数op_has_sufficient_caps()检查是否有相关的操作权限

5) 如果里面含有includes_pg_op操作,调用pg_op_must_wait()检查该操作是否需要等待。如果需要等待,加入waiting_for_all_missing队列;如果不需要等待,调用do_pg_op()处理PG相关的操作。这里的PG操作,都是CEPH_OSD_OP_PGLS等类似的PG相关的操作,需要确保该PG上没有需要修复的对象,否则ls列出的对象就不准确。

6) 构建要访问对象的head对象(head对象和快照对象的概念可查看后面介绍快照的章节)

hobject_t head(m->get_oid(), m->get_object_locator().key,
		 CEPH_NOSNAP, m->get_pg().ps(),
		 info.pgid.pool(), m->get_object_locator().nspace);

7) 检查对象的名字是否超长

// object name too long?
if (m->get_oid().name.size() > g_conf->osd_max_object_name_len) {
	dout(4) << "do_op name is longer than "
		<< g_conf->osd_max_object_name_len
		<< " bytes" << dendl;

	osd->reply_op_error(op, -ENAMETOOLONG);
	return;
}
if (m->get_object_locator().key.size() > g_conf->osd_max_object_name_len) {
	dout(4) << "do_op locator is longer than "
		<< g_conf->osd_max_object_name_len
		<< " bytes" << dendl;
	osd->reply_op_error(op, -ENAMETOOLONG);
	return;
}
if (m->get_object_locator().nspace.size() > g_conf->osd_max_object_namespace_len) {
	dout(4) << "do_op namespace is longer than "
		<< g_conf->osd_max_object_namespace_len
		<< " bytes" << dendl;
	osd->reply_op_error(op, -ENAMETOOLONG);
	return;
}

if (int r = osd->store->validate_hobject_key(head)) {
	dout(4) << "do_op object " << head << " invalid for backing store: "
		<< r << dendl;
	osd->reply_op_error(op, r);
	return;
}

8) 检查操作的客户端是否在黑名单(blacklist)中

9) 检查磁盘空间是否满

op->may_write();

10) 检查如果是写操作,并且是snap,返回EINVAL,快照不允许写操作。如果写操作带的数据大于osd_max_write_size(如果设置了),直接返回OSD_WRITETOOBIG错误。

// invalid?
if (m->get_snapid() != CEPH_NOSNAP) {
	osd->reply_op_error(op, -EINVAL);
	return;
}

// too big?
if (cct->_conf->osd_max_write_size && m->get_data_len() > cct->_conf->osd_max_write_size << 20) {
	// journal can't hold commit!
	derr << "do_op msg data len " << m->get_data_len()
	<< " > osd_max_write_size " << (cct->_conf->osd_max_write_size << 20)
	<< " on " << *m << dendl;
	osd->reply_op_error(op, -OSD_WRITETOOBIG);
	return;
}

可以看到,以上完成基本的与操作相关的参数检查。

11) 如果是顺序写,调用函数scrubber.write_blocked_by_scrub()检查:如果head对象正在进行scrub操作,就加入waiting_for_active队列,等待scrub操作完成后继续本次请求的处理。

if (write_ordered && scrubber.write_blocked_by_scrub(head, get_sort_bitwise())) {
	dout(20) << __func__ << ": waiting for scrub" << dendl;
	waiting_for_active.push_back(op);
	op->mark_delayed("waiting for scrub");
	return;
}

12) 检查head对象是否处于缺失状态(missing)需要恢复,调用函数wait_for_unreadable_object()把当前请求加入相应的队列里等待恢复完成

// missing object?
if (is_unreadable_object(head)) {
	wait_for_unreadable_object(head, op);
	return;
}

13) 如果是顺序写,检查head对象是否is_degraded_or_backfilling_object(),也就是正在恢复状态,需要调用wait_for_degraded_object()加入相应的队列等待

// degraded object?
if (write_ordered && is_degraded_or_backfilling_object(head)) {
	wait_for_degraded_object(head, op);
	return;
}

14) 检查head对象的特殊情况

  a) 检查队列objects_blocked_on_degraded_snap里如果保存有head对象,就需要等待。该队列里保存的head对象在rollback到某个版本的快照时,该版本的snap对象处于缺失状态,必须等待该snap对象恢复,从而完成rollback操作。因此,该队列的head对象目前处于缺失状态。

  b) 队列objects_blocked_on_snap_promotion里的对象标识head对象rollback到某个版本的快照时,该版本的快照对象在Cache pool层没有,需要到Data pool层获取。

如果head对象在上述的两个队列中,head对象都不能执行写操作,需要等待获取快照对象,完成rollback后才能写入。

可知,以上11~14步骤是构建并检查head对象的状态是否正常。

15) 如果是顺序写,检查该对象是否在objects_blocked_on_cache_full队列中,该队列中的对象因Cache pool层空间满而阻塞写操作

if (write_ordered && objects_blocked_on_cache_full.count(head)) {
    block_write_on_full_cache(head, op);
    return;
}

注意:当head对象被删除时,系统自动创建一个snapdir对象用来保存快照相关的信息。head对象和snapdir对象只能有一个存在,其都可以用来保存快照相关的信息。

16) 检查该对象的snapdir对象(如果存在)是否处于missing状态

17) 检查snapdir对象是否可读,如果不能读,就调用函数wait_for_unreadable_object()等待;

18) 如果是顺序写操作,调用函数is_degraded_or_backfilling_object()检查snapdir对象是否缺失;

19) 检查如果是CEPH_SNAPDIR类型的操作,则只能是读操作。snapdir对象只能读取;

20) 检查是否是客户端replay操作

21) 构建对象oid,这才是实际要操作的对象,可能是snap对象也可能是head对象

hobject_t oid(m->get_oid(),
		m->get_object_locator().key,
		m->get_snapid(),
		m->get_pg().ps(),
		m->get_object_locator().get_pool(),
		m->get_object_locator().nspace);

22) 调用函数检查maybe_await_blocked_snapset是否被block,检查该对象缓存的ObjectContext如果设置为blocked状态,该object有可能正在flush,或者copy(由于Cache Tier),暂时不能写,需要等待

// io blocked on obc?
if (!m->has_flag(CEPH_OSD_FLAG_FLUSH) &&
	maybe_await_blocked_snapset(oid, op)) {
	return;
}

23) 调用函数find_object_context()获取object_context,如果获取成功,需要检查oid的状态

int r = find_object_context(
	oid, &obc, can_create,
	m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
	&missing_oid);

24) 如果hit_set不为空,就需要设置hit_set。hit_setagent_state都是Cache tier的机制,hit_set记录了cachepool中对象是否命中,暂时不深入分析;

25) 如果agent_state不为空,就调用函数agent_choose_mode()设置agent的状态,调用函数maybe_handle_cache来处理,如果可以处理,就返回

if (agent_state) {
	if (agent_choose_mode(false, op))
		return;
}

if (maybe_handle_cache(op,
		write_ordered,
		obc,
		r,
		missing_oid,
		false,
		in_hit_set))
	return;

26) 获取object_locator,验证是否和msg里的相同

// make sure locator is consistent
object_locator_t oloc(obc->obs.oi.soid);
if (m->get_object_locator() != oloc) {
	dout(10) << " provided locator " << m->get_object_locator() 
		<< " != object's " << obc->obs.oi.soid << dendl;
		osd->clog->warn() << "bad locator " << m->get_object_locator() 
		<< " on object " << oloc
		<< " op " << *m << "\n";
}

27) 检查该对象是否被阻塞

// io blocked on obc?
if (obc->is_blocked() && !m->has_flag(CEPH_OSD_FLAG_FLUSH)) {
	wait_for_blocked_object(obc->obs.oi.soid, op);
	return;
}

28) 获取src_obc,也就是src_oid对应的ObjectContext: 同样的方法,对src_oid做各种状态检查,然后调用find_object_context()函数获取ObjectContext。

29) 如果是操作对象snapdir,相关的操作就需要所有的clone对象,获取clone对象的ObjectContext。对每个clone对象,调用get_object_context()构建ObjectContext,并把它加入到src_objs中。

30) 创建opContext

OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);

31) 调用execute_ctx(ctx);

总之,do_op()主要检查相关对象的(head对象、snapdir对齐、src对象等)状态是否正常,并获取ObjectContext、OpContext相关的上下文信息。

4. get_object_context()

本函数获取一个对象的ObjectContext信息:

ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,       //soid要获取的对象
						  bool can_create,                                     //是否允许创建新的ObjectContext
						  map<string, bufferlist> *attrs);                     //attrs对象的属性

关键是从属性OI_ATTR中获取object_info_t信息。具体过程如下:

1) 首先从LRU缓存object_contexts中获取该对象的ObjectContext,如果获取成功,就直接返回结果;

2) 如果从LRU cache没有查找到:

  a) 如果参数attrs值不为空,就从attrs里获取OI_ATTR的属性值;

  b) 否则,调用函数pgbackend->objects_get_attr()获取该对象的OI_ATTR属性值。如果获取失败,并且不允许创建,就直接返回ObjectContextRef()的空值。

3) 如果成功获取OI_ATTR属性值,就从该属性值中decode后获取object_info_t的值;

4) 调用get_snapset_context获取SnapSetContext

5) 调用相关函数设置obc相关的参数,并返回obc

5. get_snapset_context()

本函数获取对象的snapset_context结构,其过程和函数get_object_context()类似。具体实现如下:

SnapSetContext *ReplicatedPG::get_snapset_context(
  const hobject_t& oid,
  bool can_create,
  map<string, bufferlist> *attrs,
  bool oid_existed);

1) 首先从LRU缓存snapset_contexts获取该对象的snapset_context,如果成功,直接返回结果;

2) 如果不存在,并且can_create,就调用pgbackend->objects_get_attr()函数获取SS_ATTR属性。只有head对象或者snapdir对象保存有SS_ATTR属性,如果head对象不存在,就获取snapdir对象的SS_ATTR属性值,根据获得的值,decode后获得SnapsetContext结构;

6. find_object_context()

本函数查找对象的object_context,这里需要理解snapshot相关的知识。根据snap_seq正确与否获取相应的clone对象,然后获取相应的object_context:

/*
 * If we return an error, and set *pmissing, then promoting that
 * object may help.
 *
 * If we return -EAGAIN, we will always set *pmissing to the missing
 * object to wait for.
 *
 * If we return an error but do not set *pmissing, then we know the
 * object does not exist.
 */
int ReplicatedPG::find_object_context(const hobject_t& oid,      //要查找的对象
				      ObjectContextRef *pobc,                    //输出对象的ObjectContext
				      bool can_create,                           //是否需要创建
				      bool map_snapid_to_clone,                  //映射snapid到clone对象
				      hobject_t *pmissing);                      //如果对象不存在,返回缺失的对象

参数map_snapid_to_clone指该snap是否可以直接对应一个clone对象,也就是snap对象的snap_id在SnapSet的clones列表中。

1) 如果是head对象,就调用函数get_object_context()获取head对象的ObjectContext,如果失败,设置head对象为pmissing对象,返回-ENOENT;如果获取成功,返回0;

2) 如果是snapdir对象,先获取head对象的ObjectContext,如果失败,继续获取snapdir对象的ObjectContext,如果失败,返回-ENOENT;如果成功,返回0;

3) 如果非map_snapid_to_clone并且该snap已经标记删除了,就直接返回-ENOENT,pmissing为空,意味着该对象确实不存在。

// we want a snap
if (!map_snapid_to_clone && pool.info.is_removed_snap(oid.snap)) {
	dout(10) << __func__ << " snap " << oid.snap << " is removed" << dendl;
	return -ENOENT;
}

4) 调用函数get_snapset_context()来获取SnapSetContext,如果不存在,设置pmissing为head对象,返回-ENOENT。

SnapSetContext *ssc = get_snapset_context(oid, can_create);
if (!ssc || !(ssc->exists || can_create)) {
	dout(20) << __func__ << " " << oid << " no snapset" << dendl;
	if (pmissing)
		*pmissing = head;  // start by getting the head
	if (ssc)
		put_snapset_context(ssc);
	return -ENOENT;
}

5) 如果是map_snapid_to_clone:

  a) 如果oid.snap大于ssc->snapset.seq,说明该snap是最新做的快照,osd端还没有完成相关的信息更新,直接返回head对象object_context,如果head对象存在,就返回0,否则返回-ENOENT。

  b) 否则,直接检查SnapSet的clones列表,如果没有,就直接返回-ENOENT.

  c) 如果找到,检查对象如果处于missing, pmissing就设置为该clone对象,返回-EAGAIN。如果没有,就获取该clone对象的object_context。

6) 如果不是map_snapid_to_clone,就不能从snap_id直接获取clone对象,需要根据snaps和clones列表,计算snap_id对应的clone对象:

  a) 如果oid.snap > ssc->snapset.seq,获取head对象的ObjectContext;

  b) 计算oid.snap首次大于ssc->snapset.clones列表中的clone对象,就是oid对应的clone对象;

  c) 检查该clone对象如果missing,设置pmissing为该clone对象,返回-EAGAIN。

  d) 获取该clone对象的ObjectContext;

  e) 最后检查该clone对象如果是在first到last之间,这是合理情况,返回0;否则,就是异常情况,返回-ENOENT。

本函数是获取实际对象的ObjectContext,如果不是head对象,就需要获取快照对象实际对应的clone对象的ObjectContext。

7. execute_ctx()

在do_op()函数里,做了大量的对象状态的检查和上下文相关信息的获取,本函数执行相关的操作:

void ReplicatedPG::execute_ctx(OpContext *ctx);

处理过程如下:

1) 首先在OpContext中创建一个新的事务,该事务为pgbackend定义的事务

// this method must be idempotent since we may call it several times
// before we finally apply the resulting transaction.
ctx->op_t.reset(pgbackend->get_transaction());

2) 如果是写操作,更新ctx->snapc值。ctx->snapc值保存了该操作的客户端附带的快照相关信息:

  a) 如果是给整个pool的快照操作,就设置ctx->snapc等于pool.snapc的值;

  b) 如果是用户特定快照(目前只有rbd实现),ctx->snapc值就设置为消息带的相关信息:

// client specified snapc
ctx->snapc.seq = m->get_snap_seq();
ctx->snapc.snaps = m->get_snaps();

  c) 如果设置了CEPH_OSD_FLAG_ORDERSNAP标志,客户端的snap_seq比服务端的小,就直接返回-EOLDSNAPC错误码。

3) 如果是read操作,该对象的ObjectContext加ondisk_read_lock锁;对于源对象,无论读写操作,都需要加ondisk_read_lock锁。

if (op->may_read()) {
	dout(10) << " taking ondisk_read_lock" << dendl;
	obc->ondisk_read_lock();
}

提示: 所谓源对象,就是一个操作中带两个对象,比如copy操作,源对象会有读操作。

4) 调用函数prepare_transaction()把相关的操作封装到ctx->op_t的事务中。如果是读操作,对于replicate类型,该函数直接调用pgbackend->objects_read_sync同步读取数据;如果是EC,就把请求加入pending_async_reads完成异步读取操作

int result = prepare_transaction(ctx);

5) 解除操作3)中加的相关锁;

6) 如果是读操作,并且ctx->pending_async_reads为空,说明是同步读取,调用complete_read_ctx完成读取操作,给客户端返回应答消息。如果是异步读取,就调用函数ctx->start_async_reads()完成异步读取。读操作到这里就结束,后续都是写操作的流程。

// read or error?
if (ctx->op_t->empty() || result < 0) {
	// finish side-effects
	if (result == 0)
		do_osd_op_effects(ctx, m->get_connection());

	if (ctx->pending_async_reads.empty()) {
		complete_read_ctx(result, ctx);
	} else {
		in_progress_async_reads.push_back(make_pair(op, ctx));
		ctx->start_async_reads(this);
	}

	return;
}

7) 调用calc_trim_to(),计算需要trim的pg log的版本

// trim log?
calc_trim_to();

8) 调用函数issue_repop()向各个副本发送同步操作请求;

9) 调用函数eval_repop(),检查发向各个副本的同步操作是否已经reply成功,做相应的操作。

从上可以看出,execute_ctx()操作把相关的操作打包成事务,并没有真正的对对象的数据做修改。

8. calc_trim_to()

本函数用于计算是否应该将旧的pg log日志进行trim操作:

void ReplicatedPG::calc_trim_to()
{
  size_t target = cct->_conf->osd_min_pg_log_entries;
  if (is_degraded() ||
      state_test(PG_STATE_RECOVERING |
		 PG_STATE_RECOVERY_WAIT |
		 PG_STATE_BACKFILL |
		 PG_STATE_BACKFILL_WAIT |
		 PG_STATE_BACKFILL_TOOFULL)) {
    target = cct->_conf->osd_max_pg_log_entries;
  }

  if (min_last_complete_ondisk != eversion_t() &&
      min_last_complete_ondisk != pg_trim_to &&
      pg_log.get_log().approx_size() > target) {
    size_t num_to_trim = pg_log.get_log().approx_size() - target;
    if (num_to_trim < cct->_conf->osd_pg_log_trim_min) {
      return;
    }
    list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
    eversion_t new_trim_to;
    for (size_t i = 0; i < num_to_trim; ++i) {
      new_trim_to = it->version;
      ++it;
      if (new_trim_to > min_last_complete_ondisk) {
	new_trim_to = min_last_complete_ondisk;
	dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
	break;
      }
    }
    dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
    pg_trim_to = new_trim_to;
    assert(pg_trim_to <= pg_log.get_head());
    assert(pg_trim_to <= min_last_complete_ondisk);
  }
}

处理过程如下:

1) 首先计算target值: target值为最少保留的日志条数,默认设置为配置项cct->_conf->osd_min_pg_log_entries的值。如果pg处于degraded,或者正在修复的状态,target值为cct->_conf->osd_max_pg_log_entries(默认10000条)

2) 变量min_last_complete_ondisk为本PG在本OSD上完成的最后一条日志记录的版本。如果它不为空,且不等于pg_trim_to,当前pg log的size大于target值,就计算需要trim掉的日志条数:

  a) num_to_trim为日志总数目减去target,如果它小于日志一次trim的最小值cct->_conf->osd_pg_log_trim_min,就返回;

  b) 否则,从日志头开始计算最新的pg_trim_to版本。

9. prepare_transaction()

本函数用于把相关的更新操作打包为事务,包括比较复杂的部分为对象的snapshot的处理:

int ReplicatedPG::prepare_transaction(OpContext *ctx);

处理过程如下:

1) 首先调用调用函数ctx->snapc.is_valid()来验证SnapSet的有效性;

2) 调用函数do_osd_ops()打包请求到ctx->op_t的transaction中

int ReplicatedPG::prepare_transaction(OpContext *ctx){

	....	

	// prepare the actual mutation
	int result = do_osd_ops(ctx, ctx->ops);
	if (result < 0)
		return result;
	
	...
}
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
	...
	PGBackend::PGTransaction* t = ctx->op_t.get();

	for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
		...

		switch(op.op){
			...
			case CEPH_OSD_OP_WRITE:

		}
	}
}

do_osd_ops()函数CEPH_OSD_OP_WRITE实现对写请求的封装。这里我们看到是将要写入的对象数据封装到了ctx->opt_t这样一个Transaction中了。

3) 如果事务为空,或者没有修改操作,就直接返回result

// read-op?  done?
if (ctx->op_t->empty() && !ctx->modify) {
	unstable_stats.add(ctx->delta_stats);
	return result;
}

4) 检查磁盘空间是否满

// check for full
if ((ctx->delta_stats.num_bytes > 0 ||
		ctx->delta_stats.num_objects > 0) &&  // FIXME: keys?
		(pool.info.has_flag(pg_pool_t::FLAG_FULL) ||
		get_osdmap()->test_flag(CEPH_OSDMAP_FULL))) {
	MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
	if (ctx->reqid.name.is_mds() ||   // FIXME: ignore MDS for now
		m->has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
		dout(20) << __func__ << " full, but proceeding due to FULL_FORCE or MDS"<< dendl;
	} else if (m->has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
		// they tried, they failed.
		dout(20) << __func__ << " full, replying to FULL_TRY op" << dendl;
		return pool.info.has_flag(pg_pool_t::FLAG_FULL) ? -EDQUOT : -ENOSPC;
	} else {
		// drop request
		dout(20) << __func__ << " full, dropping request (bad client)" << dendl;
		return -EAGAIN;
	}
}

5) 如果该对象是head对象,就有相关快照对象COW机制的操作,需要调用函数make_writable()来完成,在关于快照的介绍中会详细介绍到。

// clone, if necessary
if (soid.snap == CEPH_NOSNAP)
	make_writeable(ctx);

6) 调用函数finish_ctx来完成后续处理,该函数主要完成了快照相关的处理。如果head对象存在,就删除snapdir对象;如果不存在,就创建snapdir对象,用来保存快照相关的信息。此外,这里很重要的一个步骤是涉及到PGLog的处理:

int ReplicatedPG::prepare_transaction(OpContext *ctx)
{
	finish_ctx(ctx,
		ctx->new_obs.exists ? pg_log_entry_t::MODIFY :
		pg_log_entry_t::DELETE);
}
void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc,
			      bool scrub_ok){
	...
	// append to log
	ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
		ctx->obs->oi.version,
		ctx->user_at_version, ctx->reqid,
		ctx->mtime));

	if (soid.snap < CEPH_NOSNAP) {
		switch (log_op_type) {
		case pg_log_entry_t::MODIFY:
		case pg_log_entry_t::PROMOTE:
		case pg_log_entry_t::CLEAN:
			dout(20) << __func__ << " encoding snaps " << ctx->new_obs.oi.snaps << dendl;
			::encode(ctx->new_obs.oi.snaps, ctx->log.back().snaps);
			break;
		default:
			break;
		}
	}
	...
}

//src/osd/osd_types.h
struct pg_log_entry_t {
	enum {
		MODIFY = 1,       // some unspecified modification (but not *all* modifications)
		CLONE = 2,        // cloned object from head
		DELETE = 3,      // deleted object
		BACKLOG = 4,     // event invented by generate_backlog [deprecated]
		LOST_REVERT = 5, // lost new version, revert to an older version.
		LOST_DELETE = 6, // lost new version, revert to no object (deleted).
		LOST_MARK = 7,   // lost new version, now EIO
		PROMOTE = 8,     // promoted object from another tier
		CLEAN = 9,       // mark an object clean
	};

	// describes state for a locally-rollbackable entry
	ObjectModDesc mod_desc;
	bufferlist snaps;                                        // only for clone entries
	hobject_t  soid;
	osd_reqid_t reqid;                                       // caller+tid to uniquely identify request
	vector<pair<osd_reqid_t, version_t> > extra_reqids;
	eversion_t version, prior_version, reverting_to;
	version_t user_version;                                 // the user version for this entry
	utime_t     mtime;                                      // this is the _user_ mtime, mind you
	
	__s32      op;
	bool invalid_hash;                                     // only when decoding sobject_t based entries
	bool invalid_pool;                                     // only when decoding pool-less hobject based entries

	pg_log_entry_t()
		: user_version(0), op(0),
		invalid_hash(false), invalid_pool(false) {}

	pg_log_entry_t(int _op, const hobject_t& _soid,
		const eversion_t& v, const eversion_t& pv,
		version_t uv,
		const osd_reqid_t& rid, const utime_t& mt)
		: soid(_soid), reqid(rid), version(v), prior_version(pv), user_version(uv),
		mtime(mt), op(_op), invalid_hash(false), invalid_pool(false)
		{}
};

PGLog在ceph的数据一致性方面非常重要。从上面可以看到,对于每一次的数据写操作,均会构造一个pg_log_entry_t对象将其放入ctx->log中。后续在ReplicatedBackend::submit_transaction()中,实际上会向ctx->log这一vector中的日志打包放入ctx->opt_t中的。注意: 这里PGLog与实际的对象数据是存在于同一个transaction中的。

10. issue_repop()
void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
{
  const hobject_t& soid = ctx->obs->oi.soid;
  dout(7) << "issue_repop rep_tid " << repop->rep_tid
          << " o " << soid
          << dendl;

  repop->v = ctx->at_version;
  if (ctx->at_version > eversion_t()) {
    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
	 i != actingbackfill.end();
	 ++i) {
      if (*i == get_primary()) continue;
      pg_info_t &pinfo = peer_info[*i];
      // keep peer_info up to date
      if (pinfo.last_complete == pinfo.last_update)
	pinfo.last_complete = ctx->at_version;
      pinfo.last_update = ctx->at_version;
    }
  }

  ctx->obc->ondisk_write_lock();
  if (ctx->clone_obc)
    ctx->clone_obc->ondisk_write_lock();

  bool unlock_snapset_obc = false;
  if (ctx->snapset_obc && ctx->snapset_obc->obs.oi.soid !=
      ctx->obc->obs.oi.soid) {
    ctx->snapset_obc->ondisk_write_lock();
    unlock_snapset_obc = true;
  }

  ctx->apply_pending_attrs();

  if (pool.info.require_rollback()) {
    for (vector<pg_log_entry_t>::iterator i = ctx->log.begin();
	 i != ctx->log.end();
	 ++i) {
      assert(i->mod_desc.can_rollback());
      assert(!i->mod_desc.empty());
    }
  }

  Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
  Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
    ctx->obc,
    ctx->clone_obc,
    unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef());
  pgbackend->submit_transaction(
    soid,
    ctx->at_version,
    std::move(ctx->op_t),
    pg_trim_to,
    min_last_complete_ondisk,
    ctx->log,
    ctx->updated_hset_history,
    onapplied_sync,
    on_all_applied,
    on_all_commit,
    repop->rep_tid,
    ctx->reqid,
    ctx->op);
}

本函数的处理过程如下:

1) 首先更新actingbackfill的osd对应的peer_info的相关信息:如果pinfo.last_update和pinfo.last_complete二者相等,说明该peer的状态处于clean状态,就同时更新二者,否则只更新pinfo.last_update值。(注:通过pinfo.last_update、pinfo.last_complete来反应当前PG的状态信息)

2) 对该对象的ObjectContext的ondisk_write_lock加写锁,如果有clone对象,对该clone对象的ObjectContext的ondisk_write_lock加写锁。如果snapset_obc不为空,也就是可能创建或者删除snapdir对象,对该ObjectContext的ondisk_write_lock加锁。

3) 如果pool是可以rollback的(也就是ErasureCode模式),检查pg log也应该支持rollback操作;

4) 分别设置三个回调Context,调用函数pgbackend->submit_transaction()来完成事务向从OSD的发送;

本函数调用pgbackend的submit_transaction()函数向从osd开始发送操作日志。

2.3 阶段3: PGBackend的处理

PGBackend为PG的更新操作增加了一层与PG类型相关的实现。对于Replicate类型的PG由类ReplicatedBackend实现。其核心处理过程是把封装好的事务分发到该PG对应的其他从OSD上;对于ErasureCode类型的PG由类ECBackend实现,其核心处理过程为主chunk向各个分片chunk分发数据的过程。下面着重介绍Replicate的处理方式。

ReplicatedBackend::submit_transaction()函数最终调用网络接口,把更新请求发送给从OSD:

void ReplicatedBackend::submit_transaction(
  const hobject_t &soid,
  const eversion_t &at_version,
  PGTransactionUPtr &&_t,
  const eversion_t &trim_to,
  const eversion_t &trim_rollback_to,
  const vector<pg_log_entry_t> &log_entries,
  boost::optional<pg_hit_set_history_t> &hset_history,
  Context *on_local_applied_sync,
  Context *on_all_acked,
  Context *on_all_commit,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  OpRequestRef orig_op)
{
  std::unique_ptr<RPGTransaction> t(
    static_cast<RPGTransaction*>(_t.release()));
  assert(t);
  ObjectStore::Transaction op_t = t->get_transaction();

  assert(t->get_temp_added().size() <= 1);
  assert(t->get_temp_cleared().size() <= 1);

  assert(!in_progress_ops.count(tid));
  InProgressOp &op = in_progress_ops.insert(
    make_pair(
      tid,
      InProgressOp(
	tid, on_all_commit, on_all_acked,
	orig_op, at_version)
      )
    ).first->second;

  op.waiting_for_applied.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());
  op.waiting_for_commit.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());


  issue_op(
    soid,
    at_version,
    tid,
    reqid,
    trim_to,
    trim_rollback_to,
    t->get_temp_added().empty() ? hobject_t() : *(t->get_temp_added().begin()),
    t->get_temp_cleared().empty() ?
      hobject_t() : *(t->get_temp_cleared().begin()),
    log_entries,
    hset_history,
    &op,
    op_t);

  if (!(t->get_temp_added().empty())) {
    add_temp_objs(t->get_temp_added());
  }
  clear_temp_objs(t->get_temp_cleared());

  parent->log_operation(
    log_entries,
    hset_history,
    trim_to,
    trim_rollback_to,
    true,
    op_t);
  
  op_t.register_on_applied_sync(on_local_applied_sync);
  op_t.register_on_applied(
    parent->bless_context(
      new C_OSD_OnOpApplied(this, &op)));
  op_t.register_on_commit(
    parent->bless_context(
      new C_OSD_OnOpCommit(this, &op)));

  vector<ObjectStore::Transaction> tls;
  tls.push_back(std::move(op_t));

  parent->queue_transactions(tls, op.op);
}

其处理过程如下:

1) 首先构建InProgressOp请求记录;

2) 调用函数ReplicatedBackend::issue_op把请求发送出去: 对于该PG中的每一个从OSD,

  a) 调用函数generate_subop()生成MSG_OSD_REPOP类型的请求;

  b) 调用函数get_parent()->send_message_osd_cluster()把消息发送出去;

3) 最后调用parent->queue_transactions()函数来完成自己,也就是该PG的主OSD上本地对象的数据修改。这里我们可以看到主OSD的OnCommit的结果可以通过回调C_OSD_OnOpCommit()来获得,而其又会回调ReplicatedBackend::op_commit();相似的,OnApply的结果可以通过回调C_OSD_OnOpApplied()来获得,而其又会回调ReplicatedBackend::op_applied()。在op_commit()以及on_applied()两个函数里会回调ReplicatedBackend::InProgressOp结构体中注册的Context,关于这一点我们会在本章最后一节进行分析。

2.4 从副本的处理

当PG的从副本OSD接收到MSG_OSD_REPOP类型的操作,也就是主副本发来的同步写的操作时,处理流程和上述流程都一样。在函数sub_op_modify()里,对本地存储应用相应的事务,完成本地对象的数据写入:

void ReplicatedBackend::sub_op_modify(OpRequestRef op)
{
  MOSDRepOp *m = static_cast<MOSDRepOp *>(op->get_req());
  m->finish_decode();
  int msg_type = m->get_type();
  assert(MSG_OSD_REPOP == msg_type);

  const hobject_t& soid = m->poid;

  dout(10) << "sub_op_modify trans"
           << " " << soid
           << " v " << m->version
	   << (m->logbl.length() ? " (transaction)" : " (parallel exec")
	   << " " << m->logbl.length()
	   << dendl;

  // sanity checks
  assert(m->map_epoch >= get_info().history.same_interval_since);

  // we better not be missing this.
  assert(!parent->get_log().get_missing().is_missing(soid));

  int ackerosd = m->get_source().num();

  op->mark_started();

  RepModifyRef rm(std::make_shared<RepModify>());
  rm->op = op;
  rm->ackerosd = ackerosd;
  rm->last_complete = get_info().last_complete;
  rm->epoch_started = get_osdmap()->get_epoch();

  assert(m->logbl.length());
  // shipped transaction and log entries
  vector<pg_log_entry_t> log;

  bufferlist::iterator p = m->get_data().begin();
  ::decode(rm->opt, p);

  if (m->new_temp_oid != hobject_t()) {
    dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
    add_temp_obj(m->new_temp_oid);
  }
  if (m->discard_temp_oid != hobject_t()) {
    dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
    if (rm->opt.empty()) {
      dout(10) << __func__ << ": removing object " << m->discard_temp_oid
	       << " since we won't get the transaction" << dendl;
      rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
    }
    clear_temp_obj(m->discard_temp_oid);
  }

  p = m->logbl.begin();
  ::decode(log, p);
  rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);

  bool update_snaps = false;
  if (!rm->opt.empty()) {
    // If the opt is non-empty, we infer we are before
    // last_backfill (according to the primary, not our
    // not-quite-accurate value), and should update the
    // collections now.  Otherwise, we do it later on push.
    update_snaps = true;
  }
  parent->update_stats(m->pg_stats);
  parent->log_operation(
    log,
    m->updated_hit_set_history,
    m->pg_trim_to,
    m->pg_trim_rollback_to,
    update_snaps,
    rm->localt);

  rm->opt.register_on_commit(
    parent->bless_context(
      new C_OSD_RepModifyCommit(this, rm)));
  rm->localt.register_on_applied(
    parent->bless_context(
      new C_OSD_RepModifyApply(this, rm)));
  vector<ObjectStore::Transaction> tls;
  tls.reserve(2);
  tls.push_back(std::move(rm->localt));
  tls.push_back(std::move(rm->opt));
  parent->queue_transactions(tls, op);
  // op is cleaned up by oncommit/onapply when both are executed
}

如下是从OSD接收消息并处理的流程图:

ceph-chapter6-2

此外,通过上面的代码我们可以看到,本地数据写入的结果可以通过所注册的C_OSD_RepModifyCommit()以及C_OSD_RepModifyApply()两个回调函数获得。这里onCommit的回调函数为:

void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
{
  rm->op->mark_commit_sent();
  rm->committed = true;

  // send commit.
  dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
	   << ", sending commit to osd." << rm->ackerosd
	   << dendl;

  assert(get_osdmap()->is_up(rm->ackerosd));
  get_parent()->update_last_complete_ondisk(rm->last_complete);

  Message *m = rm->op->get_req();
  Message *commit = NULL;
  if (m->get_type() == MSG_OSD_SUBOP) {
    // doesn't have CLIENT SUBOP feature ,use Subop
    MOSDSubOpReply  *reply = new MOSDSubOpReply(
      static_cast<MOSDSubOp*>(m),
      get_parent()->whoami_shard(),
      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
    reply->set_last_complete_ondisk(rm->last_complete);
    commit = reply;
  } else if (m->get_type() == MSG_OSD_REPOP) {
    MOSDRepOpReply *reply = new MOSDRepOpReply(
      static_cast<MOSDRepOp*>(m),
      get_parent()->whoami_shard(),
      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
    reply->set_last_complete_ondisk(rm->last_complete);
    commit = reply;
  }
  else {
    assert(0);
  }

  commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
  get_parent()->send_message_osd_cluster(
    rm->ackerosd, commit, get_osdmap()->get_epoch());

  log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
}

从上面可以看到,最终其会通过cluster_messenger向主OSD发送应答信息。

2.5 主副本接收到从副本的应答

当PG的主副本接收到从副本的应答消息MSG_OSD_REPOPREPLY时,处理流程和上述类似,不同之处在于,在函数ReplicatedPG::do_request()里调用了函数ReplicatedBackend::handle_message(),在该函数里调用了ReplicatedBackend::sub_op_modify_reply()函数处理该请求。

注:从OSD向主OSD发送MSG_OSD_REPOPREPLY应答消息,主OSD收到应答消息后,又会经历一次OSD::ms_fast_dispatch()到ReplicatedPG::do_request()这一大段流程,可以参看“图6-2 OSD处理写操作的序列图”

sub_op_modify_reply函数的处理过程如下:

void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op);

1) 首先在in_progress_ops中查找到该请求;

2) 如果是ondisk的ACK,也就是事务已经应答,就在ip_op.waiting_for_commit删除该OSD。该事务已经应答,那么必定已经提交了,那么从ip_op.waiting_for_applied删除该OSD;

3) 如果只是事务提交到日志中的ACK,就从ip_op.waiting_for_applied删除

注: 这里特别说明的是,从副本需要给主副本发送两次ACK,一次是事务提交到日志中,并没有应用到实际的对象数据中;一次是完成应用操作返回的ACK。

4) 最后检查,如果ip_op.waiting_for_applied为空,也就是所有从OSD的请求都返回来了,并且ip_op.on_applied(其为一个Context)不为NULL,就调用该Context的complete函数。同样,检查ip_op.waiting_for_commit为空,并且ip_op.on_commit(其为一个Context)不为NULL,就调用该Context的complte函数。

下面看一下,in_progress_ops注册的回调函数。其回调函数是在ReplicatedPG::issue_repop()函数调用里注册的:

void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
{
  ....
  Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
  Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
    ctx->obc,
    ctx->clone_obc,
    unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef());
  pgbackend->submit_transaction(
    soid,
    ctx->at_version,
    std::move(ctx->op_t),
    pg_trim_to,
    min_last_complete_ondisk,
    ctx->log,
    ctx->updated_hset_history,
    onapplied_sync,
    on_all_applied,
    on_all_commit,
    repop->rep_tid,
    ctx->reqid,
    ctx->op);
}

回调函数都最终调用了函数ReplicatedPG::eval_repop,其最终向client发送应答消息。这里强调的是,主副本必须等待所有的处于up的OSD都返回成功的ACK应答消息,才向客户端返回请求成功的应答。

注: 只要所有处于up状态的OSD都返回了onApplied应答(事务已经写入日志)即可向client发送响应消息,不必等到OnCommit应答。

下面再贴出一张官网上面关于ceph写数据的图:

ceph-chapter6-4

ceph-chapter6-5

3. 总结

本章介绍了OSD读写流程核心处理过程。通过本章的介绍,可以了解读写流程的主干流程,并对一些核心概念和数据结构的处理做了介绍。当然,读写流程是ceph文件系统的核心流程,其实现细节比较复杂,还需要读者对照代码继续研究。目前在这方面的工作,许多都集中在提供ceph的读写性能。其基本的方法更多的就是优化读写流程的关键路径,通过减少锁来提供并发,同时简化一些关键流程。



[参看]