📄 speventcb.cpp
字号:
void SP_EventCallback :: onResponse( void * queueData, void * arg ){ SP_Response * response = (SP_Response*)queueData; SP_EventArg * eventArg = (SP_EventArg*)arg; SP_SessionManager * manager = eventArg->getSessionManager(); SP_Sid_t fromSid = response->getFromSid(); u_int16_t seq = 0; if( ! SP_EventHelper::isSystemSid( &fromSid ) ) { SP_Session * session = manager->get( fromSid.mKey, &seq ); if( seq == fromSid.mSeq && NULL != session ) { if( SP_Session::eWouldExit == session->getStatus() ) { session->setStatus( SP_Session::eExit ); } if( SP_Session::eNormal != session->getStatus() ) { event_del( session->getReadEvent() ); } // always add a write event for sender, // so the pending input can be processed in onWrite addEvent( session, EV_WRITE, -1 ); addEvent( session, EV_READ, -1 ); } else { syslog( LOG_WARNING, "session(%d.%d) invalid, unknown FROM", fromSid.mKey, fromSid.mSeq ); } } for( SP_Message * msg = response->takeMessage(); NULL != msg; msg = response->takeMessage() ) { SP_SidList * sidList = msg->getToList(); if( msg->getTotalSize() > 0 ) { for( int i = sidList->getCount() - 1; i >= 0; i-- ) { SP_Sid_t sid = sidList->get( i ); SP_Session * session = manager->get( sid.mKey, &seq ); if( seq == sid.mSeq && NULL != session ) { if( 0 != memcmp( &fromSid, &sid, sizeof( sid ) ) && SP_Session::eExit == session->getStatus() ) { sidList->take( i ); msg->getFailure()->add( sid ); syslog( LOG_WARNING, "session(%d.%d) would exit, invalid TO", sid.mKey, sid.mSeq ); } else { session->getOutList()->append( msg ); addEvent( session, EV_WRITE, -1 ); } } else { sidList->take( i ); msg->getFailure()->add( sid ); syslog( LOG_WARNING, "session(%d.%d) invalid, unknown TO", sid.mKey, sid.mSeq ); } } } else { for( ; sidList->getCount() > 0; ) { msg->getFailure()->add( sidList->take( SP_ArrayList::LAST_INDEX ) ); } } if( msg->getToList()->getCount() <= 0 ) { SP_EventHelper::doCompletion( eventArg, msg ); } } delete response;}void SP_EventCallback :: addEvent( SP_Session * session, short events, int fd ){ SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); if( ( events & EV_WRITE ) && 0 == session->getWriting() ) { session->setWriting( 1 ); if( fd < 0 ) fd = EVENT_FD( session->getWriteEvent() ); event_set( session->getWriteEvent(), fd, events, onWrite, session ); event_base_set( eventArg->getEventBase(), session->getWriteEvent() ); struct timeval timeout; memset( &timeout, 0, sizeof( timeout ) ); timeout.tv_sec = eventArg->getTimeout(); event_add( session->getWriteEvent(), &timeout ); } if( events & EV_READ && 0 == session->getReading() ) { session->setReading( 1 ); if( fd < 0 ) fd = EVENT_FD( session->getWriteEvent() ); event_set( session->getReadEvent(), fd, events, onRead, session ); event_base_set( eventArg->getEventBase(), session->getReadEvent() ); struct timeval timeout; memset( &timeout, 0, sizeof( timeout ) ); timeout.tv_sec = eventArg->getTimeout(); event_add( session->getReadEvent(), &timeout ); }}//-------------------------------------------------------------------int SP_EventHelper :: isSystemSid( SP_Sid_t * sid ){ return sid->mKey == SP_Sid_t::eTimerKey && sid->mSeq == SP_Sid_t::eTimerSeq;}void SP_EventHelper :: doWork( SP_Session * session ){ if( SP_Session::eNormal == session->getStatus() ) { session->setRunning( 1 ); SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); eventArg->getInputResultQueue()->push( new SP_SimpleTask( worker, session, 1 ) ); } else { SP_Sid_t sid = session->getSid(); char buffer[ 16 ] = { 0 }; session->getInBuffer()->take( buffer, sizeof( buffer ) ); syslog( LOG_WARNING, "session(%d.%d) status is %d, ignore [%s...] (%dB)", sid.mKey, sid.mSeq, session->getStatus(), buffer, session->getInBuffer()->getSize() ); session->getInBuffer()->reset(); }}void SP_EventHelper :: worker( void * arg ){ SP_Session * session = (SP_Session*)arg; SP_Handler * handler = session->getHandler(); SP_EventArg * eventArg = (SP_EventArg *)session->getArg(); SP_Response * response = new SP_Response( session->getSid() ); if( 0 != handler->handle( session->getRequest(), response ) ) { session->setStatus( SP_Session::eWouldExit ); } session->setRunning( 0 ); msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );}void SP_EventHelper :: doError( SP_Session * session ){ SP_EventArg * eventArg = (SP_EventArg *)session->getArg(); event_del( session->getWriteEvent() ); event_del( session->getReadEvent() ); SP_Sid_t sid = session->getSid(); SP_ArrayList * outList = session->getOutList(); for( ; outList->getCount() > 0; ) { SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX ); int index = msg->getToList()->find( sid ); if( index >= 0 ) msg->getToList()->take( index ); msg->getFailure()->add( sid ); if( msg->getToList()->getCount() <= 0 ) { doCompletion( eventArg, msg ); } } // remove session from SessionManager, onResponse will ignore this session eventArg->getSessionManager()->remove( sid.mKey ); eventArg->getInputResultQueue()->push( new SP_SimpleTask( error, session, 1 ) );}void SP_EventHelper :: error( void * arg ){ SP_Session * session = ( SP_Session * )arg; SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); SP_Sid_t sid = session->getSid(); SP_Response * response = new SP_Response( sid ); session->getHandler()->error( response ); msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response ); // onResponse will ignore this session, so it's safe to destroy session here session->getHandler()->close(); close( EVENT_FD( session->getWriteEvent() ) ); delete session; syslog( LOG_WARNING, "session(%d.%d) error, exit", sid.mKey, sid.mSeq );}void SP_EventHelper :: doTimeout( SP_Session * session ){ SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); event_del( session->getWriteEvent() ); event_del( session->getReadEvent() ); SP_Sid_t sid = session->getSid(); SP_ArrayList * outList = session->getOutList(); for( ; outList->getCount() > 0; ) { SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX ); int index = msg->getToList()->find( sid ); if( index >= 0 ) msg->getToList()->take( index ); msg->getFailure()->add( sid ); if( msg->getToList()->getCount() <= 0 ) { doCompletion( eventArg, msg ); } } // remove session from SessionManager, onResponse will ignore this session eventArg->getSessionManager()->remove( sid.mKey ); eventArg->getInputResultQueue()->push( new SP_SimpleTask( timeout, session, 1 ) );}void SP_EventHelper :: timeout( void * arg ){ SP_Session * session = ( SP_Session * )arg; SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); SP_Sid_t sid = session->getSid(); SP_Response * response = new SP_Response( sid ); session->getHandler()->timeout( response ); msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response ); // onResponse will ignore this session, so it's safe to destroy session here session->getHandler()->close(); close( EVENT_FD( session->getWriteEvent() ) ); delete session; syslog( LOG_WARNING, "session(%d.%d) timeout, exit", sid.mKey, sid.mSeq );}void SP_EventHelper :: doStart( SP_Session * session ){ session->setRunning( 1 ); SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); eventArg->getInputResultQueue()->push( new SP_SimpleTask( start, session, 1 ) );}void SP_EventHelper :: start( void * arg ){ SP_Session * session = ( SP_Session * )arg; SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); SP_IOChannel * ioChannel = session->getIOChannel(); int initRet = ioChannel->init( EVENT_FD( session->getWriteEvent() ) ); // always call SP_Handler::start SP_Response * response = new SP_Response( session->getSid() ); int startRet = session->getHandler()->start( session->getRequest(), response ); int status = SP_Session::eWouldExit; if( 0 == initRet ) { if( 0 == startRet ) status = SP_Session::eNormal; } else { delete response; // make an empty response response = new SP_Response( session->getSid() ); } session->setStatus( status ); session->setRunning( 0 ); msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );}void SP_EventHelper :: doCompletion( SP_EventArg * eventArg, SP_Message * msg ){ eventArg->getOutputResultQueue()->push( msg );}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -