📄 speventcb.cpp
字号:
/* * Copyright 2007 Stephen Liu * For license terms, see the file COPYING along with this library. */#include <fcntl.h>#include <stdio.h>#include <unistd.h>#include <syslog.h>#include <string.h>#include <assert.h>#include <errno.h>#include <stdlib.h>#include <sys/socket.h>#include "speventcb.hpp"#include "spexecutor.hpp"#include "spsession.hpp"#include "spresponse.hpp"#include "sphandler.hpp"#include "spbuffer.hpp"#include "spmsgdecoder.hpp"#include "sputils.hpp"#include "sprequest.hpp"#include "spmsgblock.hpp"#include "spiochannel.hpp"#include "spioutils.hpp"#include "config.h" // from libevent, for event.h#include "event_msgqueue.h"#include "event.h"SP_EventArg :: SP_EventArg( int timeout ){ mEventBase = (struct event_base*)event_init(); mResponseQueue = msgqueue_new( mEventBase, 0, SP_EventCallback::onResponse, this ); mInputResultQueue = new SP_BlockingQueue(); mOutputResultQueue = new SP_BlockingQueue(); mSessionManager = new SP_SessionManager(); mTimeout = timeout;}SP_EventArg :: ~SP_EventArg(){ delete mInputResultQueue; delete mOutputResultQueue; delete mSessionManager; //msgqueue_destroy( (struct event_msgqueue*)mResponseQueue ); //event_base_free( mEventBase );}struct event_base * SP_EventArg :: getEventBase() const{ return mEventBase;}void * SP_EventArg :: getResponseQueue() const{ return mResponseQueue;}SP_BlockingQueue * SP_EventArg :: getInputResultQueue() const{ return mInputResultQueue;}SP_BlockingQueue * SP_EventArg :: getOutputResultQueue() const{ return mOutputResultQueue;}SP_SessionManager * SP_EventArg :: getSessionManager() const{ return mSessionManager;}void SP_EventArg :: setTimeout( int timeout ){ mTimeout = timeout;}int SP_EventArg :: getTimeout() const{ return mTimeout;}//-------------------------------------------------------------------void SP_EventCallback :: onAccept( int fd, short events, void * arg ){ int clientFD; struct sockaddr_in clientAddr; socklen_t clientLen = sizeof( clientAddr ); SP_AcceptArg_t * acceptArg = (SP_AcceptArg_t*)arg; SP_EventArg * eventArg = acceptArg->mEventArg; clientFD = accept( fd, (struct sockaddr *)&clientAddr, &clientLen ); if( -1 == clientFD ) { syslog( LOG_WARNING, "accept failed" ); return; } if( SP_IOUtils::setNonblock( clientFD ) < 0 ) { syslog( LOG_WARNING, "failed to set client socket non-blocking" ); } SP_Sid_t sid; sid.mKey = clientFD; eventArg->getSessionManager()->get( sid.mKey, &sid.mSeq ); SP_Session * session = new SP_Session( sid ); char clientIP[ 32 ] = { 0 }; SP_IOUtils::inetNtoa( &( clientAddr.sin_addr ), clientIP, sizeof( clientIP ) ); session->getRequest()->setClientIP( clientIP ); if( NULL != session ) { eventArg->getSessionManager()->put( sid.mKey, session, &sid.mSeq ); session->setHandler( acceptArg->mHandlerFactory->create() ); session->setIOChannel( acceptArg->mIOChannelFactory->create() ); session->setArg( eventArg ); event_set( session->getReadEvent(), clientFD, EV_READ, onRead, session ); event_set( session->getWriteEvent(), clientFD, EV_WRITE, onWrite, session ); if( eventArg->getSessionManager()->getCount() > acceptArg->mMaxConnections || eventArg->getInputResultQueue()->getLength() >= acceptArg->mReqQueueSize ) { syslog( LOG_WARNING, "System busy, session.count %d [%d], queue.length %d [%d]", eventArg->getSessionManager()->getCount(), acceptArg->mMaxConnections, eventArg->getInputResultQueue()->getLength(), acceptArg->mReqQueueSize ); SP_Message * msg = new SP_Message(); msg->getMsg()->append( acceptArg->mRefusedMsg ); msg->getMsg()->append( "\r\n" ); session->getOutList()->append( msg ); session->setStatus( SP_Session::eExit ); addEvent( session, EV_WRITE, clientFD ); } else { SP_EventHelper::doStart( session ); } } else { close( clientFD ); syslog( LOG_WARNING, "Out of memory, cannot allocate session object!" ); }}void SP_EventCallback :: onRead( int fd, short events, void * arg ){ SP_Session * session = (SP_Session*)arg; session->setReading( 0 ); SP_Sid_t sid = session->getSid(); if( EV_READ & events ) { int len = session->getIOChannel()->receive( session ); if( len > 0 ) { if( 0 == session->getRunning() ) { SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder(); if( SP_MsgDecoder::eOK == decoder->decode( session->getInBuffer() ) ) { SP_EventHelper::doWork( session ); } } addEvent( session, EV_READ, -1 ); } else { if( EINTR != errno && EAGAIN != errno ) { if( 0 == session->getRunning() ) { syslog( LOG_NOTICE, "session(%d.%d) read error", sid.mKey, sid.mSeq ); SP_EventHelper::doError( session ); } else { addEvent( session, EV_READ, -1 ); syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later", sid.mKey, sid.mSeq ); } } } } else { if( 0 == session->getRunning() ) { SP_EventHelper::doTimeout( session ); } else { addEvent( session, EV_READ, -1 ); syslog( LOG_NOTICE, "session(%d.%d) busy, process session timeout later", sid.mKey, sid.mSeq ); } }}void SP_EventCallback :: onWrite( int fd, short events, void * arg ){ SP_Session * session = (SP_Session*)arg; SP_Handler * handler = session->getHandler(); SP_EventArg * eventArg = (SP_EventArg*)session->getArg(); session->setWriting( 0 ); SP_Sid_t sid = session->getSid(); if( EV_WRITE & events ) { int ret = 0; if( session->getOutList()->getCount() > 0 ) { int len = session->getIOChannel()->transmit( session ); if( len > 0 ) { if( session->getOutList()->getCount() > 0 ) { // left for next write event addEvent( session, EV_WRITE, -1 ); } } else { if( EINTR != errno && EAGAIN != errno ) { ret = -1; if( 0 == session->getRunning() ) { syslog( LOG_NOTICE, "session(%d.%d) write error", sid.mKey, sid.mSeq ); SP_EventHelper::doError( session ); } else { addEvent( session, EV_WRITE, -1 ); syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later, errno [%d]", sid.mKey, sid.mSeq, errno ); } } } } if( 0 == ret && session->getOutList()->getCount() <= 0 ) { if( SP_Session::eExit == session->getStatus() ) { ret = -1; if( 0 == session->getRunning() ) { syslog( LOG_NOTICE, "session(%d.%d) close, exit", sid.mKey, sid.mSeq ); eventArg->getSessionManager()->remove( fd ); event_del( session->getReadEvent() ); handler->close(); close( fd ); delete session; } else { addEvent( session, EV_WRITE, -1 ); syslog( LOG_NOTICE, "session(%d.%d) busy, terminate session later", sid.mKey, sid.mSeq ); } } } if( 0 == ret ) { if( 0 == session->getRunning() ) { SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder(); if( SP_MsgDecoder::eOK == decoder->decode( session->getInBuffer() ) ) { SP_EventHelper::doWork( session ); } } else { // If this session is running, then onResponse will add write event for this session. // So no need to add write event here. } } } else { if( 0 == session->getRunning() ) { SP_EventHelper::doTimeout( session ); } else { addEvent( session, EV_WRITE, -1 ); syslog( LOG_NOTICE, "session(%d.%d) busy, process session timeout later", sid.mKey, sid.mSeq ); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -