⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mylisterner_linux.c

📁 提供了rbtree ttree avltree list hashtable等常用容器的算法,代码经过uclinux + arm44b0平台验证
💻 C
📖 第 1 页 / 共 2 页
字号:
		}
		else
			ret = MyHandleSetSelect(l->handles, NULL);

		//没有事件发生,只运行定时器回调
		if(0 == ret)
		{
			listerner_process_timeout(l);
			continue;
		}

		//优先处理优先级队列的消息
		if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->promq_notifier), E_FD_READ))
			listerner_process_promsg(l);

		//处理普通消息队列
		if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->mq_notifier), E_FD_READ))
			listerner_process_msg(l);
		
		//删除定时器发生的事件
		if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->tmhp_notifier), E_FD_READ))
		{
			//从管道里读出消息
			notify_tag t = {0};
			MyPipeRead(l->tmhp_notifier, &t, sizeof(t));
		}

		//删除由于添加io fd产生的事件
		if(E_FD_READ & MyHandleSetIsSet(l->handles, MyPipeGetReadFD(l->handles_notifier), E_FD_READ))
		{
			//从管道里读出消息
			notify_tag t = {0};
			MyPipeRead(l->handles_notifier, &t, sizeof(t));
		}

		//根据fd hash表进行相应的调度
		listerner_process_io(l);
	}
}

/*
*
* 监听线程函数
*
*/
static __INLINE__ void destroy(mylisterner_t * l)
{
	assert(l);
	
	l->bNotExit = 0;

	{
		notify_tag t = {0};
		MyPipeWrite(l->handles_notifier, &t, sizeof(t));
	}
	MyThreadJoin(l->thr);
	
	if(l->thr)
		MyThreadDestruct(l->thr);
	l->thr = NULL;
		
	if(l->handles)
		MyHandleSetDestruct(l->handles);
	l->handles = NULL;

	if(l->mq)
		MyMsgQueDestruct(l->mq);
	l->mq = NULL;

	if(l->mq_notifier)
		MyPipeDestruct(l->mq_notifier);
	l->mq_notifier = NULL;

	if(l->promq)
		MyProrityMsgQueDestruct(l->promq);
	l->promq = NULL;

	if(l->promq_notifier)
		MyPipeDestruct(l->promq_notifier);
	l->promq_notifier = NULL;

	if(l->tmhp)
		MyTimerHeapDestruct(l->tmhp);
	l->tmhp = NULL;

	if(l->tmhp_notifier)
		MyPipeDestruct(l->tmhp_notifier);
	l->tmhp_notifier = NULL;

	
	//加锁
	MyMutexLock(l->handles_protect);

	if(l->handles_dispatch_table)
		MyHashMapDestruct(l->handles_dispatch_table);
	l->handles_dispatch_table = NULL;

	//解锁
	MyMutexUnLock(l->handles_protect);

	if(l->handles_protect)
		MyMutexDestruct(l->handles_protect);
	l->handles_protect = NULL;

	if(l->handles_notifier)
		MyPipeDestruct(l->handles_notifier);
	l->handles_notifier = NULL;

	MyMemPoolFree(l->hm, l);
}


/*
*
* 构造监听线程
*
*/
HMYLISTERNER MyListernerConstruct(HMYMEMPOOL hm, size_t max_msg_count)
{
	mylisterner_t * l = (mylisterner_t *)MyMemPoolMalloc(hm, sizeof(*l));
	if(NULL == l)
		return NULL;

	LOG_DEBUG(("create listerner"));

	l->hm = hm;
	l->bNotExit = 1;

	//定时器
	l->tmhp = MyTimerHeapConstruct(hm);
	l->tmhp_notifier = MyPipeConstruct(hm);

	//io句柄表 
	l->handles = MyHandleSetConstruct(hm);
	l->handles_notifier = MyPipeConstruct(hm);
	l->handles_dispatch_table = MyHashMapConstruct(hm,
		handle_table_hashfun, handle_table_equalfun, 0, NULL, NULL);
	l->handles_protect = MyMutexConstruct(hm);

	//普通消息队列
	l->mq = MyMsgQueConstruct(hm, max_msg_count);
	l->mq_notifier = MyPipeConstruct(hm);

	//优先级消息队列
	l->promq = MyProrityMsgQueConstruct(hm, max_msg_count);
	l->promq_notifier = MyPipeConstruct(hm);

	listerner_add_fd(l, MyPipeGetReadFD(l->promq_notifier), E_FD_READ, NULL);
	listerner_add_fd(l, MyPipeGetReadFD(l->mq_notifier), E_FD_READ, NULL);
	listerner_add_fd(l, MyPipeGetReadFD(l->handles_notifier), E_FD_READ, NULL);
	listerner_add_fd(l, MyPipeGetReadFD(l->tmhp_notifier), E_FD_READ, NULL);
	
	//监听线程
	l->thr = MyThreadConstruct(listerner_thread_fun, l, 1, hm);
	
	if(NULL == l->handles || NULL == l->handles_notifier || NULL == l->handles_dispatch_table || NULL == l->handles_protect ||
		NULL == l->mq || NULL == l->mq_notifier ||
		NULL == l->promq || NULL == l->promq_notifier ||
		NULL == l->tmhp || NULL == l->tmhp_notifier ||
		NULL == l->thr)
	{
		LOG_WARN(("fail create listerner"));
		destroy(l);
		return NULL;
	}

	return (HMYLISTERNER)l;
}

/*
*
* 析构监听线程
*
*/
void MyListernerDestruct(HMYLISTERNER hlisterner)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l)
		return;

	LOG_DEBUG(("destroy listerner"));

	destroy(l);
}

/*
*
* 运行监听线程
*
*/
void MyListernerRun(HMYLISTERNER hlisterner)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l)
		return;

	assert(l->thr);
	MyThreadRun(l->thr);
}

/*
*
* 等待listern线程退出
*
*/
void MyListernerWait(HMYLISTERNER hlisterner)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l)
		return;

	assert(l->thr);
	MyThreadJoin(l->thr);
}

/*
*
* 添加定时器
*
*/
HTIMERID MyListernerAddTimer(HMYLISTERNER hlisterner, mytimer_node_t * node)
{
	HTIMERID timer_id = NULL;
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == node || NULL == l->tmhp)
		return NULL;

	LOG_DEBUG(("listerner add timer sec:%d usec:%d", node->first_expire.tv_sec, node->first_expire.tv_usec));

	gettimeofday(&node->abs_expire, NULL);
	timeval_add(node->abs_expire, node->first_expire);

	//先添加,再唤醒
	timer_id = MyTimerHeapAdd(l->tmhp, node);

	if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
	{
		notify_tag t = {0};
		MyPipeWrite(l->tmhp_notifier, &t, sizeof(t));

		LOG_DEBUG(("write timer-add notify, timer id:%x", timer_id));
	}

	return timer_id;
}

/*
*
* 删除定时器
*
*/
int MyListernerDelTimer(HMYLISTERNER hlisterner, HTIMERID timer_id)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == l->tmhp)
		return -1;

	return MyTimerHeapDel(l->tmhp, timer_id);
}

/*
*
* 重置加定时器
*
*/
HTIMERID MyListernerResetTimer(HMYLISTERNER hlisterner, HTIMERID timer_id, mytimer_node_t * node)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == l->tmhp)
		return NULL;

	gettimeofday(&node->abs_expire, NULL);
	timeval_add(node->abs_expire, node->first_expire);

	timer_id = MyTimerHeapReset(l->tmhp, timer_id, node);

	if(NULL == timer_id)
		return NULL;

	//如果发现 <新的超时> 比 <当前最短的超时> 还要短,表示需要唤醒定时器线程
	if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
	{
		notify_tag t = {0};
		MyPipeWrite(l->tmhp_notifier, &t, sizeof(t));
	}

	return timer_id;
}

/*
*
* 添加文件扫描符
*
*/
int MyListernerAddFD(HMYLISTERNER hlisterner, int fd, enum E_HANDLE_SET_MASK mask, event_handle_t * evt_handle)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == l->handles_dispatch_table || NULL == evt_handle || NULL == l->handles)
		return -1;

	LOG_DEBUG(("add fd:%d mask:%d", fd, mask));
	return listerner_add_fd(l, fd, mask, evt_handle);
}

/*
*
* 删除文件扫描符
*
*/
int MyListernerDelFD(HMYLISTERNER hlisterner, int fd)
{
	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == l->handles_dispatch_table || NULL == l->handles)
		return -1;

	LOG_DEBUG(("del fd:%d", fd));
	return listerner_del_fd(l, fd);
}


/*
*
* 添加一条消息 
*
*/
int MyListernerAddMsg(HMYLISTERNER hlisterner, 
							 const void * user_msg, 
							 unsigned long context_data,
							 CB_LISTERNER_HANDLE_MSG handle)
{
	msg_tag * msg = NULL;

	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == l->mq)
		return -1;

	//添加消息
	msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
	if(NULL == msg)
		return -1;

	msg->context_data = context_data;
	msg->handle = handle;
	msg->msg = (void *)user_msg;

	MyMsgQuePush(l->mq, msg);

	//通知
	{
		notify_tag t = {0};
		MyPipeWrite(l->mq_notifier, &t, sizeof(t));
	}
	
	return 0;
}

/*
*
* 添加一条带优先级的消息
*
*/
int MyListernerAddProrityMsg(HMYLISTERNER hlisterner, 
									int prority, 
									const void * user_msg, 
									unsigned long context_data,
									CB_LISTERNER_HANDLE_MSG handle)
{
	msg_tag * msg = NULL;
	int ret = 0;

	mylisterner_t * l = (mylisterner_t *)hlisterner;
	if(NULL == l || NULL == l->promq)
		return -1;

	//添加消息
	msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
	if(NULL == msg)
		return -1;

	msg->context_data = context_data;
	msg->handle = handle;
	msg->msg = (void *)user_msg;

	ret = MyProrityMsgQuePush(l->promq, prority, msg);

	//通知
	if(0 == ret)
	{
		notify_tag t = {0};
		MyPipeWrite(l->promq_notifier, &t, sizeof(t));
	}
	else
		LOG_WARN(("add prority msg fail %d", ret));	
	
	return ret;
}
















⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -