⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mylisterner_win.c

📁 sourceforge历史版本完整下载: http://sourceforge.net/project/showfiles.php?group_id=202044 提供了基于b树索引算法的文件数据数据
💻 C
📖 第 1 页 / 共 2 页
字号:
			listerner_process_timeout(l);
			continue;
		}

		/* 取出发生事件的句柄,根据触发事件源进行处理 */
		hsrc = l->harray[ret];

		if(hsrc == (HANDLE)MyEventWin32GetHandle(l->promq_notifier))
		{
			/* 处理带优先级的消息队列的消息 */
			listerner_process_promsg(l);
		}
		else if(hsrc == (HANDLE)MyEventWin32GetHandle(l->mq_notifier))
		{
			/* 处理不带优先级的消息队列的消息 */
			listerner_process_msg(l);
		}
		else if(hsrc == (HANDLE)MyEventWin32GetHandle(l->handleset_notifier))
		{
			LOG_DEBUG(("fdset event"));
		}
		else if(hsrc == (HANDLE)MyEventWin32GetHandle(l->tmhp_notifier))
		{
			LOG_DEBUG(("timer event"));
		}
		else
		{
			/* 处理句柄调度表里的句柄消息 */
			listerner_dispatch_handles(l, hsrc);
		}
	}

	return NULL;
}

/**
 * @brief 句柄回调表的哈希函数
 */
static size_t handle_table_hashfun(const void * key)
{
	return (size_t)key;
}

/**
 * @brief 句柄回调表的哈希值比较函数
 */
static int handle_table_equalfun(const void * key1, const void * key2)
{
	return (key1 == key2);
}


/**
 * @brief 构造监听线程
 */
HMYLISTERNER MyListernerConstruct(HMYMEMPOOL hm, size_t max_msg_count)
{
	mylisterner_t * l = MyMemPoolMalloc(hm, sizeof(*l));
	assert(l);

	l->hm = hm;
	l->bNotExit = 1;

	/* 定时器 */
	l->tmhp = MyTimerHeapConstruct(hm);
	l->tmhp_notifier = MyEventConstruct(hm);
	assert(l->tmhp_notifier && l->tmhp);

	/* 句柄数组的保护锁 */
	l->handles_protect = MyMutexConstruct(hm);
	assert(l->handles_protect);
	l->handleset_notifier = MyEventConstruct(hm);
	assert(l->handleset_notifier);
	l->handles_dispatch_table = MyHashMapConstruct(hm,
		handle_table_hashfun, handle_table_equalfun, 0, NULL, NULL);
	assert(l->handles_dispatch_table);
	l->hcount = 0;

	/* 创建消息队列,以及消息队列的通知信号 */
	l->mq = MyMsgQueConstruct(hm, max_msg_count);
	l->mq_notifier = MyEventConstruct(hm);
	assert(l->mq && l->mq_notifier);

	/* 创建优先级消息队列,以及消息队列的通知信号 */
	l->promq = MyProrityMsgQueConstruct(hm, max_msg_count);
	l->promq_notifier = MyEventConstruct(hm);
	assert(l->promq && l->promq_notifier);

	/* 监听线程 */
	l->thr = MyThreadConstruct(listerner_thread_fun, l, 1, hm);
	assert(l->thr);

	listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->promq_notifier), NULL);
	listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->mq_notifier), NULL);
	listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->tmhp_notifier), NULL);
	listerner_add_handle(l, (HANDLE)MyEventWin32GetHandle(l->handleset_notifier), NULL);

	return l;
}

/**
 * @brief 析构监听线程
 */
void MyListernerDestruct(HMYLISTERNER l)
{
	unsigned int i = 0;
	evt_tag et = {0};

	assert(l);
	
	l->bNotExit = 0;
	MyEventSetSignaled(l->handleset_notifier);
	MyThreadJoin(l->thr);

	/* 取消对所有句柄的select */
	for(i = 0; i < l->hcount; i ++)
	{
		listerner_query_handle(l, l->harray[i], &et);
		WSAEventSelect(et.fd, l->harray[i], 0);
	}
	
	if(l->thr)
		MyThreadDestruct(l->thr);
	l->thr = NULL;
		
	if(l->mq)
		MyMsgQueDestruct(l->mq);
	l->mq = NULL;

	if(l->mq_notifier)
		MyEventDestruct(l->mq_notifier);
	l->mq_notifier = NULL;

	if(l->promq)
		MyProrityMsgQueDestruct(l->promq);
	l->promq = NULL;

	if(l->promq_notifier)
		MyEventDestruct(l->promq_notifier);
	l->promq_notifier = NULL;

	if(l->tmhp)
		MyTimerHeapDestruct(l->tmhp);
	l->tmhp = NULL;

	if(l->tmhp_notifier)
		MyEventDestruct(l->tmhp_notifier);
	l->tmhp_notifier = NULL;
	
	//加锁
	MyMutexLock(l->handles_protect);

	if(l->handles_dispatch_table)
		MyHashMapDestruct(l->handles_dispatch_table);
	l->handles_dispatch_table = NULL;

	//解锁
	MyMutexUnLock(l->handles_protect);

	if(l->handles_protect)
		MyMutexDestruct(l->handles_protect);
	l->handles_protect = NULL;

	if(l->handleset_notifier)
		MyEventDestruct(l->handleset_notifier);
	l->handleset_notifier = NULL;

	MyMemPoolFree(l->hm, l);
}

/**
 * @brief 运行监听线程
 */
void MyListernerRun(HMYLISTERNER hlisterner)
{
	if(NULL == hlisterner)
		return;

	assert(hlisterner->thr);
	MyThreadRun(hlisterner->thr);
}

/**
 * @brief 等待listern线程退出
 */
void MyListernerWait(HMYLISTERNER hlisterner)
{
	if(NULL == hlisterner)
		return;

	assert(hlisterner->thr);
	MyThreadJoin(hlisterner->thr);
}

/**
 * @brief 添加定时器
 */
HTIMERID MyListernerAddTimer(HMYLISTERNER l, mytimer_node_t * node)
{
	HTIMERID timer_id = NULL;
	if(NULL == l || NULL == node || NULL == l->tmhp)
		return NULL;

	LOG_DEBUG(("listerner add timer sec:%d usec:%d", node->first_expire.tv_sec, node->first_expire.tv_usec));

	gettimeofday(&node->abs_expire, NULL);
	timeval_add(node->abs_expire, node->first_expire);

	/* 先添加,再唤醒 */
	timer_id = MyTimerHeapAdd(l->tmhp, node);

	/* 如果发现 <新的超时> 比 <当前最短的超时> 还要短,表示需要唤醒定时器线程 */
	if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
	{
		MyEventSetSignaled(l->tmhp_notifier);

		LOG_DEBUG(("write timer-add notify, timer id:%x", timer_id));
	}

	return timer_id;
}

/**
 * @brief 删除定时器
 */
int MyListernerDelTimer(HMYLISTERNER l, HTIMERID timer_id)
{
	if(NULL == l || NULL == l->tmhp)
		return -1;

	return MyTimerHeapDel(l->tmhp, timer_id);
}

/**
 * @brief 重置定时器
 */
HTIMERID MyListernerResetTimer(HMYLISTERNER l, HTIMERID timer_id, mytimer_node_t * node)
{
	if(NULL == l || NULL == l->tmhp)
		return NULL;

	gettimeofday(&node->abs_expire, NULL);
	timeval_add(node->abs_expire, node->first_expire);

	timer_id = MyTimerHeapReset(l->tmhp, timer_id, node);

	if(NULL == timer_id)
		return NULL;

	/* 如果发现 <新的超时> 比 <当前最短的超时> 还要短,表示需要唤醒定时器线程 */
	if(MyTimeHeapGetEarliestKey(l->tmhp) == timer_id)
	{
		MyEventSetSignaled(l->tmhp_notifier);

		LOG_DEBUG(("write timer-add reset notify, timer id:%x", timer_id));
	}

	return timer_id;
}

/**
 * @brief 添加文件扫描符
 */
int MyListernerAddFD(HMYLISTERNER hl, int fd, enum E_HANDLE_SET_MASK mask, event_handle_t * evt_handle)
{
	/*
	* 由于windows是用HANDLE来实现统一编程界面的
	* 这里让socket与一个handle关联
	*/

	WSAEVENT he = NULL;
	evt_tag et;

	if(NULL == hl || NULL == evt_handle || (int)INVALID_FD == fd)
		return -1;

	memcpy(&et.e, evt_handle, sizeof(et.e));
	et.mask = mask;
	et.fd = fd;

	he = WSACreateEvent();
	if(INVALID_HANDLE_VALUE == he)
		return -1;

	if(0 != WSAEventSelect(fd, he, FD_OOB | FD_READ | FD_WRITE | FD_ACCEPT | FD_CONNECT | FD_CLOSE))
		goto MyListernerAddFD_err_;

	listerner_add_handle(hl, he, &et);

	return 0;

MyListernerAddFD_err_:

	WSACloseEvent(he);

	return -1;
}

/**
 * @brief 删除文件扫描符
 */
int MyListernerDelFD(HMYLISTERNER hl, int fd)
{
	HANDLE h = INVALID_HANDLE_VALUE;

	if(NULL == hl || (int)INVALID_FD == fd)
		return -1;

	if(0 != listerner_query_handle_by_fd(hl, fd, NULL, &h))
		return -1;

	if(INVALID_HANDLE_VALUE == h)
		return -1;

	/* 取消监听 */
	if(0 != WSAEventSelect(fd, h, 0))
		return -1;

	/* 从回调表里删除相应的记录 */
	listerner_del_handle(hl, h);

	return 0;
}

/**
 * @brief 添加一条消息 
 */
int MyListernerAddMsg(HMYLISTERNER l,
					  const void * user_msg,
					  unsigned long context_data,
					  CB_LISTERNER_HANDLE_MSG handle)
{
	msg_tag * msg = NULL;

	if(NULL == l || NULL == l->mq)
		return -1;

	/* 添加消息 */
	msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
	if(NULL == msg)
		return -1;

	msg->context_data = context_data;
	msg->handle = handle;
	msg->msg = (void *)user_msg;

	MyMsgQuePush(l->mq, msg);

	/* 通知 */
	MyEventSetSignaled(l->mq_notifier);
	
	return 0;
}

/**
 * @brief 添加一条带优先级的消息
 */
int MyListernerAddProrityMsg(HMYLISTERNER l,
							 int prority,
							 const void * user_msg,
							 unsigned long context_data,
							 CB_LISTERNER_HANDLE_MSG handle)
{
	msg_tag * msg = NULL;
	int ret = 0;

	if(NULL == l || NULL == l->promq)
		return -1;

	/* 添加消息 */
	msg = (msg_tag *)MyMemPoolMalloc(l->hm, sizeof(*msg));
	if(NULL == msg)
		return -1;

	msg->context_data = context_data;
	msg->handle = handle;
	msg->msg = (void *)user_msg;

	ret = MyProrityMsgQuePush(l->promq, prority, msg);

	/* 通知 */
	if(0 == ret)
	{
		MyEventSetSignaled(l->promq_notifier);
	}
	else
		LOG_WARN(("add prority msg fail %d", ret));	
	
	return ret;
}




















⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -