一、global相关
service_spec: global specification
成员变量:
std::vector
成员函数: bool init() 1.从配置文件的[core] section读取配置参数 2.threadpool_spec::init(threadpool_spec)获取所有threadpool的specification –> threadpool_spec 3.task_spec::init()
二、threadpool相关 threadpool_code: threadpool_spec的index,内部只有一个int型_internal_code保存customizable id DEFINE_THREAD_POOL_CODE用于定义一个threadpool_code, 例如:代码中可以调用DEFINE_THREAD_POOL_CODE(THREAD_POOL_INVALID)去创建一个name=THREAD_POOL_INVALID的threadpool_code(根据name去注册customizable id)
threadpool_spec: thread pool specification
static bool init(std::vector
note: 目前只有task engine用到了thread pool
三、task相关
static std::array<std::unique_ptr
task_code: 仅有一个int类型成员用于保存task code 构造时会调用task_spec::register_storage_task_code, 注册task code并创建相应的task_spec 可以使用DEFINE_TASK_CODE/DEFINE_TASK_CODE_RPC/DEFINE_TASK_CODE_AIO等宏去注册task code(并创建相应的task_spec), 其将会去创建相应的task_code 所以task_code和task_spec是在系统启动时候就创建好的,所以toollets可以将join point加入到所有的task_spec中
task_spec: 保存task specification(包括task code, 以及各种join point) static函数task_spec::register_storage_task_code; 用于注册task code并创建相应的task_spec, 如果task_type是TASK_TYPE_RPC_REQUEST,还要注册ack_code
task: 在创建时,会去s_task_spec_store中找相应的task_spec, task执行的不同阶段, 会去调用spec中不同的join_point.execute
task_worker: task线程,创建时为其指定一个task queue void start() 标记_is_running=true, 并创建一个线程运行run_internal()函数。最终线程进入loop()函数,不断从task queue中取出task执行task->exec_internal(根据task_worker_pool::_spec.dequeue_batch_size去batch size个) note: 由于queue有可能是共享的,所以queue的内部实现需要加锁
simple_timer_service::timer_service 定时任务服务 void add_timer(task *task) 将task加入到定时服务中 1.创建一个boost::asio::deadline_timer类型的timer 2.timer->expires_from_now(boost::posix_time::milliseconds(task->delay_milliseconds())) 设置delay时间 3.调用timer->async_wait, wait指定的delay时间. 同时指定一个callback,该callback将执行task->enqueue(timer_task::enqueue) timer_task::enqueue最终将改task放入到了task_worker_pool中的对应的task queue中,随后task_worker会异步的取出该task执行,执行完task的callback后将task的状态设置为READY,以便再次执行(task::exec_interval在运行完其exec()函数后,会判断其状态,如果不是Finish状态,则继续将task入队). 并将其delay时间为interval_milliseconds, 只是delay时间变为了timer_interval, 这样task::enqueue再次入队时,由于delay时间不为0,则调用add_timer,跳到了1。 note: 1.所以timer_task的执行最终也是放入到task_queue中,正如其名字,timer_service只是提供了一个定时服务,最终执行的还是task_worker_pool中定义的线程 2.这样看定时任务也不是特别精确准时
task_worker_pool: task线程池 成员变量: threadpool_spec _spec threadpool specification task_engine *owner 指向task_engine的指针 service_node *_node 指向service_node的指针 std::vector<task_worker *> _workers 所有的task线程 std::vector<task_queue *> _queues 所有的task_queue(每个分片一个queue) std::vector<timer_service *> _per_queue_timer_svcs 所有的定时任务服务(每个分片一个queue)
成员函数:
void create() 紧跟在构造函数之后使用
1.如果_spec.partitioned等于true,分片数量qCount=_spec.worker_count(每个分片一个queue), 否则qCount=1. 创建qCount个queue, 放入_queues中
2.创建qCount个timer_service,放入_per_queue_timer_svcs
3.根据_spec.worker_count来创建task_worker,执行task_worker::create函数,并将所有worker放入_workers。
在创建task_worker时需要为其指定task_queue,如果是分片模式,则每个worker一个queue,否则所有的worker都指定同一个queue(因为只有一个)
void start() 启动task_work_pool
1.启动所有的定时任务
2.启动所有的task_worker
void add_timer(task *t)
1.idx = (_spec.partitioned ? static_cast
task_engine: 每个service_node有且仅有一个task_engine, 在service_node::start函数中,会创建一个task_engine,并调用其create接口 成员变量: std::vector<task_worker_pool *> _pools 所有的thread pool, 一个task_engine有多个thread pool volatile bool _is_running 是否正在运行 service_node *node 指向service_node的指针,每个service都有一个task_engine 成员函数: void create(const std::list<dsn::threadpool_code> &pools) 创建所有的thread pool 遍历所有的threadpool_code, 1.获取改threadpool_code对应的threadpool_spec 2.根据threadpool_spec new一个task_worker_pool 3.调用workerPool->create() 4.将workerPool放入到_pools中 void start() 对所有的thread pool执行task_worker_pool::start
TODO:
其他: join_point: join_point里面包含一个advice_entry双向循环链表,可以通过put_back,put_front等函数向其中添加advice_entry,在调用execute时将所有advice_entry执行一遍。 可以用于错误注入等,向代码的执行中注入各种逻辑操作。例如task中就有各种join_point,用于在task运行时的不同阶段,注入不同的代码
toollets: 在配置文件中通过toollets定义,在service_api_c.cpp的run()函数中,会初始化toollets, 遍历所有的task_spec, install各种join point到task_spec中