本章我们介绍一下Ceph的网络通信模块,这是客户端和服务器通信的底层模块,用来在客户端和服务器之间接收和发送请求。其实现功能比较清晰,是一个相对比较独立的模块,理解起来比较容易。
1. Ceph网络通信框架
一个分布式存储系统需要一个稳定的底层网络通信模块,用于各节点之间互联互通。对于一个网络通信系统,要求如下:
网络通信模块实现的源代码在src/msg目录下,其首先定义了一个网络通信的框架,三个子目录里分别对应:Simple、Async、XIO三种不同的实现方式。
Simple是比较简单,目前比较稳定的实现,系统默认的用于生产环境的方式。它最大的特点是:每一个网络连接都会创建两个线程,一个专门用于接收,一个专门用于发送。这种模式实现比较简单,但是对于大规模的集群部署,大量的连接会产生大量的线程,会消耗CPU资源,影响性能。
Async模式使用了基于事件的IO多路复用模式。这是网络通信中广泛采用的方式。但是在Ceph中,官方宣称这种方式还处于试验阶段,不够稳定,还不能用于生产环境。
XIO方式使用了开源的网络通信库accelio
来实现。这种方式需要依赖第三方的库accelio的稳定性,需要对accelio的使用方式以及代码都比较熟悉。目前也处于试验阶段。特别注意的是,前两种方式只支持TCP/IP协议,而XIO可以支持Infiniband网络。
在msg目录下定义了网络通信的抽象框架,它完成了通信接口和具体实现的分离。在其下分别由msg/simple子目录、msg/async子目录、msg/xio子目录,分别对应三种不同的实现。
1.1 Message
类Message是所有消息的基类(位于:src/msg/message.cc),任何要发送的消息,都要继承该类,格式如下图3-1所示:
我们可以通过Pipe::write_message()看消息发出时候的打包顺序,从而知道上面的消息发送格式。但这里,我们看到message.cc中有一个encode_message()函数,该函数其实只是在对消息进行转发
或者路由
时对原来消息的二次打包,其真正发送出去的时候还是满足上面的消息发送格式的,如下所示:
下面我们详细介绍一下消息的结构: header是消息头,类似一个消息的信封(envelope),user_data是用于要发送的实际数据,footer是一个消息的结束标记,如下所示:
下面分别介绍其中的重要参数:
ceph_msg_header为消息头,它定义了消息传输相关的元数据:
ceph_msg_footer为消息的尾部,附加了一些crc校验数据和消息结束标志:
消息带的数据分别保存在payload、middle和data这三个bufferlist中。payload一般保存ceph操作相关的元数据,作为主净荷数据存在;middle目前没有使用到;data一般为读写的数据。
注: 在源代码src/messages下定义了系统需要的相关消息,其都是Message类的子类。
1.2 Connection
类Connection对应端到端的socket链接的封装,用于跟踪链接的状态。其最重要的接口是可以发送消息(位于src/msg/connection.h):
注: 对于rx_buffers,我们可以为某一个ceph_tid分配接收缓存,也可以不为其分配。当为其分配接收缓存时,就需要使用rx_buffers_version来标识接收缓存,以标识缓存是否进行过修改。
其最重要的功能就是发送消息的接口:
1.3 Dispatcher
类Dispatcher是消息分发的接口(src/msg/dispatcher.h),其分发消息的接口为:
Server端注册该Dispatcher类用于把接收到的Message请求分发给具体处理的应用层。Client端需要实现一个Dispatcher函数,用于处理收到的ACK应答消息。
我们可以通过测试脚本:
src/test/messenger/simple_dispatcher.cc
src/test/messenger/simple_client.cc
src/test/messenger/simple_server.cc
来进一步了解Dispatcher接口的使用。
1.4 Messenger
Messenger是整个网络抽象模块,定义了网络模块的基本API接口。网络模块对外提供的基本功能,就是能在节点之间发送和接收消息。
先一个节点发送消息的命令如下:
注: 但是此方法文档标识已经过期(deprecated),建议使用Connection::send_message()方法
注册一个Dispatcher用来分发消息的接口如下:
1.5 网络连接策略
Policy定义了Messenger处理Connection的一些策略:
1.6 网络模块的使用
通过下面最基本的服务器和客户端的示例程序,了解如何调用网络通信模块提供的接口来完成收发请求消息的功能。
Server程序分析
Server程序源代码在test/simple_server.cc里,这里只展示有关网络部分的核心流程:
1) 调用Messenger的函数create创建一个Messenger的实例,配置选项g_conf->ms_type为配置的实现类型,目前有三种实现方式: simple、async、xio
2) 设置Messenger的属性
3) 对于Server,需要bind服务端地址
4) 创建一个Dispatcher,并添加到Messenger
5) 启动messenger
SimpleDispatcher函数里实现了ms_dispatch
,用于把接收到的各种请求消息分发给相关的处理函数。
1.7 Client程序分析
源代码在test/simple_client.cc里,这里只展示有关网络部分的核心流程:
1) 调用Messenger的create()函数创建一个Messenger实例:
2) 设置相关的策略
3) 创建Dispatcher类并添加,用于接收消息
4) 启动消息
5) 下面开始发送请求,先获取目标Server的链接
6) 通过Connection来发送请求消息。这里的消息发送方式都是异步发送,接收到请求消息的ACK应答消息后,将在Dispatcher的ms_dispatch或者ms_fast_dispatch处理函数里做相关的处理
综上所述,通过Ceph的网络框架发送消息比较简单。在Server端,只需要创建一个Messenger实例,设置相应的策略并绑定服务端口,然后设置一个Dispatcher来处理接收到的请求。在Client端,只需要创建一个Messenger实例,设置相关的策略和Dispatcher用于处理返回的应答消息。通过获取对应Server的connection来发送消息即可。
1.7 小结
下面我们画出ceph网络通信模块的整体架构图:
从上面我们可以看到,SimpleMessenger作为一个中心类,管理着rank_pipe以及dispatchers,rank_pipe中的Pipe负责数据的收发,并将收到的数据放入in_q中以等待相应的线程进行转发。
2. Simple的实现
Simple在Ceph里实现比较早,目前也比较稳定,是在生产环境中使用的网络通信模块。如其名字所示,实现相对简单。下面具体分析一下,Simple如何实现Ceph网络通信框架的各个模块。
2.1 SimpleMessenger
类SimpleMessenger实现了Messenger接口:
2.2 Acceptor
类Acceptor用来在Server端监听端口,接收链接,它继承了Thread类,本身是一个线程,来不断监听Server的端口:
2.3 DispatchQueue
DispatchQueue类用于把接收到的请求保存在内部,通过其内部的线程,调用SimpleMessenger类注册的Dispatchers来处理相应的消息:
其内部的mqueue为优先级队列,用来保存消息;marrival保存了接收到的消息;marrival_map保存消息在集合中的位置。
函数DispatchQueue::enqueue()用来把接收到的消息添加到消息队列中,函数DispatchQueue::entry()为线程的处理函数,用于处理消息。
2.4 Pipe
类Pipe实现了PipeConnection的接口,它实现了两个端口之间的类似管道的功能。
对于每一个pipe,内部都有一个Reader和一个Writer线程,分别用来处理这个Pipe有关的消息接收和请求的发送。线程DelayedDelivery用于故障注入测试:
2.5 消息的发送
1) 当发送一个消息时,首先要通过Messenger类,获取对应的Connection
具体到SimpleMessenger的实现如下:
a) 首先比较,如果dest.addr是my_inst.addr,就直接返回local_connection
b) 调用函数_lookup_pipe在已经存在的Pipe中查找。如果找到,就直接返回PipeConnectionRef;否则调用函数connect_rank新创建一个Pipe,并加入到msgr的register_pipe里。
2) 当获得一个Connection之后,就可以调用Connection的发送函数来发送消息
其最终调用了SimpleMessenger::submit_message()函数:
a) 如果Pipe不为空,并且状态不是Pipe::STATE_CLOSED状态,调用函数pipe->_send()把发送的消息添加到out_q发送队列里,触发发送线程。
b) 如果Pipe为空,就调用connect_rank创建Pipe,并把消息添加到out_q发送队列中。
3) 发送线程writer把消息发送出去。通过步骤2,要发送的消息已经保存在相应Pipe的out_q队列里,并触发了发送线程。每个Pipe的Writer线程负责发送out_q的消息,其线程入口函数为Pipe::writer,实现功能:
a) 调用函数_get_next_outgoing()从out_q中获取消息;
b) 调用函数write_message(header, footer, blist)把消息的header、footer、数据blist发送出去。
2.6 消息的接收
1) 每个Pipe对应的线程Reader用于接收消息。入口函数为Pipe::reader(),其功能如下
a) 判断当前的state,如果为STATE_ACCEPTING,就调用函数Pipe::accept来接受连接;如果不是STATE_CLOSED,并且不是STATE_CONNECTING状态,就接收消息。
b) 先调用函数tcp_read()来接收一个tag
c) 根据tag,来接收不同类型的消息如下所示
CEPH_MSGR_TAG_KEEPALIVE: keepalive消息;
CEPH_MSGR_TAG_KEEPALIVE2: 在CEPH_MSGR_TAG_KEEPALIVE的基础上添加了时间;
CEPH_MSGR_TAG_KEEPALIVE2_ACK: keepalive2的响应
CEPH_MSGR_TAG_ACK:
CEPH_MSGR_TAG_MSG: 这里才是接收到的消息
CEPH_MSGR_TAG_CLOSE: 关闭消息
d) 调用函数read_message()来接收消息,当本函数返回后,就完成了接收消息
2) 调用函数in_q->fast_preprocess(m)预处理消息
3) 调用函数in_q->can_fast_dispatch(m),如果可以进行fast_dispatch,就in_q->fast_dispatch(m)处理。fast_dispatch并不把消息加入到mqueue里,而是直接调用msgr->ms_fast_dispatch()函数,并最终调用注册的fast_dispatcher来进行处理。
4) 如果不能fast_dispatch,就调用函数in_q->enqueue(m, m->get_priority(), conn_id)把接收到的消息加入到DispatchQueue的mqueue队列里,由DispatchQueue的分发线程调用ms_dispatch处理。
ms_fast_dispatch和ms_dispatch两种处理的区别在于:ms_dispatch是由DispatchQueue的线程处理的,它是一个单线程;ms_fast_dispatch函数是由Pipe接收线程直接调用处理的,因此性能比前者好。
注: 目前其实只有OSD实现了Dispatcher接口。OSD中需要高效处理的消息,一般就需要fast_dispatch。
2.7 错误处理
网络模块复杂的功能是如何处理网络错误。无论是接收还是发送,会出现各种异常错误,包括返回异常错误码,接收数据的magic验证不一致,接收的数据的校验验证不一致,等等。错误的原因主要是由于网络本身的错误(物理链路等),或者字节跳变引起的。
目前错误处理的方法比较简单,处理流程如下:
1) 关闭当前socket链接
2) 重新建立一个socket链接
3) 重新发送没有接收到ACK响应的消息
函数Pipe::fault用来处理错误:
1) 调用shutdown_socket关闭pipe的socket
2) 调用函数requeue_sent把没有收到ACK的消息重新加入发送队列,当发送队列有请求时,发送线程会不断尝试重新发送。
[参看]
- 非常详细的 Ceph 介绍、原理、架构