整体结构:
上层逻辑处理
| ^
call | |
V |
rpc_engine
| ^
network::send_message | | rpc_engine::on_recv_request
V |
发送出去 --> network <-- 从网络读取
| ^
rpc_session::send_message | | network::on_recv_request
V | rpc_session::do_read
rpc_session
---------
| network |
-^-----^-
| |
| |
------------- -------------
| |
| |
----------------------------- -------------------
| connection_oriented_network | | asio_udp_provider |
--------------^-------------- -------------------
|
|
-----------------------
| asio_network_provider |
-----------------------
asio_network_provider有两个成员变量map, 分别表示相应的client和server的<rpc address, rpc_serssion>映射
rpc_engine:
成员变量:
std::vector<std::vector<std::unique_ptr
成员函数: error_code start(const service_app_spec &aspec); 根据配置文件创建_client_nets和_server_nets network, 其中_server_nets包括asio_network_provider void call(message_ex *request, const rpc_response_task_ptr &call); 发送请求,会根据request->server_address来决定是调用call_ip和call_group void call_ip 根据format和CHANNEL从_client_nets中选取相应的network,然后使用network::send_message将消息发送出去 void reply(message_ex *response, error_code err = ERR_OK) 发送回复,根据port和CHANNEL从_server_nets选取相应的network,然后使用network::send_message将消息发送出去 void forward(message_ex *request, rpc_address address) 通过调用call_ip转发请求到address void on_recv_request(network *net, message_ex *msg, int delay_ms) 接收到请求时的处理, 创建一个task,将其放入队列中。随后队列将该task取出,该task主要执行replication_service_app::on_intercepted_request函数, 根据是读或者写响应请求调用replica_stub::on_client_write或者replica_stub::on_client_read note: Q: 为什么reply接口从_server_nets中获取network? A: 因为server的代表的是我们是网络提供方,这里客户端发来的请求,reply的时候当然需要通过server再发送回去
network: 一个虚类, 表示一个网络连接 成员变量: int _message_buffer_block_size; buffer block大小 int _max_buffer_block_count_per_send; 单次发送的最大buffer block大小 rpc_engine *_engine; 指向rpc_engine的指针 成员函数: void on_recv_request(message_ex *msg, int delay_ms); 接收信息的回调函数,将会直接调用rpc_engine::on_recv_request void on_recv_reply(message_ex *msg, int delay_ms); 回复信息的回调函数,将会直接调用rpc_engine::on_reply_request virtual void send_message(message_ex *request) = 0; 发送信息,rpc_engine将会调用该函数 virtual void inject_drop_message(message_ex *msg, bool is_send) = 0; drop掉该msg message_parser *new_message_parser(network_header_format hdr_format); 创建一个新的message parser,用于:1.接收到一个rpc request message时解析出blob信息。2.parse blob信息用于获取rpc_message
connection_oriented_network : public network 用于tcp连接
成员变量:
client_sessions _clients; to_address与rpc_session之间的map(rpc_address–rpc_session_ptr)
server_sessions _servers; from_address与rpc_session之间的map(rpc_address–rpc_session_ptr)
ip_connection_count _ip_conn_count; from_ip与connection_count之间的map(uint32_t–uint32_t)
uint64_t _cfg_conn_threshold_per_ip; 配置中指定的每个ip上最大的连接数
成员函数:
rpc_session_ptr get_server_session(::dsn::rpc_address ep); 根据rpc_address ep从_servers中获取到其对应的rpc_session_ptr
void on_server_session_accepted(rpc_session_ptr &s); 根据rpc_session_ptr &s获取from_address与rpc_session之间的映射,存入_servers中,_ip_conn_count中为相应的ip的连接count+1
void on_server_session_disconnected(rpc_session_ptr &s); 将rpc_session_ptr &s从_server中移除,并在_ip_conn_count为相应的ip的连接count减1
bool is_conn_threshold_exceeded(::dsn::rpc_address ep); 从_ip_conn_count获取ep的ip的连接数量, 判断其连接数量是否超过配置的上限。
bool on_client_session_connected(rpc_session_ptr &s); 根据rpc_session_ptr &s获取to_address与rpc_session之间的映射,存入_clients中
void on_client_session_disconnected(rpc_session_ptr &s); 将rpc_session_ptr &s从_clients中移除。
virtual void send_message(message_ex *request) override;
1.根据to_address从_clients中获取rpc_session_ptr,如果没有,则创建一个新的
2.如果是新创建的,则调用其asio_rpc_session::connect用于建立网络连接, 这里使用boost的async_connect,传入一个callback,当connect成功后,先调用send_message把消息全部发送出去,然后调用read_next循环去读
3.不管是不是新创建的session,都rpc_session::send_message。如果不是新的session则直接把消息发送出去; 如果是新创建的,由于还没有连接好,则把消息挂入发送队列中,暂时不发送
void inject_drop_message(message_ex *msg, bool is_send); 从message_ex *msg中获取rpc_session_ptr, 如果为空,则根据to_address从_clients中去查找, 最后通过rpc_session::close关闭rpc session
asio_network_provider : public connection_oriented_network 监听端口,当有连接到来时创建连接session,并使用session读连接中的数据 virtual error_code start(rpc_channel channel, int port, bool client_only) 1.listen端口 2.调用do_accept,该函数调boost的async_accept用于异步accpet连接,每当有连接到来时都执行其指定的callback: 1.该callback首先为新的连接创建一个asio_rpc_session, 2.调用on_server_session_accepted向_servers中添加from_address与rpc_session之间的映射 3.并调用asio_rpc_session的start_read_next接口,用于读取连接的数据 rpc_session_ptr create_client_session(::dsn::rpc_address server_addr) 根据地址server_addr创建一个asio_rpc_session note: 有一个natvierun的tool_app,在系统启动时,service_api_c.cpp的run()函数369行,会获取nativerun并调用其install函数,用于初始化spec.network_default_server_cfs, 后面传递给service_app_spec::init函数,初始化spec.network_client_config和spec.network_server_config, 随后在rpc_engine::start中具体创建asio_network_provider
rpc_session: 虚基类, 表示一个rpc session
成员函数:
bool unlink_message_for_send();
1.auto n = _messages.next(); 从_messages中获取下一个message_ex中的dlink* (在_messages中各个message_ex通过dlink*连接)
2.auto lmsg = CONTAINING_RECORD(n, message_ex, dl); 根据dlink *获取包含他的message_ex指针
3.将lmsg放入到_sending_buffers中,准备发送
4.如果_messages中还存在消息,则跳到2继续
void send_message(message_ex *msg);
1.msg->io_session = this 设置msg的rpc session
2.msg->dl.insert_before(&_messages) 将msg插入到_messages中, 并将_message_count+1
3.如果处于连接状态(SS_CONNECTED == _connect_state)、没有其他进程在发送(!is_sending_next)并且发送队列_sending_msgs中有消息待发送,
则使用unlink_message_for_send从_messages摘下消息放入发送缓冲区_sending_buffers和_sending_msgs中,准备发送消息;
否则直接退出, 此时只是将消息放入了_messges中并没有发送,等连接建立时,会将其发送出去
4.调用send函数进行发送(send是个虚函数,各子类有不同的定义, 发送成功后会调用on_send_completed)
void on_send_completed(uint64_t signature) 发送消息,直到发送消息列表都发送完
1.清空发送缓冲区sending_buffers
2.如果没有其他进程在发送(!is_sending_next), 则unlink_message_for_send从_messages摘下消息放入发送缓冲区_sending_buffers和_sending_msgs中,并调用send函数继续发送。
void on_failure(bool is_write)
1.on_disconnected 设置_connect_state = SS_DISCONNECTED
2.根据是否是client来判断调用_net.on_client_session_disconnected或者_net.on_server_session_disconnected。这两个函数用于分别清空connection_oriented_network中的_clients或者_servers等成员数据
3.clear_send_queue(bool resend_msgs) 对于_sending_buffers和_messages中的剩余的消息,如果resend_msgs=true, 则将其重新发送;
如果resend_mesgs=false,并且is_request(是个请求)、!is_forwarded(不是转发请求),则通过接受一个空emptry的方式模拟failure(因为如果不resend, callback将会等到超时才会invoke, 太慢了)
virtual void do_read(int read_next) = 0
bool on_recv_message(message_ex *msg, int delay_ms)
note:
Q: 为什么发送缓冲区有_sending_buffers和_sending_msgs两个?
A: _sending_buffers内的buffer是void *buf保存,可以直接用于发送; sending_msgs内保存的是message_ex, 可以用于计算发送的messge count
asio_rpc_session void do_read(int read_next) 1.调用boost接口异步读,异步读传递一个callback, 该callback处理读取完之后的逻辑。 2.如果读取成功, on_message_read并解析 … n.调用start_read_next继续读 void connect() 1.调用boost接口去异步创建连接, 创建完之后调用其callback 2.如果创建成功: 1.调set_connected,执行connection_oriented_network::on_client_session_connected,向_clients中添加to_address与rpc_session之间的映射 2.rpc_session::on_send_complete 处理发送消息,直到所有消息发送完毕 3.start_read_next 读取消息
rpc_client_matcher: 用于匹配rpc request和rpc response、以及处理超时。 成员变量: typedef std::unordered_map<uint64_t, match_entry> rpc_requests; rpc_requests _requests[MATCHER_BUCKET_NR]; key-request id, value-match_entry(callback、timeout task、超时时间戳) 成员函数: void on_call(message_ex *request, const rpc_response_task_ptr &call): 当执行rpc call时,注册request id、callback和超时timeout task bool on_recv_reply(network *net, uint64_t key, message_ex *reply, int delay_ms), 当收到rpc response时,调用该函数用于触发callback调用(通过将callback task enqueue, enqueue的task会被异步调用执行)
note: 主动创建连接:connection_oriented_network::send_message发送消息时,如果没有session,则创建一个新的,放入_clients中, 连接创建后,先调用send_message将消息发送完,然后调用rpc_session::start_read_next去循环读 监听连接到来:asio_network_provider::start,在系统启动时调用该函数,调用async_accept接收连接session,放入_servers中, 连接创建后直接调用rpc_session::start_read_next去循环读