📄 poller.cpp
字号:
#include "poller.h"
#include "skfwd.h"
#include <assert.h>
#if !zPLAT_WIN
#include <sys/poll.h>
#endif
#if zPLAT_WIN
#define POLLIN 1
#define POLLOUT 2
#define POLLERR 4
struct pollfd{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
static void set_poll_events(struct pollfd* polls, size_t cnt, fd_set* pr, fd_set* pw, fd_set* pe)
{
size_t i;
if (pr){
for(i = 0 ; i < cnt ; ++i)
{
if (polls[i].fd >= 0 && FD_ISSET(polls[i].fd, pr)){
polls[i].revents |= POLLIN;
}
}
}
if (pw){
for(i = 0 ; i < cnt ; ++i)
{
if (polls[i].fd >= 0 && FD_ISSET(polls[i].fd, pw)){
polls[i].revents |= POLLOUT;
}
}
}
if (pe){
for(i = 0 ; i < cnt ; ++i)
{
if (polls[i].fd >= 0 && FD_ISSET(polls[i].fd, pe)){
polls[i].revents |= POLLERR;
}
}
}
}
static int poll(struct pollfd* polls, size_t cnt, int timeout)
{
fd_set rset, wset, eset;
fd_set* pr = 0;
fd_set* pw = 0;
fd_set* pe = 0;
FD_ZERO(&rset);
FD_ZERO(&wset);
FD_ZERO(&eset);
for(size_t i = 0 ; i < cnt ; ++i)
{
int fd = polls[i].fd;
int events = polls[i].events;
if (fd >= 0){
if (events & POLLIN){
FD_SET(fd, &rset);
pr = &rset;
}
if (events & POLLOUT){
FD_SET(fd, &wset);
pw = &wset;
}
if (events & POLLERR){
FD_SET(fd, &eset);
pe = &eset;
}
}
polls[i].revents = 0;
}
struct timeval tv;
if (timeout >= 0){
tv.tv_sec = timeout/1000;
tv.tv_usec = (timeout%1000)*1000;
}
int ret = select(0, pr, pw, pe, timeout >= 0 ? &tv : 0);
if (ret <= 0)
return ret;
set_poll_events(polls, cnt, pr, pw, pe);
return ret;
}
#endif
inline int fdevent2poll(int events){
int revents = 0;
if (events & FDEVENT_IN)
revents |= POLLIN;
if (events & FDEVENT_OUT)
revents |= POLLOUT;
if (events & FDEVENT_ERR)
revents |= POLLERR;
return revents;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// class poller
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HRET poller::create(size_t bufSize)
{
clear();
HRET hret = m_set.create(bufSize);
if (hret < 0)
return hret;
try{
m_polls = new pollfd[bufSize];
}catch(...){
m_polls = 0;
m_set.clear();
return -1;
}
for(size_t i = 0 ; i < bufSize ; ++i)
m_polls[i].fd = -1;
return 0;
}
/**
* @brief 调整poller的大小(扩大)
* @remark
* 如果失败,就保持原有内容不变
*/
HRET poller::expand(size_t bufSize)
{
size_t oldBufSize = m_set.capacity();
if (bufSize <= oldBufSize)
return 0;
pollfd* polls = 0;
try{
polls = new pollfd[bufSize];
}catch(...){
return -1;
}
HRET hret = m_set.expand(bufSize);
if (hret < 0){
delete[] polls;
return hret;
}
// 更新刚分配的这片数据区
if (oldBufSize > 0){
memcpy(polls, m_polls, sizeof(pollfd)*oldBufSize);
}
for(size_t i = oldBufSize ; i < bufSize ; ++i)
polls[i].fd = -1;
// 指向新的数据区
delete[] m_polls;
m_polls = polls;
return 0;
}
HRET poller::add(fdbase& fb, int events /* = POLLIN | POLLOUT */)
{
HRET hret = m_set.add(fb);
if (hret < 0)
return hret;
m_polls[hret].fd = fb.get_fd();
m_polls[hret].events = events;
m_polls[hret].revents = 0;
return hret;
}
HRET poller::remove(int idx)
{
assert(idx >= 0 && idx < (int)m_set.capacity());
HRET hret = m_set.remove(idx);
if (hret < 0)
return hret;
m_polls[idx].fd = -1;
return 0;
}
void poller::set_events(int idx, int events)
{
assert(idx >= 0 && idx < (int)m_set.capacity());
m_polls[idx].events = events;
}
void poller::clear()
{
m_set.clear();
delete[] m_polls;
m_polls = 0;
}
void poller::destroy()
{
m_set.destroy();
delete[] m_polls;
m_polls = 0;
}
void poller::dispatch_poll()
{
size_t bufSize = m_set.capacity();
for(size_t i = 0 ; i < bufSize ; ++i)
{
if (m_polls[i].fd >= 0){
if (fdbase* p = m_set.get(i)){
int revents = m_polls[i].revents;
if ((revents & POLLIN) && p->do_read() < 0){
continue;
}
if ((revents & POLLOUT) && p->do_write() < 0){
continue;
}
if ((revents & POLLERR) && p->do_error() < 0){
continue;
}
}
}
}
}
HRET poller::wait(const struct timeval* ptv)
{
int timeout = -1;
if (ptv)
timeout = ptv->tv_sec*1000+(ptv->tv_usec+999)/1000;
int ret = poll(m_polls, m_set.capacity(), timeout);
if (ret > 0){
dispatch_poll();
}
return ret;
}
void poller::refresh()
{
size_t bufSize = m_set.capacity();
for(size_t i = 0 ; i < bufSize ; ++i)
{
if (m_set.get(i)){
int events = 0;
m_polls[i].fd = m_set.get(i)->get_events(events);
m_polls[i].events = events;
m_polls[i].revents = 0;
}
}
}
#if zDEBUG
void poller::print_freeze() const
{
size_t cnt = m_set.capacity();
for(size_t i = 0 ; i < cnt ; ++i)
{
if (m_polls[i].fd >= 0){
sockaddr_in addr;
socklen_t len = sizeof(addr);
int ret = ::getsockname(m_polls[i].fd, (sockaddr*)&addr, &len);
if (ret < 0){
DUMP3("error: poller find a freeze socket, index: %d, fd: %d, ptr: %p\n", i, m_polls[i].fd, m_set.get(i));
}
}
}
}
#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -