Ceph AsyncMessenger 简析 I

作为分布式存储系统,Msg(src/msg)模块可谓是Ceph的基石之一。Ceph发展到Luminous ,已经支持的3大通信机制:simple,async和xio,其中simple历史最为悠久,是Ceph最早的通信模块,原理简单但性能较差。async作为后起之秀,优良的性能使其自Luminous开始已经作为了缺省msg方案。xio拥有众多实验特性,目前距离生产环境还有很大距离。

在使用OOP设计一个通信模块时,往往少不了以下几个抽象:

  • Messenger 用于在最高层次管理所有通信,通常包括通信策略,工作线程等等
  • Connection表示对一个连接的抽象,经常会设计一个状态机来供上层管理该Connection
  • Message是对消息的封装,通常包含’Header + Data + Checking’ 几部分,Message和报文流的常常需要转换方法,Message->buf:encode(),Message<-buf:decode()
  • Stack 负责实现真实的通信,比如TCP/IP协议栈、RDMA协议栈

以此为基础,Ceph Msg框架就显得十分清晰,下面就是Async机制核心的封装:AsyncMessenger,Processor,AsyncConnection,NetworkStack,Worker,EventCenter,Message,本文将逐个讨论这些角色。

上图是Ceph Msg模块框图,可以看出,msg模块可以分为3个层次:async/simple/xio + Generic NetworkStack + Specific NetworkStack,上层将”通信机制”抽象出来,下层聚焦于协议栈,包括硬件无关的部分以及硬件相关的部分,比如RDMAStack针对配置了IB卡的存储节点,DPDKStack用于使用X86 DPDK技术的存储节点,而PosixStack则是Linux原生的Socket通信接口。

AsyncMessenger

AsynMessenger类继承自Messenger:`class AsyncMessenger:public SimplePolicyMessenger:public Messenger` SimplePolicyMessenger是和”连接策略”相关的类。AsyncMessenger作为整个通信模块的核心和中转,不论是作为底层通信的NetworkStack,还是作为连接的抽象的Connection,抑或处理对端链接请求的Processor,都要围着它转。以OSD进程的main函数为例(src/ceph_osd.cc),整个OSD进程也不过7个Messenger实例:ms_public,ms_cluster,ms_hb_back_client,ms_hb_front_client, ms_hb_back_server,ms_hb_front_server,ms_objecter,这7个Messenger分别用来处理不同类型的消息。

//src/msg/async/AsyncMessenger.h
 75 class AsyncMessenger : public SimplePolicyMessenger {
139   int send_message(Message *m, const entity_inst_t& dest) override {
176   void ready() override;
197   AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
221   NetworkStack *stack;
222   std::vector<Processor*> processors;
223   friend class Processor;
224   DispatchQueue dispatch_queue;
273   ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
280   set<AsyncConnectionRef> accepting_conns;
343   AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
346   }
348   int accept_conn(AsyncConnectionRef conn) {
368   }
371   void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
448 } ;

–176–>启动Processor和dispatch_queue线程
–197–>创建Connection实例的接口
–139–>提供给上层提交发送请求的接口,AsyncMessenger::send_message()–>AsyncMessenger::_send_message()–>AsyncMessenger::submit_message()–>AsyncConnection::send_message()
–221–>Messenger关联的Network实例,收发流程都要经过该实例的传递
–224–>向上层提交收到的Message的队列
–273–>管理Connection实例的数据结构,所有已经成功建立起来的Connection都可以在这里找到
–280–>对于Connection建立过程中的Server端,accept返回但还在协商阶段的Connection被放置于此,连接建立成功后将从这里移到conns中
–343–>根据地址在conns中查找对应的Connection实例
–348–>将链接从accepting_conns中删除,并添加到conns中
–371–>被Processor::accept()调用,Server端一旦侦听到Client端的链接,就会在Server端建立一个Connection实例,并将其加入到accepting_conns中以待后续处理,这个建立和加入的过程就是在该函数中完成的。

AsyncMessenger::AsyncMessenger()

//src/msg/async/AsyncMessenger.cc
245 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, string mname, uint64_t _nonce)
253 {
254   std::string transport_type = "posix";
255   if (type.find("rdma") != std::string::npos)
256     transport_type = "rdma";
257   else if (type.find("dpdk") != std::string::npos)
258     transport_type = "dpdk";
259 
260   ceph_spin_init(&global_seq_lock);
261   StackSingleton *single;
262   cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+tr
263   single->ready(transport_type);
264   stack = single->stack.get();
265   stack->start();
266   local_worker = stack->get_worker();
267   local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
268   init_local_connection();
269   reap_handler = new C_handle_reap(this);
270   unsigned processor_num = 1;
271   if (stack->support_local_listen_table())
272     processor_num = stack->get_num_worker();
273   for (unsigned i = 0; i < processor_num; ++i)
274     processors.push_back(new Processor(this, stack->get_worker(i), cct));
275 }

–254-258–>协议栈默认是Posix,根据ceph.conf中指定的,可以为rdma或dpdk
–261-262–>使用单例模式single(用于构造NetworkStack对象),所以在一个Ceph进程(MON/MDS/OSD/Client)中,一个类型的协议栈有且只有一个实例
–263–>利用单例single构造NetworkStack对象,实质是调用"NetworkStack::create(cct,type)",后者会根据type的不同来初始化不同的子类,比如RDMAStack,PosixStack或DPDKStack。在NetworkStack实例被构造的同时,还会构造一组Worker供关联于它的AsyncMessenger使用,类似的,这组Worker也会根据ceph.conf而被实例化出不同的子类实例,比如RDMAWorker。这组worker使用"AsyncMessenger->NetworkStack::vector<Worker*>workers"管理。
–264–>将构造好的NetworkStack与AsyncMessenger关联,一个进程内可以有多个Messenger实例(参见ceph_osd.cc),但如果只有一种类型的网络,eg,public/cluster均为Posix,则该进程内只有一个NetworkStack实例
–265–>”启动”协议栈,其主要工作就是针对每一个workers[i]启动一个线程,这个线程的核心任务就是循环执行"Worker.EventCenter.process_events()",相关的资源都被封装在Worker及其子类中。
–266–>get_worker()会根据Work Load获取当前Worker中的负载最轻的线程
–271-274–>构造一组(个)Processor实例,和Worker类似,每个Processor实例都是对一个线程运行资源的封装,在"Processor::start()"中(not here)启动这些线程。使用"AsyncMessenger::vector<Processor*>processors"管理。

Processor

Processor的主要工作是”监视”,类似于Socket编程中的listen,Async机制会启动一组Processor线程,每个线程”监视”一个端口,一旦有链接建立,即构造一个AsynConnection实例并交由该Processor线程对应的Worker线程处理。这里插句题外话,Processor线程和Worker线程是”多对一”的关系,一个Worker线程不但处理Processor发给它的任务,还可能会有一堆AsyncConnection发给它的任务。在Ceph中,进程之间(eg, OSD-OSD)没有C/S之分,但两个进程之间有很多的Connection,对于每个Connection确是有C/S之分的。在没有链接的情况下,主动建立链接的为Client,被动接受链接的为Server,而Processor就是那个使一个进程可以像Server一样被动建立链接的前提。

 47 class Processor {
 48   AsyncMessenger *msgr;
 49   NetHandler net;
 50   Worker *worker;
 51   ServerSocket listen_socket;
 52   EventCallbackRef listen_handler;
 54   class C_processor_accept;
 57   Processor(AsyncMessenger *r, Worker *w, CephContext *c);
 60   void stop();
 61   int bind(const entity_addr_t &bind_addr,
 62            const set<int>& avoid_ports,
 63            entity_addr_t* bound_addr);
 64   void start();
 65   void accept();
 66 };

–48–>隶属的AsyncMessenger实例
–49–>Processor建立连接的底层是使用Socket的listen-accept机制,这些机制被封装在了NetHandler中
–50–>Processor关联的Worker线程,Processor和Connection一样,所有Event最后都提交给关联的Worker处理
–51–>监听socket,本质是对Specific Stack层中的Server端Socket进行封装,比如RDMAServerScoketImpl::server_setup_socket(listen_socket.fd())
–52–>实质是C_processor_accept实例,作为listen_socket.fd()的handler被注册到Msg中的file events中,当Client端有连接过来时,该fd()会变得可读,此时就会回调该listen_handler,本质是调用Processor.accept()–>AsyncMessenger.add_accept()–>AsyncConnection.accept()来建立一个可用连接。
–61–>绑定一个地址作为listen_socket。

AsyncConnection


一个Connection实例对应着一个端对端的连接,AsyncConnection作为Connection的子类,其中封装了一个连接状态机、上下文以及基于该AsyncConnection的读写方法。

//src/msg/async/AsyncConnection.h
 50 class AsyncConnection : public Connection {
 52   ssize_t read_bulk(char *buf, unsigned len);
 61   void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
 62   ssize_t read_until(unsigned needed, char *p);
 63   ssize_t _process_connection();
 64   void _connect();
 65   void _stop();
 66   int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
 67   ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
 76   ssize_t write_message(Message *m, bufferlist& bl, bool more);
 78   ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,bufferlist &authorizer_reply) {
 96   }
194   bool is_connected() override {
195     return can_write.load() == WriteStatus::CANWRITE;
196   }
197 
198   // Only call when AsyncConnection first construct
199   void connect(const entity_addr_t& addr, int type) {
204   }
205   // Only call when AsyncConnection first construct
206   void accept(ConnectedSocket socket, entity_addr_t &addr);
207   int send_message(Message *m) override;
291   AsyncMessenger *async_msgr;
296   std::atomic<uint64_t> out_seq{0};
298   int state;
299   int state_after_send;
300   ConnectedSocket cs;
301   int port;
302   Messenger::Policy policy;
304   DispatchQueue *dispatch_queue;
307   bufferlist outcoming_bl;
318   list<Message*> sent; 
319   map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
324   EventCallbackRef read_handler;
325   EventCallbackRef write_handler;
326   EventCallbackRef wakeup_handler;
327   EventCallbackRef tick_handler;
328   struct iovec msgvec[ASYNC_IOV_MAX];
329   char *recv_buf;
330   uint32_t recv_max_prefetch;
331   uint32_t recv_start;
332   uint32_t recv_end;
333   set<uint64_t> register_time_events; // need to delete it if stop
346   bufferlist data_buf;
347   bufferlist::iterator data_blp;
348   bufferlist front, middle, data;
349   ceph_msg_connect connect_msg;
350   // Connecting state
351   bool got_bad_auth;
352   AuthAuthorizer *authorizer;
353   bufferlist authorizer_buf;
354   ceph_msg_connect_reply connect_reply;
355   // Accepting state
356   entity_addr_t socket_addr;
357   CryptoKey session_key;
358   bool replacing;    
364   bool is_reset_from_peer;
365   bool once_ready;
368   char *state_buffer;
370   uint64_t state_offset;
371   Worker *worker;
372   EventCenter *center;
377   void handle_write();
378   void process();
404 };

–52–>调用ConnectedSocket->read()填充传入的buffer,通常被read_until()调用,用于填充构造Message的buffer或AsyncConnection::recv_buf
–61–>发送消息前的准备工作
–62–>或从recv_buf中读取事前缓冲的足够的数据,填充到传入的buffer(通常是用于构造Message的一个buffer)中,或通过AsyncConnection::read_until()–>AsynConnection::read_bulk()–>ConnectedSocket::read()调用底层NetworkStack的接口直接读取数据。是AsyncConnection::read_handler.process()的核心基础函数。
–63–>处理连接的方法,负责本地AsyncConnection实例构造之后,STATE_OPEN状态之前连接处理
–64–>当发消息时发现连接不可达时,以Client身份去连接Server(对端)来建立连接,AsyncConnection::send_message->AsyncMessenger::get_connection()->AsyncMessenger::add_connect()->AsyncConnection::connect()->AsyncConnection::_connect()–66–>如果本地以Client的身份建立一个AsyncConnection,则会在AsyncConnection处于STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH之后处理接下来的状态
–67–>如果本地以Server的身份建立一个AsyncConnection,则会在AsyncConnection处于STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH之后处理接下来的状态
–76–>发送消息的一环,主要工作是将Message中的数据拷贝到outcoming_bl中,提交给下层逻辑(AsyncConnection::_try_send())处理。
–78–>Server端建立连接的一环,AsyncConnection::handle_connect_msg()->AsyncConnection::_reply_accept()
–199–>Client端建立连接的一环,AsyncConnection::send_message()->AsyncMessenger::get_connection()->AsyncMessenger::add_connect()->AsyncConnection::connect()->AsyncConnection::_connect()–206–>Server端建立连接的一环,Processor::do_request()->Processor::accept()->AsyncMessenger::add_accept()->AsyncConnection::accept()
–207–>提供给上层发送消息的接口AsyncMessenger::submit_message()->AsyncConnection::send_message()->AsyncConnection::write_handler。
–217-251–> 一个AsyncConnection状态枚举,包含了Server和Client的所有可能状态
–291–>该AsyncConnection归属的AsyncMessenger
–298–>AsyncConnection的当前状态
–300–>与Stack层的接口,ConnectedSocket是对抽象类ConnectedSocketImpl的封装,而后者的实现随Stack而异,比如,RDMAStack中就是RDMAConnectedSocketImpl
–302–>AsynConnection的policy,通常由AsyncConnection::connect()中`policy=msgr->get_policy(type)`设置为归属的AsyncMessenger::Policy
–304–>向上层提供Message的队列,有专门的线程维护这个队列
–307–>AsyncConnection将要发送数据的bufferlist,供AsyncConnection::_try_send()->ConnectedSocket::send()->ConnectedSocketImpl::send()使用来传给Stack层处理
–318–>将发Message的list,这些Message都是从out_q中取出的
–319–>基于优先级的Message与bufferlist映射–324-327–>该AsyncConnection发生异步事件的各种回调handler
–329–>该AsyncConnection使用的接收buffer,供read_until()使用
–330–>read_until(addr,len)的本质从底层获取len数据到addr,但是会在Async层维持一个缓存buffer—recv_buf,对于小块的数据,通常已经在上次读取时已经在recv_buf被缓存,读取这部分数据直接从recv_buf中获取而不必兴师动众劳烦Stack。决定一个数据块是否被缓存的标准就是recv_max_prefetch–331-332–>用于管理recv_buf的读写位置,只在read_until()中使用–333–>已经注册的time event
–346-348–>构造一个Message所需
–349–>Server端收到的Client端的connect_msg
–358–>当前AsyncConnection发生Connection racing的时候,此处会为true
–371–>该AsyncConnection归属的Worker,即发生在AsyncConnection实例中的所有events都由该Worker处理–377–>异步写事件回调函数,AsyncConnection.write_handler/C_handle_write.do_request()->handler_write()
–378–>异步读事件回调函数,AsyncConnection.read_handler/C_handle_read.do_request()->process()–379–>异步时间事件回调函数,AsyncConnection.wakeup_handler/C_time_wakeup.do_request()->wakeup_from()
–380–>异步tick事件回调函数,AsyncConnection.tick_handler/C_tick_wakeup.do_request()->tick(), Ceph中对AsyncConnection的处理是STATE驱动的,几乎所有操作AsyncConnection的接口在干活之前都会check一下AsyncConnection当前的状态。AsyncConnection的状态主要在两个函数:AsyncConnection::process()和AsyncConnection::_process_connection(),后者被前者调用,前者作为read_handler随notify_fd一同被注册到file events中,或以external events的形式被调用。下图是我整理的在连接实例AsyncConnection已经构造,尙处在C/S协商阶段,仍未可用之前的状态迁移图,C/S两端在此阶段的实际操作可以参考_process_connection()的实现,基于策略的连接处理方案在handle_connect_msg()(Server端)和handle_connect_reply()(Client中)

AsyncConnection::AsyncConnection()

//src/msg/async/AsynConnection.cc
 121 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
 122                                  Worker *w)
 123   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
 124     logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
 125     state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
 126     dispatch_queue(q), can_write(WriteStatus::NOWRITE),
 127     keepalive(false), recv_buf(NULL),
 128     recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
 129     recv_start(0), recv_end(0),
 130     last_active(ceph::coarse_mono_clock::now()),
 131     inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
 132     got_bad_auth(false), authorizer(NULL), replacing(false),
 133     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
 134     worker(w), center(&w->center)
 135 {
 136   read_handler = new C_handle_read(this);
 137   write_handler = new C_handle_write(this);
 138   wakeup_handler = new C_time_wakeup(this);
 139   tick_handler = new C_tick_wakeup(this);
 140   memset(msgvec, 0, sizeof(msgvec));
 141   // double recv_max_prefetch see "read_until"
 142   recv_buf = new char[2*recv_max_prefetch];
 143   state_buffer = new char[4096];
 144   logger->inc(l_msgr_created_connections);
 145 }

–123–>构造AsyncConnection及其父类Connection都需要一个AsyncMessenger –125–>一个AsyncConnection的构造之初,它的state和state_after_send都是STATE_NONE,该状态将在AsyncConnection::accept()或_connect()返回之后,被相应的置为STATE_ACCEPTING以及STATE_CONNECTING
–128–>根据ceph.conf配置recv_max_prefetch
–136-139–>构造该AsyncConnection的各种异步Event的handler
–142–>申请recv_buf –143–>申请state_buffer

NetworkStack

NetworkStack是Generic NetworkStack层的核心类,主要的工作是承上启下:承Async层的请求,启Specific NetworkStack的方法。

//src/msg/Stack.h
288 class NetworkStack : public CephContext::ForkWatcher {
290   unsigned num_workers = 0;
294   std::function<void ()> add_thread(unsigned i);
298   vector<Worker*> workers;
300   explicit NetworkStack(CephContext *c, const string &t);
309   static std::shared_ptr<NetworkStack> create(
310           CephContext *c, const string &type);
312   static Worker* create_worker(
313           CephContext *c, const string &t, unsigned i);
327   virtual Worker *get_worker();
328   Worker *get_worker(unsigned i) {
329     return workers[i];
330   }
350 };

–290–>隶属于该NetworkStack的Worker的数量
–294–>构造一个基于NetworkStack.workers[i],循环执行Worker->EventCenter.process_events()的线程,由start()调用
–298–>隶属于该NetworkStack的Worker们,这些Worker线程们会通过该NetworkStack服务于关联于该NetworkStack的Processor们以及AsyncConnection们
–309–>Ceph msg中使用单例模式构造一种类型的NetworkStack实例,create()即是构造接口
–312–>根据type的不同构造或PosixWorker,或RDMAWorker,或DPDKWorker
–325–>循环调用add_thread()启动所有拥有的Worker线程

NetworkStack::NetworkStack()

//src/msg/async/Stack.cc
102 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
103 {
104   assert(cct->_conf->ms_async_op_threads > 0);
105 
106   const uint64_t InitEventNumber = 5000;
107   num_workers = cct->_conf->ms_async_op_threads;
108   if (num_workers >= EventCenter::MAX_EVENTCENTER) {
113     num_workers = EventCenter::MAX_EVENTCENTER;
114   }
115 
116   for (unsigned i = 0; i < num_workers; ++i) {
117     Worker *w = create_worker(cct, type, i);
118     w->center.init(InitEventNumber, i, type);
119     workers.push_back(w);
120   }
121   cct->register_fork_watcher(this);         
122 }

–107-114–>取EventCenter::MAX_EVENTCENTER和配置文件中ms_async_op_threads较小的来确定Worker线程的数量
–116-120–>构造相应数量的Worker并初始化其Center

Worker

//src/msg/async/Stack.h
205 class Worker {
218   EventCenter center;
223   Worker(CephContext *c, unsigned i)
224     : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
244   }
252   virtual int listen(entity_addr_t &addr,
253                      const SocketOptions &opts, ServerSocket *) = 0;
254   virtual int connect(const entity_addr_t &addr,
255                       const SocketOptions &opts, ConnectedSocket *socket) = 0;
286 };

–218–>内嵌EventCenter
–252–>给Server Connection用的listen()
–254–>给Client Connection用的connect()

EventCenter

封装了基于异步事件的处理方法,包括Worker,注册事件等接口

//src/msg/async/Event.h
 87 class EventCenter {
 90   static const int MAX_EVENTCENTER = 24;
 95   struct AssociatedCenters {
100   };
102   struct FileEvent {
103     int mask;
104     EventCallbackRef read_cb;
105     EventCallbackRef write_cb;
106     FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
107   };
109   struct TimeEvent {
110     uint64_t id;
111     EventCallbackRef time_cb;
113     TimeEvent(): id(0), time_cb(NULL) {}
114   };
158   pthread_t owner;
160   std::atomic_ulong external_num_events;
161   deque<EventCallbackRef> external_events;
162   vector<FileEvent> file_events;
163   EventDriver *driver;
170   uint64_t time_event_next_id;
171   int notify_receive_fd;
172   int notify_send_fd;
173   NetHandler net;
174   EventCallbackRef notify_handler;
178   int process_time_events();
194   int init(int nevent, unsigned idx, const std::string &t);
199   EventDriver *get_driver() { return driver; }   
202   int create_file_event(int fd, int mask, EventCallbackRef ctxt);
203   uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
206   int process_events(int timeout_microseconds, ceph::timespan *working_dur = nullptr);
207   void wakeup();
210   void dispatch_event_external(EventCallbackRef e);
211   inline bool in_thread() const {
212     return pthread_equal(pthread_self(), owner);
213   }
245   template <typename func>
246   void submit_to(int i, func &&f, bool nowait = false) {
262   };
263 };

–90–>进程的EventCenter数量
–102–>file event结构
–109–>time event结构
–158–>归属的Worker线程
–160–>外部事件个数
–163–>异步机制抽象类对象,在EventCenter::init()中根据配置确定,比如支持EPOLL,就会使用EpollDriver
–171–>管道的写fd,作为读fd的代理,用于唤醒Worker
–172–>管道的读fd,作为file_events被加入到监视列表中
–174–>notify_receive_fd的READABLE时的handler,作为external events在file events中的代理,这个handler只是将字符从管道中取出而已
–162–>该EventCenter中注册的file_events,用EventDriver管理
–163–>该EventCenter中注册的external_events,用EventDriver管理
–164–>计数fd,基于Linux的eventfd()机制
–176–>全局的EventCenter* 数组
–203–>注册文件事件 –204–>删除文件事件
–205–>注册time事件 –206–>处理file_event和external_events
–207–>唤醒Worker,本质就是写写fd,这样读fd可读,线程得以唤醒
–211–>判断调用者是Worker线程
–246–>使用f构造EventCallback

— 感谢Joe.HU同学指出文中的错误

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.