Rdsn Rpc

2019-09-27

整体结构:

                                  上层逻辑处理

                                     |  ^
                               call  |  |  
                                     V  |
                                  rpc_engine
                                     |  ^
              network::send_message  |  | rpc_engine::on_recv_request   
                                     V  |
 发送出去 -->                       network                             <-- 从网络读取
                                     |  ^
          rpc_session::send_message  |  |  network::on_recv_request
                                     V  |  rpc_session::do_read
                                  rpc_session



                                   ---------
                                  | network |
                                   -^-----^-
                                    |     |
                                    |     |
                        -------------     -------------
                        |                             |
                        |                             |
          -----------------------------      -------------------
         | connection_oriented_network |    | asio_udp_provider |
          --------------^--------------      -------------------
                        |
                        |
             -----------------------      
            | asio_network_provider |    
             -----------------------     

asio_network_provider有两个成员变量map, 分别表示相应的client和server的<rpc address, rpc_serssion>映射

rpc_engine: 成员变量: std::vector<std::vector<std::unique_ptr>> _client_nets; // <format, <CHANNEL, network*> format: network_header_format,目前只有NET_HDR_DSN; CHANNEL: 有RPC_CHANNEL_TCP和RPC_CHANNEL_UDP两个 std::unordered_map<int, std::vector<std::unique_ptr>> _server_nets; // <port, <CHANNEL, network*>> 不同的CHANNEL使用不同的network, 也就是udp和tcp有不同的网络连接

成员函数: error_code start(const service_app_spec &aspec); 根据配置文件创建_client_nets和_server_nets network, 其中_server_nets包括asio_network_provider void call(message_ex *request, const rpc_response_task_ptr &call); 发送请求,会根据request->server_address来决定是调用call_ip和call_group void call_ip 根据format和CHANNEL从_client_nets中选取相应的network,然后使用network::send_message将消息发送出去 void reply(message_ex *response, error_code err = ERR_OK) 发送回复,根据port和CHANNEL从_server_nets选取相应的network,然后使用network::send_message将消息发送出去 void forward(message_ex *request, rpc_address address) 通过调用call_ip转发请求到address void on_recv_request(network *net, message_ex *msg, int delay_ms) 接收到请求时的处理, 创建一个task,将其放入队列中。随后队列将该task取出,该task主要执行replication_service_app::on_intercepted_request函数, 根据是读或者写响应请求调用replica_stub::on_client_write或者replica_stub::on_client_read note: Q: 为什么reply接口从_server_nets中获取network? A: 因为server的代表的是我们是网络提供方,这里客户端发来的请求,reply的时候当然需要通过server再发送回去

network: 一个虚类, 表示一个网络连接 成员变量: int _message_buffer_block_size; buffer block大小 int _max_buffer_block_count_per_send; 单次发送的最大buffer block大小 rpc_engine *_engine; 指向rpc_engine的指针 成员函数: void on_recv_request(message_ex *msg, int delay_ms); 接收信息的回调函数,将会直接调用rpc_engine::on_recv_request void on_recv_reply(message_ex *msg, int delay_ms); 回复信息的回调函数,将会直接调用rpc_engine::on_reply_request virtual void send_message(message_ex *request) = 0; 发送信息,rpc_engine将会调用该函数 virtual void inject_drop_message(message_ex *msg, bool is_send) = 0; drop掉该msg message_parser *new_message_parser(network_header_format hdr_format); 创建一个新的message parser,用于:1.接收到一个rpc request message时解析出blob信息。2.parse blob信息用于获取rpc_message

connection_oriented_network : public network 用于tcp连接 成员变量: client_sessions _clients; to_address与rpc_session之间的map(rpc_address–rpc_session_ptr) server_sessions _servers; from_address与rpc_session之间的map(rpc_address–rpc_session_ptr) ip_connection_count _ip_conn_count; from_ip与connection_count之间的map(uint32_t–uint32_t) uint64_t _cfg_conn_threshold_per_ip; 配置中指定的每个ip上最大的连接数 成员函数: rpc_session_ptr get_server_session(::dsn::rpc_address ep); 根据rpc_address ep从_servers中获取到其对应的rpc_session_ptr void on_server_session_accepted(rpc_session_ptr &s); 根据rpc_session_ptr &s获取from_address与rpc_session之间的映射,存入_servers中,_ip_conn_count中为相应的ip的连接count+1 void on_server_session_disconnected(rpc_session_ptr &s); 将rpc_session_ptr &s从_server中移除,并在_ip_conn_count为相应的ip的连接count减1 bool is_conn_threshold_exceeded(::dsn::rpc_address ep); 从_ip_conn_count获取ep的ip的连接数量, 判断其连接数量是否超过配置的上限。 bool on_client_session_connected(rpc_session_ptr &s); 根据rpc_session_ptr &s获取to_address与rpc_session之间的映射,存入_clients中 void on_client_session_disconnected(rpc_session_ptr &s); 将rpc_session_ptr &s从_clients中移除。 virtual void send_message(message_ex *request) override;
1.根据to_address从_clients中获取rpc_session_ptr,如果没有,则创建一个新的 2.如果是新创建的,则调用其asio_rpc_session::connect用于建立网络连接, 这里使用boost的async_connect,传入一个callback,当connect成功后,先调用send_message把消息全部发送出去,然后调用read_next循环去读 3.不管是不是新创建的session,都rpc_session::send_message。如果不是新的session则直接把消息发送出去; 如果是新创建的,由于还没有连接好,则把消息挂入发送队列中,暂时不发送 void inject_drop_message(message_ex *msg, bool is_send); 从message_ex *msg中获取rpc_session_ptr, 如果为空,则根据to_address从_clients中去查找, 最后通过rpc_session::close关闭rpc session

asio_network_provider : public connection_oriented_network 监听端口,当有连接到来时创建连接session,并使用session读连接中的数据 virtual error_code start(rpc_channel channel, int port, bool client_only) 1.listen端口 2.调用do_accept,该函数调boost的async_accept用于异步accpet连接,每当有连接到来时都执行其指定的callback: 1.该callback首先为新的连接创建一个asio_rpc_session, 2.调用on_server_session_accepted向_servers中添加from_address与rpc_session之间的映射 3.并调用asio_rpc_session的start_read_next接口,用于读取连接的数据 rpc_session_ptr create_client_session(::dsn::rpc_address server_addr) 根据地址server_addr创建一个asio_rpc_session note: 有一个natvierun的tool_app,在系统启动时,service_api_c.cpp的run()函数369行,会获取nativerun并调用其install函数,用于初始化spec.network_default_server_cfs, 后面传递给service_app_spec::init函数,初始化spec.network_client_config和spec.network_server_config, 随后在rpc_engine::start中具体创建asio_network_provider

rpc_session: 虚基类, 表示一个rpc session 成员函数: bool unlink_message_for_send();
1.auto n = _messages.next(); 从_messages中获取下一个message_ex中的dlink* (在_messages中各个message_ex通过dlink*连接) 2.auto lmsg = CONTAINING_RECORD(n, message_ex, dl); 根据dlink *获取包含他的message_ex指针 3.将lmsg放入到_sending_buffers中,准备发送 4.如果_messages中还存在消息,则跳到2继续 void send_message(message_ex *msg);
1.msg->io_session = this 设置msg的rpc session 2.msg->dl.insert_before(&_messages) 将msg插入到_messages中, 并将_message_count+1 3.如果处于连接状态(SS_CONNECTED == _connect_state)、没有其他进程在发送(!is_sending_next)并且发送队列_sending_msgs中有消息待发送, 则使用unlink_message_for_send从_messages摘下消息放入发送缓冲区_sending_buffers和_sending_msgs中,准备发送消息; 否则直接退出, 此时只是将消息放入了_messges中并没有发送,等连接建立时,会将其发送出去 4.调用send函数进行发送(send是个虚函数,各子类有不同的定义, 发送成功后会调用on_send_completed) void on_send_completed(uint64_t signature) 发送消息,直到发送消息列表都发送完 1.清空发送缓冲区sending_buffers 2.如果没有其他进程在发送(!is_sending_next), 则unlink_message_for_send从_messages摘下消息放入发送缓冲区_sending_buffers和_sending_msgs中,并调用send函数继续发送。 void on_failure(bool is_write) 1.on_disconnected 设置_connect_state = SS_DISCONNECTED 2.根据是否是client来判断调用_net.on_client_session_disconnected或者_net.on_server_session_disconnected。这两个函数用于分别清空connection_oriented_network中的_clients或者_servers等成员数据 3.clear_send_queue(bool resend_msgs) 对于_sending_buffers和_messages中的剩余的消息,如果resend_msgs=true, 则将其重新发送; 如果resend_mesgs=false,并且is_request(是个请求)、!is_forwarded(不是转发请求),则通过接受一个空emptry的方式模拟failure(因为如果不resend, callback将会等到超时才会invoke, 太慢了) virtual void do_read(int read_next) = 0 bool on_recv_message(message_ex *msg, int delay_ms) note: Q: 为什么发送缓冲区有_sending_buffers和_sending_msgs两个? A: _sending_buffers内的buffer是void *buf保存,可以直接用于发送; sending_msgs内保存的是message_ex, 可以用于计算发送的messge count

asio_rpc_session void do_read(int read_next) 1.调用boost接口异步读,异步读传递一个callback, 该callback处理读取完之后的逻辑。 2.如果读取成功, on_message_read并解析 … n.调用start_read_next继续读 void connect() 1.调用boost接口去异步创建连接, 创建完之后调用其callback 2.如果创建成功: 1.调set_connected,执行connection_oriented_network::on_client_session_connected,向_clients中添加to_address与rpc_session之间的映射 2.rpc_session::on_send_complete 处理发送消息,直到所有消息发送完毕 3.start_read_next 读取消息

rpc_client_matcher: 用于匹配rpc request和rpc response、以及处理超时。 成员变量: typedef std::unordered_map<uint64_t, match_entry> rpc_requests; rpc_requests _requests[MATCHER_BUCKET_NR]; key-request id, value-match_entry(callback、timeout task、超时时间戳) 成员函数: void on_call(message_ex *request, const rpc_response_task_ptr &call): 当执行rpc call时,注册request id、callback和超时timeout task bool on_recv_reply(network *net, uint64_t key, message_ex *reply, int delay_ms), 当收到rpc response时,调用该函数用于触发callback调用(通过将callback task enqueue, enqueue的task会被异步调用执行)

note: 主动创建连接:connection_oriented_network::send_message发送消息时,如果没有session,则创建一个新的,放入_clients中, 连接创建后,先调用send_message将消息发送完,然后调用rpc_session::start_read_next去循环读 监听连接到来:asio_network_provider::start,在系统启动时调用该函数,调用async_accept接收连接session,放入_servers中, 连接创建后直接调用rpc_session::start_read_next去循环读