⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 spserver.cpp

📁 spserver 是一个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架
💻 CPP
字号:
/* * Copyright 2007 Stephen Liu * For license terms, see the file COPYING along with this library. */#include <stdio.h>#include <string.h>#include <stdlib.h>#include <assert.h>#include <syslog.h>#include <errno.h>#include <unistd.h>#include <signal.h>#include <netinet/tcp.h>#include "spserver.hpp"#include "speventcb.hpp"#include "sphandler.hpp"#include "spsession.hpp"#include "spexecutor.hpp"#include "sputils.hpp"#include "spiochannel.hpp"#include "spioutils.hpp"#include "config.h"#include "event_msgqueue.h"SP_Server :: SP_Server( const char * bindIP, int port,		SP_HandlerFactory * handlerFactory ){	snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP );	mPort = port;	mIsShutdown = 0;	mIsRunning = 0;	mHandlerFactory = handlerFactory;	mIOChannelFactory = NULL;	mTimeout = 600;	mMaxThreads = 64;	mReqQueueSize = 128;	mMaxConnections = 256;	mRefusedMsg = strdup( "System busy, try again later." );}SP_Server :: ~SP_Server(){	if( NULL != mHandlerFactory ) delete mHandlerFactory;	mHandlerFactory = NULL;	if( NULL != mIOChannelFactory ) delete mIOChannelFactory;	mIOChannelFactory = NULL;	if( NULL != mRefusedMsg ) free( mRefusedMsg );	mRefusedMsg = NULL;}void SP_Server :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory ){	mIOChannelFactory = ioChannelFactory;}void SP_Server :: setTimeout( int timeout ){	mTimeout = timeout > 0 ? timeout : mTimeout;}void SP_Server :: setMaxThreads( int maxThreads ){	mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;}void SP_Server :: setMaxConnections( int maxConnections ){	mMaxConnections = maxConnections > 0 ? maxConnections : mMaxConnections;}void SP_Server :: setReqQueueSize( int reqQueueSize, const char * refusedMsg ){	mReqQueueSize = reqQueueSize > 0 ? reqQueueSize : mReqQueueSize;	if( NULL != mRefusedMsg ) free( mRefusedMsg );	mRefusedMsg = strdup( refusedMsg );}void SP_Server :: shutdown(){	mIsShutdown = 1;}int SP_Server :: isRunning(){	return mIsRunning;}int SP_Server :: run(){	int ret = -1;	pthread_attr_t attr;	pthread_attr_init( &attr );	assert( pthread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );	pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );	pthread_t thread = 0;	ret = pthread_create( &thread, &attr, reinterpret_cast<void*(*)(void*)>(eventLoop), this );	pthread_attr_destroy( &attr );	if( 0 == ret ) {		syslog( LOG_NOTICE, "Thread #%ld has been created to listen on port [%d]", thread, mPort );	} else {		mIsRunning = 0;		syslog( LOG_WARNING, "Unable to create a thread for TCP server on port [%d], %s",			mPort, strerror( errno ) ) ;	}	return ret;}void SP_Server :: runForever(){	eventLoop( this );}void * SP_Server :: eventLoop( void * arg ){	SP_Server * server = (SP_Server*)arg;	server->mIsRunning = 1;	server->start();	server->mIsRunning = 0;	return NULL;}void SP_Server :: sigHandler( int, short, void * arg ){	SP_Server * server = (SP_Server*)arg;	server->shutdown();}void SP_Server :: outputCompleted( void * arg ){	SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];	SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];	handler->completionMessage( msg );	free( arg );}int SP_Server :: start(){	/* Don't die with SIGPIPE on remote read shutdown. That's dumb. */	signal( SIGPIPE, SIG_IGN );	int ret = 0;	int listenFD = -1;	ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );	if( 0 == ret ) {		SP_EventArg eventArg( mTimeout );		// Clean close on SIGINT or SIGTERM.		struct event evSigInt, evSigTerm;		signal_set( &evSigInt, SIGINT,  sigHandler, this );		event_base_set( eventArg.getEventBase(), &evSigInt );		signal_add( &evSigInt, NULL);		signal_set( &evSigTerm, SIGTERM, sigHandler, this );		event_base_set( eventArg.getEventBase(), &evSigTerm );		signal_add( &evSigTerm, NULL);		SP_AcceptArg_t acceptArg;		memset( &acceptArg, 0, sizeof( SP_AcceptArg_t ) );		if( NULL == mIOChannelFactory ) {			mIOChannelFactory = new SP_DefaultIOChannelFactory();		}		acceptArg.mEventArg = &eventArg;		acceptArg.mHandlerFactory = mHandlerFactory;		acceptArg.mIOChannelFactory = mIOChannelFactory;		acceptArg.mReqQueueSize = mReqQueueSize;		acceptArg.mMaxConnections = mMaxConnections;		acceptArg.mRefusedMsg = mRefusedMsg;		struct event evAccept;		event_set( &evAccept, listenFD, EV_READ|EV_PERSIST,				SP_EventCallback::onAccept, &acceptArg );		event_base_set( eventArg.getEventBase(), &evAccept );		event_add( &evAccept, NULL );		SP_Executor workerExecutor( mMaxThreads, "work" );		SP_Executor actExecutor( 1, "act" );		SP_CompletionHandler * completionHandler = mHandlerFactory->createCompletionHandler();		/* Start the event loop. */		while( 0 == mIsShutdown ) {			event_base_loop( eventArg.getEventBase(), EVLOOP_ONCE );			for( ; NULL != eventArg.getInputResultQueue()->top(); ) {				SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();				workerExecutor.execute( task );			}			for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {				SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();				void ** arg = ( void** )malloc( sizeof( void * ) * 2 );				arg[ 0 ] = (void*)completionHandler;				arg[ 1 ] = (void*)msg;				actExecutor.execute( outputCompleted, arg );			}		}		delete completionHandler;		syslog( LOG_NOTICE, "Server is shutdown." );		event_del( &evAccept );		signal_del( &evSigTerm );		signal_del( &evSigInt );		close( listenFD );	}	return ret;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -