📄 mthread.cpp
字号:
#endif
return PIAPI_COMPLETED;
};
/* ---
Called by StaticDispatchConnection() this function
marks the current thread as free so it may be reused
--- */
void FreeThread( ThreadRecord *pFree )
{
/* --- add this thread back to the free list --- */
assert( pFreeThreadsMutex );
MUSTBETRUE( !PISync_lock( pFreeThreadsMutex ) );
int i=0; for(; i<iMaxThreads; i++ )
{
if ( !ppFreeThreads[i] )
{ ppFreeThreads[i] = pFree; break; };
};
/* ---
Another thread is officially free
--- */
iNumBusyThreads--;
MUSTBETRUE( !PISync_unlock( pFreeThreadsMutex ) );
MUSTBETRUE( !PISync_unlock( pFreeThreadsSemaphore ) );
assert( i<iMaxThreads );
};
/* ---
Called by the threads begin function this runs in a thread
created for this connection.
--- */
inline static int StaticDispatchConnection( ThreadRecord *pThreadRecord )
{
assert( pThreadRecord );
if ( !pThreadRecord )
{ return PIAPI_ERROR; };
for(;;)
{
/* --- wait on the mutex --- */
#if !THREAD_PER_REQUEST
if ( PISync_lock( pThreadRecord->pSema ) )
{
MUSTBETRUE( 0 );
};
#endif
MultiThreadedIOServer *pServer = pThreadRecord->pServer;
if ( !pServer )
{
/* --- message to exit the thread --- */
break;
};
PIObject *pIOObject = pThreadRecord->pIOObject;
pServer->DispatchConnection( pIOObject );
pServer->FreeThread( pThreadRecord );
#if THREAD_PER_REQUEST
break;
#endif
};
return PIAPI_COMPLETED;
}
public:
MultiThreadedIOServer( const PIObject *pTheObject )
: IOServerBase( pTheObject ),
pFreeThreadsMutex( PIPlatform_allocLocalMutex() ),
pFreeThreadsSemaphore( 0 ),
iNumBusyThreads( 0 ),
ppAllThreads( 0 ),
ppFreeThreads( 0 ),
iMaxThreads( 0 ),
iRequestsToLive( -1 )
{
if ( !bOK ) { return; /* Base class initialization failed */ };
bOK = 0; /* until initialization is complete */
if ( !pFreeThreadsMutex )
{
CONFIG_ERR( pObject, "MultiThreadedIOServer: Could not \
allocate mutex.");
return;
};
PIConfig *pConfigDB = PIObject_getConfigurationDB( pObject );
/* ---
Get exit after count
--- */
const char *pExitAfter = PIConfig_lookupValue( pConfigDB,
KEY_CONF_EXITAFTER, 0, 0 );
if ( pExitAfter )
{
iRequestsToLive = atoi( pExitAfter );
iRequestsToLive = iRequestsToLive > 0 ? iRequestsToLive : -1 ;
};
const char *pMaxThreads = PIConfig_lookupValue( pConfigDB,
KEY_CONF_MAXTHREADS, 0, 0 );
if ( !pMaxThreads )
{
CONFIG_ERR( pObject, \
"MultiThreadedIOServer: Missing definition for 'MaxThreads'");
return;
};
iMaxThreads = atoi( pMaxThreads );
if ( iMaxThreads<1 || iMaxThreads>1000 )
{
CONFIG_WARN( pObject, "MultiThreadedIOServer: Invalid number for 'MaxThreads', \
defaulting to 25");
iMaxThreads = 25;
return;
};
pFreeThreadsSemaphore = PIPlatform_allocLocalSemaphore( iMaxThreads,
iMaxThreads );
if ( !pFreeThreadsSemaphore )
{
CONFIG_ERR( pObject, "MultiThreadedIOServer: Could not allocate semaphore.");
return;
};
ppAllThreads = new ThreadRecord *[iMaxThreads];
ppFreeThreads = new ThreadRecord *[iMaxThreads];
if ( !ppFreeThreads || !ppAllThreads )
{ assert( 0 ); return; };
memset( ppAllThreads, 0, sizeof( ThreadRecord * ) * iMaxThreads );
memset( ppFreeThreads, 0, sizeof( ThreadRecord * ) * iMaxThreads );
int iGood = 1;
for( int i=0; iGood && i<iMaxThreads; i++ )
{
PIThread *pNew = PIThread_new( 0, 0 );
ThreadRecord *pRecord = new ThreadRecord( pNew );
ppFreeThreads[i] = pRecord;
ppAllThreads[i] = pRecord;
#if !THREAD_PER_REQUEST
pRecord->pServer = this;
if ( PIThread_begin(
pNew,
(PIThreadFn)MultiThreadedIOServer::StaticDispatchConnection,
(unsigned long)pRecord,
PITHREAD_PRIORITY_HIGH, 0 ) )
{
assert( 0 );
CONFIG_ERR( pObject, \
"MultiThreadedIOServer: Failed to prime thread, PIThread_begin() failed.");
iGood = 0;
};
#endif
};
if ( ppAllThreads && ppFreeThreads && iGood )
{ bOK = 1; };
};
virtual ~MultiThreadedIOServer()
{
for( int i=0; i<iMaxThreads; i++ )
{
if ( !ppAllThreads[i] ) continue;
PIThread *pThread = ppAllThreads[i]->pThread;
#if !THREAD_PER_REQUEST
/* --- signal thread to die --- */
ppAllThreads[i]->pServer = 0;
PISync_unlock( ppAllThreads[i]->pSema );
MUSTBETRUE( !PIThread_waitForThread( pThread ) );
#endif
MUSTBETRUE( !PIThread_delete( pThread ) );
delete ppAllThreads[i];
};
delete [] ppAllThreads;
delete [] ppFreeThreads;
MUSTBETRUE(
!pFreeThreadsMutex || !PISync_delete( pFreeThreadsMutex ) );
MUSTBETRUE(
!pFreeThreadsSemaphore || !PISync_delete( pFreeThreadsSemaphore ) );
};
virtual int Accept()
{
/* ---
Check is it time for this server to exit
--- */
#if POSIX
if ( iRequestsToLive )
{
if ( iRequestsToLive>0 )
{ iRequestsToLive--; };
}
else
{
/* ---
Time to go, gracefully exit.
--- */
while( iNumBusyThreads>0 )
{
MUSTBETRUE( !PISync_lock( pFreeThreadsSemaphore ) );
};
#if 1
/*
** Why even bother?
*/
/* ---
Allow all threads to die
--- */
for( int i=0; i<iMaxThreads; i++ )
{
/* --- signal thread to die --- */
#if THREAD_PER_REQUEST
ppAllThreads[i]->pServer = 0;
#endif
#if !THREAD_PER_REQUEST
PISync_unlock( ppAllThreads[i]->pSema );
#endif
};
#endif
::exit( 0 );
};
#endif
/* ---
Before attempting to accept a connection, make sure there
is a thread available to handle it
--- */
MUSTBETRUE( !PISync_lock( pFreeThreadsSemaphore ) );
/* ---
Accept a new child IO object
--- */
PIObject *pChildIO = PIObject_copy( GetPrototypeIOObject(), 0, 0 );
if ( !pChildIO )
{
/* --- consider this a critical error --- */
CONFIG_ERR( pObject, "MultiThreadedIOServer: PIObject_copy() \
failed for IO Object");
MUSTBETRUE( !PISync_unlock( pFreeThreadsSemaphore ) );
return PIAPI_ABORT;
};
return DoDispatchConnection( pChildIO );
};
};
/*____________________________________________________________________________*\
*
Function:
Synopsis:
Description:
\*____________________________________________________________________________*/
PUBLIC_PIAPI int MultiThreadedIOServer_onClassLoad( PIClass_LoadAction eLoad, void * )
{
switch( eLoad )
{
case STARTUP:
default:;
};
return PIAPI_COMPLETED;
}
/*____________________________________________________________________________*\
*
Function:
Synopsis:
Description:
\*____________________________________________________________________________*/
PUBLIC_PIAPI int MultiThreadedIOServer_constructor( PIObject *pObj, int, const char *[] )
{
MultiThreadedIOServer *pServer = new MultiThreadedIOServer( pObj );
if (!( pServer && pServer->IsOK() ))
{
delete pServer;
return PIAPI_ERROR;
};
if ( PIObject_setUserData( pObj, pServer ) )
{
CONFIG_ERR( pObj, "MultiThreadedIOServer_constructor: PIObject_setUserData() failed" );
return PIAPI_ERROR;
};
return PIAPI_COMPLETED;
}
/*____________________________________________________________________________*\
*
Function:
Synopsis:
Description:
\*____________________________________________________________________________*/
PUBLIC_PIAPI int MultiThreadedIOServer_copyConstructor( PIObject *, int,
const char *[] )
{
/* --- forbid copy construction --- */
return PIAPI_ERROR;
}
#if 0
/*___+++CNF_BEGIN+++___*/
<Class>
Name MultiThreadedIOServerClass
Type LogicExtension
Library Server
OnClassLoad MultiThreadedIOServer_onClassLoad
Constructor MultiThreadedIOServer_constructor
CopyConstructor MultiThreadedIOServer_copyConstructor
Destructor IOServerBase_destructor
Execute IOServerBase_execute
</Class>
<Object>
Name MultiThreadedIOServer
Class MultiThreadedIOServerClass
</Object>
/*___+++CNF_END+++___*/
#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -