📄 splfserver.cpp
字号:
/* * Copyright 2007 Stephen Liu * For license terms, see the file COPYING along with this library. */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <signal.h>#include <unistd.h>#include "splfserver.hpp"#include "speventcb.hpp"#include "spthreadpool.hpp"#include "sphandler.hpp"#include "spexecutor.hpp"#include "sputils.hpp"#include "spioutils.hpp"#include "spiochannel.hpp"#include "config.h"#include "event_msgqueue.h"SP_LFServer :: SP_LFServer( const char * bindIP, int port, SP_HandlerFactory * handlerFactory ){ snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP ); mPort = port; mIsShutdown = 0; mIsRunning = 0; mEventArg = new SP_EventArg( 600 ); mMaxThreads = 64; mAcceptArg = (SP_AcceptArg_t*)malloc( sizeof( SP_AcceptArg_t ) ); memset( mAcceptArg, 0, sizeof( SP_AcceptArg_t ) ); mAcceptArg->mMaxConnections = 256; mAcceptArg->mReqQueueSize = 128; mAcceptArg->mRefusedMsg = strdup( "System busy, try again later." ); mAcceptArg->mHandlerFactory = handlerFactory; mAcceptArg->mEventArg = mEventArg; mThreadPool = NULL; mEvAccept = mEvSigTerm = mEvSigInt = NULL; mCompletionHandler = NULL; pthread_mutex_init( &mMutex, NULL );}SP_LFServer :: ~SP_LFServer(){ shutdown(); if( NULL != mThreadPool ) delete mThreadPool; mThreadPool = NULL; if( NULL != mCompletionHandler ) delete mCompletionHandler; mCompletionHandler = NULL; event_del( mEvAccept ); free( mEvAccept ); mEvAccept = NULL; signal_del( mEvSigTerm ); free( mEvSigTerm ); mEvSigTerm = NULL; signal_del( mEvSigInt ); free( mEvSigInt ); mEvSigInt = NULL; delete mAcceptArg->mHandlerFactory; free( mAcceptArg->mRefusedMsg ); free( mAcceptArg ); mAcceptArg = NULL; delete mEventArg; mEventArg = NULL; pthread_mutex_destroy( &mMutex );}void SP_LFServer :: setTimeout( int timeout ){ mEventArg->setTimeout( timeout );}void SP_LFServer :: setMaxConnections( int maxConnections ){ mAcceptArg->mMaxConnections = maxConnections > 0 ? maxConnections : mAcceptArg->mMaxConnections;}void SP_LFServer :: setMaxThreads( int maxThreads ){ mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;}void SP_LFServer :: setReqQueueSize( int reqQueueSize, const char * refusedMsg ){ mAcceptArg->mReqQueueSize = reqQueueSize > 0 ? reqQueueSize : mAcceptArg->mReqQueueSize; if( NULL != mAcceptArg->mRefusedMsg ) free( mAcceptArg->mRefusedMsg ); mAcceptArg->mRefusedMsg = strdup( refusedMsg );}void SP_LFServer :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory ){ mAcceptArg->mIOChannelFactory = ioChannelFactory;}void SP_LFServer :: shutdown(){ mIsShutdown = 1;}int SP_LFServer :: isRunning(){ return mIsRunning;}void SP_LFServer :: sigHandler( int, short, void * arg ){ SP_LFServer * server = (SP_LFServer*)arg; server->shutdown();}void SP_LFServer :: lfHandler( void * arg ){ SP_LFServer * server = (SP_LFServer*)arg; for( ; 0 == server->mIsShutdown; ) { server->handleOneEvent(); }}void SP_LFServer :: handleOneEvent(){ SP_Task * task = NULL; SP_Message * msg = NULL; pthread_mutex_lock( &mMutex ); for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) { if( mEventArg->getInputResultQueue()->getLength() > 0 ) { task = (SP_Task*)mEventArg->getInputResultQueue()->pop(); } else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) { msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop(); } if( NULL == task && NULL == msg ) { event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE ); } } pthread_mutex_unlock( &mMutex ); if( NULL != task ) task->run(); if( NULL != msg ) mCompletionHandler->completionMessage( msg );}int SP_LFServer :: run(){ /* Don't die with SIGPIPE on remote read shutdown. That's dumb. */ signal( SIGPIPE, SIG_IGN ); int ret = 0; int listenFD = -1; ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 ); if( 0 == ret ) { // Clean close on SIGINT or SIGTERM. mEvSigInt = (struct event*)malloc( sizeof( struct event ) ); signal_set( mEvSigInt, SIGINT, sigHandler, this ); event_base_set( mEventArg->getEventBase(), mEvSigInt ); signal_add( mEvSigInt, NULL); mEvSigTerm = (struct event*)malloc( sizeof( struct event ) ); signal_set( mEvSigTerm, SIGTERM, sigHandler, this ); event_base_set( mEventArg->getEventBase(), mEvSigTerm ); signal_add( mEvSigTerm, NULL); mEvAccept = (struct event*)malloc( sizeof( struct event ) ); event_set( mEvAccept, listenFD, EV_READ|EV_PERSIST, SP_EventCallback::onAccept, mAcceptArg ); event_base_set( mEventArg->getEventBase(), mEvAccept ); event_add( mEvAccept, NULL ); mCompletionHandler = mAcceptArg->mHandlerFactory->createCompletionHandler(); if( NULL == mAcceptArg->mIOChannelFactory ) { mAcceptArg->mIOChannelFactory = new SP_DefaultIOChannelFactory(); } mThreadPool = new SP_ThreadPool( mMaxThreads ); for( int i = 0; i < mMaxThreads; i++ ) { mThreadPool->dispatch( lfHandler, this ); } } return ret;}void SP_LFServer :: runForever(){ run(); pause();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -