⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketport.cpp

📁 贡献一份commoncpp2,有兴趣的可以研究一下
💻 CPP
📖 第 1 页 / 共 2 页
字号:
	HANDLE GetSync() const 	{ 		return sync; 	}	void update(unsigned char flag)	{		// FIXME: cancellation		WaitForSingleObject(semWrite,INFINITE);		this->flag = flag;		SetEvent(sync);	}	int getFlag()	{		int res = flag;		flag = -1;		if (res > 0)		{			ReleaseSemaphore(semWrite,1,NULL);			ResetEvent(sync);		}		return res;	}private:	HANDLE sync;	HANDLE semWrite;	int flag;};#endifSocketService::SocketService(int pri, size_t stack, const char *id) :Thread(pri, stack), Mutex(id){	first = last = NULL;	count = 0;#ifndef WIN32	FD_ZERO(&connect);	long opt;	::pipe(iosync);	hiwater = iosync[0] + 1;	FD_SET(iosync[0], &connect);	opt = fcntl(iosync[0], F_GETFL);	fcntl(iosync[0], F_SETFL, opt | O_NDELAY);	#else	sync = new Sync();#endif}SocketService::~SocketService(){	update(0);#ifdef WIN32	// FIXME: thread is finished ???	delete sync;#endif	terminate();	while(first)		delete first;}void SocketService::onUpdate(unsigned char buf){}void SocketService::onEvent(void){}void SocketService::onCallback(SocketPort *port){}void SocketService::attach(SocketPort *port){	enterMutex();#ifdef	USE_POLL	port->ufd = 0;#endif	if(last)		last->next = port;	port->prev = last;	last = port;#ifndef WIN32	FD_SET(port->so, &connect);	if(port->so >= hiwater)		hiwater = port->so + 1;#endif	port->service = this;	++count;	if(!first) first = port;		// start thread if necessary	if (count == 1)	{		if (!isRunning())		{			leaveMutex();			start();			return;		}	}	leaveMutex();	update();}void SocketService::detach(SocketPort *port){	enterMutex();#if !defined(USE_POLL) && !defined(WIN32)	FD_CLR(port->so, &connect);#endif	if(port->prev) {		port->prev->next = port->next;	} else {		first = port->next;	}		if(port->next) {		port->next->prev = port->prev;	} else {		last = port->prev;	}	port->service = NULL;		--count;	leaveMutex();	update();}void SocketService::update(unsigned char flag){#ifndef WIN32	::write(iosync[1], (char *)&flag, 1);#else	sync->update(flag);#endif}#define MUTEX_START { MutexLock _lock_(*this);#define MUTEX_END }void SocketService::run(void){	timeout_t timer, expires;	SocketPort *port;	unsigned char buf;#ifndef WIN32#ifdef 	USE_POLL		Poller			  mfd;	pollfd			* p_ufd;	int				  lastcount = 0;	// initialize ufd in all attached ports :	// probably don't need this but it can't hurt.	enterMutex();	port = first;	while(port)	{		port->ufd = 0;		port = port->next;	}	leaveMutex();	#else	struct timeval timeout, *tvp;	fd_set inp, out, err;	FD_ZERO(&inp);	FD_ZERO(&out);	FD_ZERO(&err);	int so;#endif#else // WIN32	int numHandle = 0;	HANDLE hv[MAXIMUM_WAIT_OBJECTS];#endif#ifdef WIN32	// FIXME: needed ?	ResetEvent(sync->GetSync());#endif	setCancel(cancelDeferred);	for(;;)	{		timer = TIMEOUT_INF;#ifndef WIN32		while(1 == ::read(iosync[0], (char *)&buf, 1))		{#else		for(;;)		{			int f = sync->getFlag();			if (f < 0)				break;			buf = f;#endif			if(buf)			{				onUpdate(buf);				continue;			}			setCancel(cancelImmediate);			sleep(TIMEOUT_INF);			exit();		}#ifndef WIN32#ifdef	USE_POLL		bool	reallocate = false;				MUTEX_START		onEvent();		port = first;		while(port)		{			onCallback(port);			if ( ( p_ufd = port->ufd ) ) {				if ( ( POLLHUP | POLLNVAL ) & p_ufd->revents ) {					// Avoid infinite loop from disconnected sockets					port->detect_disconnect = false;					p_ufd->events &= ~POLLHUP;					SocketPort* p = port;					port = port->next;					detach(p);					reallocate = true;					p->disconnect();					continue;				}					if ( ( POLLIN | POLLPRI ) & p_ufd->revents )					port->pending();					if ( POLLOUT & p_ufd->revents )					port->output();			} else {				reallocate = true;			}retry:			expires = port->getTimer();			if(expires > 0)				if(expires < timer)					timer = expires;			if(!expires)			{				port->endTimer();				port->expired();				goto retry;			}			port = port->next;		}		//		// reallocate things if we saw a ServerPort without		// ufd set !		if ( reallocate || ( ( count + 1 ) != lastcount ) ) {			lastcount = count + 1;			p_ufd = mfd.getList( count + 1 );				// Set up iosync polling			p_ufd->fd = iosync[0];			p_ufd->events = POLLIN | POLLHUP;			p_ufd ++;						port = first;			while(port)			{				p_ufd->fd = port->so;				p_ufd->events =					( port->detect_disconnect ? POLLHUP : 0 )					| ( port->detect_output ? POLLOUT : 0 )					| ( port->detect_pending ? POLLIN : 0 )				;				port->ufd = p_ufd;				p_ufd ++;				port = port->next;			}		}		MUTEX_END		poll( mfd.getList(), lastcount, timer );#else		MUTEX_START		onEvent();		port = first;		while(port)		{			onCallback(port);			so = port->so;			if(FD_ISSET(so, &err)) {				port->detect_disconnect = false;								SocketPort* p = port;				port = port->next;				p->disconnect();				continue;			}			if(FD_ISSET(so, &inp))				port->pending();			if(FD_ISSET(so, &out))				port->output();retry:			expires = port->getTimer();			if(expires > 0)				if(expires < timer)					timer = expires;			// if we expire, get new scheduling now			if(!expires)			{				port->endTimer();				port->expired();				goto retry;			}			port = port->next;		}		FD_ZERO(&inp);		FD_ZERO(&out);		FD_ZERO(&err);		FD_SET(iosync[0],&inp);		port = first;		while(port)		{			so = port->so;			if(port->detect_pending)				FD_SET(so, &inp);			if(port->detect_output)				FD_SET(so, &out);			if(port->detect_disconnect)				FD_SET(so, &err);			port = port->next;		}				MUTEX_END		if(timer == TIMEOUT_INF)			tvp = NULL;		else		{			tvp = &timeout;			timeout.tv_sec = timer / 1000;			timeout.tv_usec = (timer % 1000) * 1000;		}		select(hiwater, &inp, &out, &err, tvp);		#endif#else // WIN32		MUTEX_START		onEvent();		hv[0] = sync->GetSync();		numHandle = 1;		port = first;		while(port)		{			onCallback(port);			long events = 0;			if(port->detect_pending)				events |= FD_READ;			if(port->detect_output)				events |= FD_WRITE;			if(port->detect_disconnect)				events |= FD_CLOSE;			// !!! ignore some socket on overflow !!!			if (events && numHandle < MAXIMUM_WAIT_OBJECTS)			{				WSAEventSelect(port->so,port->event,events);				hv[numHandle++] = port->event;			}retry:			expires = port->getTimer();			if(expires > 0)				if(expires < timer)					timer = expires;			// if we expire, get new scheduling now			if(!expires)			{				port->endTimer();				port->expired();				goto retry;			}			port = port->next;		}				MUTEX_END		// FIXME: handle thread cancellation correctly		DWORD res = WaitForMultipleObjects(numHandle,hv,FALSE,timer);		switch (res)		{		case WAIT_OBJECT_0:			break;		case WAIT_TIMEOUT:			break;		default:			// FIXME: handle failures (detach SocketPort)			if (res >= WAIT_OBJECT_0+1 && res <= WAIT_OBJECT_0+MAXIMUM_WAIT_OBJECTS)			{				int curr = res - (WAIT_OBJECT_0);				WSANETWORKEVENTS events;				// search port				MUTEX_START				port = first;				while(port)				{					if (port->event == hv[curr])						break;					port = port->next;				}				MUTEX_END				// if port not found ignore				if (!port || port->event != hv[curr])					break;				WSAEnumNetworkEvents(port->so,port->event,&events);				if(events.lNetworkEvents & FD_CLOSE) 				{					port->detect_disconnect = false;					port->disconnect();					continue;				}				if(events.lNetworkEvents & FD_READ)					port->pending();				if(events.lNetworkEvents & FD_WRITE)					port->output();			}		}#endif	}}						#ifdef	CCXX_NAMESPACES}#endif			/** EMACS ** * Local variables: * mode: c++ * c-basic-offset: 8 * End: */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -