watchdog 的聊聊概念大概是定时去做一些事情,具体的使用v实概念可以网上搜索,本文主要是现的学介绍一下使用 Libuv 实现的 watchdog,背景主要是聊聊因为 Node.js 是单线程的,一旦主线程繁忙或者陷入死循环,使用v实那么整个进程都无法工作了。现的学 虽然 Node.js 在 JS 层实现了子线程模块,聊聊但是使用v实因为子线程持有单独的 V8 Isolate 和 loop,所以存在很多限制。现的学比如我们希望在主线程繁忙时还能收集到主线程的聊聊一些数据(进程 / V8 内存、CPU 使用情况等)。使用v实这时候就需要使用 addon 去实现,现的学addon 虽然是聊聊 C、C++ 等语言写的使用v实,但是现的学依然跑在主线程里。高防服务器所以我们需要使用底层提供的原生子线程加上 addon 实现我们的需求。本文正是基于这种场景的 watchdog。 首先看一下整体的类结构。 下面看一下每个类的作用和实现。 因为存在主线程和子线程,线程间会互相向对方提交任务,这就涉及到多线程互斥访问的问题。TaskManager 是负责管理任务的类,是线程安全的。 class TaskManager { public: TaskManager() { uv_mutex_init(&tasks_mutex); }; void add_task(task_cb cb, void * data) { struct task* t = new task; t->cb = cb; t->data = data; uv_mutex_lock(&tasks_mutex); tasks.push_back(t); uv_mutex_unlock(&tasks_mutex); }; void handle_task() { std::vector uv_mutex_lock(&tasks_mutex); task_list.swap(tasks); uv_mutex_unlock(&tasks_mutex); std::vector for(iter = task_list.begin(); task_list.end() != iter;) { struct task* t = (*iter); t->cb(t->data); iter = task_list.erase(iter); delete t; } }; private: std::vector uv_mutex_t tasks_mutex; }; 实现很简单,主要是使用 uv_mutex_t 解决互斥问题,提供 add_task 和 handle_task 两个方法。add_task 是追加任务,handle_task 是处理任务。handle_task 具体的调用时机由 TaskManager 的持有者决定。 WatchDog 是对 Libuv 定时器的云服务器封装,核心数据结构是 watch_dog_ctx。 typedef void (*watch_dog_cb)(void* ctx); struct watch_dog_ctx{ void * data; // 上下文 watch_dog_cb work; // 任务函数 int poll_interval; // 定时时间 }; watch_dog_ctx 是对一个定时任务的封装。watch_dog_ctx 的任务会在子线程中定时被执行,这就解决了主线程陷入繁忙时无法工作的问题。 NodeWatchDog::WatchDog::WatchDog(uv_loop_t* loop, struct watch_dog_ctx* ctx) { this->ctx = ctx; uv_timer_init(loop, &timer); timer.data = this; is_stop = false; } void NodeWatchDog::WatchDog::start() { if (is_stop) { return; } uv_timer_start( &timer, [](uv_timer_t* timer) { NodeWatchDog::WatchDog* watch_dog = (NodeWatchDog::WatchDog*)timer->data; struct watch_dog_ctx* ctx = watch_dog->get_ctx(); ctx->work(ctx); }, ctx->poll_interval, ctx->poll_interval ); }; void NodeWatchDog::WatchDog::stop() { is_stop = true; uv_timer_stop(&timer); } WatchDogWorker 是负责管理子线程和 watchdog 的类。首先看一下定义。 class WatchDogWorker { public: WatchDogWorker(); ~WatchDogWorker() { }; void start(); void stop(); void add_watchdog(struct watch_dog_ctx* watchdog); void add_task(task_cb cb, void * data); void handle_task(); uv_loop_t* get_event_loop() { return &loop; } uv_sem_t * get_thread_sem() { return &sem; } private: uv_loop_t loop; uv_thread_t tid; uv_sem_t sem; TaskManager task_manager; uv_async_t notify_async; std::vector }; 1)add_watchdog 用于新增 watchdog。 void NodeWatchDog::WatchDogWorker::add_watchdog(struct watch_dog_ctx* watchdog_ctx) { NodeWatchDog::WatchDog *watchdog = new NodeWatchDog::WatchDog(&loop, watchdog_ctx); watch_dogs.push_back(watchdog); } 2)start 函数用于创建子线程和启动 watchdog。 void NodeWatchDog::WatchDogWorker::start() { std::vector for(iter = watch_dogs.begin(); watch_dogs.end() != iter; iter++) { (*iter)->start(); } int r = uv_thread_create(&tid, [](void *data) { NodeWatchDog::WatchDogWorker * worker = (NodeWatchDog::WatchDogWorker *)data; uv_sem_post((uv_sem_t*)worker->get_thread_sem()); uv_loop_t * loop = worker->get_event_loop(); uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); }, (void *)this); if (!r) { uv_sem_wait(&sem); } uv_sem_destroy(&sem); } 启动 watchdog 时会往子线程的 loop 里插入一个定时器。然后创建子线程,在子线程里开启一个新的 loop。在这个 loop 里就会不断执行 watchdog 的任务。 3)add_task 用于其他线程外另一个线程插入一个任务。下面是任务的定义。 typedef void (*task_cb)(void *data); struct task{ task() { cb = nullptr; data = nullptr; } task_cb cb; void *data; }; 定义很简单,一个工作函数和对应的上下文。接着看 add_task。 void NodeWatchDog::WatchDogWorker::add_task(task_cb cb, void * data) { task_manager.add_task(cb, data); uv_async_send(¬ify_async); } add_task 直接调用 task_manager 的 add_task 函数插入任务,服务器租用因为它保证了线程安全。接着通过 Libuv 提供的 async 线程间通信机制通知另一个线程有新任务。如果是在 Node.js 里的话,还需要通过另一种方式进行通知,这里就不具体展开。 此处,我们就实现了在子线程里定时执行任务,并实现主线程和子线程互相提交任务的能力。最终再实现一个 WatchDogManager 用于管理多个 worker。 4.WatchDogManagerclass WatchDogManager { public: WatchDogManager(uv_loop_t *loop); ~WatchDogManager() { }; void start(); void stop(); void add_task(task_cb cb, void *data); void handle_task(); void add_worker(WatchDogWorker* worker); WatchDogWorker* get_worker(int index) { return workers[index]; }; private: uv_async_t notify_async; TaskManager task_manager; std::vector }; 通过 WatchDogManager 的类定义,我们大概就知道它的功能。首先看一下 start 方法。 void NodeWatchDog::WatchDogManager::start(){ std::vector for(iter = workers.begin(); workers.end() != iter; iter++) { (*iter)->start(); } } start 函数就是启动多个 worker,刚才已经介绍过,worker 会创建一个子线程定时执行任务。其他方法就没有太多逻辑,就不一一介绍。 接下来看看使用方式。 #include "src/watch_dog_manager.h" #include "uv.h" #include "stdio.h" using namespace NodeWatchDog;int main() { setbuf(stdout, NULL); uv_loop_t loop; uv_loop_init(&loop); WatchDogWorker *worker = new WatchDogWorker(); struct watch_dog_ctx* ctx = new watch_dog_ctx; ctx->work = [](void *ctx) { printf("worker task execute in thread id => %ld\n", (long)pthread_self()); }; ctx->poll_interval = 1000; worker->add_watchdog(ctx); worker->start(); uv_idle_t idle; uv_idle_init(&loop, &idle); uv_idle_start(&idle, [](uv_idle_t *) { }); printf("main thread id => %ld\n", (long)pthread_self()); uv_run(&loop, UV_RUN_DEFAULT); delete worker; delete ctx; return 0; } 新建一个 watchdog 和 worker,并把 watchdog 插入到 worker 中,最后启动 worker。同时主线程也进入自己的 loop。执行输出如下。 main thread id => 4338490880 worker task execute in thread id => 123145480077312 worker task execute in thread id => 123145480077312 我们看到,printf 分别执行在不同的线程中。接下来再看一下复杂的场景。 #include "src/watch_dog_manager.h" #include "uv.h" #include "stdio.h" using namespace NodeWatchDog; int main() { setbuf(stdout, NULL); uv_loop_t loop; uv_loop_init(&loop); WatchDogManager *manager = new WatchDogManager(&loop); WatchDogWorker *worker = new WatchDogWorker(); struct watch_dog_ctx* ctx = new watch_dog_ctx; ctx->data = (void *)manager; ctx->work = [](void *ctx) { struct watch_dog_ctx* watchdog_ctx = (struct watch_dog_ctx*)ctx; WatchDogManager* manager = (WatchDogManager*)watchdog_ctx->data; // 提交任务给主线程 manager->add_task([](void *data) { printf("manager task execute in thread id => %ld\n", (long)pthread_self()); WatchDogManager* manager = (WatchDogManager*)data; // 提交任务给某个子线程 manager->get_worker(0)->add_task([](void *data) { printf("worker task execute in thread id => %ld\n", (long)pthread_self()); }, nullptr); }, manager); }; ctx->poll_interval = 1000; worker->add_watchdog(ctx); manager->add_worker(worker); manager->start(); uv_idle_t idle; uv_idle_init(&loop, &idle); uv_idle_start(&idle, [](uv_idle_t *) { // }); printf("main thread id => %ld\n", (long)pthread_self()); uv_run(&loop, UV_RUN_DEFAULT); delete manager; delete worker; delete ctx; return 0; } 上面的例子中,当在子线程中执行回调时,通过 manager->add_task 往主线程提及一个任务。然后在主线程执行这个任务时,首先输出当前线程 id,再通过 manager->get_worker(0)->add_task 给子线程提交一个任务。比如我们希望定时收集主线程的 CPU 数据,那么就可以给子线程插入一个 watchdog,然后在 watchdog 回调里给主线程提交一个任务,当主线程执行这个任务的时候,我们就可以拿到主线程的 CPU 数据。当然还有更复杂的场景,比如获取 CPU Profile 时,主线程和子线程会进行多次交互。最后再看一个多个 worker 线程的例子。 WatchDogManager *manager = new WatchDogManager(&loop); WatchDogWorker *worker1 = new WatchDogWorker(); WatchDogWorker *worker2 = new WatchDogWorker(); struct watch_dog_ctx* ctx = new watch_dog_ctx; ctx->work = [](void *ctx) { printf("manager task execute in thread id => %ld\n", (long)pthread_self()); }; ctx->poll_interval = 1000; worker1->add_watchdog(ctx); worker2->add_watchdog(ctx); manager->add_worker(worker1); manager->add_worker(worker2); manager->start(); 以上代码输出如下。 main thread id => 4469550592 manager task execute in thread id => 123145483321344 manager task execute in thread id => 123145491722240 manager task execute in thread id => 123145483321344 manager task execute in thread id => 123145491722240 我们看到存在多个子线程,并且成功执行了任务。 后记:单线程使得编码变得简单,但是也导致了一些限制,这时候我们就需要使用额外的子线程来解决单线程存在的限制。引入多线程后,就需要解决多线程互斥和通信问题。以上代码只是介绍了一个大致的思路,还有地方需要完善,如果有想法的同学也可以交流。 仓库:https://github.com/theanarkh/uv-watchdog