📄 tlibthrd.cxx
字号:
/*
* tlibthrd.cxx
*
* Routines for pre-emptive threading system
*
* Portable Windows Library
*
* Copyright (c) 1993-1998 Equivalence Pty. Ltd.
*
* The contents of this file are subject to the Mozilla Public License
* Version 1.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* The Original Code is Portable Windows Library.
*
* The Initial Developer of the Original Code is Equivalence Pty. Ltd.
*
* Portions are Copyright (C) 1993 Free Software Foundation, Inc.
* All Rights Reserved.
*
* Contributor(s): ______________________________________.
*
* $Log: tlibthrd.cxx,v $
* Revision 1.31 1999/11/18 14:02:57 craigs
* Fixed problem with houskeeping thread termination
*
* Revision 1.30 1999/11/15 01:12:56 craigs
* Fixed problem with PSemaphore::Wait consuming 100% CPU
*
* Revision 1.29 1999/10/30 13:44:11 craigs
* Added correct method of aborting socket operations asynchronously
*
* Revision 1.28 1999/10/24 13:03:30 craigs
* Changed to capture io break signal
*
* Revision 1.27 1999/09/23 06:52:16 robertj
* Changed PSemaphore to use Posix semaphores.
*
* Revision 1.26 1999/09/03 02:26:25 robertj
* Changes to aid in breaking I/O locks on thread termination. Still needs more work esp in BSD!
*
* Revision 1.25 1999/09/02 11:56:35 robertj
* Fixed problem with destroying PMutex that is already locked.
*
* Revision 1.24 1999/08/24 13:40:56 craigs
* Fixed problem with condwait destorys failing on linux
*
* Revision 1.23 1999/08/23 05:33:45 robertj
* Made last threading changes Linux only.
*
* Revision 1.22 1999/08/23 05:14:13 robertj
* Removed blocking of interrupt signals as does not work in Linux threads.
*
* Revision 1.21 1999/07/30 00:40:32 robertj
* Fixed problem with signal variable in non-Linux platforms
*
* Revision 1.20 1999/07/19 01:32:24 craigs
* Changed signals used in pthreads code, is used by linux version.
*
* Revision 1.19 1999/07/15 13:10:55 craigs
* Fixed problem with EINTR in nontimed sempahore waits
*
* Revision 1.18 1999/07/15 13:05:33 robertj
* Fixed problem with getting EINTR in semaphore wait, is normal, not error.
*
* Revision 1.17 1999/07/11 13:42:13 craigs
* pthreads support for Linux
*
* Revision 1.16 1999/05/12 03:29:20 robertj
* Fixed problem with semaphore free, done at wrong time.
*
* Revision 1.15 1999/04/29 08:41:26 robertj
* Fixed problems with uninitialised mutexes in PProcess.
*
* Revision 1.14 1999/03/16 10:54:16 robertj
* Added parameterless version of WaitForTermination.
*
* Revision 1.13 1999/03/16 10:30:37 robertj
* Added missing PThread::WaitForTermination function.
*
* Revision 1.12 1999/01/12 12:09:51 robertj
* Removed redundent member variable, was in common.
* Fixed BSD threads compatibility.
*
* Revision 1.11 1999/01/11 12:05:56 robertj
* Fixed some more race conditions in threads.
*
* Revision 1.10 1999/01/11 03:42:26 robertj
* Fixed problem with destroying thread automatically.
*
* Revision 1.9 1999/01/09 03:37:28 robertj
* Fixed problem with closing thread waiting on semaphore.
* Improved efficiency of mutex to use pthread functions directly.
*
* Revision 1.8 1999/01/08 01:31:03 robertj
* Support for pthreads under FreeBSD
*
* Revision 1.7 1998/12/15 12:41:07 robertj
* Fixed signal handling so can now ^C a pthread version.
*
* Revision 1.6 1998/11/05 09:45:04 robertj
* Removed StartImmediate option in thread construction.
*
* Revision 1.5 1998/09/24 04:12:25 robertj
* Added open software license.
*
*/
#if !defined(P_VXWORKS)
#include <pthread.h>
#include <sys/resource.h>
#else
#include <taskLib.h>
#include <signal.h>
#endif
#ifdef P_LINUX
#define SUSPEND_SIG SIGALRM
#define RESUME_SIG SIGVTALRM
#else
#define SUSPEND_SIG SIGUSR1
#define RESUME_SIG SIGUSR2
#endif
#ifdef P_PTHREADS
#define P_IO_BREAK_SIGNAL SIGPROF
#endif
PDECLARE_CLASS(HouseKeepingThread, PThread)
public:
HouseKeepingThread()
: PThread(1000, NoAutoDeleteThread) { closing = FALSE; Resume(); }
void Main();
void SetClosing() { closing = TRUE; }
protected:
BOOL closing;
};
int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
// make sure we flush the buffer before doing a write
fd_set tmp_rfd, tmp_wfd, tmp_efd;
fd_set * read_fds = &tmp_rfd;
fd_set * write_fds = &tmp_wfd;
fd_set * exception_fds = &tmp_efd;
FD_ZERO (read_fds);
FD_ZERO (write_fds);
FD_ZERO (exception_fds);
switch (type) {
case PChannel::PXReadBlock:
case PChannel::PXAcceptBlock:
FD_SET (handle, read_fds);
break;
case PChannel::PXWriteBlock:
FD_SET (handle, write_fds);
break;
case PChannel::PXConnectBlock:
FD_SET (handle, write_fds);
FD_SET (handle, exception_fds);
break;
default:
PAssertAlways(PLogicError);
return 0;
}
struct timeval * tptr = NULL;
struct timeval timeout_val;
if (timeout != PMaxTimeInterval) {
static const PTimeInterval oneDay(0, 0, 0, 0, 1);
if (timeout < oneDay) {
timeout_val.tv_usec = (timeout.GetMilliSeconds() % 1000) * 1000;
timeout_val.tv_sec = timeout.GetSeconds();
tptr = &timeout_val;
}
}
// include the termination pipe into all blocking I/O functions
int width = handle+1;
FD_SET(termPipe[0], read_fds);
width = PMAX(width, termPipe[0]+1);
int retval = ::select(width, read_fds, write_fds, exception_fds, tptr);
PProcess::Current().PXCheckSignals();
if ((retval == 1) && FD_ISSET(termPipe[0], read_fds)) {
BYTE ch;
::read(termPipe[0], &ch, 1);
errno = EINTR;
retval = -1;
}
return retval;
}
static void sigSuspendHandler(int)
{
// wait for a resume signal
sigset_t waitSignals;
sigemptyset(&waitSignals);
sigaddset(&waitSignals, RESUME_SIG);
sigaddset(&waitSignals, SIGINT);
sigaddset(&waitSignals, SIGQUIT);
sigaddset(&waitSignals, SIGTERM);
for (;;) {
int sig;
#ifdef P_LINUX
sigwait(&waitSignals, &sig);
#else
sig = sigwait(&waitSignals);
#endif
PProcess::Current().PXCheckSignals();
if (sig == RESUME_SIG)
return;
}
}
void HouseKeepingThread::Main()
{
PProcess & process = PProcess::Current();
while (!closing) {
PTimeInterval waitTime = process.timers.Process();
if (waitTime == PMaxTimeInterval)
process.timerChangeSemaphore.Wait();
else
process.timerChangeSemaphore.Wait(waitTime);
}
}
void PProcess::Construct()
{
// make sure we don't get upset by resume signals
sigset_t blockedSignals;
sigemptyset(&blockedSignals);
sigaddset(&blockedSignals, RESUME_SIG);
PAssertOS(pthread_sigmask(SIG_BLOCK, &blockedSignals, NULL) == 0);
// set the file descriptor limit to something sensible
struct rlimit rl;
PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
rl.rlim_cur = rl.rlim_max;
PAssertOS(setrlimit(RLIMIT_NOFILE, &rl) == 0);
// initialise the housekeeping thread
housekeepingThread = NULL;
CommonConstruct();
}
PProcess::~PProcess()
{
if (housekeepingThread != NULL) {
((HouseKeepingThread *)housekeepingThread)->SetClosing();
SignalTimerChange();
housekeepingThread->WaitForTermination();
delete housekeepingThread;
}
CommonDestruct();
}
PThread::PThread()
{
// see InitialiseProcessThread()
}
void PThread::InitialiseProcessThread()
{
PX_origStackSize = 0;
autoDelete = FALSE;
PX_waitingSemaphore = NULL;
PX_threadId = pthread_self();
PX_suspendCount = 0;
::pipe(termPipe);
PAssertOS(pthread_mutex_init(&PX_WaitSemMutex, NULL) == 0);
PAssertOS(pthread_mutex_init(&PX_suspendMutex, NULL) == 0);
((PProcess *)this)->activeThreads.DisallowDeleteObjects();
((PProcess *)this)->activeThreads.SetAt((unsigned)PX_threadId, this);
}
PThread::PThread(PINDEX stackSize,
AutoDeleteFlag deletion,
Priority /*priorityLevel*/)
{
PAssert(stackSize > 0, PInvalidParameter);
PX_origStackSize = stackSize;
autoDelete = (deletion == AutoDeleteThread);
PX_waitingSemaphore = NULL;
pthread_mutex_init(&PX_WaitSemMutex, NULL);
PAssertOS(pthread_mutex_init(&PX_suspendMutex, NULL) == 0);
::pipe(termPipe);
// throw the new thread
PX_NewThread(TRUE);
}
PThread::~PThread()
{
if (!IsTerminated())
Terminate();
::close(termPipe[0]);
::close(termPipe[1]);
PAssertOS(pthread_mutex_destroy(&PX_WaitSemMutex) == 0);
PAssertOS(pthread_mutex_destroy(&PX_suspendMutex) == 0);
}
void PThread::PX_NewThread(BOOL startSuspended)
{
// initialise suspend counter and create mutex
PX_suspendCount = startSuspended ? 1 : 0;
// throw the thread
// pthread_attr_t threadAttr;
// pthread_attr_init(&threadAttr);
PAssertOS(pthread_create(&PX_threadId, NULL, PX_ThreadStart, this) == 0);
}
void * PThread::PX_ThreadStart(void * arg)
{
pthread_t threadId = pthread_self();
// self-detach
pthread_detach(threadId);
PThread * thread = (PThread *)arg;
PProcess & process = PProcess::Current();
// block RESUME_SIG
sigset_t blockedSignals;
sigemptyset(&blockedSignals);
sigaddset(&blockedSignals, RESUME_SIG);
//sigaddset(&blockedSignals, P_IO_BREAK_SIGNAL);
PAssertOS(pthread_sigmask(SIG_BLOCK, &blockedSignals, NULL) == 0);
// add thread to thread list
process.threadMutex.Wait();
process.activeThreads.SetAt((unsigned)threadId, thread);
process.threadMutex.Signal();
// make sure the cleanup routine is called when the thread exits
pthread_cleanup_push(PThread::PX_ThreadEnd, arg);
// if we are not supposed to start suspended, then don't wait
// if we are supposed to start suspended, then wait for a resume
PAssertOS(pthread_mutex_lock(&thread->PX_suspendMutex) == 0);
if (thread->PX_suspendCount == 0)
PAssertOS(pthread_mutex_unlock(&thread->PX_suspendMutex) == 0);
else {
PAssertOS(pthread_mutex_unlock(&thread->PX_suspendMutex) == 0);
sigset_t waitSignals;
sigemptyset(&waitSignals);
sigaddset(&waitSignals, RESUME_SIG);
#ifdef P_LINUX
int sig;
sigwait(&waitSignals, &sig);
#else
sigwait(&waitSignals);
#endif
}
// set the signal handler for SUSPEND_SIG
struct sigaction action;
memset(&action, 0, sizeof(action));
action.sa_handler = sigSuspendHandler;
sigaction(SUSPEND_SIG, &action, 0);
// now call the the thread main routine
thread->Main();
// execute the cleanup routine
pthread_cleanup_pop(1);
return NULL;
}
void PProcess::SignalTimerChange()
{
if (housekeepingThread == NULL)
housekeepingThread = PNEW HouseKeepingThread;
else
timerChangeSemaphore.Signal();
}
void PThread::PX_ThreadEnd(void * arg)
{
PThread * thread = (PThread *)arg;
PProcess & process = PProcess::Current();
// remove this thread from the thread list
process.threadMutex.Wait();
process.activeThreads.SetAt(thread->PX_GetThreadId(), NULL);
process.threadMutex.Signal();
thread->PX_threadId = 0; // Prevent terminating terminated thread
// delete the thread if required
if (thread->autoDelete)
delete thread;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -