⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 speventcb.cpp

📁 spserver 是一个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* * Copyright 2007 Stephen Liu * For license terms, see the file COPYING along with this library. */#include <fcntl.h>#include <stdio.h>#include <unistd.h>#include <syslog.h>#include <string.h>#include <assert.h>#include <errno.h>#include <stdlib.h>#include <sys/socket.h>#include "speventcb.hpp"#include "spexecutor.hpp"#include "spsession.hpp"#include "spresponse.hpp"#include "sphandler.hpp"#include "spbuffer.hpp"#include "spmsgdecoder.hpp"#include "sputils.hpp"#include "sprequest.hpp"#include "spmsgblock.hpp"#include "spiochannel.hpp"#include "spioutils.hpp"#include "config.h"   // from libevent, for event.h#include "event_msgqueue.h"#include "event.h"SP_EventArg :: SP_EventArg( int timeout ){	mEventBase = (struct event_base*)event_init();	mResponseQueue = msgqueue_new( mEventBase, 0,			SP_EventCallback::onResponse, this );	mInputResultQueue = new SP_BlockingQueue();	mOutputResultQueue = new SP_BlockingQueue();	mSessionManager = new SP_SessionManager();	mTimeout = timeout;}SP_EventArg :: ~SP_EventArg(){	delete mInputResultQueue;	delete mOutputResultQueue;	delete mSessionManager;	//msgqueue_destroy( (struct event_msgqueue*)mResponseQueue );	//event_base_free( mEventBase );}struct event_base * SP_EventArg :: getEventBase() const{	return mEventBase;}void * SP_EventArg :: getResponseQueue() const{	return mResponseQueue;}SP_BlockingQueue * SP_EventArg :: getInputResultQueue() const{	return mInputResultQueue;}SP_BlockingQueue * SP_EventArg :: getOutputResultQueue() const{	return mOutputResultQueue;}SP_SessionManager * SP_EventArg :: getSessionManager() const{	return mSessionManager;}void SP_EventArg :: setTimeout( int timeout ){	mTimeout = timeout;}int SP_EventArg :: getTimeout() const{	return mTimeout;}//-------------------------------------------------------------------void SP_EventCallback :: onAccept( int fd, short events, void * arg ){	int clientFD;	struct sockaddr_in clientAddr;	socklen_t clientLen = sizeof( clientAddr );	SP_AcceptArg_t * acceptArg = (SP_AcceptArg_t*)arg;	SP_EventArg * eventArg = acceptArg->mEventArg;	clientFD = accept( fd, (struct sockaddr *)&clientAddr, &clientLen );	if( -1 == clientFD ) {		syslog( LOG_WARNING, "accept failed" );		return;	}	if( SP_IOUtils::setNonblock( clientFD ) < 0 ) {		syslog( LOG_WARNING, "failed to set client socket non-blocking" );	}	SP_Sid_t sid;	sid.mKey = clientFD;	eventArg->getSessionManager()->get( sid.mKey, &sid.mSeq );	SP_Session * session = new SP_Session( sid );	char clientIP[ 32 ] = { 0 };	SP_IOUtils::inetNtoa( &( clientAddr.sin_addr ), clientIP, sizeof( clientIP ) );	session->getRequest()->setClientIP( clientIP );	if( NULL != session ) {		eventArg->getSessionManager()->put( sid.mKey, session, &sid.mSeq );		session->setHandler( acceptArg->mHandlerFactory->create() );		session->setIOChannel( acceptArg->mIOChannelFactory->create() );		session->setArg( eventArg );		event_set( session->getReadEvent(), clientFD, EV_READ, onRead, session );		event_set( session->getWriteEvent(), clientFD, EV_WRITE, onWrite, session );		if( eventArg->getSessionManager()->getCount() > acceptArg->mMaxConnections				|| eventArg->getInputResultQueue()->getLength() >= acceptArg->mReqQueueSize ) {			syslog( LOG_WARNING, "System busy, session.count %d [%d], queue.length %d [%d]",				eventArg->getSessionManager()->getCount(), acceptArg->mMaxConnections,				eventArg->getInputResultQueue()->getLength(), acceptArg->mReqQueueSize );			SP_Message * msg = new SP_Message();			msg->getMsg()->append( acceptArg->mRefusedMsg );			msg->getMsg()->append( "\r\n" );			session->getOutList()->append( msg );			session->setStatus( SP_Session::eExit );			addEvent( session, EV_WRITE, clientFD );		} else {			SP_EventHelper::doStart( session );		}	} else {		close( clientFD );		syslog( LOG_WARNING, "Out of memory, cannot allocate session object!" );	}}void SP_EventCallback :: onRead( int fd, short events, void * arg ){	SP_Session * session = (SP_Session*)arg;	session->setReading( 0 );	SP_Sid_t sid = session->getSid();	if( EV_READ & events ) {		int len = session->getIOChannel()->receive( session );		if( len > 0 ) {			if( 0 == session->getRunning() ) {				SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder();				if( SP_MsgDecoder::eOK == decoder->decode( session->getInBuffer() ) ) {					SP_EventHelper::doWork( session );				}			}			addEvent( session, EV_READ, -1 );		} else {			if( EINTR != errno && EAGAIN != errno ) {				if( 0 == session->getRunning() ) {					syslog( LOG_NOTICE, "session(%d.%d) read error", sid.mKey, sid.mSeq );					SP_EventHelper::doError( session );				} else {					addEvent( session, EV_READ, -1 );					syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later",							sid.mKey, sid.mSeq );				}			}		}	} else {		if( 0 == session->getRunning() ) {			SP_EventHelper::doTimeout( session );		} else {			addEvent( session, EV_READ, -1 );			syslog( LOG_NOTICE, "session(%d.%d) busy, process session timeout later",					sid.mKey, sid.mSeq );		}	}}void SP_EventCallback :: onWrite( int fd, short events, void * arg ){	SP_Session * session = (SP_Session*)arg;	SP_Handler * handler = session->getHandler();	SP_EventArg * eventArg = (SP_EventArg*)session->getArg();	session->setWriting( 0 );	SP_Sid_t sid = session->getSid();	if( EV_WRITE & events ) {		int ret = 0;		if( session->getOutList()->getCount() > 0 ) {			int len = session->getIOChannel()->transmit( session );			if( len > 0 ) {				if( session->getOutList()->getCount() > 0 ) {					// left for next write event					addEvent( session, EV_WRITE, -1 );				}			} else {				if( EINTR != errno && EAGAIN != errno ) {					ret = -1;					if( 0 == session->getRunning() ) {						syslog( LOG_NOTICE, "session(%d.%d) write error", sid.mKey, sid.mSeq );						SP_EventHelper::doError( session );					} else {						addEvent( session, EV_WRITE, -1 );						syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later, errno [%d]",								sid.mKey, sid.mSeq, errno );					}				}			}		}		if( 0 == ret && session->getOutList()->getCount() <= 0 ) {			if( SP_Session::eExit == session->getStatus() ) {				ret = -1;				if( 0 == session->getRunning() ) {					syslog( LOG_NOTICE, "session(%d.%d) close, exit", sid.mKey, sid.mSeq );					eventArg->getSessionManager()->remove( fd );					event_del( session->getReadEvent() );					handler->close();					close( fd );					delete session;				} else {					addEvent( session, EV_WRITE, -1 );					syslog( LOG_NOTICE, "session(%d.%d) busy, terminate session later",							sid.mKey, sid.mSeq );				}			}		}		if( 0 == ret ) {			if( 0 == session->getRunning() ) {				SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder();				if( SP_MsgDecoder::eOK == decoder->decode( session->getInBuffer() ) ) {					SP_EventHelper::doWork( session );				}			} else {				// If this session is running, then onResponse will add write event for this session.				// So no need to add write event here.			}		}	} else {		if( 0 == session->getRunning() ) {			SP_EventHelper::doTimeout( session );		} else {			addEvent( session, EV_WRITE, -1 );			syslog( LOG_NOTICE, "session(%d.%d) busy, process session timeout later",					sid.mKey, sid.mSeq );		}	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -