作为分布式存储系统,Msg(src/msg)模块可谓是Ceph的基石之一。Ceph发展到Luminous ,已经支持的3大通信机制:simple,async和xio,其中simple历史最为悠久,是Ceph最早的通信模块,原理简单但性能较差。async作为后起之秀,优良的性能使其自Luminous开始已经作为了缺省msg方案。xio拥有众多实验特性,目前距离生产环境还有很大距离。
在使用OOP设计一个通信模块时,往往少不了以下几个抽象:
- Messenger 用于在最高层次管理所有通信,通常包括通信策略,工作线程等等
- Connection表示对一个连接的抽象,经常会设计一个状态机来供上层管理该Connection
- Message是对消息的封装,通常包含’Header + Data + Checking’ 几部分,Message和报文流的常常需要转换方法,Message->buf:encode(),Message<-buf:decode()
- Stack 负责实现真实的通信,比如TCP/IP协议栈、RDMA协议栈
以此为基础,Ceph Msg框架就显得十分清晰,下面就是Async机制核心的封装:AsyncMessenger,Processor,AsyncConnection,NetworkStack,Worker,EventCenter,Message,本文将逐个讨论这些角色。
上图是Ceph Msg模块框图,可以看出,msg模块可以分为3个层次:async/simple/xio + Generic NetworkStack + Specific NetworkStack,上层将”通信机制”抽象出来,下层聚焦于协议栈,包括硬件无关的部分以及硬件相关的部分,比如RDMAStack针对配置了IB卡的存储节点,DPDKStack用于使用X86 DPDK技术的存储节点,而PosixStack则是Linux原生的Socket通信接口。
AsyncMessenger
AsynMessenger类继承自Messenger:`class AsyncMessenger:public SimplePolicyMessenger:public Messenger` SimplePolicyMessenger是和”连接策略”相关的类。AsyncMessenger作为整个通信模块的核心和中转,不论是作为底层通信的NetworkStack,还是作为连接的抽象的Connection,抑或处理对端链接请求的Processor,都要围着它转。以OSD进程的main函数为例(src/ceph_osd.cc),整个OSD进程也不过7个Messenger实例:ms_public,ms_cluster,ms_hb_back_client,ms_hb_front_client, ms_hb_back_server,ms_hb_front_server,ms_objecter,这7个Messenger分别用来处理不同类型的消息。
//src/msg/async/AsyncMessenger.h 75 class AsyncMessenger : public SimplePolicyMessenger { 139 int send_message(Message *m, const entity_inst_t& dest) override { 176 void ready() override; 197 AsyncConnectionRef create_connect(const entity_addr_t& addr, int type); 221 NetworkStack *stack; 222 std::vector<Processor*> processors; 223 friend class Processor; 224 DispatchQueue dispatch_queue; 273 ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns; 280 set<AsyncConnectionRef> accepting_conns; 343 AsyncConnectionRef lookup_conn(const entity_addr_t& k) { 346 } 348 int accept_conn(AsyncConnectionRef conn) { 368 } 371 void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr); 448 } ;
–176–>启动Processor和dispatch_queue线程
–197–>创建Connection实例的接口
–139–>提供给上层提交发送请求的接口,AsyncMessenger::send_message()–>AsyncMessenger::_send_message()–>AsyncMessenger::submit_message()–>AsyncConnection::send_message()
–221–>Messenger关联的Network实例,收发流程都要经过该实例的传递
–224–>向上层提交收到的Message的队列
–273–>管理Connection实例的数据结构,所有已经成功建立起来的Connection都可以在这里找到
–280–>对于Connection建立过程中的Server端,accept返回但还在协商阶段的Connection被放置于此,连接建立成功后将从这里移到conns中
–343–>根据地址在conns中查找对应的Connection实例
–348–>将链接从accepting_conns中删除,并添加到conns中
–371–>被Processor::accept()调用,Server端一旦侦听到Client端的链接,就会在Server端建立一个Connection实例,并将其加入到accepting_conns中以待后续处理,这个建立和加入的过程就是在该函数中完成的。
AsyncMessenger::AsyncMessenger()
//src/msg/async/AsyncMessenger.cc 245 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, string mname, uint64_t _nonce) 253 { 254 std::string transport_type = "posix"; 255 if (type.find("rdma") != std::string::npos) 256 transport_type = "rdma"; 257 else if (type.find("dpdk") != std::string::npos) 258 transport_type = "dpdk"; 259 260 ceph_spin_init(&global_seq_lock); 261 StackSingleton *single; 262 cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+tr 263 single->ready(transport_type); 264 stack = single->stack.get(); 265 stack->start(); 266 local_worker = stack->get_worker(); 267 local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); 268 init_local_connection(); 269 reap_handler = new C_handle_reap(this); 270 unsigned processor_num = 1; 271 if (stack->support_local_listen_table()) 272 processor_num = stack->get_num_worker(); 273 for (unsigned i = 0; i < processor_num; ++i) 274 processors.push_back(new Processor(this, stack->get_worker(i), cct)); 275 }
–254-258–>协议栈默认是Posix,根据ceph.conf中指定的,可以为rdma或dpdk
–261-262–>使用单例模式single(用于构造NetworkStack对象),所以在一个Ceph进程(MON/MDS/OSD/Client)中,一个类型的协议栈有且只有一个实例
–263–>利用单例single构造NetworkStack对象,实质是调用"NetworkStack::create(cct,type)"
,后者会根据type的不同来初始化不同的子类,比如RDMAStack,PosixStack或DPDKStack。在NetworkStack实例被构造的同时,还会构造一组Worker供关联于它的AsyncMessenger使用,类似的,这组Worker也会根据ceph.conf而被实例化出不同的子类实例,比如RDMAWorker。这组worker使用"AsyncMessenger->NetworkStack::vector<Worker*>workers"
管理。
–264–>将构造好的NetworkStack与AsyncMessenger关联,一个进程内可以有多个Messenger实例(参见ceph_osd.cc),但如果只有一种类型的网络,eg,public/cluster均为Posix,则该进程内只有一个NetworkStack实例
–265–>”启动”协议栈,其主要工作就是针对每一个workers[i]启动一个线程,这个线程的核心任务就是循环执行"Worker.EventCenter.process_events()"
,相关的资源都被封装在Worker及其子类中。
–266–>get_worker()会根据Work Load获取当前Worker中的负载最轻的线程
–271-274–>构造一组(个)Processor实例,和Worker类似,每个Processor实例都是对一个线程运行资源的封装,在"Processor::start()"
中(not here)启动这些线程。使用"AsyncMessenger::vector<Processor*>processors"
管理。
Processor
Processor的主要工作是”监视”,类似于Socket编程中的listen,Async机制会启动一组Processor线程,每个线程”监视”一个端口,一旦有链接建立,即构造一个AsynConnection实例并交由该Processor线程对应的Worker线程处理。这里插句题外话,Processor线程和Worker线程是”多对一”的关系,一个Worker线程不但处理Processor发给它的任务,还可能会有一堆AsyncConnection发给它的任务。在Ceph中,进程之间(eg, OSD-OSD)没有C/S之分,但两个进程之间有很多的Connection,对于每个Connection确是有C/S之分的。在没有链接的情况下,主动建立链接的为Client,被动接受链接的为Server,而Processor就是那个使一个进程可以像Server一样被动建立链接的前提。
47 class Processor { 48 AsyncMessenger *msgr; 49 NetHandler net; 50 Worker *worker; 51 ServerSocket listen_socket; 52 EventCallbackRef listen_handler; 54 class C_processor_accept; 57 Processor(AsyncMessenger *r, Worker *w, CephContext *c); 60 void stop(); 61 int bind(const entity_addr_t &bind_addr, 62 const set<int>& avoid_ports, 63 entity_addr_t* bound_addr); 64 void start(); 65 void accept(); 66 };
–48–>隶属的AsyncMessenger实例
–49–>Processor建立连接的底层是使用Socket的listen-accept机制,这些机制被封装在了NetHandler中
–50–>Processor关联的Worker线程,Processor和Connection一样,所有Event最后都提交给关联的Worker处理
–51–>监听socket,本质是对Specific Stack层中的Server端Socket进行封装,比如RDMAServerScoketImpl::server_setup_socket(listen_socket.fd())
–52–>实质是C_processor_accept实例,作为listen_socket.fd()的handler被注册到Msg中的file events中,当Client端有连接过来时,该fd()会变得可读,此时就会回调该listen_handler,本质是调用Processor.accept()–>AsyncMessenger.add_accept()–>AsyncConnection.accept()来建立一个可用连接。
–61–>绑定一个地址作为listen_socket。
AsyncConnection
一个Connection实例对应着一个端对端的连接,AsyncConnection作为Connection的子类,其中封装了一个连接状态机、上下文以及基于该AsyncConnection的读写方法。
//src/msg/async/AsyncConnection.h 50 class AsyncConnection : public Connection { 52 ssize_t read_bulk(char *buf, unsigned len); 61 void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); 62 ssize_t read_until(unsigned needed, char *p); 63 ssize_t _process_connection(); 64 void _connect(); 65 void _stop(); 66 int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); 67 ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); 76 ssize_t write_message(Message *m, bufferlist& bl, bool more); 78 ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,bufferlist &authorizer_reply) { 96 } 194 bool is_connected() override { 195 return can_write.load() == WriteStatus::CANWRITE; 196 } 197 198 // Only call when AsyncConnection first construct 199 void connect(const entity_addr_t& addr, int type) { 204 } 205 // Only call when AsyncConnection first construct 206 void accept(ConnectedSocket socket, entity_addr_t &addr); 207 int send_message(Message *m) override; 291 AsyncMessenger *async_msgr; 296 std::atomic<uint64_t> out_seq{0}; 298 int state; 299 int state_after_send; 300 ConnectedSocket cs; 301 int port; 302 Messenger::Policy policy; 304 DispatchQueue *dispatch_queue; 307 bufferlist outcoming_bl; 318 list<Message*> sent; 319 map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs 324 EventCallbackRef read_handler; 325 EventCallbackRef write_handler; 326 EventCallbackRef wakeup_handler; 327 EventCallbackRef tick_handler; 328 struct iovec msgvec[ASYNC_IOV_MAX]; 329 char *recv_buf; 330 uint32_t recv_max_prefetch; 331 uint32_t recv_start; 332 uint32_t recv_end; 333 set<uint64_t> register_time_events; // need to delete it if stop 346 bufferlist data_buf; 347 bufferlist::iterator data_blp; 348 bufferlist front, middle, data; 349 ceph_msg_connect connect_msg; 350 // Connecting state 351 bool got_bad_auth; 352 AuthAuthorizer *authorizer; 353 bufferlist authorizer_buf; 354 ceph_msg_connect_reply connect_reply; 355 // Accepting state 356 entity_addr_t socket_addr; 357 CryptoKey session_key; 358 bool replacing; 364 bool is_reset_from_peer; 365 bool once_ready; 368 char *state_buffer; 370 uint64_t state_offset; 371 Worker *worker; 372 EventCenter *center; 377 void handle_write(); 378 void process(); 404 };
–52–>调用ConnectedSocket->read()填充传入的buffer,通常被read_until()调用,用于填充构造Message的buffer或AsyncConnection::recv_buf
–61–>发送消息前的准备工作
–62–>或从recv_buf中读取事前缓冲的足够的数据,填充到传入的buffer(通常是用于构造Message的一个buffer)中,或通过AsyncConnection::read_until()–>AsynConnection::read_bulk()–>ConnectedSocket::read()调用底层NetworkStack的接口直接读取数据。是AsyncConnection::read_handler.process()的核心基础函数。
–63–>处理连接的方法,负责本地AsyncConnection实例构造之后,STATE_OPEN状态之前连接处理
–64–>当发消息时发现连接不可达时,以Client身份去连接Server(对端)来建立连接,AsyncConnection::send_message->AsyncMessenger::get_connection()->AsyncMessenger::add_connect()->AsyncConnection::connect()->AsyncConnection::_connect()–66–>如果本地以Client的身份建立一个AsyncConnection,则会在AsyncConnection处于STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH之后处理接下来的状态
–67–>如果本地以Server的身份建立一个AsyncConnection,则会在AsyncConnection处于STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH之后处理接下来的状态
–76–>发送消息的一环,主要工作是将Message中的数据拷贝到outcoming_bl中,提交给下层逻辑(AsyncConnection::_try_send())处理。
–78–>Server端建立连接的一环,AsyncConnection::handle_connect_msg()->AsyncConnection::_reply_accept()
–199–>Client端建立连接的一环,AsyncConnection::send_message()->AsyncMessenger::get_connection()->AsyncMessenger::add_connect()->AsyncConnection::connect()->AsyncConnection::_connect()–206–>Server端建立连接的一环,Processor::do_request()->Processor::accept()->AsyncMessenger::add_accept()->AsyncConnection::accept()
–207–>提供给上层发送消息的接口AsyncMessenger::submit_message()->AsyncConnection::send_message()->AsyncConnection::write_handler。
–217-251–> 一个AsyncConnection状态枚举,包含了Server和Client的所有可能状态
–291–>该AsyncConnection归属的AsyncMessenger
–298–>AsyncConnection的当前状态
–300–>与Stack层的接口,ConnectedSocket是对抽象类ConnectedSocketImpl的封装,而后者的实现随Stack而异,比如,RDMAStack中就是RDMAConnectedSocketImpl
–302–>AsynConnection的policy,通常由AsyncConnection::connect()中`policy=msgr->get_policy(type)`设置为归属的AsyncMessenger::Policy
–304–>向上层提供Message的队列,有专门的线程维护这个队列
–307–>AsyncConnection将要发送数据的bufferlist,供AsyncConnection::_try_send()->ConnectedSocket::send()->ConnectedSocketImpl::send()使用来传给Stack层处理
–318–>将发Message的list,这些Message都是从out_q中取出的
–319–>基于优先级的Message与bufferlist映射–324-327–>该AsyncConnection发生异步事件的各种回调handler
–329–>该AsyncConnection使用的接收buffer,供read_until()使用
–330–>read_until(addr,len)的本质从底层获取len数据到addr,但是会在Async层维持一个缓存buffer—recv_buf,对于小块的数据,通常已经在上次读取时已经在recv_buf被缓存,读取这部分数据直接从recv_buf中获取而不必兴师动众劳烦Stack。决定一个数据块是否被缓存的标准就是recv_max_prefetch–331-332–>用于管理recv_buf的读写位置,只在read_until()中使用–333–>已经注册的time event
–346-348–>构造一个Message所需
–349–>Server端收到的Client端的connect_msg
–358–>当前AsyncConnection发生Connection racing的时候,此处会为true
–371–>该AsyncConnection归属的Worker,即发生在AsyncConnection实例中的所有events都由该Worker处理–377–>异步写事件回调函数,AsyncConnection.write_handler/C_handle_write.do_request()->handler_write()
–378–>异步读事件回调函数,AsyncConnection.read_handler/C_handle_read.do_request()->process()–379–>异步时间事件回调函数,AsyncConnection.wakeup_handler/C_time_wakeup.do_request()->wakeup_from()
–380–>异步tick事件回调函数,AsyncConnection.tick_handler/C_tick_wakeup.do_request()->tick(), Ceph中对AsyncConnection的处理是STATE驱动的,几乎所有操作AsyncConnection的接口在干活之前都会check一下AsyncConnection当前的状态。AsyncConnection的状态主要在两个函数:AsyncConnection::process()和AsyncConnection::_process_connection(),后者被前者调用,前者作为read_handler随notify_fd一同被注册到file events中,或以external events的形式被调用。下图是我整理的在连接实例AsyncConnection已经构造,尙处在C/S协商阶段,仍未可用之前的状态迁移图,C/S两端在此阶段的实际操作可以参考_process_connection()的实现,基于策略的连接处理方案在handle_connect_msg()(Server端)和handle_connect_reply()(Client中)
AsyncConnection::AsyncConnection()
//src/msg/async/AsynConnection.cc 121 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, 122 Worker *w) 123 : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), 124 logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), 125 state(STATE_NONE), state_after_send(STATE_NONE), port(-1), 126 dispatch_queue(q), can_write(WriteStatus::NOWRITE), 127 keepalive(false), recv_buf(NULL), 128 recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), 129 recv_start(0), recv_end(0), 130 last_active(ceph::coarse_mono_clock::now()), 131 inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), 132 got_bad_auth(false), authorizer(NULL), replacing(false), 133 is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), 134 worker(w), center(&w->center) 135 { 136 read_handler = new C_handle_read(this); 137 write_handler = new C_handle_write(this); 138 wakeup_handler = new C_time_wakeup(this); 139 tick_handler = new C_tick_wakeup(this); 140 memset(msgvec, 0, sizeof(msgvec)); 141 // double recv_max_prefetch see "read_until" 142 recv_buf = new char[2*recv_max_prefetch]; 143 state_buffer = new char[4096]; 144 logger->inc(l_msgr_created_connections); 145 }
–123–>构造AsyncConnection及其父类Connection都需要一个AsyncMessenger –125–>一个AsyncConnection的构造之初,它的state和state_after_send都是STATE_NONE,该状态将在AsyncConnection::accept()或_connect()返回之后,被相应的置为STATE_ACCEPTING以及STATE_CONNECTING
–128–>根据ceph.conf配置recv_max_prefetch
–136-139–>构造该AsyncConnection的各种异步Event的handler
–142–>申请recv_buf –143–>申请state_buffer
NetworkStack
NetworkStack是Generic NetworkStack层的核心类,主要的工作是承上启下:承Async层的请求,启Specific NetworkStack的方法。
//src/msg/Stack.h 288 class NetworkStack : public CephContext::ForkWatcher { 290 unsigned num_workers = 0; 294 std::function<void ()> add_thread(unsigned i); 298 vector<Worker*> workers; 300 explicit NetworkStack(CephContext *c, const string &t); 309 static std::shared_ptr<NetworkStack> create( 310 CephContext *c, const string &type); 312 static Worker* create_worker( 313 CephContext *c, const string &t, unsigned i); 327 virtual Worker *get_worker(); 328 Worker *get_worker(unsigned i) { 329 return workers[i]; 330 } 350 };
–290–>隶属于该NetworkStack的Worker的数量
–294–>构造一个基于NetworkStack.workers[i],循环执行Worker->EventCenter.process_events()的线程,由start()调用
–298–>隶属于该NetworkStack的Worker们,这些Worker线程们会通过该NetworkStack服务于关联于该NetworkStack的Processor们以及AsyncConnection们
–309–>Ceph msg中使用单例模式构造一种类型的NetworkStack实例,create()即是构造接口
–312–>根据type的不同构造或PosixWorker,或RDMAWorker,或DPDKWorker
–325–>循环调用add_thread()启动所有拥有的Worker线程
NetworkStack::NetworkStack()
//src/msg/async/Stack.cc 102 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) 103 { 104 assert(cct->_conf->ms_async_op_threads > 0); 105 106 const uint64_t InitEventNumber = 5000; 107 num_workers = cct->_conf->ms_async_op_threads; 108 if (num_workers >= EventCenter::MAX_EVENTCENTER) { 113 num_workers = EventCenter::MAX_EVENTCENTER; 114 } 115 116 for (unsigned i = 0; i < num_workers; ++i) { 117 Worker *w = create_worker(cct, type, i); 118 w->center.init(InitEventNumber, i, type); 119 workers.push_back(w); 120 } 121 cct->register_fork_watcher(this); 122 }
–107-114–>取EventCenter::MAX_EVENTCENTER和配置文件中ms_async_op_threads较小的来确定Worker线程的数量
–116-120–>构造相应数量的Worker并初始化其Center
Worker
//src/msg/async/Stack.h 205 class Worker { 218 EventCenter center; 223 Worker(CephContext *c, unsigned i) 224 : cct(c), perf_logger(NULL), id(i), references(0), center(c) { 244 } 252 virtual int listen(entity_addr_t &addr, 253 const SocketOptions &opts, ServerSocket *) = 0; 254 virtual int connect(const entity_addr_t &addr, 255 const SocketOptions &opts, ConnectedSocket *socket) = 0; 286 };
–218–>内嵌EventCenter
–252–>给Server Connection用的listen()
–254–>给Client Connection用的connect()
EventCenter
封装了基于异步事件的处理方法,包括Worker,注册事件等接口
//src/msg/async/Event.h 87 class EventCenter { 90 static const int MAX_EVENTCENTER = 24; 95 struct AssociatedCenters { 100 }; 102 struct FileEvent { 103 int mask; 104 EventCallbackRef read_cb; 105 EventCallbackRef write_cb; 106 FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {} 107 }; 109 struct TimeEvent { 110 uint64_t id; 111 EventCallbackRef time_cb; 113 TimeEvent(): id(0), time_cb(NULL) {} 114 }; 158 pthread_t owner; 160 std::atomic_ulong external_num_events; 161 deque<EventCallbackRef> external_events; 162 vector<FileEvent> file_events; 163 EventDriver *driver; 170 uint64_t time_event_next_id; 171 int notify_receive_fd; 172 int notify_send_fd; 173 NetHandler net; 174 EventCallbackRef notify_handler; 178 int process_time_events(); 194 int init(int nevent, unsigned idx, const std::string &t); 199 EventDriver *get_driver() { return driver; } 202 int create_file_event(int fd, int mask, EventCallbackRef ctxt); 203 uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt); 206 int process_events(int timeout_microseconds, ceph::timespan *working_dur = nullptr); 207 void wakeup(); 210 void dispatch_event_external(EventCallbackRef e); 211 inline bool in_thread() const { 212 return pthread_equal(pthread_self(), owner); 213 } 245 template <typename func> 246 void submit_to(int i, func &&f, bool nowait = false) { 262 }; 263 };
–90–>进程的EventCenter数量
–102–>file event结构
–109–>time event结构
–158–>归属的Worker线程
–160–>外部事件个数
–163–>异步机制抽象类对象,在EventCenter::init()中根据配置确定,比如支持EPOLL,就会使用EpollDriver
–171–>管道的写fd,作为读fd的代理,用于唤醒Worker
–172–>管道的读fd,作为file_events被加入到监视列表中
–174–>notify_receive_fd的READABLE时的handler,作为external events在file events中的代理,这个handler只是将字符从管道中取出而已
–162–>该EventCenter中注册的file_events,用EventDriver管理
–163–>该EventCenter中注册的external_events,用EventDriver管理
–164–>计数fd,基于Linux的eventfd()机制
–176–>全局的EventCenter* 数组
–203–>注册文件事件 –204–>删除文件事件
–205–>注册time事件 –206–>处理file_event和external_events
–207–>唤醒Worker,本质就是写写fd,这样读fd可读,线程得以唤醒
–211–>判断调用者是Worker线程
–246–>使用f构造EventCallback
— 感谢Joe.HU同学指出文中的错误