📄 mylisterner_linux.c
字号:
LOG_DEBUG(("listerner process begin\r\n"));
//循环select fd集合timeout为定时器的最小超时
if(0 == MyTimerHeapGetEarliestExpire(l->tmhp, &tv_earliest))
{
gettimeofday(&tv_now, NULL);
if(timeval_smaller(tv_now, tv_earliest))
{
timeval_minus(tv_earliest, tv_now);
ret = MyHandleSetSelect(l->handles, &tv_earliest);
}
else
{
ret = 0;
memset(&tv_earliest, 0, sizeof(tv_earliest));
LOG_INFO(("null loop"));
}
}
else
ret = MyHandleSetSelect(l->handles, NULL);
//没有事件发生,只运行定时器回调
if(0 == ret)
{
listerner_process_timeout(l);
continue;
}
//优先处理优先级队列的消息
if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->promq_notifier), E_FD_READ))
listerner_process_promsg(l);
//处理普通消息队列
if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->mq_notifier), E_FD_READ))
listerner_process_msg(l);
//删除定时器发生的事件
if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->tmhp_notifier), E_FD_READ))
{
//从管道里读出消息
notify_tag t = {0};
MyPipeRead(l->tmhp_notifier, &t, sizeof(t));
}
//删除由于添加io fd产生的事件
if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->handles_notifier), E_FD_READ))
{
//从管道里读出消息
notify_tag t = {0};
MyPipeRead(l->handles_notifier, &t, sizeof(t));
}
//根据fd hash表进行相应的调度
listerner_process_io(l);
}
}
/*
*
* 监听线程函数
*
*/
static __INLINE__ void destroy(mylisterner_t * l)
{
assert(l);
l->bNotExit = 0;
{
notify_tag t = {0};
MyPipeWrite(l->handles_notifier, &t, sizeof(t));
}
MyThreadJoin(l->thr);
if(l->thr)
MyThreadDestruct(l->thr);
l->thr = NULL;
if(l->handles)
MyHandleSetDestruct(l->handles);
l->handles = NULL;
if(l->mq)
MyMsgQueDestruct(l->mq);
l->mq = NULL;
if(l->mq_notifier)
MyPipeDestruct(l->mq_notifier);
l->mq_notifier = NULL;
if(l->promq)
MyProrityMsgQueDestruct(l->promq);
l->promq = NULL;
if(l->promq_notifier)
MyPipeDestruct(l->promq_notifier);
l->promq_notifier = NULL;
if(l->tmhp)
MyTimerHeapDestruct(l->tmhp);
l->tmhp = NULL;
if(l->tmhp_notifier)
MyPipeDestruct(l->tmhp_notifier);
l->tmhp_notifier = NULL;
//加锁
MyMutexLock(l->handles_protect);
if(l->handles_dispatch_table)
MyHashMapDestruct(l->handles_dispatch_table);
l->handles_dispatch_table = NULL;
//解锁
MyMutexUnLock(l->handles_protect);
if(l->handles_protect)
MyMutexDestruct(l->handles_protect);
l->handles_protect = NULL;
if(l->handles_notifier)
MyPipeDestruct(l->handles_notifier);
l->handles_notifier = NULL;
MyMemPoolFree(l->hm, l);
}
/*
*
* 构造监听线程
*
*/
HMYLISTERNER MyListernerConstruct(HMYMEMPOOL hm, size_t max_msg_count)
{
mylisterner_t * l = (mylisterner_t *)MyMemPoolMalloc(hm, sizeof(*l));
if(NULL == l)
return NULL;
LOG_DEBUG(("create listerner"));
l->hm = hm;
l->bNotExit = 1;
//定时器
l->tmhp = MyTimerHeapConstruct(hm);
l->tmhp_notifier = MyPipeConstruct(hm);
//io句柄表
l->handles = MyHandleSetConstruct(hm);
l->handles_notifier = MyPipeConstruct(hm);
l->handles_dispatch_table = MyHashMapConstruct(hm,
handle_table_hashfun, handle_table_equalfun, 0, NULL, NULL);
l->handles_protect = MyMutexConstruct(hm);
//普通消息队列
l->mq = MyMsgQueConstruct(hm, max_msg_count);
l->mq_notifier = MyPipeConstruct(hm);
//优先级消息队列
l->promq = MyProrityMsgQueConstruct(hm, max_msg_count);
l->promq_notifier = MyPipeConstruct(hm);
listerner_add_fd(l, MyPipeGetReadFD(l->promq_notifier), E_FD_READ, NULL);
listerner_add_fd(l, MyPipeGetReadFD(l->mq_notifier), E_FD_READ, NULL);
listerner_add_fd(l, MyPipeGetReadFD(l->handles_notifier), E_FD_READ, NULL);
listerner_add_fd(l, MyPipeGetReadFD(l->tmhp_notifier), E_FD_READ, NULL);
//监听线程
l->thr = MyThreadConstruct(listerner_thread_fun, l, 1, hm);
if(NULL == l->handles || NULL == l->handles_notifier || NULL == l->handles_dispatch_table || NULL == l->handles_protect ||
NULL == l->mq || NULL == l->mq_notifier ||
NULL == l->promq || NULL == l->promq_notifier ||
NULL == l->tmhp || NULL == l->tmhp_notifier ||
NULL == l->thr)
{
LOG_WARN(("fail create listerner"));
destroy(l);
return NULL;
}
return (HMYLISTERNER)l;
}
/*
*
* 析构监听线程
*
*/
void MyListernerDestruct(HMYLISTERNER hlisterner)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l)
return;
LOG_DEBUG(("destroy listerner"));
destroy(l);
}
/*
*
* 运行监听线程
*
*/
void MyListernerRun(HMYLISTERNER hlisterner)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l)
return;
assert(l->thr);
MyThreadRun(l->thr);
}
/*
*
* 等待listern线程退出
*
*/
void MyListernerWait(HMYLISTERNER hlisterner)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l)
return;
assert(l->thr);
MyThreadJoin(l->thr);
}
/*
*
* 添加定时器
*
*/
HTIMERID MyListernerAddTimer(HMYLISTERNER hlisterner, mytimer_node_t * node)
{
HTIMERID timer_id = NULL;
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == node || NULL == l->tmhp)
return NULL;
LOG_DEBUG(("listerner add timer sec:%d usec:%d", node->first_expire.tv_sec, node->first_expire.tv_usec));
gettimeofday(&node->abs_expire, NULL);
timeval_add(node->abs_expire, node->first_expire);
//先添加,再唤醒
timer_id = MyTimerHeapAdd(l->tmhp, node);
if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
{
notify_tag t = {0};
MyPipeWrite(l->tmhp_notifier, &t, sizeof(t));
LOG_DEBUG(("write timer-add notify, timer id:%x", timer_id));
}
return timer_id;
}
/*
*
* 删除定时器
*
*/
int MyListernerDelTimer(HMYLISTERNER hlisterner, HTIMERID timer_id)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == l->tmhp)
return -1;
return MyTimerHeapDel(l->tmhp, timer_id);
}
/*
*
* 重置加定时器
*
*/
HTIMERID MyListernerResetTimer(HMYLISTERNER hlisterner, HTIMERID timer_id, mytimer_node_t * node)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == l->tmhp)
return NULL;
gettimeofday(&node->abs_expire, NULL);
timeval_add(node->abs_expire, node->first_expire);
timer_id = MyTimerHeapReset(l->tmhp, timer_id, node);
if(NULL == timer_id)
return NULL;
//如果发现 <新的超时> 比 <当前最短的超时> 还要短,表示需要唤醒定时器线程
if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
{
notify_tag t = {0};
MyPipeWrite(l->tmhp_notifier, &t, sizeof(t));
}
return timer_id;
}
/*
*
* 添加文件扫描符
*
*/
int MyListernerAddFD(HMYLISTERNER hlisterner, int fd, enum E_HANDLE_SET_MASK mask, event_handle_t * evt_handle)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == l->handles_dispatch_table || NULL == evt_handle || NULL == l->handles)
return -1;
LOG_DEBUG(("add fd:%d mask:%d", fd, mask));
return listerner_add_fd(l, fd, mask, evt_handle);
}
/*
*
* 删除文件扫描符
*
*/
int MyListernerDelFD(HMYLISTERNER hlisterner, int fd)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == l->handles_dispatch_table || NULL == l->handles)
return -1;
LOG_DEBUG(("del fd:%d", fd));
return listerner_del_fd(l, fd);
}
/*
*
* 添加一条消息
*
*/
int MyListernerAddMsg(HMYLISTERNER hlisterner,
const void * user_msg,
unsigned long context_data,
CB_LISTERNER_HANDLE_MSG handle)
{
msg_tag * msg = NULL;
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == l->mq)
return -1;
//添加消息
msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
if(NULL == msg)
return -1;
msg->context_data = context_data;
msg->handle = handle;
msg->msg = (void *)user_msg;
MyMsgQuePush(l->mq, msg);
//通知
{
notify_tag t = {0};
MyPipeWrite(l->mq_notifier, &t, sizeof(t));
}
return 0;
}
/*
*
* 添加一条带优先级的消息
*
*/
int MyListernerAddProrityMsg(HMYLISTERNER hlisterner,
int prority,
const void * user_msg,
unsigned long context_data,
CB_LISTERNER_HANDLE_MSG handle)
{
msg_tag * msg = NULL;
int ret = 0;
mylisterner_t * l = (mylisterner_t *)hlisterner;
if(NULL == l || NULL == l->promq)
return -1;
//添加消息
msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
if(NULL == msg)
return -1;
msg->context_data = context_data;
msg->handle = handle;
msg->msg = (void *)user_msg;
ret = MyProrityMsgQuePush(l->promq, prority, msg);
//通知
if(0 == ret)
{
notify_tag t = {0};
MyPipeWrite(l->promq_notifier, &t, sizeof(t));
}
else
LOG_WARN(("add prority msg fail %d", ret));
return ret;
}
/*
*
* 获取定时器个数
*
*/
int MyListernerPrint(HMYLISTERNER hlisterner)
{
mylisterner_t * l = (mylisterner_t *)hlisterner;
printf("listerner %x\r\n timer:%d msgq:%d\r\n",
l, MyTimerHeapGetCount(l->tmhp),
MyMsgQueGetCount(l->mq));
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -