⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poller.cpp

📁 socket的事件分发模型
💻 CPP
字号:
#include "poller.h"
#include "skfwd.h"
#include <assert.h>
#if !zPLAT_WIN
#include <sys/poll.h>
#endif

#if zPLAT_WIN

#define POLLIN			1
#define POLLOUT			2
#define POLLERR			4

struct pollfd{
	int     fd;        /* file descriptor */
	short   events;    /* requested events */
	short   revents;   /* returned events */
};

static void set_poll_events(struct pollfd* polls, size_t cnt, fd_set* pr, fd_set* pw, fd_set* pe)
{
	size_t i;
	if (pr){
		for(i = 0 ; i < cnt ; ++i)
		{
			if (polls[i].fd >= 0 && FD_ISSET(polls[i].fd, pr)){
				polls[i].revents |= POLLIN;
			}
		}
	}
	if (pw){
		for(i = 0 ; i < cnt ; ++i)
		{
			if (polls[i].fd >= 0 && FD_ISSET(polls[i].fd, pw)){
				polls[i].revents |= POLLOUT;
			}
		}
	}
	if (pe){
		for(i = 0 ; i < cnt ; ++i)
		{
			if (polls[i].fd >= 0 && FD_ISSET(polls[i].fd, pe)){
				polls[i].revents |= POLLERR;
			}
		}
	}
}

static int poll(struct pollfd* polls, size_t cnt, int timeout)
{
	fd_set rset, wset, eset;
	fd_set* pr = 0;
	fd_set* pw = 0;
	fd_set* pe = 0;

	FD_ZERO(&rset);
	FD_ZERO(&wset);
	FD_ZERO(&eset);
	
	for(size_t i = 0 ; i < cnt ; ++i)
	{
		int fd = polls[i].fd;
		int events = polls[i].events;
		if (fd >= 0){
			if (events & POLLIN){
				FD_SET(fd, &rset);
				pr = &rset;
			}
			if (events & POLLOUT){
				FD_SET(fd, &wset);
				pw = &wset;
			}
			if (events & POLLERR){
				FD_SET(fd, &eset);
				pe = &eset;
			}
		}
		polls[i].revents = 0;
	}
	struct timeval tv;
	if (timeout >= 0){
		tv.tv_sec = timeout/1000;
		tv.tv_usec = (timeout%1000)*1000;
	}
	int ret = select(0, pr, pw, pe, timeout >= 0 ? &tv : 0);
	if (ret <= 0)
		return ret;
	set_poll_events(polls, cnt, pr, pw, pe);
	return ret;
}
#endif

inline int fdevent2poll(int events){
	int revents = 0;
	if (events & FDEVENT_IN)
		revents |= POLLIN;
	if (events & FDEVENT_OUT)
		revents |= POLLOUT;
	if (events & FDEVENT_ERR)
		revents |= POLLERR;
	return revents;
}

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//	class poller
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HRET poller::create(size_t bufSize)
{
	clear();
	
	HRET hret = m_set.create(bufSize);
	if (hret < 0)
		return hret;
	try{
		m_polls = new pollfd[bufSize];
	}catch(...){
		m_polls = 0;
		m_set.clear();
		return -1;
	}
	for(size_t i = 0 ; i < bufSize ; ++i)
		m_polls[i].fd = -1;
	return 0;
}
/**
 *	@brief	调整poller的大小(扩大)
 *	@remark	
 *		如果失败,就保持原有内容不变
 */
HRET poller::expand(size_t bufSize)
{
	size_t oldBufSize = m_set.capacity();
	if (bufSize <= oldBufSize)
		return 0;
	pollfd* polls = 0;
	try{
		polls = new pollfd[bufSize];
	}catch(...){
		return -1;
	}
	HRET hret = m_set.expand(bufSize);
	if (hret < 0){
		delete[] polls;
		return hret;
	}
	//	更新刚分配的这片数据区
	if (oldBufSize > 0){
		memcpy(polls, m_polls, sizeof(pollfd)*oldBufSize);
	}
	for(size_t i = oldBufSize ; i < bufSize ; ++i)
		polls[i].fd = -1;
	//	指向新的数据区
	delete[] m_polls;
	m_polls = polls;
	return 0;
}

HRET poller::add(fdbase& fb, int events /* = POLLIN | POLLOUT */)
{
	HRET hret = m_set.add(fb);
	if (hret < 0)
		return hret;
	m_polls[hret].fd = fb.get_fd();
	m_polls[hret].events = events;
	m_polls[hret].revents = 0;
	return hret;
}

HRET poller::remove(int idx)
{
	assert(idx >= 0 && idx < (int)m_set.capacity());

	HRET hret = m_set.remove(idx);
	if (hret < 0)
		return hret;
	m_polls[idx].fd = -1;
	return 0;
}

void poller::set_events(int idx, int events)
{
	assert(idx >= 0 && idx < (int)m_set.capacity());
	m_polls[idx].events = events;
}

void poller::clear()
{
	m_set.clear();

	delete[] m_polls;
	m_polls = 0;
}

void poller::destroy()
{
	m_set.destroy();
	delete[] m_polls;
	m_polls = 0;
}

void poller::dispatch_poll()
{
	size_t bufSize = m_set.capacity();
	for(size_t i = 0 ; i < bufSize ; ++i)
	{
		if (m_polls[i].fd >= 0){
			if (fdbase* p = m_set.get(i)){
				int revents = m_polls[i].revents;
				if ((revents & POLLIN) && p->do_read() < 0){
					continue;
				}
				if ((revents & POLLOUT) && p->do_write() < 0){
					continue;
				}
				if ((revents & POLLERR) && p->do_error() < 0){
					continue;
				}
			}
		}
	}
}


HRET poller::wait(const struct timeval* ptv)
{
	int timeout = -1;
	if (ptv)
		timeout = ptv->tv_sec*1000+(ptv->tv_usec+999)/1000;
	int ret = poll(m_polls, m_set.capacity(), timeout);
	if (ret > 0){
		dispatch_poll();
	}
	return ret;
}


void poller::refresh()
{
	size_t bufSize = m_set.capacity();
	for(size_t i = 0 ; i < bufSize ; ++i)
	{
		if (m_set.get(i)){
			int events = 0;
			m_polls[i].fd = m_set.get(i)->get_events(events);
			m_polls[i].events = events;
			m_polls[i].revents = 0;
		}
	}
}

#if zDEBUG

void poller::print_freeze() const
{
	size_t cnt = m_set.capacity();
	for(size_t i = 0 ; i < cnt ; ++i)
	{
		if (m_polls[i].fd >= 0){
			sockaddr_in addr;
			socklen_t len = sizeof(addr);
			int ret = ::getsockname(m_polls[i].fd, (sockaddr*)&addr, &len);
			if (ret < 0){
				DUMP3("error: poller find a freeze socket, index: %d, fd: %d, ptr: %p\n", i, m_polls[i].fd, m_set.get(i));
			}
		}		
	}
}

#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -