📄 mylisterner_win.c
字号:
listerner_process_timeout(l);
continue;
}
/* 取出发生事件的句柄,根据触发事件源进行处理 */
hsrc = l->harray[ret];
if(hsrc == (HANDLE)MyEventWin32GetHandle(l->promq_notifier))
{
/* 处理带优先级的消息队列的消息 */
listerner_process_promsg(l);
}
else if(hsrc == (HANDLE)MyEventWin32GetHandle(l->mq_notifier))
{
/* 处理不带优先级的消息队列的消息 */
listerner_process_msg(l);
}
else if(hsrc == (HANDLE)MyEventWin32GetHandle(l->handleset_notifier))
{
LOG_DEBUG(("fdset event"));
}
else if(hsrc == (HANDLE)MyEventWin32GetHandle(l->tmhp_notifier))
{
LOG_DEBUG(("timer event"));
}
else
{
/* 处理句柄调度表里的句柄消息 */
listerner_dispatch_handles(l, hsrc);
}
}
return NULL;
}
/**
* @brief 句柄回调表的哈希函数
*/
static size_t handle_table_hashfun(const void * key)
{
return (size_t)key;
}
/**
* @brief 句柄回调表的哈希值比较函数
*/
static int handle_table_equalfun(const void * key1, const void * key2)
{
return (key1 == key2);
}
/**
* @brief 构造监听线程
*/
HMYLISTERNER MyListernerConstruct(HMYMEMPOOL hm, size_t max_msg_count)
{
mylisterner_t * l = MyMemPoolMalloc(hm, sizeof(*l));
assert(l);
l->hm = hm;
l->bNotExit = 1;
/* 定时器 */
l->tmhp = MyTimerHeapConstruct(hm);
l->tmhp_notifier = MyEventConstruct(hm);
assert(l->tmhp_notifier && l->tmhp);
/* 句柄数组的保护锁 */
l->handles_protect = MyMutexConstruct(hm);
assert(l->handles_protect);
l->handleset_notifier = MyEventConstruct(hm);
assert(l->handleset_notifier);
l->handles_dispatch_table = MyHashMapConstruct(hm,
handle_table_hashfun, handle_table_equalfun, 0, NULL, NULL);
assert(l->handles_dispatch_table);
l->hcount = 0;
/* 创建消息队列,以及消息队列的通知信号 */
l->mq = MyMsgQueConstruct(hm, max_msg_count);
l->mq_notifier = MyEventConstruct(hm);
assert(l->mq && l->mq_notifier);
/* 创建优先级消息队列,以及消息队列的通知信号 */
l->promq = MyProrityMsgQueConstruct(hm, max_msg_count);
l->promq_notifier = MyEventConstruct(hm);
assert(l->promq && l->promq_notifier);
/* 监听线程 */
l->thr = MyThreadConstruct(listerner_thread_fun, l, 1, hm);
assert(l->thr);
listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->promq_notifier), NULL);
listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->mq_notifier), NULL);
listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->tmhp_notifier), NULL);
listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->handleset_notifier), NULL);
return l;
}
/**
* @brief 析构监听线程
*/
void MyListernerDestruct(HMYLISTERNER l)
{
unsigned int i = 0;
evt_tag et = {0};
assert(l);
l->bNotExit = 0;
MyEventSetSignaled(l->handleset_notifier);
MyThreadJoin(l->thr);
/* 取消对所有句柄的select */
for(i = 0; i < l->hcount; i ++)
{
listerner_query_handle(l, l->harray[i], &et);
WSAEventSelect(et.fd, l->harray[i], 0);
}
if(l->thr)
MyThreadDestruct(l->thr);
l->thr = NULL;
if(l->mq)
MyMsgQueDestruct(l->mq);
l->mq = NULL;
if(l->mq_notifier)
MyEventDestruct(l->mq_notifier);
l->mq_notifier = NULL;
if(l->promq)
MyProrityMsgQueDestruct(l->promq);
l->promq = NULL;
if(l->promq_notifier)
MyEventDestruct(l->promq_notifier);
l->promq_notifier = NULL;
if(l->tmhp)
MyTimerHeapDestruct(l->tmhp);
l->tmhp = NULL;
if(l->tmhp_notifier)
MyEventDestruct(l->tmhp_notifier);
l->tmhp_notifier = NULL;
//加锁
MyMutexLock(l->handles_protect);
if(l->handles_dispatch_table)
MyHashMapDestruct(l->handles_dispatch_table);
l->handles_dispatch_table = NULL;
//解锁
MyMutexUnLock(l->handles_protect);
if(l->handles_protect)
MyMutexDestruct(l->handles_protect);
l->handles_protect = NULL;
if(l->handleset_notifier)
MyEventDestruct(l->handleset_notifier);
l->handleset_notifier = NULL;
MyMemPoolFree(l->hm, l);
}
/**
* @brief 运行监听线程
*/
void MyListernerRun(HMYLISTERNER hlisterner)
{
if(NULL == hlisterner)
return;
assert(hlisterner->thr);
MyThreadRun(hlisterner->thr);
}
/**
* @brief 等待listern线程退出
*/
void MyListernerWait(HMYLISTERNER hlisterner)
{
if(NULL == hlisterner)
return;
assert(hlisterner->thr);
MyThreadJoin(hlisterner->thr);
}
/**
* @brief 添加定时器
*/
HTIMERID MyListernerAddTimer(HMYLISTERNER l, mytimer_node_t * node)
{
HTIMERID timer_id = NULL;
if(NULL == l || NULL == node || NULL == l->tmhp)
return NULL;
LOG_DEBUG(("listerner add timer sec:%d usec:%d", node->first_expire.tv_sec, node->first_expire.tv_usec));
gettimeofday(&node->abs_expire, NULL);
timeval_add(node->abs_expire, node->first_expire);
/* 先添加,再唤醒 */
timer_id = MyTimerHeapAdd(l->tmhp, node);
/* 如果发现 <新的超时> 比 <当前最短的超时> 还要短,表示需要唤醒定时器线程 */
if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
{
MyEventSetSignaled(l->tmhp_notifier);
LOG_DEBUG(("write timer-add notify, timer id:%x", timer_id));
}
return timer_id;
}
/**
* @brief 删除定时器
*/
int MyListernerDelTimer(HMYLISTERNER l, HTIMERID timer_id)
{
if(NULL == l || NULL == l->tmhp)
return -1;
return MyTimerHeapDel(l->tmhp, timer_id);
}
/**
* @brief 重置定时器
*/
HTIMERID MyListernerResetTimer(HMYLISTERNER l, HTIMERID timer_id, mytimer_node_t * node)
{
if(NULL == l || NULL == l->tmhp)
return NULL;
gettimeofday(&node->abs_expire, NULL);
timeval_add(node->abs_expire, node->first_expire);
timer_id = MyTimerHeapReset(l->tmhp, timer_id, node);
if(NULL == timer_id)
return NULL;
/* 如果发现 <新的超时> 比 <当前最短的超时> 还要短,表示需要唤醒定时器线程 */
if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
{
MyEventSetSignaled(l->tmhp_notifier);
LOG_DEBUG(("write timer-add reset notify, timer id:%x", timer_id));
}
return timer_id;
}
/**
* @brief 添加文件扫描符
*/
int MyListernerAddFD(HMYLISTERNER hl, int fd, enum E_HANDLE_SET_MASK mask, event_handle_t * evt_handle)
{
/*
* 由于windows是用HANDLE来实现统一编程界面的
* 这里让socket与一个handle关联
*/
WSAEVENT he = NULL;
evt_tag et;
if(NULL == hl || NULL == evt_handle || (int)INVALID_FD == fd)
return -1;
memcpy(&et.e, evt_handle, sizeof(et.e));
et.mask = mask;
et.fd = fd;
he = WSACreateEvent();
if(INVALID_HANDLE_VALUE == he)
return -1;
if(0 != WSAEventSelect(fd, he, FD_OOB | FD_READ | FD_WRITE | FD_ACCEPT | FD_CONNECT | FD_CLOSE))
goto MyListernerAddFD_err_;
listerner_add_handle(hl, he, &et);
return 0;
MyListernerAddFD_err_:
WSACloseEvent(he);
return -1;
}
/**
* @brief 删除文件扫描符
*/
int MyListernerDelFD(HMYLISTERNER hl, int fd)
{
HANDLE h = INVALID_HANDLE_VALUE;
if(NULL == hl || (int)INVALID_FD == fd)
return -1;
if(0 != listerner_query_handle_by_fd(hl, fd, NULL, &h))
return -1;
if(INVALID_HANDLE_VALUE == h)
return -1;
/* 取消监听 */
if(0 != WSAEventSelect(fd, h, 0))
return -1;
/* 从回调表里删除相应的记录 */
listerner_del_handle(hl, h);
return 0;
}
/**
* @brief 添加一条消息
*/
int MyListernerAddMsg(HMYLISTERNER l,
const void * user_msg,
unsigned long context_data,
CB_LISTERNER_HANDLE_MSG handle)
{
msg_tag * msg = NULL;
if(NULL == l || NULL == l->mq)
return -1;
/* 添加消息 */
msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
if(NULL == msg)
return -1;
msg->context_data = context_data;
msg->handle = handle;
msg->msg = (void *)user_msg;
MyMsgQuePush(l->mq, msg);
/* 通知 */
MyEventSetSignaled(l->mq_notifier);
return 0;
}
/**
* @brief 添加一条带优先级的消息
*/
int MyListernerAddProrityMsg(HMYLISTERNER l,
int prority,
const void * user_msg,
unsigned long context_data,
CB_LISTERNER_HANDLE_MSG handle)
{
msg_tag * msg = NULL;
int ret = 0;
if(NULL == l || NULL == l->promq)
return -1;
/* 添加消息 */
msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
if(NULL == msg)
return -1;
msg->context_data = context_data;
msg->handle = handle;
msg->msg = (void *)user_msg;
ret = MyProrityMsgQuePush(l->promq, prority, msg);
/* 通知 */
if(0 == ret)
{
MyEventSetSignaled(l->promq_notifier);
}
else
LOG_WARN(("add prority msg fail %d", ret));
return ret;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -