⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 speventcb.cpp

📁 spserver 是一个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架
💻 CPP
📖 第 1 页 / 共 2 页
字号:
void SP_EventCallback :: onResponse( void * queueData, void * arg ){	SP_Response * response = (SP_Response*)queueData;	SP_EventArg * eventArg = (SP_EventArg*)arg;	SP_SessionManager * manager = eventArg->getSessionManager();	SP_Sid_t fromSid = response->getFromSid();	u_int16_t seq = 0;	if( ! SP_EventHelper::isSystemSid( &fromSid ) ) {		SP_Session * session = manager->get( fromSid.mKey, &seq );		if( seq == fromSid.mSeq && NULL != session ) {			if( SP_Session::eWouldExit == session->getStatus() ) {				session->setStatus( SP_Session::eExit );			}			if( SP_Session::eNormal != session->getStatus() ) {				event_del( session->getReadEvent() );			}			// always add a write event for sender, 			// so the pending input can be processed in onWrite			addEvent( session, EV_WRITE, -1 );			addEvent( session, EV_READ, -1 );		} else {			syslog( LOG_WARNING, "session(%d.%d) invalid, unknown FROM",					fromSid.mKey, fromSid.mSeq );		}	}	for( SP_Message * msg = response->takeMessage();			NULL != msg; msg = response->takeMessage() ) {		SP_SidList * sidList = msg->getToList();		if( msg->getTotalSize() > 0 ) {			for( int i = sidList->getCount() - 1; i >= 0; i-- ) {				SP_Sid_t sid = sidList->get( i );				SP_Session * session = manager->get( sid.mKey, &seq );				if( seq == sid.mSeq && NULL != session ) {					if( 0 != memcmp( &fromSid, &sid, sizeof( sid ) )							&& SP_Session::eExit == session->getStatus() ) {						sidList->take( i );						msg->getFailure()->add( sid );						syslog( LOG_WARNING, "session(%d.%d) would exit, invalid TO", sid.mKey, sid.mSeq );					} else {						session->getOutList()->append( msg );						addEvent( session, EV_WRITE, -1 );					}				} else {					sidList->take( i );					msg->getFailure()->add( sid );					syslog( LOG_WARNING, "session(%d.%d) invalid, unknown TO", sid.mKey, sid.mSeq );				}			}		} else {			for( ; sidList->getCount() > 0; ) {				msg->getFailure()->add( sidList->take( SP_ArrayList::LAST_INDEX ) );			}		}		if( msg->getToList()->getCount() <= 0 ) {			SP_EventHelper::doCompletion( eventArg, msg );		}	}	delete response;}void SP_EventCallback :: addEvent( SP_Session * session, short events, int fd ){	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	if( ( events & EV_WRITE ) && 0 == session->getWriting() ) {		session->setWriting( 1 );		if( fd < 0 ) fd = EVENT_FD( session->getWriteEvent() );		event_set( session->getWriteEvent(), fd, events, onWrite, session );		event_base_set( eventArg->getEventBase(), session->getWriteEvent() );		struct timeval timeout;		memset( &timeout, 0, sizeof( timeout ) );		timeout.tv_sec = eventArg->getTimeout();		event_add( session->getWriteEvent(), &timeout );	}	if( events & EV_READ && 0 == session->getReading() ) {		session->setReading( 1 );		if( fd < 0 ) fd = EVENT_FD( session->getWriteEvent() );		event_set( session->getReadEvent(), fd, events, onRead, session );		event_base_set( eventArg->getEventBase(), session->getReadEvent() );		struct timeval timeout;		memset( &timeout, 0, sizeof( timeout ) );		timeout.tv_sec = eventArg->getTimeout();		event_add( session->getReadEvent(), &timeout );	}}//-------------------------------------------------------------------int SP_EventHelper :: isSystemSid( SP_Sid_t * sid ){	return sid->mKey == SP_Sid_t::eTimerKey && sid->mSeq == SP_Sid_t::eTimerSeq;}void SP_EventHelper :: doWork( SP_Session * session ){	if( SP_Session::eNormal == session->getStatus() ) {		session->setRunning( 1 );		SP_EventArg * eventArg = (SP_EventArg*)session->getArg();		eventArg->getInputResultQueue()->push( new SP_SimpleTask( worker, session, 1 ) );	} else {		SP_Sid_t sid = session->getSid();		char buffer[ 16 ] = { 0 };		session->getInBuffer()->take( buffer, sizeof( buffer ) );		syslog( LOG_WARNING, "session(%d.%d) status is %d, ignore [%s...] (%dB)",			sid.mKey, sid.mSeq, session->getStatus(), buffer, session->getInBuffer()->getSize() );		session->getInBuffer()->reset();	}}void SP_EventHelper :: worker( void * arg ){	SP_Session * session = (SP_Session*)arg;	SP_Handler * handler = session->getHandler();	SP_EventArg * eventArg = (SP_EventArg *)session->getArg();	SP_Response * response = new SP_Response( session->getSid() );	if( 0 != handler->handle( session->getRequest(), response ) ) {		session->setStatus( SP_Session::eWouldExit );	}	session->setRunning( 0 );	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );}void SP_EventHelper :: doError( SP_Session * session ){	SP_EventArg * eventArg = (SP_EventArg *)session->getArg();	event_del( session->getWriteEvent() );	event_del( session->getReadEvent() );	SP_Sid_t sid = session->getSid();	SP_ArrayList * outList = session->getOutList();	for( ; outList->getCount() > 0; ) {		SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX );		int index = msg->getToList()->find( sid );		if( index >= 0 ) msg->getToList()->take( index );		msg->getFailure()->add( sid );		if( msg->getToList()->getCount() <= 0 ) {			doCompletion( eventArg, msg );		}	}	// remove session from SessionManager, onResponse will ignore this session	eventArg->getSessionManager()->remove( sid.mKey );	eventArg->getInputResultQueue()->push( new SP_SimpleTask( error, session, 1 ) );}void SP_EventHelper :: error( void * arg ){	SP_Session * session = ( SP_Session * )arg;	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	SP_Sid_t sid = session->getSid();	SP_Response * response = new SP_Response( sid );	session->getHandler()->error( response );	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );	// onResponse will ignore this session, so it's safe to destroy session here	session->getHandler()->close();	close( EVENT_FD( session->getWriteEvent() ) );	delete session;	syslog( LOG_WARNING, "session(%d.%d) error, exit", sid.mKey, sid.mSeq );}void SP_EventHelper :: doTimeout( SP_Session * session ){	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	event_del( session->getWriteEvent() );	event_del( session->getReadEvent() );	SP_Sid_t sid = session->getSid();	SP_ArrayList * outList = session->getOutList();	for( ; outList->getCount() > 0; ) {		SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX );		int index = msg->getToList()->find( sid );		if( index >= 0 ) msg->getToList()->take( index );		msg->getFailure()->add( sid );		if( msg->getToList()->getCount() <= 0 ) {			doCompletion( eventArg, msg );		}	}	// remove session from SessionManager, onResponse will ignore this session	eventArg->getSessionManager()->remove( sid.mKey );	eventArg->getInputResultQueue()->push( new SP_SimpleTask( timeout, session, 1 ) );}void SP_EventHelper :: timeout( void * arg ){	SP_Session * session = ( SP_Session * )arg;	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	SP_Sid_t sid = session->getSid();	SP_Response * response = new SP_Response( sid );	session->getHandler()->timeout( response );	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );	// onResponse will ignore this session, so it's safe to destroy session here	session->getHandler()->close();	close( EVENT_FD( session->getWriteEvent() ) );	delete session;	syslog( LOG_WARNING, "session(%d.%d) timeout, exit", sid.mKey, sid.mSeq );}void SP_EventHelper :: doStart( SP_Session * session ){	session->setRunning( 1 );	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	eventArg->getInputResultQueue()->push( new SP_SimpleTask( start, session, 1 ) );}void SP_EventHelper :: start( void * arg ){	SP_Session * session = ( SP_Session * )arg;	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	SP_IOChannel * ioChannel = session->getIOChannel();	int initRet = ioChannel->init( EVENT_FD( session->getWriteEvent() ) );	// always call SP_Handler::start	SP_Response * response = new SP_Response( session->getSid() );	int startRet = session->getHandler()->start( session->getRequest(), response );	int status = SP_Session::eWouldExit;	if( 0 == initRet ) {		if( 0 == startRet ) status = SP_Session::eNormal;	} else {		delete response;		// make an empty response		response = new SP_Response( session->getSid() );	}	session->setStatus( status );	session->setRunning( 0 );	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );}void SP_EventHelper :: doCompletion( SP_EventArg * eventArg, SP_Message * msg ){	eventArg->getOutputResultQueue()->push( msg );}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -