本章我们介绍一下Ceph的网络通信模块,这是客户端和服务器通信的底层模块,用来在客户端和服务器之间接收和发送请求。其实现功能比较清晰,是一个相对比较独立的模块,理解起来比较容易。

1. Messenger

class Messenger{
private:
	list<Dispatcher *> dispatchers;
	list<Dispatcher *> fast_dispatchers;

public:
	
	//获取dispatcher列表的当前长度
	virtual int get_dispatch_queue_len() = 0;

	/**
	* Get age of oldest undelivered message
	* (0 if the queue is empty)
	*/
	virtual double get_dispatch_queue_max_age(utime_t now) = 0;
	
	
	//添加新的dispatcher到列表头
	void add_dispatcher_head(Dispatcher *d) { 
		bool first = dispatchers.empty();
		dispatchers.push_front(d);
		
		if (d->ms_can_fast_dispatch_any())
			fast_dispatchers.push_front(d);
			
		if (first)
			ready();
	}
	
	//添加新的dispatcher到列表尾
	void add_dispatcher_tail(Dispatcher *d) { 
		bool first = dispatchers.empty();
		dispatchers.push_back(d);
		
		if (d->ms_can_fast_dispatch_any())
			fast_dispatchers.push_back(d);
		if (first)
			ready();
	}
	
	/*
	 * 将message放入队列。
	 * (注:此方法已经过时,在新的代码中不要使用,请用Connection::send_message()来替代)
	 */
	virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
	

public:

	//遍历fast_dispatchers列表,查看其是否可以通过fast dispatch来分发消息
	bool ms_can_fast_dispatch(Message *m) {
		for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();p != fast_dispatchers.end();++p) {
			if ((*p)->ms_can_fast_dispatch(m))
				return true;
		}
		
		return false;
	}
	
	//通过fast dispatch来分发消息
	void ms_fast_dispatch(Message *m) {
		m->set_dispatch_stamp(ceph_clock_now(cct));
		
		for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();p != fast_dispatchers.end();++p) {
			if ((*p)->ms_can_fast_dispatch(m)) {
				(*p)->ms_fast_dispatch(m);
				return;
			}
		}
		
		assert(0);
	}
	
	//在fast dispatch之前,进行消息的预处理
	void ms_fast_preprocess(Message *m) {
		for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();p != fast_dispatchers.end();++p) {
			(*p)->ms_fast_preprocess(m);
		}
	}
	
	//遍历dispatchers列表,直到其中一个dispatcher可以处理,就return退出。如果没有一个
	//dispatcher可以处理,则assert()报错
	void ms_deliver_dispatch(Message *m) {
		m->set_dispatch_stamp(ceph_clock_now(cct));
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p) {
			if ((*p)->ms_dispatch(m))
				return;
		}
		
		lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
			<< m->get_source_inst() << dendl;
		assert(!cct->_conf->ms_die_on_unhandled_msg);
		m->put();
	}
	
	//当有新的连接产生时,调用此函数通知dispatcher
	void ms_deliver_handle_connect(Connection *con) {
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p)
			(*p)->ms_handle_connect(con);
	}
	
	//通知每一个fast dispatcher有新的连接产生
	void ms_deliver_handle_fast_connect(Connection *con) {
		for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();p != fast_dispatchers.end();++p)
			(*p)->ms_handle_fast_connect(con);
	}
	
	/**
	 * Notify each Dispatcher of a new incomming Connection. Call
	 * this function whenever a new Connection is accepted.
	*/
	void ms_deliver_handle_accept(Connection *con) {
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p)
			(*p)->ms_handle_accept(con);
	}
	

	/**
	 * Notify each fast Dispatcher of a new incoming Connection. Call
	 * this function whenever a new Connection is accepted.
	*/
	void ms_deliver_handle_fast_accept(Connection *con) {
		for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();p != fast_dispatchers.end();++p)
			(*p)->ms_handle_fast_accept(con);
	}
	
	//当侦测到一个lossy connection断连之后,回调此函数报告dispatcher有可能丢失消息
	void ms_deliver_handle_reset(Connection *con) {
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p) {
			if ((*p)->ms_handle_reset(con))
				return;
		}
	}
	
	//报告每一个dispatcher,connection可能被远程端forgotten,这暗含着消息可能丢失
	void ms_deliver_handle_remote_reset(Connection *con) {
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p)
			(*p)->ms_handle_remote_reset(con);
	}
	
	//为一个新的outgoing连接查找到AuthAuthorizer
	AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) {
		AuthAuthorizer *a = 0;
		
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p) {
			if ((*p)->ms_get_authorizer(peer_type, &a, force_new))
				return a;
		}
		return NULL;
	}
	
	
	//校验一个新的incomming连接是否正确
	bool ms_deliver_verify_authorizer(Connection *con, int peer_type,
	  int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
	  bool& isvalid, CryptoKey& session_key) {
	  
		for (list<Dispatcher*>::iterator p = dispatchers.begin();p != dispatchers.end();++p) {
			if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid, session_key))
				return true;
		}
		return false;
	}
};

Messenger管理着两类dispatcher:

  • 普通dispatcher

  • fast dispater

代码实现较为简单,主要功能就是当有新的message到来时,分发消息;当有新的ingoing连接进来时,回调accept以及verify authorizer等;当有新的outgoing连接建立时,回调connect以及get authorizer等。

这里存在的一个疑问是,当调用add_dispatcher_head()或add_dispatcher_tail()加入一个dispatcher时,有可能同时加入到普通dispatcher列表,也有可能加入到fast dispatcher列表。那么当一个消息发送到dispatcher queue时,是否会回调两次呢?

答案是不会的。因为在消息进行投递时,其首先会调用ms_can_fast_dispatch()来判断是否可以进行快速投递,如果可以则不进行普通投递了。

2. SimpleMessenger

class SimplePolicyMessenger : public Messenger{
};


/**
 * SimpleMessenger负责消息的发送与接收。一般来说,其主要包含如下几个components:
 *
 * 1) Connection
 *    每一个逻辑会话(session)都关联着一个Connection
 *
 * 2) Pipe
 *    每一个网络连接都是通过pipe来进行处理的,负责消息的收发。通常情况下Pipe与Connection
 *    之间的比例关系是1:1,但是在socket重连或者connection竞争时逻辑sessions可能会在Pipes
 *    之间分离。
 *
 * 3)IncommingQueue
 *    Incomming消息与IncommingQueue相关联,并且每一个Pipe都关联着这样一个队列
 *
 * 4)DispatchQueue
 *   IncommingQueues本身也存放于DispatchQueue中,并由DispatchQueue的工作线程来负责消息的清理
 *   以及处理。
 *
 * 5)SimpleMessenger
 *   作为对外暴露的类传递给其他的消息处理器,并提供大部分API的细节
 */
class SimpleMessenger : public SimplePolicyMessenger {
public:
	/*
	 * 初始化SimpleMessenger
	 *
	 * @param cct: 所对应的CephContext
     * @param name: 用户所指定的name值
     * @param _nonce: 本SimpleMessenger所使用的唯一ID。在守护进程重启时,该值不能重复
     * @param features: local_connection的features	 
	 */
	SimpleMessenger(CephContext *cct, entity_name_t name,
		  string mname, uint64_t _nonce, uint64_t features);

	int get_dispatch_queue_len() {
		return dispatch_queue.get_queue_len();
	}

	double get_dispatch_queue_max_age(utime_t now) {
		return dispatch_queue.get_max_age(now);
	}
	
	/*
	 * SimpleMessage拥有一个accepter,让accepter绑定对应的网络地址
	 */
	int bind(const entity_addr_t& bind_addr);
	int rebind(const set<int>& avoid_ports);
	
	/*
	 * 启动回收线程(reaper_thread),做pipe回收等相关工作
	 */
	virtual int start();
	virtual void wait();
	virtual int shutdown();
	
	
	/*
	 * 发送消息到指定的目标地址
	 */
	virtual int send_message(Message *m, const entity_inst_t& dest) {
		return _send_message(m, dest);
	}

	int send_message(Message *m, Connection *con) {
		return _send_message(m, con);
	}
	
	/*
	 * 根据目标地址获取一个Connection
	 */
	virtual ConnectionRef get_connection(const entity_inst_t& dest);
	
	/*
	 * 获取local_connection
	 */
	virtual ConnectionRef get_loopback_connection();

protected:
	/*
	 * 启动DispatchQueue来分发消息,并且如果accepter已经绑定,也启动accepter线程监听socket连接
	 */
	 virtual void ready();
	 
public:
	/*
	 * 一个SimpleMessenger可以拥有一个accepter,用于接受远程连接
	 */
	Accepter accepter;
	
	/*
	 * 所拥有的消息分发队列(该分发队列是一个带优先级的队列)
	 */
	DispatchQueue dispatch_queue;
	
	/*
	 * 当accepter接受了一个新的连接,调用此函数将对应的pipe加入到pipes和accept_pipes列表,
	 * 并且启动读线程
	 */
	Pipe *add_accept_pipe(int sd);
	
	
private:

	/**
	* A thread used to tear down Pipes when they're complete.
	*/
	class ReaperThread : public Thread {
		SimpleMessenger *msgr;
		
	public:
		explicit ReaperThread(SimpleMessenger *m) : msgr(m) {}
		
		void *entry() {
			msgr->reaper_entry();
			return 0;
		}
	} reaper_thread;
	
	
	/*
	 * 采用addr创建一个新的Pipe,并将con与该pipe关联。
	 * (注:本函数返回成功,并不保证底层的socket实际建立成功)
	 *
	 * @param addr: 所要建立的连接的目标地址
	 * @param type: 对端的类型(OSD/MDS/MON等)
	 * @param con: 所创建的pipe与con相关连,用于存储该pipe的一些状态信息
	 * @param first: 向该pipe发送的第一条消息
	 */
	Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
		     Message *first);
			 
	//最终通过pipe来发送消息
	int _send_message(Message *m, const entity_inst_t& dest);
	
	//最终通过pipe来发送消息
	int _send_message(Message *m, Connection *con);
	
	/*
	 * 将message放入队列以发送到指定的目标地址。
	 * (如有必要,submit_message()也负责创建新的Pipes,以及关闭老的pipes)
	 *
	 * @param m: 所要发送的消息
	 * @param con: 已存在的目标连接,如果不清楚,请设置为NULL,这样就会根据目标地址创建一个新的pip来发送
	 * @param addr: 所要发送到的目标地址
	 * @param dest_type: 目标类型(osd/mds/mon等)
	 * @param already_locked: 假如为false的话,那么SimpleMessenger在访问共享数据结构之前会先获取锁;否则其
	 *        会假设当前已经获取了锁。值得注意的是,假如通过already_locked为false来调用此函数,那么con不能为NULL
	 *
	 */
	void submit_message(Message *m, PipeConnection *con,
		      const entity_addr_t& addr, int dest_type,
		      bool already_locked);
			  
	/*
	 * 回收不必要的pipe
	 */
	void reaper();
	
private:
	//accepter是否绑定地址
	bool did_bind;
	
	//存放pipe的列表(包括自己本身发出去的连接,以及accept的连接)
	ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
	
	//当前正在accepting的pipes
	set<Pipe*> accepting_pipes;
	
	//所有的pipe列表
	set<Pipe*>      pipes;
	
	//等待回收的pipe列表
	list<Pipe*>     pipe_reap_queue;
	
	
	//根据目标地址查找当前已存在的pipe
	Pipe *_lookup_pipe(const entity_addr_t& k) {
		ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
		if (p == rank_pipe.end())
			return NULL;
			
		// see lock cribbing in Pipe::fault()
		if (p->second->state_closed.read())
			return NULL;
		return p->second;
	}
	
public:

  //local_connection,可用于简化消息的投递(不需要通过socket来发送,可以直接投递到队列)
  ConnectionRef local_connection;
  uint64_t local_features;
};

3. Connection

/*
 * Connection代表的是一个抽象的连接,用于保存连接的状态信息。
 * (注:与我们通常意义上的连接含义有些不同,主要用于保存连接状态信息,并不直接与socket fd相关联)
 */
struct Connection : public RefCountedObject {
public:
	//所对应的Messenger
	Messenger *msgr;
	
	//所对应的连接的类型(osd/mon/mds等)
	int peer_type
	
	//对端地址
	entity_addr_t peer_addr;
	
private:
	//用于保存连接的一些feature
	uint64_t features;
	
public:
	//假如当前连接是lossy connection,那么当其状态为failed时,本字段值为true
	bool failed; // true if we are a lossy connection that has failed.

	int rx_buffers_version;
	
	//接收缓存(根据事务ID来索引)
	map<ceph_tid_t,pair<bufferlist,int> > rx_buffers;
	
public:
	/*
	 * 发送消息到connection
	 * (注:send_message()函数比较奇怪,因为connection的含义为表示实际连接的状态,因此
	 * 本不应该在这里设置消息发送函数)
	 */
	virtual int send_message(Message *m) = 0;
};

4. PipeConnection

class PipeConnection : public Connection {
public:
	Pipe* pipe;

public:
	int send_message(Message *m) override;
};

PipeConnection继承自Connection,最主要是其可以与pipe相关连,从而可与实际的socket建立联系。

上面讲到Connection::send_message()函数有些奇怪,这里我们来看一下其实现:

int PipeConnection::send_message(Message *m)
{
	assert(msgr);
	return static_cast<SimpleMessenger*>(msgr)->send_message(m, this);
}

可以看到其并不能直接发送消息,而是调用SimpleMessenger来发送,最终是调用所关联的pipe来发送。

5. Accepter

class Accepter : public Thread {
	
	//所对应的simple messenger
	SimpleMessenger *msgr;

	//是否退出标志
	bool done;
	
	//所监听的socket句柄
	int listen_sd;
	
public:
	/*
	 * 监听socket句柄,并接受新来的连接
	 */
	void *entry();
	
	//绑定指定的地址
	int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
	int rebind(const set<int>& avoid_port);
};

6. Pipe

/*
 * Pipe是SimpleMessenger中最复杂的组件。Pipe拥有读写两个线程,并且负责对应socket
 * 上的一切。除了负责消息的投递之外,Pipe还负责将socket上的错误信息propagate到
 * SimpleMessenger,并且为SimpleMessenger提供可靠的消息传递机制
 * 
 */
class Pipe : public RefCountedObject {

	/*
	 * 读线程负责socket上的所有读操作---不仅仅是消息,也包括acks以及其他一些protocol bits
	 * (注:在启动阶段,write线程也会进行一些读操作,主要是为了完成相应连接的建立)
	 */
	class Reader : public Thread {
	Pipe *pipe;
	public:
		explicit Reader(Pipe *p) : pipe(p) {}
		
		void *entry() { pipe->reader(); return 0; }
	} reader_thread;
	
	
	//负责socket建立完成之后的写操作
	class Writer : public Thread {
	Pipe *pipe;
	public:
		explicit Writer(Pipe *p) : pipe(p) {}
		void *entry() { pipe->writer(); return 0; }
	} writer_thread;
	
	/*
	 * 延迟消息队列,主要是为了在消息投递时注入延迟。
	 * 只有在请求中指定了延迟,才会将消息投入进延迟队列。在延迟时间到了之后,才会将消息
	 * 从延迟队列出去,放入in_q(即SimpleMessage::dispatch_queue)中。
	 */
	class DelayedDelivery: public Thread {
	}*delay_thread;
	
	
public: 
	//所对应的simple messenger
	SimpleMessenger *msgr;
	
 private:
 
	//对应的socket句柄
	int sd;
	
	struct iovec msgvec[SM_IOV_MAX];	
	
public:
	/*
	 * 连接对应的端口、peer type信息
	 */
	int port;
	int peer_type;
	entity_addr_t peer_addr;
	
	/*
	 * 当前连接的状态
	 *  
	 * enum {
	 *	STATE_ACCEPTING,
	 *	STATE_CONNECTING,
	 *	STATE_OPEN,
	 *	STATE_STANDBY,
	 *	STATE_CLOSED,
	 *	STATE_CLOSING,
	 *	STATE_WAIT       // just wait for racing connection
	 * };
	 */ 
	int state;
	
protected:
	/*
	 * 连接的Connection state
	 */
	PipeConnectionRef connection_state;
	
	/*
	 * 消息发送的优先级队列
	 */
	map<int, list<Message*> > out_q;  // priority queue for outbound msgs
	
	/*
	 * 接受到的消息的投递队列
	 */
	DispatchQueue *in_q;
	
	//已经发送的消息
    list<Message*> sent;
	
	/*
	 * 当accepter接受到新连接进来,就会调用reader()来进行处理,然后在reader中调用本
	 * 函数完成服务端的握手
	 */
	int accept();   // server handshake
	
	/*
	 * 当客户端连接后,在writer()函数中调用本函数完成客户端握手
	 */
    int connect();  // client handshake
	
	//具体的读函数
    void reader();
	
	//具体的写函数
    void writer();
};

7. Dispatcher

/*
 * 消息发送的目标接收者需要实现此接口
 */
class Dispatcher {
public:

	/*
	 * Messenger会调用此函数来查询是否有能力fast dispatch消息。如果想要一条
	 * 消息能够fast dispatch,你需要满足:
	 *
	 * 1)能够快速的处理消息,并且不需要花长时间来竞争lock
	 *
	 * 2) 即使在对应Connection没有获得ms_handle_accept()通知的情况下(比如已经执行了
	 * mark_down(),或者在连接上执行了ms_handle_reset()等),仍能够接受该消息并进行处理。
	 *
	 * 3)在不依赖特定系统状态的情况下,要能够决定消息是否能够快速投递。
	 * (注:一条消息可能会多次调用ms_can_fast_dispatch,而状态可能可能会在多次调用过程
	 *  中发生改变)
	 */
	virtual bool ms_can_fast_dispatch(Message *m) const { return false;}
	
	/*
	 * 用于决定一个dispatcher是否包含在fast-dispatcher列表中。
	 * 假如可以通过fast_dispatch()处理任何消息的话,那么本函数返回true,否则返回false;
	 */
	virtual bool ms_can_fast_dispatch_any() const { return false; }
	
	/*
	 * 对消息进行fast_dispatch
	 */
	virtual void ms_fast_dispatch(Message *m) { assert(0); }
	
	/*
	 * 在消息被dispatch之前,让dispatcher可以先对消息进行预处理。本函数针对每一个message都会
	 * 被调用,且是在决定进行fast/regular dispatch之前被调用,但值得注意的是其只用于拥有fast-dispatch
	 * 能力的系统。
	 */
	virtual void ms_fast_preprocess(Message *m) {}
	
	//普通的消息分发
	virtual bool ms_dispatch(Message *m) = 0;
	
	/*
	 * 当一个连接新创建,或者连接重连时,本函数会被同步的调用。
	 */
	virtual void ms_handle_connect(Connection *con) {}
	
	/*
	 * 假如Connection支持fast dispatch的话,那么当连接新创建或者连接重连时,本函数就会被同步地回调
	 */
	virtual void ms_handle_fast_connect(Connection *con) {}
	
	/*
	 * 当接受到一个incomming连接时,就会回调此函数
	 */
	virtual void ms_handle_accept(Connection *con) {}
	
	/*
	 * 当接受到一个incomming连接,且该连接支持fast dispatch时本函数就会被回调
	 *
	 * 系统会保证在该incomming connection的任何消息被投递之前,本函数会被回调
	 */
	virtual void ms_handle_fast_accept(Connection *con) {}
	
	/*
	 * 如果本函数被回调,则说明ordered+reliable的消息投递语义被破坏。可能由于网络连接
	 * 故障导致消息被丢失
	 *
	 * (注:只针对lossy connection,本函数才可能被回调)
	 */
	virtual bool ms_handle_reset(Connection *con) = 0;
	
	/*
	 * 如果本函数被回调,则说明ordered+reliable的消息投递语义被破坏,原因是连接被remote端reset。
	 * 
	 * 这通常隐含着incomming消息丢失,并且前面发出去的一些outgoing消息也可能丢失
	 */
	virtual void ms_handle_remote_reset(Connection *con) = 0;
	
	/*
	 * 获取给定peer type的AuthAuthorizer。假如对应的peer type不需要认证信息的话,则返回false
	 */
	virtual bool ms_get_authorizer(int dest_type, AuthAuthorizer **a, bool force_new) { return false; }
	
	/*
	 * 对新的incomming连接校验其authorizer信息
	 */
	virtual bool ms_verify_authorizer(Connection *con,
					int peer_type,
					int protocol,
					ceph::bufferlist& authorizer,
					ceph::bufferlist& authorizer_reply,
					bool& isvalid,
					CryptoKey& session_key) { return false; }
	
};

8. DispatchQueue

/*
 * DispatchQueue含有所有Pipe中需要分发的消息,其是一个带有优先级的队列,
 * 因此在使用时请仔细的组织各类消息的优先级,
 */
class DispatchQueue {

	/*
	 * 消息队列中的每一个Item类型
	 */
	class QueueItem {
		int type;
		ConnectionRef con;
		MessageRef m;
	};
	
	
	//对应的simple messenger
	SimpleMessenger *msgr;
	
	//对应的优先级队列
	PrioritizedQueue<QueueItem, uint64_t> mqueue;
	
	//用于存放收到的incomming消息
	set<pair<double, Message*> > marrival;
	map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
	
	
	/*
	 * 负责分发消息的线程
	 */
	class DispatchThread : public Thread {
		DispatchQueue *dq;
	public:
		explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
		void *entry() {
			dq->entry();
			return 0;
		}
	} dispatch_thread;
	
	//存放发往本地的消息
	list<pair<Message *, int> > local_messages;
	
	/*
	 * 负责投递到本地消息的线程
	 */
	class LocalDeliveryThread : public Thread {
	DispatchQueue *dq;
	public:
		explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
		void *entry() {
		dq->run_local_delivery();
		return 0;
		}
	} local_delivery_thread;
	
	//消息投递之前,打印消息,申请相应的资源等
	uint64_t pre_dispatch(Message *m);
	
	//消息投递完成之后,释放相应的资源
	void post_dispatch(Message *m, uint64_t msize);
	
public:
	/*
	 * 进行本地消息投递,即将消息插入到last_messages中,然后由local_delivery_thread真正负责投递
	 */
	void local_delivery(Message *m, int priority);
	
	//本地消息投递的真正实现
	void run_local_delivery();
	
	//调用对应的SimpleMessenger判断消息是否可以进行fast dispatch
	bool can_fast_dispatch(Message *m) const;
	
	//调用SimpleMessenger对消息进行快速投递
	void fast_dispatch(Message *m);
	
	//调用SimpleMessenger对消息进行预处理
	void fast_preprocess(Message *m);
	
	//将消息插入到优先级队列
	void enqueue(Message *m, int priority, uint64_t id);
	
	//丢弃某一种类型的消息
	void discard_queue(uint64_t id);
	
	//丢弃本地消息
	void discard_local();
	
	//创建dispatch_thread以及local_delivery_thread
	void start();
	
	//dispatch_thread的回调函数
	void entry();
};

现在我们来看一下消息投递的具体流程:

1)本地消息

void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
				     const entity_addr_t& dest_addr, int dest_type,
				     bool already_locked)
{
	...
	// local?
	if (my_inst.addr == dest_addr) {
		// local
		ldout(cct,20) << "submit_message " << *m << " local" << dendl;
		dispatch_queue.local_delivery(m, m->get_priority());
		return;
	}

	...
}

void DispatchQueue::local_delivery(Message *m, int priority)
{
	m->set_connection(msgr->local_connection.get());
	m->set_recv_stamp(ceph_clock_now(msgr->cct));
	Mutex::Locker l(local_delivery_lock);

	if (local_messages.empty())
		local_delivery_cond.Signal();
	local_messages.push_back(make_pair(m, priority));
	return;
}

从上面我们可以看到,对于发送的目标地址是本地的消息,这直接将消息插入到local_messages队列。

2) 收到的incomming消息的投递

主要有如下两个地方:

  • 对投入到DelayQueue中的消息进行再投递
void *Pipe::DelayedDelivery::entry(){
	...
	if (pipe->in_q->can_fast_dispatch(m)) {
		if (!stop_fast_dispatching_flag) {
			delay_dispatching = true;
			delay_lock.Unlock();
			pipe->in_q->fast_dispatch(m);
			delay_lock.Lock();
			delay_dispatching = false;

			if (stop_fast_dispatching_flag) {
				// we need to let the stopping thread proceed
				delay_cond.Signal();
				delay_lock.Unlock();
				delay_lock.Lock();
			}
		}
	} else {
		pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
	}

	...
}
  • reader()读取到的消息投递进mqueue
void Pipe::reader()
{
	...
	in_q->fast_preprocess(m);
	
	if (delay_thread) {
		utime_t release;
		if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
			release = m->get_recv_stamp();
			release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
			lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
		}
		delay_thread->queue(release, m);
	} else {
		if (in_q->can_fast_dispatch(m)) {
			reader_dispatching = true;
			pipe_lock.Unlock();
			in_q->fast_dispatch(m);
			pipe_lock.Lock();
			reader_dispatching = false;

			if (state == STATE_CLOSED ||
			  notify_on_dispatch_done) { // there might be somebody waiting
				notify_on_dispatch_done = false;
				cond.Signal();
			}
		} else {
			in_q->enqueue(m, m->get_priority(), conn_id);
		}
	}

	...
}



[参看]