📄 mmthread.cpp
字号:
/* ---
Accept new IO connection
--- */
PIObject *pIOObject = PIObject_copy( pThreadRecord->pIOObject, 0, 0 );
if ( !pIOObject )
{
/* --- the program is terminating --- */
break;
};
/* ---
Dispatch to worker thread
--- */
pThreadRecord->iRet = pServer->DoDispatchConnection( pIOObject );
/* ---
Unlock main accept loop
--- */
#if THREAD_PER_REQUEST
pThreadRecord->pServer = 0;
#endif
MUSTBETRUE( !PISync_unlock( pServer->pAcceptThreadsSemaphore ) );
};
return PIAPI_COMPLETED;
}
static int ParameterFn( void *pData, const char *pVar, const char *pVal,
const char *pWhere )
{
assert( pData );
return ((MTMIOServer *)pData)->Parameter( pVar, pVal,
pWhere ? pWhere : "" );
};
protected:
int Parameter( const char *pVariable, const char *pValue,
const char *pWhere )
{
assert( pVariable && pValue );
PIOStrStream os;
os << pWhere << "MTMIOServer: ";
if ( !PIUtil_stricmp( KEY_CONF_EXITAFTER, pVariable ) )
{
iRequestsToLive = atoi( pValue );
iRequestsToLive = iRequestsToLive > 0 ? iRequestsToLive : -1 ;
}
else if ( !PIUtil_stricmp( KEY_CONF_MAXTHREADS, pVariable ) )
{
iMaxThreads = atoi( pValue );
if ( iMaxThreads<1 || iMaxThreads>1000 )
{
CONFIG_WARN( pObject, "MTMIOServer: Invalid number for \
'MaxThreads', defaulting to 25");
};
}
else if ( !PIUtil_stricmp( KEY_CONF_SECONDARYIO, pVariable ) )
{
lAccept.Append( (DblList::type) PI_NEW( PIString( pValue ) ) );
iAcceptThreads++;
}
else if ( !PIUtil_stricmp( KEY_CONF_IOOBJECT, pVariable ) )
{
// has already been loaded by parent class
iAcceptThreads++;
}
else if ( !PIUtil_stricmp( KEY_CONF_LOGICOBJECT, pVariable ) )
{
// has already been loaded by parent class
}
else
{
os << "Unknown parameter '" << pVariable << "'" << ends;
CONFIG_ERR( pObject, os.str() );
return 0;
};
return 1;
};
public:
MTMIOServer( const PIObject *pTheObject, int iArgc, const char *pArgv[] )
: IOServerBase( pTheObject ),
pFreeThreadsMutex( PIPlatform_allocLocalMutex() ),
pFreeThreadsSemaphore( 0 ),
pAcceptThreadsSemaphore( PIPlatform_allocLocalSemaphore( 0, 2 )),
iNumBusyThreads( 0 ),
ppAllThreads( 0 ),
ppFreeThreads( 0 ),
ppAcceptThreads( 0 ),
iMaxThreads( 25 ),
iAcceptThreads( 0 ),
iRequestsToLive( -1 )
{
if ( !bOK ) { return; /* Base class initialization failed */ };
bOK = PIObject_readParameters( (PIObject *)pObject, iArgc, pArgv, ParameterFn, this );
if ( !bOK ) { return; /* parameter function failed */ };
bOK = 0; /* until initialization is complete */
if ( !pFreeThreadsMutex )
{
CONFIG_ERR( pObject, "MTMIOServer: Could not allocate mutex.");
return;
};
pFreeThreadsSemaphore = PIPlatform_allocLocalSemaphore( iMaxThreads,
iMaxThreads );
if ( !pFreeThreadsSemaphore )
{
CONFIG_ERR( pObject, "MTMIOServer: Could not allocate semaphore.");
return;
};
ppAllThreads = new ThreadRecord *[iMaxThreads];
ppFreeThreads = new ThreadRecord *[iMaxThreads];
ppAcceptThreads = new ThreadRecord *[iAcceptThreads];
if ( !ppFreeThreads || !ppAllThreads || !ppAcceptThreads )
{ assert( 0 ); return; };
memset( ppAllThreads, 0, sizeof( ThreadRecord * ) * iMaxThreads );
memset( ppFreeThreads, 0, sizeof( ThreadRecord * ) * iMaxThreads );
memset( ppAcceptThreads, 0, sizeof( ThreadRecord * ) * iAcceptThreads );
int iGood = 1;
/* --- begin accept thread for main I/O object --- */
PIThread *pNew = PIThread_new( 0, 0 );
ThreadRecord *pRecord = new ThreadRecord( pNew );
pRecord->pIOObject = GetPrototypeIOObject();
#if THREAD_PER_REQUEST
pRecord->pServer = 0;
#endif
#if !THREAD_PER_REQUEST
pRecord->pServer = this;
#endif
int i = 0;
ppAcceptThreads[i++] = pRecord;
if ( PIThread_begin(
pNew,
(PIThreadFn)MTMIOServer::StaticAcceptConnection,
(unsigned long)pRecord,
PITHREAD_PRIORITY_HIGH, 0 ) )
{
PIThread_delete( pNew );
/* --- consider this a critical error --- */
iGood = 0;
assert( 0 );
CONFIG_ERR( pObject,
"MTMIOServer: Failed to prime thread, PIThread_begin() failed.");
};
/* --- begin accept threads for secondary I/O objects --- */
for( DblListIterator a( lAccept ); !a.BadIndex(); a++)
{
if ( iAcceptThreads == i ) break;
PIThread *pNew = PIThread_new( 0, 0 );
ThreadRecord *pRecord = new ThreadRecord( pNew );
pRecord->pIOObject = PIObject_loadFromLine( PIObject_getDB(pObject),
PIObject_getConfigurationDB(pObject), (const char *)a.Current() );
if ( !pRecord->pIOObject )
{
iGood = 0;
pRecord->pServer = 0;
CONFIG_ERR( pObject,
"MTMIOServer: Failed to load IOObject, PIObject_loadFromLine() failed." );
break;
}
#if THREAD_PER_REQUEST
pRecord->pServer = 0;
#endif
#if !THREAD_PER_REQUEST
pRecord->pServer = this;
#endif
ppAcceptThreads[i++] = pRecord;
if ( PIThread_begin(
pNew,
(PIThreadFn)MTMIOServer::StaticAcceptConnection,
(unsigned long)pRecord,
PITHREAD_PRIORITY_HIGH, 0 ) )
{
PIThread_delete( pNew );
/* --- consider this a critical error --- */
iGood = 0;
assert( 0 );
CONFIG_ERR( pObject,
"MTMIOServer: Failed to prime thread, PIThread_begin() failed.");
};
}
for( i=0; iGood && i<iMaxThreads; i++ )
{
PIThread *pNew = PIThread_new( 0, 0 );
ThreadRecord *pRecord = new ThreadRecord( pNew );
ppFreeThreads[i] = pRecord;
ppAllThreads[i] = pRecord;
#if !THREAD_PER_REQUEST
pRecord->pServer = this;
if ( PIThread_begin(
pNew,
(PIThreadFn)MTMIOServer::StaticDispatchConnection,
(unsigned long)pRecord,
PITHREAD_PRIORITY_MED, 0 ) )
{
assert( 0 );
CONFIG_ERR( pObject,
"MTMIOServer: Failed to prime thread, PIThread_begin() failed.");
iGood = 0;
};
#endif
};
if ( ppAllThreads && ppFreeThreads && iGood )
{ bOK = 1; };
};
virtual ~MTMIOServer()
{
for( DblListIterator a( lAccept ); !a.BadIndex(); a++)
{ PI_DELETE( (PIString *)a.Current() ); };
if ( !ppFreeThreads || !ppAllThreads || !ppAcceptThreads )
{ assert( 0 ); return; };
int i;
for( i=0; i<iAcceptThreads; i++ )
{
if ( !ppAcceptThreads[i] ) continue;
PIThread *pThread = ppAcceptThreads[i]->pThread;
#if !THREAD_PER_REQUEST
/* --- signal thread to die --- */
ppAcceptThreads[i]->pServer = 0;
// MUSTBETRUE( !PIThread_terminate( pThread, 0 ) );
if ( PIObject_delete( ppAcceptThreads[i]->pIOObject, 0, 0 ) == PIAPI_COMPLETED
&& pIO && ppAcceptThreads[i]->pIOObject == pIO )
{
pIO = 0;
}
PISync_unlock( ppAcceptThreads[i]->pSema );
#endif
MUSTBETRUE( !PIThread_waitForThread( pThread ) );
MUSTBETRUE( !PIThread_delete( pThread ) );
delete ppAcceptThreads[i];
}
for( i=0; i<iMaxThreads; i++ )
{
if ( !ppAllThreads[i] ) continue;
PIThread *pThread = ppAllThreads[i]->pThread;
#if !THREAD_PER_REQUEST
/* --- signal thread to die --- */
ppAllThreads[i]->pServer = 0;
PISync_unlock( ppAllThreads[i]->pSema );
MUSTBETRUE( !PIThread_waitForThread( pThread ) );
#endif
MUSTBETRUE( !PIThread_delete( pThread ) );
delete ppAllThreads[i];
}
delete [] ppAllThreads;
delete [] ppFreeThreads;
delete [] ppAcceptThreads;
MUSTBETRUE(
!pFreeThreadsMutex || !PISync_delete( pFreeThreadsMutex ) );
MUSTBETRUE(
!pFreeThreadsSemaphore || !PISync_delete( pFreeThreadsSemaphore ) );
MUSTBETRUE(
!pAcceptThreadsSemaphore || !PISync_delete( pAcceptThreadsSemaphore ) );
};
virtual int Accept()
{
/* ---
Check is it time for this server to exit
--- */
#if POSIX
if ( iRequestsToLive )
{
if ( iRequestsToLive>0 )
{ iRequestsToLive--; };
}
else
{
/* ---
Time to go, gracefully exit.
--- */
while( iNumBusyThreads>0 )
{
MUSTBETRUE( !PISync_lock( pFreeThreadsSemaphore ) );
};
#if 1
/*
** Why even bother?
*/
/* ---
Allow all threads to die
--- */
for( int i=0; i<iMaxThreads; i++ )
{
/* --- signal thread to die --- */
ppAllThreads[i]->pServer = 0;
#if !THREAD_PER_REQUEST
PISync_unlock( ppAllThreads[i]->pSema );
MUSTBETRUE( !PIThread_terminate( pThread, 0 ) );
#endif
};
#endif
::exit( 0 );
};
#endif
// run the accept threads
int iRet = PIAPI_COMPLETED;
#if !THREAD_PER_REQUEST
for( int i=0; i<iAcceptThreads; i++ )
{
if ( PISync_tryLock(ppAcceptThreads[i]->pSema)==PIAPI_WOULDBLOCK )
{
int iRet = ppAcceptThreads[i]->iRet;
if ( iRet == PIAPI_ABORT ) break;
/* --- signal thread to run --- */
PISync_unlock( ppAcceptThreads[i]->pSema );
}
}
#endif
#if THREAD_PER_REQUEST
for( int i=0; i<iAcceptThreads; i++ )
{
if ( ppAcceptThreads[i]->pServer == 0 )
{
int iRet = ppAcceptThreads[i]->iRet;
if ( iRet == PIAPI_ABORT ) break;
/* --- signal thread to run --- */
ppAcceptThreads[i]->pServer = this;
PIThread_waitForThread( ppAcceptThreads[i]->pThread );
if ( PIThread_begin(
ppAcceptThreads[i]->pThread,
(PIThreadFn)MTMIOServer::StaticAcceptConnection,
(unsigned long)ppAcceptThreads[i],
PITHREAD_PRIORITY_HIGH, 0 ) )
{
iRet = PIAPI_ABORT;
};
}
}
#endif
/* ---
Block until one accept thread did its work
--- */
do {}
while ( PISync_tryLock(pAcceptThreadsSemaphore)==PIAPI_WOULDBLOCK );
// MUSTBETRUE( !PISync_lock( pAcceptThreadsSemaphore ) );
return iRet;
};
};
/*____________________________________________________________________________*\
*
Function:
Synopsis:
Description:
\*____________________________________________________________________________*/
PUBLIC_PIAPI int MTMIOServer_onClassLoad( PIClass_LoadAction eLoad, void * )
{
switch( eLoad )
{
case STARTUP:
default:;
};
return PIAPI_COMPLETED;
}
/*____________________________________________________________________________*\
*
Function:
Synopsis:
Description:
\*____________________________________________________________________________*/
PUBLIC_PIAPI int MTMIOServer_constructor( PIObject *pObj, int iArgc, const char *pArgv[] )
{
MTMIOServer *pServer = new MTMIOServer( pObj, iArgc, pArgv );
if (!( pServer && pServer->IsOK() ))
{
delete pServer;
return PIAPI_ERROR;
};
if ( PIObject_setUserData( pObj, pServer ) )
{
CONFIG_ERR( pObj, "MTMIOServer_constructor: PIObject_setUserData() failed" );
return PIAPI_ERROR;
};
return PIAPI_COMPLETED;
}
/*____________________________________________________________________________*\
*
Function:
Synopsis:
Description:
\*____________________________________________________________________________*/
PUBLIC_PIAPI int MTMIOServer_copyConstructor( PIObject *, int,
const char *[] )
{
/* --- forbid copy construction --- */
return PIAPI_ERROR;
}
#if 0
/*___+++CNF_BEGIN+++___*/
<Class>
Name MTMIOServerClass
Type LogicExtension
Library Server
OnClassLoad MTMIOServer_onClassLoad
Constructor MTMIOServer_constructor
CopyConstructor MTMIOServer_copyConstructor
Destructor IOServerBase_destructor
Execute IOServerBase_execute
</Class>
<Object>
Name MTMIOServer
Class MTMIOServerClass
</Object>
/*___+++CNF_END+++___*/
#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -