Ceph AsyncMessenger 简析 I

作为分布式存储系统,Msg(src/msg)模块可谓是Ceph的基石之一。Ceph发展到Luminous ,已经支持的3大通信机制:simple,async和xio,其中simple历史最为悠久,是Ceph最早的通信模块,原理简单但性能较差。async作为后起之秀,优良的性能使其自Luminous开始已经作为了缺省msg方案。xio拥有众多实验特性,目前距离生产环境还有很大距离。

在使用OOP设计一个通信模块时,往往少不了以下几个抽象:

  • Messenger 用于在最高层次管理所有通信,通常包括通信策略,工作线程等等
  • Connection表示对一个连接的抽象,经常会设计一个状态机来供上层管理该Connection
  • Message是对消息的封装,通常包含’Header + Data + Checking’ 几部分,Message和报文流的常常需要转换方法,Message->buf:encode(),Message<-buf:decode()
  • Stack 负责实现真实的通信,比如TCP/IP协议栈、RDMA协议栈

以此为基础,Ceph Msg框架就显得十分清晰,下面就是Async机制核心的封装:AsyncMessenger,Processor,AsyncConnection,NetworkStack,Worker,EventCenter,Message,本文将逐个讨论这些角色。

上图是Ceph Msg模块框图,可以看出,msg模块可以分为3个层次:async/simple/xio + Generic NetworkStack + Specific NetworkStack,上层将”通信机制”抽象出来,下层聚焦于协议栈,包括硬件无关的部分以及硬件相关的部分,比如RDMAStack针对配置了IB卡的存储节点,DPDKStack用于使用X86 DPDK技术的存储节点,而PosixStack则是Linux原生的Socket通信接口。

AsyncMessenger

AsynMessenger类继承自Messenger:class AsyncMessenger:public SimplePolicyMessenger:public Messenger SimplePolicyMessenger是和”连接策略”相关的类。AsyncMessenger作为整个通信模块的核心和中转,不论是作为底层通信的NetworkStack,还是作为连接的抽象的Connection,抑或处理对端链接请求的Processor,都要围着它转。以OSD进程的main函数为例(src/ceph_osd.cc),整个OSD进程也不过7个Messenger实例:ms_public,ms_cluster,ms_hb_back_client,ms_hb_front_client, ms_hb_back_server,ms_hb_front_server,ms_objecter,这7个Messenger分别用来处理不同类型的消息。

–176–>启动Processor和dispatch_queue线程
–197–>创建Connection实例的接口
–139–>提供给上层提交发送请求的接口,AsyncMessenger::send_message()–>AsyncMessenger::_send_message()–>AsyncMessenger::submit_message()–>AsyncConnection::send_message()
–221–>Messenger关联的Network实例,收发流程都要经过该实例的传递
–224–>向上层提交收到的Message的队列
–273–>管理Connection实例的数据结构,所有已经成功建立起来的Connection都可以在这里找到
–280–>对于Connection建立过程中的Server端,accept返回但还在协商阶段的Connection被放置于此,连接建立成功后将从这里移到conns中
–343–>根据地址在conns中查找对应的Connection实例
–348–>将链接从accepting_conns中删除,并添加到conns中
–371–>被Processor::accept()调用,Server端一旦侦听到Client端的链接,就会在Server端建立一个Connection实例,并将其加入到accepting_conns中以待后续处理,这个建立和加入的过程就是在该函数中完成的。

AsyncMessenger::AsyncMessenger()

–48–>隶属的AsyncMessenger实例
–49–>Processor建立连接的底层是使用Socket的listen-accept机制,这些机制被封装在了NetHandler中
–50–>Processor关联的Worker线程,Processor和Connection一样,所有Event最后都提交给关联的Worker处理
–51–>监听socket,本质是对Specific Stack层中的Server端Socket进行封装,比如RDMAServerScoketImpl::server_setup_socket(listen_socket.fd())
–52–>实质是C_processor_accept实例,作为listen_socket.fd()的handler被注册到Msg中的file events中,当Client端有连接过来时,该fd()会变得可读,此时就会回调该listen_handler,本质是调用Processor.accept()–>AsyncMessenger.add_accept()–>AsyncConnection.accept()来建立一个可用连接。
–61–>绑定一个地址作为listen_socket。

Processor

Processor的主要工作是”监视”,类似于Socket编程中的listen,Async机制会启动一组Processor线程,每个线程”监视”一个端口,一旦有链接建立,即构造一个AsynConnection实例并交由该Processor线程对应的Worker线程处理。这里插句题外话,Processor线程和Worker线程是”多对一”的关系,一个Worker线程不但处理Processor发给它的任务,还可能会有一堆AsyncConnection发给它的任务。在Ceph中,进程之间(eg, OSD-OSD)没有C/S之分,但两个进程之间有很多的Connection,对于每个Connection确是有C/S之分的。在没有链接的情况下,主动建立链接的为Client,被动接受链接的为Server,而Processor就是那个使一个进程可以像Server一样被动建立链接的前提。

–48–>隶属的AsyncMessenger实例
–49–>Processor建立连接的底层是使用Socket的listen-accept机制,这些机制被封装在了NetHandler中
–50–>Processor关联的Worker线程,Processor和Connection一样,所有Event最后都提交给关联的Worker处理
–51–>监听socket,本质是对Specific Stack层中的Server端Socket进行封装,比如RDMAServerScoketImpl::server_setup_socket(listen_socket.fd())
–52–>实质是C_processor_accept实例,作为listen_socket.fd()的handler被注册到Msg中的file events中,当Client端有连接过来时,该fd()会变得可读,此时就会回调该listen_handler,本质是调用Processor.accept()–>AsyncMessenger.add_accept()–>AsyncConnection.accept()来建立一个可用连接。
–61–>绑定一个地址作为listen_socket。

AsyncConnection


一个Connection实例对应着一个端对端的连接,AsyncConnection作为Connection的子类,其中封装了一个连接状态机、上下文以及基于该AsyncConnection的读写方法。

–52–>调用ConnectedSocket->read()填充传入的buffer,通常被read_until()调用,用于填充构造Message的buffer或AsyncConnection::recv_buf
–61–>发送消息前的准备工作
–62–>或从recv_buf中读取事前缓冲的足够的数据,填充到传入的buffer(通常是用于构造Message的一个buffer)中,或通过AsyncConnection::read_until()–>AsynConnection::read_bulk()–>ConnectedSocket::read()调用底层NetworkStack的接口直接读取数据。是AsyncConnection::read_handler.process()的核心基础函数。
–63–>处理连接的方法,负责本地AsyncConnection实例构造之后,STATE_OPEN状态之前连接处理
–64–>当发消息时发现连接不可达时,以Client身份去连接Server(对端)来建立连接,AsyncConnection::send_message->AsyncMessenger::get_connection()->AsyncMessenger::add_connect()->AsyncConnection::connect()->AsyncConnection::_connect()–66–>如果本地以Client的身份建立一个AsyncConnection,则会在AsyncConnection处于STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH之后处理接下来的状态
–67–>如果本地以Server的身份建立一个AsyncConnection,则会在AsyncConnection处于STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH之后处理接下来的状态
–76–>发送消息的一环,主要工作是将Message中的数据拷贝到outcoming_bl中,提交给下层逻辑(AsyncConnection::_try_send())处理。
–78–>Server端建立连接的一环,AsyncConnection::handle_connect_msg()->AsyncConnection::_reply_accept()
–199–>Client端建立连接的一环,AsyncConnection::send_message()->AsyncMessenger::get_connection()->AsyncMessenger::add_connect()->AsyncConnection::connect()->AsyncConnection::_connect()–206–>Server端建立连接的一环,Processor::do_request()->Processor::accept()->AsyncMessenger::add_accept()->AsyncConnection::accept()
–207–>提供给上层发送消息的接口AsyncMessenger::submit_message()->AsyncConnection::send_message()->AsyncConnection::write_handler。
–217-251–> 一个AsyncConnection状态枚举,包含了Server和Client的所有可能状态
–291–>该AsyncConnection归属的AsyncMessenger
–298–>AsyncConnection的当前状态
–300–>与Stack层的接口,ConnectedSocket是对抽象类ConnectedSocketImpl的封装,而后者的实现随Stack而异,比如,RDMAStack中就是RDMAConnectedSocketImpl
–302–>AsynConnection的policy,通常由AsyncConnection::connect()中policy=msgr->get_policy(type)设置为归属的AsyncMessenger::Policy
–304–>向上层提供Message的队列,有专门的线程维护这个队列
–307–>AsyncConnection将要发送数据的bufferlist,供AsyncConnection::_try_send()->ConnectedSocket::send()->ConnectedSocketImpl::send()使用来传给Stack层处理
–318–>将发Message的list,这些Message都是从out_q中取出的
–319–>基于优先级的Message与bufferlist映射–324-327–>该AsyncConnection发生异步事件的各种回调handler
–329–>该AsyncConnection使用的接收buffer,供read_until()使用
–330–>read_until(addr,len)的本质从底层获取len数据到addr,但是会在Async层维持一个缓存buffer—recv_buf,对于小块的数据,通常已经在上次读取时已经在recv_buf被缓存,读取这部分数据直接从recv_buf中获取而不必兴师动众劳烦Stack。决定一个数据块是否被缓存的标准就是recv_max_prefetch–331-332–>用于管理recv_buf的读写位置,只在read_until()中使用–333–>已经注册的time event
–346-348–>构造一个Message所需
–349–>Server端收到的Client端的connect_msg
–358–>当前AsyncConnection发生Connection racing的时候,此处会为true
–371–>该AsyncConnection归属的Worker,即发生在AsyncConnection实例中的所有events都由该Worker处理–377–>异步写事件回调函数,AsyncConnection.write_handler/C_handle_write.do_request()->handler_write()
–378–>异步读事件回调函数,AsyncConnection.read_handler/C_handle_read.do_request()->process()–379–>异步时间事件回调函数,AsyncConnection.wakeup_handler/C_time_wakeup.do_request()->wakeup_from()
–380–>异步tick事件回调函数,AsyncConnection.tick_handler/C_tick_wakeup.do_request()->tick(), Ceph中对AsyncConnection的处理是STATE驱动的,几乎所有操作AsyncConnection的接口在干活之前都会check一下AsyncConnection当前的状态。AsyncConnection的状态主要在两个函数:AsyncConnection::process()和AsyncConnection::_process_connection(),后者被前者调用,前者作为read_handler随notify_fd一同被注册到file events中,或以external events的形式被调用。下图是我整理的在连接实例AsyncConnection已经构造,尙处在C/S协商阶段,仍未可用之前的状态迁移图,C/S两端在此阶段的实际操作可以参考_process_connection()的实现,基于策略的连接处理方案在handle_connect_msg()(Server端)和handle_connect_reply()(Client中)

AsyncConnection::AsyncConnection()

–123–>构造AsyncConnection及其父类Connection都需要一个AsyncMessenger –125–>一个AsyncConnection的构造之初,它的state和state_after_send都是STATE_NONE,该状态将在AsyncConnection::accept()或_connect()返回之后,被相应的置为STATE_ACCEPTING以及STATE_CONNECTING
–128–>根据ceph.conf配置recv_max_prefetch
–136-139–>构造该AsyncConnection的各种异步Event的handler
–142–>申请recv_buf –143–>申请state_buffer

NetworkStack

NetworkStack是Generic NetworkStack层的核心类,主要的工作是承上启下:承Async层的请求,启Specific NetworkStack的方法。

–290–>隶属于该NetworkStack的Worker的数量
–294–>构造一个基于NetworkStack.workers[i],循环执行Worker->EventCenter.process_events()的线程,由start()调用
–298–>隶属于该NetworkStack的Worker们,这些Worker线程们会通过该NetworkStack服务于关联于该NetworkStack的Processor们以及AsyncConnection们
–309–>Ceph msg中使用单例模式构造一种类型的NetworkStack实例,create()即是构造接口
–312–>根据type的不同构造或PosixWorker,或RDMAWorker,或DPDKWorker
–325–>循环调用add_thread()启动所有拥有的Worker线程

NetworkStack::NetworkStack()

–107-114–>取EventCenter::MAX_EVENTCENTER和配置文件中ms_async_op_threads较小的来确定Worker线程的数量
–116-120–>构造相应数量的Worker并初始化其Center

Worker

–218–>内嵌EventCenter
–252–>给Server Connection用的listen()
–254–>给Client Connection用的connect()

EventCenter

封装了基于异步事件的处理方法,包括Worker,注册事件等接口

–90–>进程的EventCenter数量
–102–>file event结构
–109–>time event结构
–158–>归属的Worker线程
–160–>外部事件个数
–163–>异步机制抽象类对象,在EventCenter::init()中根据配置确定,比如支持EPOLL,就会使用EpollDriver
–171–>管道的写fd,作为读fd的代理,用于唤醒Worker
–172–>管道的读fd,作为file_events被加入到监视列表中
–174–>notify_receive_fd的READABLE时的handler,作为external events在file events中的代理,这个handler只是将字符从管道中取出而已
–162–>该EventCenter中注册的file_events,用EventDriver管理
–163–>该EventCenter中注册的external_events,用EventDriver管理
–164–>计数fd,基于Linux的eventfd()机制
–176–>全局的EventCenter* 数组
–203–>注册文件事件 –204–>删除文件事件
–205–>注册time事件 –206–>处理file_event和external_events
–207–>唤醒Worker,本质就是写写fd,这样读fd可读,线程得以唤醒
–211–>判断调用者是Worker线程
–246–>使用f构造EventCallback

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: