之前写过一篇文章叫《写了一段高端C++代码》,这篇文章的背景和它完全相同,我这里再复述一遍。

背景:
在音视频方向中,线程分为普通线程和GL线程(OpenGL线程),GL线程中可以执行OpenGL相关的语句,做一些图像渲染的工作,也可以理解为所有GL语句都要在GL线程中执行;而在普通线程中,只能执行那些我们平时经常接触的普通语句。
在具体项目开发中会有些需求:在普通线程中突然想要执行某些必须要在GL线程下执行的任务(比如某些初始化工作,释放某些GL相关的对象),执行完此任务后又继续执行自己的任务,像在同一个线程执行一样:
void func() {task1();task2(); // 需要在GL线程执行task3();}
分析:
这里有个关键点:task3()一定要等到task2()执行完毕后才可执行,但是由于task2()是被抛到了其他线程运行,没有起到阻塞执行的效果。
怎么能达到目的呢?可以这样使用条件变量:
void task2() {...notify();}void func() {task1();task2(); // 需要在GL线程执行wait();task3();}
普通线程在task2()后使用wait()阻塞线程,待GL线程中的任务执行完后使用notity()打断普通线程的阻塞,可达到顺序执行的目的。
但这样非常麻烦,而且不通用,代码还相当难看。

在之前的文章里我使用C++的future封装了一套函数,可以方便的跨线程阻塞调度某个任务执行,然而我还有个项目是使用纯C语言开发的,没有了C++的future,要完成类似需求就比较困难,但是,困难也得搞阿,于是有了下面的代码。
先看下如何函数的设计:
typedef struct Dispatcher Dispatcher;typedef void (*DispatcherFunc)(void* arg);/*** @brief 创建一个实例,内部会常驻一个GL线程,外部可以将某些任务丢到此线程里执行,可以选择是否阻塞执行*/Dispatcher* Dispatcher_create();/*** @brief 销毁实例*/void Dispatcher_destroy(Dispatcher** dispatcher_p);/*** @brief 利用此函数将任务丢到线程里执行** @param dispatcher 实例* @param func 要执行的函数* @param arg 函数参数* @param block 选择是否阻塞执行*/void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block);
主要功能就在最后一个函数,该函数可以选择是否阻塞执行,这里大家可以思考一下,如何实现阻塞的需求?条件变量是肯定的,但是如何保证条件变量等待的是当前事件呢?还是直接看代码实现吧:
#include <stdatomic.h>#include "libctools.h"#include "gldispatch.h"typedef struct Task {DispatcherFunc func;void* arg;Mutex* mutex;Cond* cond;bool is_continue;bool is_block;} Task;struct Dispatcher {Thread* thread_id;Thread _thread_id;Mutex* mutex;Cond* cond;List* task_queue;atomic_bool is_interrupt;};static void taskDestroy(Task* task) {if (task->mutex) {mutex_destroyp(&task->mutex);}if (task->cond) {cond_destroyp(&task->cond);}free((void*)task);}static Task* taskCreate(DispatcherFunc func, void* arg, bool is_block) {Task* task = (Task*)calloc(1, sizeof(Task));if (!task) return NULL;task->func = func;task->arg = arg;task->is_block = is_block;if (is_block) {task->mutex = mutex_create();task->cond = cond_create();}return task;}static int Dispatcher_process(void* arg) {log_info("%s start", __func__);Dispatcher* self = (Dispatcher*)arg;while (!atomic_load(&self->is_interrupt)) {mutex_lock(self->mutex);while (self->task_queue->len == 0 && !atomic_load(&self->is_interrupt)) {cond_wait(self->cond, self->mutex);}ListNode* node = list_pop_front(self->task_queue);mutex_unlock(self->mutex);if (atomic_load(&self->is_interrupt)) {break;}if (node) {Task* task = (Task*)node->val;task->func(task->arg);if (task->is_block) {mutex_lock(task->mutex);task->is_continue = true;cond_signal(task->cond);mutex_unlock(task->mutex);} else {taskDestroy(task);}free(node);}}log_info("%s stop", __func__);return 0;}static void Dispatcher_start(Dispatcher* dispatcher) {dispatcher->thread_id =thread_create_with_name(&dispatcher->_thread_id, Dispatcher_process, dispatcher, "Dispatcher_process");}static void Dispatcher_stop(Dispatcher* dispatcher) {atomic_store(&dispatcher->is_interrupt, true);cond_signal(dispatcher->cond);list_clear(dispatcher->task_queue, true);if (dispatcher->thread_id) {thread_wait(dispatcher->thread_id, NULL);}}Dispatcher* Dispatcher_create() {Dispatcher* self = (Dispatcher*)calloc(1, sizeof(Dispatcher));if (!self) return NULL;self->mutex = mutex_create();self->cond = cond_create();self->task_queue = list_create();self->task_queue->free_func = (void (*)(int64_t))taskDestroy;Dispatcher_start(self);return self;}void Dispatcher_destroy(Dispatcher** dispatcher_p) {if (NULL == dispatcher_p || NULL == *dispatcher_p) return;Dispatcher* self = *dispatcher_p;Dispatcher_stop(self);mutex_destroyp(&self->mutex);cond_destroyp(&self->cond);list_destroy(self->task_queue);freep((void**)dispatcher_p);}void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block) {if (!dispatcher) return;Task* task = taskCreate(func, arg, block);if (task) {ListNode* node = list_node_new((int64_t)task);mutex_lock(dispatcher->mutex);list_push_back(dispatcher->task_queue, node);cond_signal(dispatcher->cond);mutex_unlock(dispatcher->mutex);if (task->is_block) {mutex_lock(task->mutex);while (!task->is_continue) {cond_wait(task->cond, task->mutex);}mutex_unlock(task->mutex);taskDestroy(task);}}}
使用方式:
void func1(void* arg) {print("hello func1");}void func2(void* arg) {print("hello func2");}int main() {Dispatcher* dispatcher = Dispatcher_create();Dispatcher_run(dispatcher, func1, NULL, true);Dispatcher_run(dispatcher, func2, NULL, true);Dispatcher_destroy(&dispatcher);return 0;}
tips:
代码中的log、mutex、cond、thread、list都是二次封装的函数,功能无非就是log、加解锁、条件变量、创建线程以及C语言的链表。这里就不贴出他们的实现了,大家可以自己实现一套,当作个小练习,不难。

代码我也就不过多介绍了,相信有点水平的朋友都能看懂,有问题可以留言!