⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spdispatcher.cpp

📁 spserver 是一个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架
💻 CPP
字号:
/* * Copyright 2007 Stephen Liu * For license terms, see the file COPYING along with this library. */#include <stdlib.h>#include <stdio.h>#include <string.h>#include <pthread.h>#include <assert.h>#include <syslog.h>#include <errno.h>#include <signal.h>#include <unistd.h>#include "spdispatcher.hpp"#include "speventcb.hpp"#include "sphandler.hpp"#include "spsession.hpp"#include "spexecutor.hpp"#include "sputils.hpp"#include "spiochannel.hpp"#include "spioutils.hpp"#include "config.h"#include "event_msgqueue.h"SP_Dispatcher :: SP_Dispatcher( SP_CompletionHandler * completionHandler, int maxThreads ){	/* Don't die with SIGPIPE on remote read shutdown. That's dumb. */	signal( SIGPIPE, SIG_IGN );	mIsShutdown = 0;	mIsRunning = 0;	mEventArg = new SP_EventArg( 600 );	mMaxThreads = maxThreads > 0 ? maxThreads : 64;	mCompletionHandler = completionHandler;	mPushQueue = msgqueue_new( mEventArg->getEventBase(), 0, onPush, mEventArg );}SP_Dispatcher :: ~SP_Dispatcher(){	if( 0 == mIsRunning ) sleep( 1 );	shutdown();	for( ; mIsRunning; ) sleep( 1 );	//msgqueue_destroy( (struct event_msgqueue*)mPushQueue );	delete mEventArg;	mEventArg = NULL;}void SP_Dispatcher :: setTimeout( int timeout ){	mEventArg->setTimeout( timeout );}void SP_Dispatcher :: shutdown(){	mIsShutdown = 1;}int SP_Dispatcher :: isRunning(){	return mIsRunning;}int SP_Dispatcher :: getSessionCount(){	return mEventArg->getSessionManager()->getCount();}int SP_Dispatcher :: getReqQueueLength(){	return mEventArg->getInputResultQueue()->getLength();}int SP_Dispatcher :: dispatch(){	int ret = -1;	pthread_attr_t attr;	pthread_attr_init( &attr );	assert( pthread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );	pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );	pthread_t thread = 0;	ret = pthread_create( &thread, &attr, reinterpret_cast<void*(*)(void*)>(eventLoop), this );	pthread_attr_destroy( &attr );	if( 0 == ret ) {		syslog( LOG_NOTICE, "Thread #%ld has been created for dispatcher", thread );	} else {		mIsRunning = 0;		syslog( LOG_WARNING, "Unable to create a thread for dispatcher, %s",			strerror( errno ) ) ;	}	return ret;}void * SP_Dispatcher :: eventLoop( void * arg ){	SP_Dispatcher * dispatcher = (SP_Dispatcher*)arg;	dispatcher->mIsRunning = 1;	dispatcher->start();	dispatcher->mIsRunning = 0;	return NULL;}void SP_Dispatcher :: outputCompleted( void * arg ){	SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];	SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];	handler->completionMessage( msg );	free( arg );}int SP_Dispatcher :: start(){	SP_Executor workerExecutor( mMaxThreads, "work" );	SP_Executor actExecutor( 1, "act" );	/* Start the event loop. */	while( 0 == mIsShutdown ) {		event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );		for( ; NULL != mEventArg->getInputResultQueue()->top(); ) {			SP_Task * task = (SP_Task*)mEventArg->getInputResultQueue()->pop();			workerExecutor.execute( task );		}		for( ; NULL != mEventArg->getOutputResultQueue()->top(); ) {			SP_Message * msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();			void ** arg = ( void** )malloc( sizeof( void * ) * 2 );			arg[ 0 ] = (void*)mCompletionHandler;			arg[ 1 ] = (void*)msg;			actExecutor.execute( outputCompleted, arg );		}	}	syslog( LOG_NOTICE, "Dispatcher is shutdown." );	return 0;}typedef struct tagSP_PushArg {	int mType;      // 0 : fd, 1 : timer	// for push fd	int mFd;	SP_Handler * mHandler;	SP_IOChannel * mIOChannel;	int mNeedStart;	// for push timer	struct timeval mTimeout;	struct event mTimerEvent;	SP_TimerHandler * mTimerHandler;	SP_EventArg * mEventArg;	void * mPushQueue;} SP_PushArg_t;void SP_Dispatcher :: onPush( void * queueData, void * arg ){	SP_PushArg_t * pushArg = (SP_PushArg_t*)queueData;	SP_EventArg * eventArg = (SP_EventArg*)arg;	if( 0 == pushArg->mType ) {		SP_Sid_t sid;		sid.mKey = pushArg->mFd;		eventArg->getSessionManager()->get( sid.mKey, &sid.mSeq );		SP_Session * session = new SP_Session( sid );		eventArg->getSessionManager()->put( sid.mKey, session, &sid.mSeq );		session->setHandler( pushArg->mHandler );		session->setIOChannel( pushArg->mIOChannel );		session->setArg( eventArg );		event_set( session->getReadEvent(), pushArg->mFd, EV_READ,				SP_EventCallback::onRead, session );		event_set( session->getWriteEvent(), pushArg->mFd, EV_WRITE,				SP_EventCallback::onWrite, session );		if( pushArg->mNeedStart ) {			SP_EventHelper::doStart( session );		} else {			SP_EventCallback::addEvent( session, EV_WRITE, pushArg->mFd );			SP_EventCallback::addEvent( session, EV_READ, pushArg->mFd );		}		free( pushArg );	} else {		event_set( &( pushArg->mTimerEvent ), -1, 0, onTimer, pushArg );		event_base_set( eventArg->getEventBase(), &( pushArg->mTimerEvent ) );		event_add( &( pushArg->mTimerEvent ), &( pushArg->mTimeout ) );	}}int SP_Dispatcher :: push( int fd, SP_Handler * handler, int needStart ){	SP_IOChannel * ioChannel = new SP_DefaultIOChannel();	return push( fd, handler, ioChannel, needStart );}int SP_Dispatcher :: push( int fd, SP_Handler * handler,		SP_IOChannel * ioChannel, int needStart ){	uint16_t seq = 0;	if( NULL != mEventArg->getSessionManager()->get( fd, &seq ) ) {		return -1;	}	SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );	arg->mType = 0;	arg->mFd = fd;	arg->mHandler = handler;	arg->mIOChannel = ioChannel;	arg->mNeedStart = needStart;	SP_IOUtils::setNonblock( fd );	return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );}void SP_Dispatcher :: onTimer( int, short, void * arg ){	SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;	pushArg->mEventArg->getInputResultQueue()->push(		new SP_SimpleTask( timer, pushArg, 1 ) );}void SP_Dispatcher :: timer( void * arg ){	SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;	SP_TimerHandler * handler = pushArg->mTimerHandler;	SP_EventArg * eventArg = pushArg->mEventArg;	SP_Sid_t sid;	sid.mKey = SP_Sid_t::eTimerKey;	sid.mSeq = SP_Sid_t::eTimerSeq;	SP_Response * response = new SP_Response( sid );	if( 0 == handler->handle( response, &( pushArg->mTimeout ) ) ) {		msgqueue_push( (struct event_msgqueue*)pushArg->mPushQueue, arg );	} else {		delete pushArg->mTimerHandler;		free( pushArg );	}	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );}int SP_Dispatcher :: push( const struct timeval * timeout, SP_TimerHandler * handler ){	SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );	arg->mType = 1;	arg->mTimeout = *timeout;	arg->mTimerHandler = handler;	arg->mEventArg = mEventArg;	arg->mPushQueue = mPushQueue;	return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -