📄 tlibthrd.cxx
字号:
* * Revision 1.13 1999/03/16 10:30:37 robertj * Added missing PThread::WaitForTermination function. * * Revision 1.12 1999/01/12 12:09:51 robertj * Removed redundent member variable, was in common. * Fixed BSD threads compatibility. * * Revision 1.11 1999/01/11 12:05:56 robertj * Fixed some more race conditions in threads. * * Revision 1.10 1999/01/11 03:42:26 robertj * Fixed problem with destroying thread automatically. * * Revision 1.9 1999/01/09 03:37:28 robertj * Fixed problem with closing thread waiting on semaphore. * Improved efficiency of mutex to use pthread functions directly. * * Revision 1.8 1999/01/08 01:31:03 robertj * Support for pthreads under FreeBSD * * Revision 1.7 1998/12/15 12:41:07 robertj * Fixed signal handling so can now ^C a pthread version. * * Revision 1.6 1998/11/05 09:45:04 robertj * Removed StartImmediate option in thread construction. * * Revision 1.5 1998/09/24 04:12:25 robertj * Added open software license. * */#include <ptlib/socket.h>#include <sched.h> // for sched_yield#include <pthread.h>#include <sys/resource.h>#ifdef P_RTEMS#define SUSPEND_SIG SIGALRM#include <sched.h>#else#define SUSPEND_SIG SIGVTALRM#endif#ifdef P_MACOSX#include <mach/mach.h>#include <mach/thread_policy.h>#include <sys/param.h>#include <sys/sysctl.h>// going to need the main thread for adjusting relative prioritystatic pthread_t baseThread;#endif#ifdef P_HAS_SEMAPHORES_XPG6#include "semaphore.h"#endifint PX_NewHandle(const char *, int);#define PAssertPTHREAD(func, args) \ { \ unsigned threadOpRetry = 0; \ while (PAssertThreadOp(func args, threadOpRetry, #func, __FILE__, __LINE__)); \ }static BOOL PAssertThreadOp(int retval, unsigned & retry, const char * funcname, const char * file, unsigned line){ if (retval == 0) { PTRACE_IF(2, retry > 0, "PWLib\t" << funcname << " required " << retry << " retries!"); return FALSE; } if (errno == EINTR || errno == EAGAIN) { if (++retry < 1000) {#if defined(P_RTEMS) sched_yield();#else usleep(10000); // Basically just swap out thread to try and clear blockage#endif return TRUE; // Return value to try again } // Give up and assert } PAssertFunc(file, line, NULL, psprintf("Function %s failed", funcname)); return FALSE;}PDECLARE_CLASS(PHouseKeepingThread, PThread) public: PHouseKeepingThread() : PThread(1000, NoAutoDeleteThread, NormalPriority, "Housekeeper") { closing = FALSE; Resume(); } void Main(); void SetClosing() { closing = TRUE; } protected: BOOL closing;};static pthread_mutex_t MutexInitialiser = PTHREAD_MUTEX_INITIALIZER;static pthread_cond_t CondInitialiser = PTHREAD_COND_INITIALIZER;#define new PNEWvoid PHouseKeepingThread::Main(){ PProcess & process = PProcess::Current(); while (!closing) { PTimeInterval delay = process.timers.Process(); int fd = process.timerChangePipe[0]; P_fd_set read_fds = fd; P_timeval tval = delay; if (::select(fd+1, read_fds, NULL, NULL, tval) == 1) { BYTE ch; ::read(fd, &ch, 1); } process.PXCheckSignals(); }}void PProcess::SignalTimerChange(){ if (housekeepingThread == NULL) {#if PMEMORY_CHECK BOOL oldIgnoreAllocations = PMemoryHeap::SetIgnoreAllocations(TRUE);#endif housekeepingThread = new PHouseKeepingThread;#if PMEMORY_CHECK PMemoryHeap::SetIgnoreAllocations(oldIgnoreAllocations);#endif } BYTE ch; write(timerChangePipe[1], &ch, 1);}void PProcess::Construct(){#ifndef P_RTEMS // get the file descriptor limit struct rlimit rl; PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0); maxHandles = rl.rlim_cur; PTRACE(4, "PWLib\tMaximum per-process file handles is " << maxHandles); ::pipe(timerChangePipe);#else maxHandles = 500; // arbitrary value socketpair(AF_INET,SOCK_STREAM,0,timerChangePipe);#endif // initialise the housekeeping thread housekeepingThread = NULL;#ifdef P_MACOSX // records the main thread for priority adjusting baseThread = pthread_self();#endif CommonConstruct();}BOOL PProcess::SetMaxHandles(int newMax){#ifndef P_RTEMS // get the current process limit struct rlimit rl; PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0); // set the new current limit rl.rlim_cur = newMax; if (setrlimit(RLIMIT_NOFILE, &rl) == 0) { PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0); maxHandles = rl.rlim_cur; if (maxHandles == newMax) { PTRACE(2, "PWLib\tNew maximum per-process file handles set to " << maxHandles); return TRUE; } }#endif // !P_RTEMS PTRACE(1, "PWLib\tCannot set per-process file handle limit to " << newMax << " (is " << maxHandles << ") - check permissions"); return FALSE;}PProcess::~PProcess(){ PreShutdown(); // Don't wait for housekeeper to stop if Terminate() is called from it. if (housekeepingThread != NULL && PThread::Current() != housekeepingThread) { housekeepingThread->SetClosing(); SignalTimerChange(); housekeepingThread->WaitForTermination(); delete housekeepingThread; } CommonDestruct(); PTRACE(5, "PWLib\tDestroyed process " << this);}//////////////////////////////////////////////////////////////////////////////PThread::PThread(){ // see InitialiseProcessThread()}void PThread::InitialiseProcessThread(){ autoDelete = FALSE; PX_origStackSize = 0; PX_threadId = pthread_self(); PX_priority = NormalPriority; PX_suspendCount = 0;#ifndef P_HAS_SEMAPHORES PX_waitingSemaphore = NULL; PX_WaitSemMutex = MutexInitialiser;#endif PX_suspendMutex = MutexInitialiser;#ifdef P_RTEMS PAssertOS(socketpair(AF_INET,SOCK_STREAM,0,unblockPipe) == 0);#else PAssertOS(::pipe(unblockPipe) == 0);#endif ((PProcess *)this)->activeThreads.DisallowDeleteObjects(); ((PProcess *)this)->activeThreads.SetAt((unsigned)PX_threadId, this); traceBlockIndentLevel = 0;}PThread::PThread(PINDEX stackSize, AutoDeleteFlag deletion, Priority priorityLevel, const PString & name) : threadName(name){ autoDelete = (deletion == AutoDeleteThread); PAssert(stackSize > 0, PInvalidParameter); PX_origStackSize = stackSize; PX_threadId = 0; PX_priority = priorityLevel; PX_suspendCount = 1;#ifndef P_HAS_SEMAPHORES PX_waitingSemaphore = NULL; PX_WaitSemMutex = MutexInitialiser;#endif PX_suspendMutex = MutexInitialiser;#ifdef P_RTEMS PAssertOS(socketpair(AF_INET,SOCK_STREAM,0,unblockPipe) == 0);#else PAssertOS(::pipe(unblockPipe) == 0);#endif PX_NewHandle("Thread unblock pipe", PMAX(unblockPipe[0], unblockPipe[1])); // new thread is actually started the first time Resume() is called. PX_firstTimeStart = TRUE; traceBlockIndentLevel = 0; PTRACE(5, "PWLib\tCreated thread " << this << ' ' << threadName);}PThread::~PThread(){ if (PX_threadId != 0 && PX_threadId != pthread_self()) Terminate(); PAssertPTHREAD(::close, (unblockPipe[0])); PAssertPTHREAD(::close, (unblockPipe[1]));#ifndef P_HAS_SEMAPHORES pthread_mutex_destroy(&PX_WaitSemMutex);#endif#ifdef P_NETBSD // If the mutex was not locked, the unlock will fail */ pthread_mutex_trylock(&PX_suspendMutex);#endif pthread_mutex_unlock(&PX_suspendMutex); pthread_mutex_destroy(&PX_suspendMutex); if (this != &PProcess::Current()) PTRACE(5, "PWLib\tDestroyed thread " << this << ' ' << threadName);}void PThread::Restart(){ if (!IsTerminated()) return; pthread_attr_t threadAttr; pthread_attr_init(&threadAttr); pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_DETACHED);#if defined(P_LINUX) // Set a decent (256K) stack size that won't eat all virtual memory pthread_attr_setstacksize(&threadAttr, 16*PTHREAD_STACK_MIN); /* Set realtime scheduling if our effective user id is root (only then is this allowed) AND our priority is Highest. As far as I can see, we could use either SCHED_FIFO or SCHED_RR here, it doesn't matter. I don't know if other UNIX OSs have SCHED_FIFO and SCHED_RR as well. WARNING: a misbehaving thread (one that never blocks) started with Highest priority can hang the entire machine. That is why root permission is neccessary. */ if ((geteuid() == 0) && (PX_priority == HighestPriority)) PAssertPTHREAD(pthread_attr_setschedpolicy, (&threadAttr, SCHED_FIFO));#elif defined(P_RTEMS) pthread_attr_setstacksize(&threadAttr, 2*PTHREAD_MINIMUM_STACK_SIZE); pthread_attr_setinheritsched(&threadAttr, PTHREAD_EXPLICIT_SCHED); pthread_attr_setschedpolicy(&threadAttr, SCHED_OTHER); struct sched_param sched_param; sched_param.sched_priority = 125; /* set medium priority */ pthread_attr_setschedparam(&threadAttr, &sched_param);#endif PAssertPTHREAD(pthread_create, (&PX_threadId, &threadAttr, PX_ThreadStart, this));#ifdef P_MACOSX if (PX_priority == HighestPriority) { PTRACE(1, "set thread to have the highest priority (MACOSX)"); SetPriority(HighestPriority); }#endif}void PX_SuspendSignalHandler(int){ PThread * thread = PThread::Current(); if (thread == NULL) return; BOOL notResumed = TRUE; while (notResumed) { BYTE ch; notResumed = ::read(thread->unblockPipe[0], &ch, 1) < 0 && errno == EINTR;#if !( defined(P_NETBSD) && defined(P_NO_CANCEL) ) pthread_testcancel();#endif }}void PThread::Suspend(BOOL susp){ PAssertPTHREAD(pthread_mutex_lock, (&PX_suspendMutex)); // Check for start up condition, first time Resume() is called if (PX_firstTimeStart) { if (susp) PX_suspendCount++; else { if (PX_suspendCount > 0) PX_suspendCount--; if (PX_suspendCount == 0) { PX_firstTimeStart = FALSE; Restart(); } } PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex)); return; }#if defined(P_MACOSX) && (P_MACOSX <= 55) // Suspend - warn the user with an Assertion PAssertAlways("Cannot suspend threads on Mac OS X due to lack of pthread_kill()");#else if (pthread_kill(PX_threadId, 0) == 0) { // if suspending, then see if already suspended if (susp) { PX_suspendCount++; if (PX_suspendCount == 1) { if (PX_threadId != pthread_self()) { signal(SUSPEND_SIG, PX_SuspendSignalHandler); pthread_kill(PX_threadId, SUSPEND_SIG); } else { PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex)); PX_SuspendSignalHandler(SUSPEND_SIG); return; // Mutex already unlocked } } } // if resuming, then see if to really resume else if (PX_suspendCount > 0) { PX_suspendCount--; if (PX_suspendCount == 0) PXAbortBlock(); } } PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));#endif // P_MACOSX}void PThread::Resume(){ Suspend(FALSE);}BOOL PThread::IsSuspended() const{ if (PX_firstTimeStart) return TRUE; if (IsTerminated()) return FALSE; PAssertPTHREAD(pthread_mutex_lock, ((pthread_mutex_t *)&PX_suspendMutex)); BOOL suspended = PX_suspendCount != 0; PAssertPTHREAD(pthread_mutex_unlock, ((pthread_mutex_t *)&PX_suspendMutex)); return suspended;}void PThread::SetAutoDelete(AutoDeleteFlag deletion)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -