ceph客户端
本章我们介绍Ceph的客户端实现。客户端是系统对外提供的功能接口,上层应用通过它来访问ceph存储系统。本章首先介绍librados和Osdc两个模块,通过它们可以直接访问RADOS对象存储系统。其次介绍Cls扩展模块,使用它们可方便地扩展现有的接口。最后介绍librbd模块。由于librados和librbd的多数实现流程都比较类似,本章在介绍相关数据结构后,只选取一些典型的操作流程介绍。
1. librados
librados是RADOS对象存储系统访问的接口库,它提供了pool的创建、删除,对象的创建、删除、读写等基本操作接口。架构如下图5-5所示:
最上层是类RadosClient,它是Librados的核心管理类,处理整个RADOS系统层面以及pool层面的管理。类IoctxImpl实现单个pool层对象的读写等操作。OSDC模块实现了请求的封装和通过网络模块发送请求的逻辑,其核心类Objecter完成对象的地址计算、消息的发送等工作。
1.1 RadosClient
代码如下(src/librados/radosclient.h):
class librados::RadosClient : public Dispatcher
{
public:
using Dispatcher::cct;
md_config_t *conf; //配置文件
private:
enum {
DISCONNECTED,
CONNECTING,
CONNECTED,
} state; //和monitor的网络连接状态
MonClient monclient; //Monitor客户端
Messenger *messenger; //网络消息接口
uint64_t instance_id; //rados客户端实例的ID
Objecter *objecter; //objecter对象指针
Mutex lock;
Cond cond;
SafeTimer timer; //定时器
int refcnt; //引用计数
version_t log_last_version;
rados_log_callback_t log_cb;
void *log_cb_arg;
string log_watch;
public:
Finisher finisher; //用于执行回调函数的Finisher类
...
};
通过RadosClient的成员函数,可以了解RadosClient的实现功能如下:
1) 网络连接
connect()函数是RadosClient的初始化函数,完成了许多的初始化工作:
a) 调用函数monclient.build_initial_monmap(),从配置文件里检查是否有初始的Monitor地址信息;
b) 创建网络通信模块messenger,并设置相关的Policy信息;
c) 创建Objecter对象并初始化;
objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
&finisher,
cct->_conf->rados_mon_op_timeout,
cct->_conf->rados_osd_op_timeout);
if (!objecter)
goto out;
objecter->set_balanced_budget();
objecter->init();
d) 调用monclient.init()函数初始化monclient
err = monclient.init();
e) Timer定时器初始化,Finisher对象初始化
2) pool的同步和异步创建
int librados::RadosClient::pool_create(string& name, unsigned long long auid,
int16_t crush_rule);
int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c,
unsigned long long auid,
int16_t crush_rule);
a) 函数pool_create同步创建pool。其实现过程为调用Objecter::create_pool()函数,构造PoolOp操作,通过monitor的客户端monc发送请求给Monitor创建一个pool,并同步等待请求的返回;
b) 函数pool_create_async异步创建。与同步方式的区别在于注册了回调函数,当创建成功后,执行回调函数通知完成。
3) pool的同步和异步删除
函数delete_pool完成同步删除,函数delete_pool_async异步删除。其过程和pool的创建过程相同,向Monitor发送删除请求。
4) 查找pool和列举pool
函数lookup_pool()用于查找pool,函数pool_list用于列出所有的pool。pool相关的信息都保存在OsdMap中。
5) 获取pool和系统的信息
函数get_pool_stats()用于获取pool的统计信息,函数get_fs_stats()用于获取系统的统计信息。
int librados::RadosClient::get_pool_stats(std::list<string>& pools,
map<string,::pool_stat_t>& result);
int librados::RadosClient::get_fs_stats(ceph_statfs& stats);
6) 命令处理
函数mon_command()处理Monitor相关的命令,它调用函数monclient.start_mon_command()把命令发送给monitor处理;函数osd_command处理OSD相关的命令,它调用函数objecter->osd_command()把命令发送给对应OSD处理。函数pg_command()处理PG相关命令,它调用函数objecter->pg_command()把命令发送给该PG的主OSD来处理。
7) 创建IoCtxImpl对象
函数create_ioctx()创建一个pool相关的山下文信息IoCtxImpl对象。
1.2 IoCtxImpl
类IoCtxImpl(src/librados/IoCtxImpl.h)是pool操作相关的上下文信息,一个IoCtxImpl对象对应着一个pool(注: 对于一个pool,我们可以创建多个IoCtxImpl对象),可以在该pool里创建、删除对象,完成对象的数据读写等各种操作,包括同步和异步的实现。其处理过程都比较简单,而且过程类似:
1) 把请求封装成ObjectOperation类(该类定义在src/osdc/Objecter.h中)
2) 然后再添加pool的地址信息,封装成Objecter::Op对象
3) 调用函数objecter->op_submit发送给相应的OSD。如果是同步操作,就等待操作完成;如果是异步操作,就不用等待,直接返回。当操作完成后,调用相应的回调函数通知。
2. OSDC
OSDC是客户端比较底层的模块,其核心在于封装操作数据,计算对象的地址,发送请求和处理超时。代码位于src/osdc目录下:
# ls Filer.cc Filer.h Journaler.cc Journaler.h Makefile.am ObjectCacher.cc ObjectCacher.h Objecter.cc Objecter.h Striper.cc Striper.h WritebackHandler.h
2.1 ObjectOperation
类ObjectOperation用于将操作相关的参数统一封装在该类里,该类可以一次封装多个对象的操作(src/osdc/objecter.h):
struct ObjectOperation {
vector<OSDOp> ops; //多个操作
int flags; //操作的标志
int priority; //优先级
vector<bufferlist*> out_bl; //每个操作对应的输出缓冲区队列
vector<Context*> out_handler; //每个操作对应的回调函数队列
vector<int*> out_rval; //每个操作对应的操作结果队列
};
封装的对象操作主要包括如下几类:
-
object的创建、读写、遍历
-
pg的遍历
-
object的xattr的创建、遍历
-
object的omap的创建、遍历
类OSDOp封装对象的一个操作。结构体ceph_osd_op封装一个操作的操作码和相关的输入和输出参数(src/osd/osd_types.h):
struct OSDOp {
ceph_osd_op op; //各种操作码和操作参数
sobject_t soid; //操作对象
bufferlist indata, outdata; //输入和输出bufferlist
int32_t rval; //操作结果
};
注: 这里OSDOp通常只是操作的封装,具体操作的对象名可能不会在这里设置,一般需要搭配Objecter一起使用
2.2 op_target
结构op_target_t封装了对象所在的PG,以及PG对应的OSD列表等地址信息(src/osdc/objecter.h):
struct op_target_t {
int flags; //标志
object_t base_oid; //所操作的对象
object_locator_t base_oloc; //对象的pool信息
object_t target_oid; //最终操作的目标对象
object_locator_t target_oloc; //最终目标对象的pool信息。这里由于Cache tier的存在,导致产生最终操作的目标和pool的不同
bool precalc_pgid; //是否使用预先计算好的PG id
pg_t base_pgid; //
pg_t pgid; //上一次所映射的PG
unsigned pg_num; //last pg_num we mapped to
unsigned pg_num_mask; //last pg_num_mask we mapped to
vector<int> up; //set of up osds for last pg we mapped to
vector<int> acting; //set of acting osds for last pg we mapped to
int up_primary; //primary for last pg we mapped to based on the up set
int acting_primary; //primary for last pg we mapped to based on the acting set
int size; //the size of the pool when were were last mapped
int min_size; //the min size of the pool when were were last mapped
bool sort_bitwise; //whether the hobject_t sort order is bitwise
bool used_replica;
bool paused;
int osd; ///< the final target osd, or -1
};
2.3 Op
结构Op封装了完成一个操作的相关上下文信息,包括target地址信息、链接信息等(src/osdc/objecter.h):
struct Op : public RefCountedObject {
OSDSession *session; //osd相关的session信息
int incarnation; //引用次数
op_target_t target; //操作的目标地址信息
ConnectionRef con; // for rx buffer only
uint64_t features; // explicitly specified op features
vector<OSDOp> ops; //对应多个操作的封装
snapid_t snapid; //快照的id
SnapContext snapc; //pool层级的快照信息
ceph::real_time mtime;
bufferlist *outbl; //输出的bufferlist
vector<bufferlist*> out_bl; //每个操作对应的bufferlist
vector<Context*> out_handler; //每个操作对应的回调函数
vector<int*> out_rval; //每个操作对应的输出结果
int priority;
Context *onack, *oncommit;
uint64_t ontimeout;
Context *oncommit_sync; // used internally by watch/notify
ceph_tid_t tid;
eversion_t replay_version; // for op replay
int attempts;
version_t *objver;
epoch_t *reply_epoch;
ceph::mono_time stamp;
epoch_t map_dne_bound;
bool budgeted;
/// true if we should resend this message on failure
bool should_resend;
/// true if the throttle budget is get/put on a series of OPs,
/// instead of per OP basis, when this flag is set, the budget is
/// acquired before sending the very first OP of the series and
/// released upon receiving the last OP reply.
bool ctx_budgeted;
int *data_offset;
epoch_t last_force_resend;
osd_reqid_t reqid; // explicitly setting reqid
};
2.4 Objecter
类Objecter主要完成对象的地址计算、消息的发送等工作(src/osdc/objecter.h):
class Objecter : public md_config_obs_t, public Dispatcher {
public:
Messenger *messenger; //网络消息发送接收器
MonClient *monc; //monitor客户端
Finisher *finisher; //消息异步处理的Finisher线程
private:
OSDMap *osdmap; //osd map
public:
using Dispatcher::cct; //ceph上下文
std::multimap<string,string> crush_location; //指定的crush_location来读取数据
atomic_t initialized;
private:
atomic64_t last_tid; //事务id
atomic_t inflight_ops; //当前处于flight状态的op数量
atomic_t client_inc; //用于唯一标识一个client
uint64_t max_linger_id;
atomic_t num_unacked;
atomic_t num_uncommitted;
atomic_t global_op_flags; // flags which are applied to each IO op
bool keep_balanced_budget;
bool honor_osdmap_full;
};
注:关于crush_location的使用,可以参看https://ceph.com/planet/ceph%E6%A0%B9%E6%8D%AEcrush%E4%BD%8D%E7%BD%AE%E8%AF%BB%E5%8F%96%E6%95%B0%E6%8D%AE/
2.4 Striper
对象有分片(stripe)时,类Stripe用于完成对象分片数据的计算。数据结构ceph_file_layout用来保存文件或者image的分片信息(src/include/fs_types.h):
struct ceph_file_layout { //文件 -> 对象的映射
__le32 fl_stripe_unit; //条带单元块大小(单位:字节),必须是page_size的倍数
__le32 fl_stripe_count; //stripe跨越的对象数
__le32 fl_object_size; //object的大小
__le32 fl_cas_hash; /* UNUSED. 0 = none; 1 = sha256 */
/* pg -> disk layout */
__le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */
/* object -> pg layout */
__le32 fl_unused; /* unused; used to be preferred primary for pg (-1 for none) */
__le32 fl_pg_pool; /* namespace, crush ruleset, rep level */
} __attribute__ ((packed));
对象ObjectExtent用来记录对象内的分片信息(src/osd/osd_types.h):
class ObjectExtent {
public:
object_t oid; //对象的id
uint64_t objectno; //属于object set中的哪一个Object
uint64_t offset; //对象内的偏移
uint64_t length; //长度
uint64_t truncate_size; //对象truncate的操作的size
object_locator_t oloc; //对象的位置信息,例如在哪个pool中等等
vector<pair<uint64_t,uint64_t> > buffer_extents; //Extents在buffer中的偏移和长度(off -> len),可能有多个extents
};
void Striper::file_to_extents(
CephContext *cct, const char *object_format,
const file_layout_t *layout, //分片信息
uint64_t offset, uint64_t len, //文件的偏移、长度
uint64_t trunc_size,
map<object_t,vector<ObjectExtent> >& object_extents, //分布到每个对象的数据段
uint64_t buffer_offset) //在buffer中的偏移量
函数file_to_extents()完成了file到对象stripe后的映射。只有了解清楚了每个概念,计算方法都比较简单。下面举例说明。
例5-1
file_to_extents示例
如上图5-5所示,要计算的文件的offset为2KB,length为16KB。文件的分片信息: stripe unit为4KB,stripe count为3,object size为8KB。则对象Object 0对应的ObjectExtent为:
object_extents["Object 0"] = {
oid = "Object 0",
objectno = 0,
offset = 2KB,
length = 6KB,
buffer_extents = { [0, 2KB], [10KB, 4KB]}
}
其中,oid就是映射对象的id,objectno为stripe对象的序号, offset为映射的数据段在对象内的起始偏移,length为对象内的长度。buffer_extents为映射的数据在buffer内的偏移和长度。
2.5 ObjectCacher
类ObjectCacher提供了客户端的基于LRU算法的对象数据缓存功能,其实比较简单,这里就不深入分析了(src/osdc/ObjectCacher.h)。
2.6 小结
librados层的整体架构如下:
对于librados我们可以分如下两个层面来看:
1) 从大的接口封装层面来说,提供了C语言的封装rados_t,以及C++语言的抽象librados::Rados(注意: Objecter处于全局名称空间,而其他处于librados名称空间)
2) 从RadosClient、IoCtxImpl、Objecter层面来说,是一个抽象到具体的一个实现的过程。RadosClient是粗粒度的Rados操作接口;IoCtxImpl是针对某一个pool的较为细粒度的操作接口;而Objecter则是object层面的更为细粒度的操作实现。
3. 客户端写操作分析
以下代码是通过librados库的接口写入数据到对象中的典型例程,对象的其他操作过程都类似:
rados_t cluster;
rados_ioctx_t ioctx;
rados_create(&cluster, NULL);
rados_conf_read_file(cluster, NULL);
rados_connect(cluster);
rados_ioctx_create(cluster, pool_name.c_str(), &ioctx);
rados_write(ioctx, "foo", buf, sizeof(buf), 0);
上述代码是C语言接口完成的,其流程如下:
1) 首先调用rados_create()函数创建一个RadosClient对象,输出类型为rados_t,它是一个void类型的指针,通过librados::RadosClient对象的强制转换产生。第二个参数id为一个标识符,一般传入为NULL。
2) 调用函数rados_conf_read来读取配置文件。第二个参数为配置文件的路径,如果是NULL,就搜索默认的配置文件。
3) 调用rados_connect()函数,它调用了RadosClient的connect()函数,做相关的初始化工作。
4) 调用函数rados_ioctx_create(),它调用RadosClient的create_ioctx()函数,创建pool相关的IoCtxImpl类,其输出类型为rados_ioctx_t,它也是void类型的指针,有IoCtxImpl对象转换而来;
5) 调用函数rados_write()函数,向该pool的名为foo
的对象写入数据。此调用IoCtxImpl的write()操作
3.1 写操作消息封装
本函数完成具体的写操作,代码如下(src/librados/IoCtxImpl.cc):
int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
size_t len, uint64_t off)
{
if (len > UINT_MAX/2)
return -E2BIG;
::ObjectOperation op;
prepare_assert_ops(&op);
bufferlist mybl;
mybl.substr_of(bl, 0, len);
op.write(off, mybl);
return operate(oid, &op, NULL);
}
其实现过程如下:
1) 创建ObjectOperation对象,封装写操作的相关参数;
2) 调用函数operate()完成处理:
int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
ceph::real_time *pmtime, int flags);
a) 调用函数objecter->prepare_mutate_op()把ObjectOperation类型封装成Op类型,添加了object_locator_t相关的pool信息;
// a locator constrains the placement of an object. mainly, which pool
// does it go in.
struct object_locator_t {
// You specify either the hash or the key -- not both
int64_t pool; ///< pool id
string key; ///< key string (if non-empty)
string nspace; ///< namespace
int64_t hash; ///< hash position (if >= 0)
};
b) 调用objecter->op_submit()把消息发送出去;
c) 等待操作完成;
3.2 发送数据op_submit
函数op_submit()用来把封装好的操作Op通过网络发送出去:
void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
{
shunique_lock rl(rwlock, ceph::acquire_shared);
ceph_tid_t tid = 0;
if (!ptid)
ptid = &tid;
_op_submit_with_budget(op, rl, ptid, ctx_budget);
}
函数_op_submit_with_budget()用来处理Throttle相关的流量限制。如果osd_timeout大于0,就设置定时器,当操作超时,就调用定时器回调函数op_cancel取消操作:
void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
ceph_tid_t *ptid,
int *ctx_budget)
{
assert(initialized.read());
assert(op->ops.size() == op->out_bl.size());
assert(op->ops.size() == op->out_rval.size());
assert(op->ops.size() == op->out_handler.size());
// throttle. before we look at any state, because
// _take_op_budget() may drop our lock while it blocks.
if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
int op_budget = _take_op_budget(op, sul);
// take and pass out the budget for the first OP
// in the context session
if (ctx_budget && (*ctx_budget == -1)) {
*ctx_budget = op_budget;
}
}
if (osd_timeout > timespan(0)) {
if (op->tid == 0)
op->tid = last_tid.inc();
auto tid = op->tid;
op->ontimeout = timer.add_event(osd_timeout,
[this, tid]() {
op_cancel(tid, -ETIMEDOUT); });
}
_op_submit(op, sul, ptid);
}
函数_op_submit完成了关键的地址寻址和发送工作,其处理过程如下:
1) 调用函数_calc_target来计算对象的目标OSD
2) 调用函数_get_session获取目标OSD的链接,如果返回值为-EAGAIN,就升级为写锁,重新获取;
3) 检查当前的状态标志,如果当前是CEPH_OSDMAP_PAUSEWR
或者OSD空间满
(注: 因为这里我们讲述的是对象的写操作,因此flag中CEPH_OSD_FLAG_WRITE肯定被设置),就暂时不发送请求;否则调用函数_prepare_osd_op()准备请求的消息,然后调用函数_send_op()发送出去。
3.3 对象寻址_calc_target
函数_calc_target用于完成对象到OSD的寻址过程:
int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend,
bool any_change);
其处理过程如下:
1) 首先根据t->base_oloc.pool
的pool信息,获取pg_pool_t对象;
const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
if (!pi) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
2) 检查如果强制重发,force_resend设置为true;
3) 检查cache tier,如果是读操作,并且有读缓存,就设置target_oloc.pool为该pool的read_tier值;如果是写操作,并且有写缓存,就设置target_oloc.pool为该pool的write_tier值
if (need_check_tiering &&
(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (is_read && pi->has_read_tier())
t->target_oloc.pool = pi->read_tier;
if (is_write && pi->has_write_tier())
t->target_oloc.pool = pi->write_tier;
}
4) 调用函数osdmap->object_locator_to_pg()获取目标对象所在的PG
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
pgid);
5) 调用函数osdmap->pg_to_up_acting_osds(),通过CRUSH算法,获取该PG对应的OSD列表;
osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
&acting, &acting_primary);
6) 如果是写操作,target的OSD就设置为主OSD;如果是读操作,如果设置了CEPH_OSD_FLAG_BALANCE_READS标志,就随机选择一个副本读取;如果设置了CEPH_OSD_FLAG_LOCALIZE_READS标志,就尽可能选择本地副本来读取;否则选择主OSD来进行读取
int osd;
bool read = is_read && !is_write;
if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
int p = rand() % acting.size();
if (p)
t->used_replica = true;
osd = acting[p];
ldout(cct, 10) << " chose random osd." << osd << " of " << acting << dendl;
}
else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) && acting.size() > 1) {
// look for a local replica. prefer the primary if the
// distance is the same.
int best = -1;
int best_locality = 0;
for (unsigned i = 0; i < acting.size(); ++i) {
int locality = osdmap->crush->get_common_ancestor_distance(
cct, acting[i], crush_location);
ldout(cct, 20) << __func__ << " localize: rank " << i << " osd." << acting[i]
<< " locality " << locality << dendl;
if (i == 0 || (locality >= 0 && best_locality >= 0 && locality < best_locality) ||
(best_locality < 0 && locality >= 0)) {
best = i;
best_locality = locality;
if (i)
t->used_replica = true;
}
}
assert(best >= 0);
osd = acting[best];
}
else {
osd = acting_primary;
}
t->osd = osd;
3.4 查询object对应的PG
object到PG的映射是通过object_locator_to_pg()函数来实现的(src/osd/osdmap.cc):
// mapping
int OSDMap::object_locator_to_pg(
const object_t& oid,
const object_locator_t& loc,
pg_t &pg) const
{
// calculate ps (placement seed)
const pg_pool_t *pool = get_pg_pool(loc.get_pool());
if (!pool)
return -ENOENT;
ps_t ps;
if (loc.hash >= 0) {
ps = loc.hash;
} else {
if (!loc.key.empty())
ps = pool->hash_key(loc.key, loc.nspace);
else
ps = pool->hash_key(oid.name, loc.nspace);
}
pg = pg_t(ps, loc.get_pool(), -1);
return 0;
}
从上面来看,过程比较简单,首先计算ps(placement seed),然后根据pool id就可以获得PG。
3.5 通过pg求OSD列表
pg到OSD列表的映射是通过pg_to_up_acting_osds()来实现的(src/osd/osdmap.cc):
/**
* map a pg to its acting set as well as its up set. You must use
* the acting set for data mapping purposes, but some users will
* also find the up set useful for things like deciding what to
* set as pg_temp.
* Each of these pointers must be non-NULL.
*/
void pg_to_up_acting_osds(pg_t pg, vector<int> *up, int *up_primary,
vector<int> *acting, int *acting_primary) const {
_pg_to_up_acting_osds(pg, up, up_primary, acting, acting_primary);
}
void pg_to_up_acting_osds(pg_t pg, vector<int>& up, vector<int>& acting) const {
int up_primary, acting_primary;
pg_to_up_acting_osds(pg, &up, &up_primary, &acting, &acting_primary);
}
void OSDMap::_pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary,
vector<int> *acting, int *acting_primary) const
{
const pg_pool_t *pool = get_pg_pool(pg.pool());
if (!pool) {
if (up)
up->clear();
if (up_primary)
*up_primary = -1;
if (acting)
acting->clear();
if (acting_primary)
*acting_primary = -1;
return;
}
vector<int> raw;
vector<int> _up;
vector<int> _acting;
int _up_primary;
int _acting_primary;
ps_t pps;
_pg_to_osds(*pool, pg, &raw, &_up_primary, &pps);
_raw_to_up_osds(*pool, raw, &_up, &_up_primary);
_apply_primary_affinity(pps, *pool, &_up, &_up_primary);
_get_temp_osds(*pool, pg, &_acting, &_acting_primary);
if (_acting.empty()) {
_acting = _up;
if (_acting_primary == -1) {
_acting_primary = _up_primary;
}
}
if (up)
up->swap(_up);
if (up_primary)
*up_primary = _up_primary;
if (acting)
acting->swap(_acting);
if (acting_primary)
*acting_primary = _acting_primary;
}
pg_to_up_acting_osds()函数实现将pg映射为acting set。我们在进行对象读写时,都需要通过acting set来进行(对于pg_temp等少数情况,我们可能只需要知道PG对应的up set即可,也可以通过本函数来返回)。其实现步骤如下:
1) _pg_to_osds()会根据pool所对应的crush rule来计算出PG所对应的OSD
int ruleno = crush->find_rule(pool.get_crush_ruleset(), pool.get_type(), size);
if (ruleno >= 0)
crush->do_rule(ruleno, pps, *osds, size, osd_weight);
此时计算出来的OSD其实只是原始的仍在osd map列表中存在的OSD(这里在调用该函数时我们传递的是raw),之间是没有primary、secondary、tertiary这样区分的。
2) _raw_to_up_osds()函数从raw中返回当前仍处于UP状态的OSD,把处于down状态的进行标记
或移除
。同时这里简单的将返回的OSD列表中的第一个作为up_primary
3) _apply_primary_affinity()用于计算up_primary的亲和性,就是根据一定的规则从up osds里面选出其中一个作为primay。
4) _get_temp_osds()用于求出PG对应的acting set
void OSDMap::_get_temp_osds(const pg_pool_t& pool, pg_t pg,
vector<int> *temp_pg, int *temp_primary) const
{
pg = pool.raw_pg_to_pg(pg);
map<pg_t,vector<int32_t> >::const_iterator p = pg_temp->find(pg);
temp_pg->clear();
if (p != pg_temp->end()) {
for (unsigned i=0; i<p->second.size(); i++)
{
if (!exists(p->second[i]) || is_down(p->second[i]))
{
if (pool.can_shift_osds()) {
continue;
} else {
temp_pg->push_back(CRUSH_ITEM_NONE);
}
}
else {
temp_pg->push_back(p->second[i]);
}
}
}
map<pg_t,int32_t>::const_iterator pp = primary_temp->find(pg);
*temp_primary = -1;
if (pp != primary_temp->end()) {
*temp_primary = pp->second;
}
else if (!temp_pg->empty()) { // apply pg_temp's primary
for (unsigned i = 0; i < temp_pg->size(); ++i)
{
if ((*temp_pg)[i] != CRUSH_ITEM_NONE) {
*temp_primary = (*temp_pg)[i];
break;
}
}
}
}
上面我们看到首先调用pool.raw_pg_to_pg(pg)将raw pg转换,然后再在pg_temp中查找转换后的PG所对应的OSD列表即为acting set。
3.5 写操作的应答
上面讲写请求发送出去之后,就会等待相应的应答。当从网络收到应答之后,首先会回调:
bool Objecter::ms_dispatch(Message *m)
{
ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
if (!initialized.read())
return false;
switch (m->get_type()) {
// these we exlusively handle
case CEPH_MSG_OSD_OPREPLY:
handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
}
....
}
对于对象的读写操作,应答都是CEPH_MSG_OSD_OPREPLY
消息,其会调用handle_osd_op_reply()来进行处理。
4. Cls
Cls是Ceph的一个扩展模块,它允许用户自定义对象的操作接口和实现方法,为用户提供了一种比较方便的接口扩展方式。目前rbd和lock等模块都采用了这种机制(即通过加载额外的动态链接库的方式来进行处理,类似于Nginx中的dynamic module)。
4.1 模块以及方法的注册
类ClassHandler用来管理所有的扩展模块。函数register_class用来注册模块(src/osd/ClassHandler.h):
class ClassHandler
{
public:
CephContext *cct;
Mutex mutex;
map<string, ClassData> classes; //所有注册的模块: 模块名->模块元数据信息。
};
一个ClassData代表一个模块,一个模块中可以有很多方法。可以在一个ClassHandler中注册多个模块。
类ClassData描述了一个模块相关的元数据信息。它描述了一个扩展模块的相关信息,包括模块名、模块相关的操作方法以及依赖的模块:
struct ClassData {
enum Status {
CLASS_UNKNOWN, //初始未知状态
CLASS_MISSING, //缺失状态(动态链接库找不到)
CLASS_MISSING_DEPS, //依赖的模块缺失
CLASS_INITIALIZING, //正在初始化
CLASS_OPEN, //已经初始化(动态链接库以及加载成功)
} status; //当前模块的加载状态
string name; //模块的名字
ClassHandler *handler; //管理模块的指针
void *handle; //指向加载动动态链接库(dlopen())
map<string, ClassMethod> methods_map; //模块下所有注册的ClassMethod
map<string, ClassFilter> filters_map; //模块下所有的过滤器ClassFilter
set<ClassData *> dependencies; //本模块所依赖的模块
set<ClassData *> missing_dependencies; //缺失的依赖模块
};
ClassMethod定义一个模块具体的方法名,以及函数类型:
struct ClassMethod {
struct ClassHandler::ClassData *cls; //所属模块的ClassData的指针
string name; //方法的名字
int flags; //方法相关的标志
cls_method_call_t func; //C类型函数指针
cls_method_cxx_call_t cxx_func; //C++类型函数指针
};
在src/objclass/Objectclass.h, src/objclass/class_api.c里定义了一些辅助函数用来注册模块以及方法:
- 注册一个模块如下
int cls_register(const char *name, cls_handle_t *handle);
- 注册一个模块的方法如下
int cls_register_method(cls_handle_t hclass, const char *method,
int flags,
cls_method_call_t class_call, cls_method_handle_t *handle);
4.2 cls模块的工作原理
1) ClassHandler的初始化 在src/osd/OSD.cc的init()函数中有如下:
int OSD::init()
{
....
class_handler = new ClassHandler(cct);
cls_initialize(class_handler);
...
}
通过上面我们看到是调用cls_initialize()来完成初始化,现在我们来看看该函数(src/objclass/Class_api.cc):
static ClassHandler *ch;
#define dout_subsys ceph_subsys_objclass
void cls_initialize(ClassHandler *h)
{
ch = h;
}
这里可以看到,是将OSD::init()中创建的ClassHandler对象赋给了一个静态全局变量。
2) 加载动态链接库
通过上面创建出的ClassHandler对象调用open_all_classes()来加载ceph配置文件中以libcls_
开头的动态链接库,加载步骤如下:
int ClassHandler::_load_class(ClassData *cls)
{
// already open
if (cls->status == ClassData::CLASS_OPEN)
return 0;
if (cls->status == ClassData::CLASS_UNKNOWN ||
cls->status == ClassData::CLASS_MISSING) {
char fname[PATH_MAX];
snprintf(fname, sizeof(fname), "%s/" CLS_PREFIX "%s" CLS_SUFFIX,
cct->_conf->osd_class_dir.c_str(),
cls->name.c_str());
dout(10) << "_load_class " << cls->name << " from " << fname << dendl;
cls->handle = dlopen(fname, RTLD_NOW);
if (!cls->handle) {
struct stat st;
int r = ::stat(fname, &st);
if (r < 0) {
r = -errno;
dout(0) << __func__ << " could not stat class " << fname
<< ": " << cpp_strerror(r) << dendl;
} else {
dout(0) << "_load_class could not open class " << fname
<< " (dlopen failed): " << dlerror() << dendl;
r = -EIO;
}
cls->status = ClassData::CLASS_MISSING;
return r;
}
cls_deps_t *(*cls_deps)();
cls_deps = (cls_deps_t *(*)())dlsym(cls->handle, "class_deps");
if (cls_deps) {
cls_deps_t *deps = cls_deps();
while (deps) {
if (!deps->name)
break;
ClassData *cls_dep = _get_class(deps->name);
cls->dependencies.insert(cls_dep);
if (cls_dep->status != ClassData::CLASS_OPEN)
cls->missing_dependencies.insert(cls_dep);
deps++;
}
}
}
// resolve dependencies
set<ClassData*>::iterator p = cls->missing_dependencies.begin();
while (p != cls->missing_dependencies.end()) {
ClassData *dc = *p;
int r = _load_class(dc);
if (r < 0) {
cls->status = ClassData::CLASS_MISSING_DEPS;
return r;
}
dout(10) << "_load_class " << cls->name << " satisfied dependency " << dc->name << dendl;
cls->missing_dependencies.erase(p++);
}
// initialize
void (*cls_init)() = (void (*)())dlsym(cls->handle, "__cls_init");
if (cls_init) {
cls->status = ClassData::CLASS_INITIALIZING;
cls_init();
}
dout(10) << "_load_class " << cls->name << " success" << dendl;
cls->status = ClassData::CLASS_OPEN;
return 0;
}
从上面我们可以看到步骤较为简单:
-
调用dlopen()打开动态链接库
-
调用dlsym()读取动态链接库中的class_deps()函数,从而获得该动态链接库所需要的一些依赖;
-
调用dlsym()读取动态链接库中的
__cls_init()
函数,从而完成对应模块的初始化。下面我们以src/cls/rbd/cls_rbd.cc为例,大体看一下__cls_init()的实现:
void __cls_init()
{
CLS_LOG(20, "Loaded rbd class!");
cls_register("rbd", &h_class);
cls_register_cxx_method(h_class, "create",
CLS_METHOD_RD | CLS_METHOD_WR,
create, &h_create);
...
}
从上面我们可以看到,其首先调用cls_register()向ClassHandler注册该模块,其实就是在ClassHandler中创建了一个ClassData。然后,调用cls_register_cxx_method()来向该模块注册C++方法,这样该ClassData中就有该模块所有相关的方法了。
4.3 模块的方法执行
模块方法的执行在类ReplicatedPG的函数do_osd_ops()里实现(src/osd/ReplicatedPG.cc)。执行方法对应的操作码为CEPH_OSD_OP_CALL值:
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
....
case CEPH_OSD_OP_CALL:
ClassHandler::ClassData *cls;
result = osd->class_handler->open_class(cname, &cls);
assert(result == 0); //函数init_op_flags()已经对结果做了验证
//根据方法名获取方法
ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
//执行方法
result = method->exec((cls_method_context_t)&ctx, indata, outdata);
....
}
[参看]