📄 socketport.cpp
字号:
HANDLE GetSync() const { return sync; } void update(unsigned char flag) { // FIXME: cancellation WaitForSingleObject(semWrite,INFINITE); this->flag = flag; SetEvent(sync); } int getFlag() { int res = flag; flag = -1; if (res > 0) { ReleaseSemaphore(semWrite,1,NULL); ResetEvent(sync); } return res; }private: HANDLE sync; HANDLE semWrite; int flag;};#endifSocketService::SocketService(int pri, size_t stack, const char *id) :Thread(pri, stack), Mutex(id){ first = last = NULL; count = 0;#ifndef WIN32 FD_ZERO(&connect); long opt; ::pipe(iosync); hiwater = iosync[0] + 1; FD_SET(iosync[0], &connect); opt = fcntl(iosync[0], F_GETFL); fcntl(iosync[0], F_SETFL, opt | O_NDELAY); #else sync = new Sync();#endif}SocketService::~SocketService(){ update(0);#ifdef WIN32 // FIXME: thread is finished ??? delete sync;#endif terminate(); while(first) delete first;}void SocketService::onUpdate(unsigned char buf){}void SocketService::onEvent(void){}void SocketService::onCallback(SocketPort *port){}void SocketService::attach(SocketPort *port){ enterMutex();#ifdef USE_POLL port->ufd = 0;#endif if(last) last->next = port; port->prev = last; last = port;#ifndef WIN32 FD_SET(port->so, &connect); if(port->so >= hiwater) hiwater = port->so + 1;#endif port->service = this; ++count; if(!first) first = port; // start thread if necessary if (count == 1) { if (!isRunning()) { leaveMutex(); start(); return; } } leaveMutex(); update();}void SocketService::detach(SocketPort *port){ enterMutex();#if !defined(USE_POLL) && !defined(WIN32) FD_CLR(port->so, &connect);#endif if(port->prev) { port->prev->next = port->next; } else { first = port->next; } if(port->next) { port->next->prev = port->prev; } else { last = port->prev; } port->service = NULL; --count; leaveMutex(); update();}void SocketService::update(unsigned char flag){#ifndef WIN32 ::write(iosync[1], (char *)&flag, 1);#else sync->update(flag);#endif}#define MUTEX_START { MutexLock _lock_(*this);#define MUTEX_END }void SocketService::run(void){ timeout_t timer, expires; SocketPort *port; unsigned char buf;#ifndef WIN32#ifdef USE_POLL Poller mfd; pollfd * p_ufd; int lastcount = 0; // initialize ufd in all attached ports : // probably don't need this but it can't hurt. enterMutex(); port = first; while(port) { port->ufd = 0; port = port->next; } leaveMutex(); #else struct timeval timeout, *tvp; fd_set inp, out, err; FD_ZERO(&inp); FD_ZERO(&out); FD_ZERO(&err); int so;#endif#else // WIN32 int numHandle = 0; HANDLE hv[MAXIMUM_WAIT_OBJECTS];#endif#ifdef WIN32 // FIXME: needed ? ResetEvent(sync->GetSync());#endif setCancel(cancelDeferred); for(;;) { timer = TIMEOUT_INF;#ifndef WIN32 while(1 == ::read(iosync[0], (char *)&buf, 1)) {#else for(;;) { int f = sync->getFlag(); if (f < 0) break; buf = f;#endif if(buf) { onUpdate(buf); continue; } setCancel(cancelImmediate); sleep(TIMEOUT_INF); exit(); }#ifndef WIN32#ifdef USE_POLL bool reallocate = false; MUTEX_START onEvent(); port = first; while(port) { onCallback(port); if ( ( p_ufd = port->ufd ) ) { if ( ( POLLHUP | POLLNVAL ) & p_ufd->revents ) { // Avoid infinite loop from disconnected sockets port->detect_disconnect = false; p_ufd->events &= ~POLLHUP; SocketPort* p = port; port = port->next; detach(p); reallocate = true; p->disconnect(); continue; } if ( ( POLLIN | POLLPRI ) & p_ufd->revents ) port->pending(); if ( POLLOUT & p_ufd->revents ) port->output(); } else { reallocate = true; }retry: expires = port->getTimer(); if(expires > 0) if(expires < timer) timer = expires; if(!expires) { port->endTimer(); port->expired(); goto retry; } port = port->next; } // // reallocate things if we saw a ServerPort without // ufd set ! if ( reallocate || ( ( count + 1 ) != lastcount ) ) { lastcount = count + 1; p_ufd = mfd.getList( count + 1 ); // Set up iosync polling p_ufd->fd = iosync[0]; p_ufd->events = POLLIN | POLLHUP; p_ufd ++; port = first; while(port) { p_ufd->fd = port->so; p_ufd->events = ( port->detect_disconnect ? POLLHUP : 0 ) | ( port->detect_output ? POLLOUT : 0 ) | ( port->detect_pending ? POLLIN : 0 ) ; port->ufd = p_ufd; p_ufd ++; port = port->next; } } MUTEX_END poll( mfd.getList(), lastcount, timer );#else MUTEX_START onEvent(); port = first; while(port) { onCallback(port); so = port->so; if(FD_ISSET(so, &err)) { port->detect_disconnect = false; SocketPort* p = port; port = port->next; p->disconnect(); continue; } if(FD_ISSET(so, &inp)) port->pending(); if(FD_ISSET(so, &out)) port->output();retry: expires = port->getTimer(); if(expires > 0) if(expires < timer) timer = expires; // if we expire, get new scheduling now if(!expires) { port->endTimer(); port->expired(); goto retry; } port = port->next; } FD_ZERO(&inp); FD_ZERO(&out); FD_ZERO(&err); FD_SET(iosync[0],&inp); port = first; while(port) { so = port->so; if(port->detect_pending) FD_SET(so, &inp); if(port->detect_output) FD_SET(so, &out); if(port->detect_disconnect) FD_SET(so, &err); port = port->next; } MUTEX_END if(timer == TIMEOUT_INF) tvp = NULL; else { tvp = &timeout; timeout.tv_sec = timer / 1000; timeout.tv_usec = (timer % 1000) * 1000; } select(hiwater, &inp, &out, &err, tvp); #endif#else // WIN32 MUTEX_START onEvent(); hv[0] = sync->GetSync(); numHandle = 1; port = first; while(port) { onCallback(port); long events = 0; if(port->detect_pending) events |= FD_READ; if(port->detect_output) events |= FD_WRITE; if(port->detect_disconnect) events |= FD_CLOSE; // !!! ignore some socket on overflow !!! if (events && numHandle < MAXIMUM_WAIT_OBJECTS) { WSAEventSelect(port->so,port->event,events); hv[numHandle++] = port->event; }retry: expires = port->getTimer(); if(expires > 0) if(expires < timer) timer = expires; // if we expire, get new scheduling now if(!expires) { port->endTimer(); port->expired(); goto retry; } port = port->next; } MUTEX_END // FIXME: handle thread cancellation correctly DWORD res = WaitForMultipleObjects(numHandle,hv,FALSE,timer); switch (res) { case WAIT_OBJECT_0: break; case WAIT_TIMEOUT: break; default: // FIXME: handle failures (detach SocketPort) if (res >= WAIT_OBJECT_0+1 && res <= WAIT_OBJECT_0+MAXIMUM_WAIT_OBJECTS) { int curr = res - (WAIT_OBJECT_0); WSANETWORKEVENTS events; // search port MUTEX_START port = first; while(port) { if (port->event == hv[curr]) break; port = port->next; } MUTEX_END // if port not found ignore if (!port || port->event != hv[curr]) break; WSAEnumNetworkEvents(port->so,port->event,&events); if(events.lNetworkEvents & FD_CLOSE) { port->detect_disconnect = false; port->disconnect(); continue; } if(events.lNetworkEvents & FD_READ) port->pending(); if(events.lNetworkEvents & FD_WRITE) port->output(); } }#endif }} #ifdef CCXX_NAMESPACES}#endif /** EMACS ** * Local variables: * mode: c++ * c-basic-offset: 8 * End: */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -