📄 mylisterner_linux.c
字号:
/*
*
*mylisterner.h 定时器线程 lin shao chuan
*
* 参照ACE的Select_Reactor实现以下功能:
* 监听设备句柄
* 监听消息队列(优先级与无优先级)
* 提供定时器接口
*
* 暂未实现的接口:
* 提供sysv消息队列监听
* 仿照ACE提供suspend resume fd类似的接口
* 删除监听fd的接口
*
*/
#include "mylisterner.h"
#include <assert.h>
#include <sys/time.h>
#include <string.h>
#include "mypipe.h"
#include "mymsgque.h"
#include "myproritymsgque.h"
#include "mymutex.h"
#include "myhashmap.h"
#include "mythread.h"
#include "mylog.h"
#include "myutility.h"
typedef struct __mylisterner_t_
{
//fd集合
HMYHANDLESET handles;
HMYPIPE handles_notifier;
//fd事件回调函数表
HMYHASHMAP handles_dispatch_table;
HMYMUTEX handles_protect;
//普通消息队列
HMYMSGQUE mq;
HMYPIPE mq_notifier;
//优先级消息队列
HMY_PRO_MQ promq;
HMYPIPE promq_notifier;
//定时器
HMYTIMERHEAP tmhp;
HMYPIPE tmhp_notifier;
//监听线程
HMYTHREAD thr;
HMYMEMPOOL hm;
int bNotExit;
}mylisterner_t;
typedef struct __msg_tag_
{
CB_LISTERNER_HANDLE_MSG handle;
unsigned long context_data;
void * msg;
}msg_tag;
typedef struct __notify_tag_
{
int unused;
}notify_tag;
static size_t handle_table_hashfun(const void * key)
{
return (size_t)key;
}
static int handle_table_equalfun(const void * key1, const void * key2)
{
return (key1 == key2);
}
/*
*
* 处理带优先级的消息
*
*/
static __INLINE__ void listerner_process_promsg(mylisterner_t * l)
{
msg_tag * msg = NULL;
//读出所有的消息
assert(l && l->promq);
//呼叫回调函数
while(msg = (msg_tag *)MyProrityMsgQuePop(l->promq))
{
//从管道里读出消息
notify_tag t = {0};
MyPipeRead(l->promq_notifier, &t, sizeof(t));
if(msg->handle)
msg->handle(msg->context_data, msg->msg);
MyMemPoolFree(l->hm, msg);
}
}
/*
*
* 处理带超时消息
*
*/
static __INLINE__ void listerner_process_timeout(mylisterner_t * l)
{
struct timeval now;
assert(l && l->tmhp);
gettimeofday(&now, NULL);
MyTimerHeapRunExpire(l->tmhp, &now);
}
/*
*
* 处理带普通消息
*
*/
static __INLINE__ void listerner_process_msg(mylisterner_t * l)
{
msg_tag * msg = NULL;
//读出所有的消息
assert(l && l->mq);
//呼叫回调函数
while(msg = (msg_tag *)MyMsgQuePop(l->mq))
{
//从管道里读出消息
notify_tag t = {0};
MyPipeRead(l->mq_notifier, &t, sizeof(t));
if(msg->handle)
msg->handle(msg->context_data, msg->msg);
MyMemPoolFree(l->hm, msg);
}
}
/*
*
* 处理可读的io fd
*
*/
static __INLINE__ void listerner_dispatch_read(mylisterner_t * l, HMYVECTOR read_set)
{
size_t i = 0;
size_t loop = 0;
LOG_DEBUG(("listerner process io read event"));
assert(l && read_set);
//轮循read_set,
loop = MyVectorGetCount(read_set);
for(i = 0; i < loop; i ++)
{
event_handle_t * evt_handle = NULL;
HMYHASHMAP_ITER it = NULL;
int fd = (int)MyVectorGetIndexData(read_set, i, NULL);
//查表回调
it = MyHashMapSearch(l->handles_dispatch_table, (void *)fd);
if(NULL == it)
continue;
evt_handle = (event_handle_t *)MyHashMapGetIterData(it);
if(NULL == evt_handle || NULL == evt_handle->input)
continue;
evt_handle->input(evt_handle->context_data, fd);
}
}
/*
*
* 处理可写的io fd
*
*/
static __INLINE__ void listerner_dispatch_write(mylisterner_t * l, HMYVECTOR write_set)
{
size_t i = 0;
size_t loop = 0;
LOG_DEBUG(("listerner process io write event"));
assert(l && write_set);
//轮循read_set,
loop = MyVectorGetCount(write_set);
for(i = 0; i < loop; i ++)
{
event_handle_t * evt_handle = NULL;
HMYHASHMAP_ITER it = NULL;
int fd = (int)MyVectorGetIndexData(write_set, i, NULL);
//查表回调
it = MyHashMapSearch(l->handles_dispatch_table, (void *)fd);
if(NULL == it)
continue;
evt_handle = (event_handle_t *)MyHashMapGetIterData(it);
if(NULL == evt_handle || NULL == evt_handle->output)
continue;
evt_handle->output(evt_handle->context_data, fd);
}
}
/*
*
* 处理异常的io fd
*
*/
static __INLINE__ void listerner_dispatch_exception(mylisterner_t * l, HMYVECTOR exception_set)
{
size_t i = 0;
size_t loop = 0;
LOG_DEBUG(("listerner process io exception event"));
assert(l && exception_set);
//轮循read_set,
loop = MyVectorGetCount(exception_set);
for(i = 0; i < loop; i ++)
{
event_handle_t * evt_handle = NULL;
HMYHASHMAP_ITER it = NULL;
int fd = (int)MyVectorGetIndexData(exception_set, i, NULL);
//查表回调
it = MyHashMapSearch(l->handles_dispatch_table, (void *)fd);
if(NULL == it)
continue;
evt_handle = (event_handle_t *)MyHashMapGetIterData(it);
if(NULL == evt_handle || NULL == evt_handle->exception)
continue;
evt_handle->exception(evt_handle->context_data, fd);
}
}
static __INLINE__ int listerner_del_fd(mylisterner_t * l, int fd)
{
assert(l && l->handles_dispatch_table && l->handles);
//加锁
if(0 != MyMutexLock(l->handles_protect))
return -1;
MyHashMapDelKey(l->handles_dispatch_table, (void *)fd);
//解锁
MyMutexUnLock(l->handles_protect);
//从fd集合中删除
MyHandleSetDelFd(l->handles, fd);
//产生fd添加通知
{
notify_tag t = {0};
MyPipeWrite(l->handles_notifier, &t, sizeof(t));
}
return 0;
}
static __INLINE__ int listerner_add_fd(mylisterner_t * l, int fd, enum E_HANDLE_SET_MASK mask, event_handle_t * evt_handle)
{
int ret = 0;
assert(l && l->handles_dispatch_table && l->handles);
//添加io fd回调函数表
if(evt_handle)
{
//加锁
if(0 != MyMutexLock(l->handles_protect))
return -1;
if(NULL == MyHashMapInsertUnique(l->handles_dispatch_table, (void *)fd, 0, evt_handle, sizeof(*evt_handle)))
{
//解锁
MyMutexUnLock(l->handles_protect);
return -1;
}
else/*解锁*/
MyMutexUnLock(l->handles_protect);
}
//添加到fd集合
if((ret = MyHandleSetFdSet(l->handles, fd, mask)) != 0)
goto listerner_add_fd_end_;
//产生fd添加通知
{
notify_tag t = {0};
MyPipeWrite(l->handles_notifier, &t, sizeof(t));
}
return 0;
listerner_add_fd_end_:
//加锁
MyMutexLock(l->handles_protect);
//从回调表中删除
MyHashMapDelKey(l->handles_dispatch_table, (void *)fd);
//解锁
MyMutexUnLock(l->handles_protect);
//从集合中删除
MyHandleSetDelFd(l->handles, fd);
return ret;
}
/*
*
* 处理带io消息
*
*/
static __INLINE__ void listerner_process_io(mylisterner_t * l)
{
HMYVECTOR read_set = NULL;
HMYVECTOR write_set = NULL;
HMYVECTOR exception_set = NULL;
assert(l);
//加锁
if(0 != MyMutexLock(l->handles_protect))
return;
read_set = MyVectorConstruct(l->hm, 0, NULL, NULL);
write_set = MyVectorConstruct(l->hm, 0, NULL, NULL);
exception_set = MyVectorConstruct(l->hm, 0, NULL, NULL);
//取出所有发生事件的fd
MyHandleSetGetAllSignalFd(l->handles,
read_set, write_set, exception_set);
//查表,
//呼叫相应的回调函数
if(MyVectorGetCount(read_set))
listerner_dispatch_read(l, read_set);
if(MyVectorGetCount(write_set))
listerner_dispatch_write(l, write_set);
if(MyVectorGetCount(exception_set))
listerner_dispatch_exception(l, exception_set);
MyVectorDestruct(read_set);
MyVectorDestruct(write_set);
MyVectorDestruct(exception_set);
//解锁
listerner_process_io_end_:
MyMutexUnLock(l->handles_protect);
}
/*
*
* 监听线程函数
*
*/
static void * listerner_thread_fun(void * param)
{
struct timeval tv_now = {0};
struct timeval tv_earliest = {0};
unsigned long mask = 0;
int ret = 0;
mylisterner_t * l = (mylisterner_t *)param;
assert(l);
LOG_DEBUG(("listerner loop run"));
while(l->bNotExit)
{
LOG_DEBUG(("listerner process begin\r\n"));
//循环select fd集合timeout为定时器的最小超时
if(0 == MyTimerHeapGetEarliestExpire(l->tmhp, &tv_earliest))
{
gettimeofday(&tv_now, NULL);
if(timeval_smaller(tv_now, tv_earliest))
{
timeval_minus(tv_earliest, tv_now);
ret = MyHandleSetSelect(l->handles, &tv_earliest);
}
else
{
ret = 0;
memset(&tv_earliest, 0, sizeof(tv_earliest));
LOG_INFO(("null loop"));
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -