⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mylisterner_linux.c

📁 提供了rbtree ttree avltree list hashtable等常用容器的算法,代码经过uclinux + arm44b0平台验证
💻 C
📖 第 1 页 / 共 2 页
字号:
/*
*
*mylisterner.h 定时器线程 lin shao chuan
*
* 参照ACE的Select_Reactor实现以下功能:
*  监听设备句柄
*  监听消息队列(优先级与无优先级)
*  提供定时器接口
*
* 暂未实现的接口:
*	提供sysv消息队列监听
*	仿照ACE提供suspend resume fd类似的接口
*	删除监听fd的接口
*
*/


#include "mylisterner.h"

#include <assert.h>
#include <sys/time.h>
#include <string.h>

#include "mypipe.h"
#include "mymsgque.h"
#include "myproritymsgque.h"
#include "mymutex.h"
#include "myhashmap.h"
#include "mythread.h"
#include "mylog.h"
#include "myutility.h"


typedef struct __mylisterner_t_
{
	//fd集合 
	HMYHANDLESET handles;
	HMYPIPE handles_notifier;

	//fd事件回调函数表
	HMYHASHMAP handles_dispatch_table;
	HMYMUTEX handles_protect;

	//普通消息队列
	HMYMSGQUE mq;
	HMYPIPE mq_notifier;

	//优先级消息队列
	HMY_PRO_MQ promq;
	HMYPIPE promq_notifier;

	//定时器
	HMYTIMERHEAP tmhp;
	HMYPIPE tmhp_notifier;
	
	//监听线程
	HMYTHREAD thr;

	HMYMEMPOOL hm;
	
	int bNotExit;
}mylisterner_t;

typedef struct __msg_tag_
{
	CB_LISTERNER_HANDLE_MSG handle;
	unsigned long context_data;
	void * msg;
}msg_tag;

typedef struct __notify_tag_
{
	int unused;
}notify_tag;


static size_t handle_table_hashfun(const void * key)
{
	return (size_t)key;
}

static int handle_table_equalfun(const void * key1, const void * key2)
{
	return (key1 == key2);
}


/*
*
* 处理带优先级的消息
*
*/
static __INLINE__ void listerner_process_promsg(mylisterner_t * l)
{
	msg_tag * msg = NULL;

	//读出所有的消息
	assert(l && l->promq);

	//呼叫回调函数
	while(msg = (msg_tag *)MyProrityMsgQuePop(l->promq))
	{
		//从管道里读出消息
		notify_tag t = {0};
		MyPipeRead(l->promq_notifier, &t, sizeof(t));

		if(msg->handle)
			msg->handle(msg->context_data, msg->msg);

		MyMemPoolFree(l->hm, msg);
	}
}

/*
*
* 处理带超时消息
*
*/
static __INLINE__ void listerner_process_timeout(mylisterner_t * l)
{
	struct timeval now;

	assert(l && l->tmhp);
	
	gettimeofday(&now, NULL);	

	MyTimerHeapRunExpire(l->tmhp, &now);
}

/*
*
* 处理带普通消息
*
*/
static __INLINE__ void listerner_process_msg(mylisterner_t * l)
{
	msg_tag * msg = NULL;

	//读出所有的消息
	assert(l && l->mq);

	//呼叫回调函数
	while(msg = (msg_tag *)MyMsgQuePop(l->mq))
	{
		//从管道里读出消息
		notify_tag t = {0};
		MyPipeRead(l->mq_notifier, &t, sizeof(t));

		if(msg->handle)
			msg->handle(msg->context_data, msg->msg);

		MyMemPoolFree(l->hm, msg);
	}
}

/*
*
* 处理可读的io fd
*
*/
static __INLINE__ void listerner_dispatch_read(mylisterner_t * l, HMYVECTOR read_set)
{
	size_t i = 0;
	size_t loop = 0;

	LOG_DEBUG(("listerner process io read event"));
	assert(l && read_set);

	//轮循read_set,
	loop = MyVectorGetCount(read_set);
	for(i = 0; i < loop; i ++)
	{
		event_handle_t * evt_handle = NULL;
		HMYHASHMAP_ITER it = NULL;
		int fd = (int)MyVectorGetIndexData(read_set, i, NULL);

		//查表回调
		it = MyHashMapSearch(l->handles_dispatch_table, (void *)fd);
		if(NULL == it)
			continue;

		evt_handle = (event_handle_t *)MyHashMapGetIterData(it);
		if(NULL == evt_handle || NULL == evt_handle->input)
			continue;

		evt_handle->input(evt_handle->context_data, fd);
	}
}

/*
*
* 处理可写的io fd
*
*/
static __INLINE__ void listerner_dispatch_write(mylisterner_t * l, HMYVECTOR write_set)
{
	size_t i = 0;
	size_t loop = 0;

	LOG_DEBUG(("listerner process io write event"));
	assert(l && write_set);

	//轮循read_set,
	loop = MyVectorGetCount(write_set);
	for(i = 0; i < loop; i ++)
	{
		event_handle_t * evt_handle = NULL;
		HMYHASHMAP_ITER it = NULL;
		int fd = (int)MyVectorGetIndexData(write_set, i, NULL);

		//查表回调
		it = MyHashMapSearch(l->handles_dispatch_table, (void *)fd);
		if(NULL == it)
			continue;

		evt_handle = (event_handle_t *)MyHashMapGetIterData(it);
		if(NULL == evt_handle || NULL == evt_handle->output)
			continue;

		evt_handle->output(evt_handle->context_data, fd);
	}
}

/*
*
* 处理异常的io fd
*
*/
static __INLINE__ void listerner_dispatch_exception(mylisterner_t * l, HMYVECTOR exception_set)
{
	size_t i = 0;
	size_t loop = 0;

	LOG_DEBUG(("listerner process io exception event"));
	assert(l && exception_set);

	//轮循read_set,
	loop = MyVectorGetCount(exception_set);
	for(i = 0; i < loop; i ++)
	{
		event_handle_t * evt_handle = NULL;
		HMYHASHMAP_ITER it = NULL;
		int fd = (int)MyVectorGetIndexData(exception_set, i, NULL);

		//查表回调
		it = MyHashMapSearch(l->handles_dispatch_table, (void *)fd);
		if(NULL == it)
			continue;

		evt_handle = (event_handle_t *)MyHashMapGetIterData(it);
		if(NULL == evt_handle || NULL == evt_handle->exception)
			continue;

		evt_handle->exception(evt_handle->context_data, fd);
	}
}

static __INLINE__ int listerner_del_fd(mylisterner_t * l, int fd)
{
	assert(l && l->handles_dispatch_table && l->handles);

	//加锁
	if(0 != MyMutexLock(l->handles_protect))
		return -1;

	MyHashMapDelKey(l->handles_dispatch_table, (void *)fd);

	//解锁
	MyMutexUnLock(l->handles_protect);

	//从fd集合中删除
	MyHandleSetDelFd(l->handles, fd);

	//产生fd添加通知
	{
		notify_tag t = {0};
		MyPipeWrite(l->handles_notifier, &t, sizeof(t));
	}
	
	return 0;
}

static __INLINE__ int listerner_add_fd(mylisterner_t * l, int fd, enum E_HANDLE_SET_MASK mask, event_handle_t * evt_handle)
{
	int ret = 0;

	assert(l && l->handles_dispatch_table && l->handles);


	//添加io fd回调函数表
	if(evt_handle)
	{
		//加锁
		if(0 != MyMutexLock(l->handles_protect))
			return -1;

		if(NULL == MyHashMapInsertUnique(l->handles_dispatch_table, (void *)fd, 0, evt_handle, sizeof(*evt_handle)))
		{
			//解锁
			MyMutexUnLock(l->handles_protect);
			return -1;
		}
		else/*解锁*/
			MyMutexUnLock(l->handles_protect);
	}

	//添加到fd集合
	if((ret = MyHandleSetFdSet(l->handles, fd, mask)) != 0)
		goto listerner_add_fd_end_;

	//产生fd添加通知
	{
		notify_tag t = {0};
		MyPipeWrite(l->handles_notifier, &t, sizeof(t));
	}

	return 0;

listerner_add_fd_end_:

	//加锁
	MyMutexLock(l->handles_protect);

	//从回调表中删除
	MyHashMapDelKey(l->handles_dispatch_table, (void *)fd);

	//解锁
	MyMutexUnLock(l->handles_protect);

	//从集合中删除
	MyHandleSetDelFd(l->handles, fd);

	return ret;
}

/*
*
* 处理带io消息
*
*/
static __INLINE__ void listerner_process_io(mylisterner_t * l)
{
	HMYVECTOR read_set = NULL;
	HMYVECTOR write_set = NULL;
	HMYVECTOR exception_set = NULL;

	assert(l);

	//加锁
	if(0 != MyMutexLock(l->handles_protect))
		return;

	read_set = MyVectorConstruct(l->hm, 0, NULL, NULL);
	write_set = MyVectorConstruct(l->hm, 0, NULL, NULL);
	exception_set = MyVectorConstruct(l->hm, 0, NULL, NULL);

	//取出所有发生事件的fd
	MyHandleSetGetAllSignalFd(l->handles,
		read_set, write_set, exception_set);

	//查表,
	//呼叫相应的回调函数
	if(MyVectorGetCount(read_set))
		listerner_dispatch_read(l, read_set);
	if(MyVectorGetCount(write_set))
		listerner_dispatch_write(l, write_set);
	if(MyVectorGetCount(exception_set))
		listerner_dispatch_exception(l, exception_set);

	MyVectorDestruct(read_set);
	MyVectorDestruct(write_set);
	MyVectorDestruct(exception_set);

	//解锁
listerner_process_io_end_:
	MyMutexUnLock(l->handles_protect);
}

/*
*
* 监听线程函数
*
*/
static void * listerner_thread_fun(void * param)
{
	struct timeval tv_now = {0};
	struct timeval tv_earliest = {0};
	unsigned long mask = 0;
	int ret = 0;

	mylisterner_t * l = (mylisterner_t *)param;
	assert(l);

	LOG_DEBUG(("listerner loop run"));

	while(l->bNotExit)
	{
		LOG_DEBUG(("listerner process begin\r\n"));

		//循环select fd集合timeout为定时器的最小超时
		if(0 == MyTimerHeapGetEarliestExpire(l->tmhp, &tv_earliest))
		{
			gettimeofday(&tv_now, NULL);
			if(timeval_smaller(tv_now, tv_earliest))
			{
				timeval_minus(tv_earliest, tv_now);
				ret = MyHandleSetSelect(l->handles, &tv_earliest);
			}
			else
			{
				ret = 0;
				memset(&tv_earliest, 0, sizeof(tv_earliest));
				LOG_INFO(("null loop"));
			}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -