📄 tlibthrd.cxx
字号:
* Revision 1.24 1999/08/24 13:40:56 craigs
* Fixed problem with condwait destorys failing on linux
*
* Revision 1.23 1999/08/23 05:33:45 robertj
* Made last threading changes Linux only.
*
* Revision 1.22 1999/08/23 05:14:13 robertj
* Removed blocking of interrupt signals as does not work in Linux threads.
*
* Revision 1.21 1999/07/30 00:40:32 robertj
* Fixed problem with signal variable in non-Linux platforms
*
* Revision 1.20 1999/07/19 01:32:24 craigs
* Changed signals used in pthreads code, is used by linux version.
*
* Revision 1.19 1999/07/15 13:10:55 craigs
* Fixed problem with EINTR in nontimed sempahore waits
*
* Revision 1.18 1999/07/15 13:05:33 robertj
* Fixed problem with getting EINTR in semaphore wait, is normal, not error.
*
* Revision 1.17 1999/07/11 13:42:13 craigs
* pthreads support for Linux
*
* Revision 1.16 1999/05/12 03:29:20 robertj
* Fixed problem with semaphore free, done at wrong time.
*
* Revision 1.15 1999/04/29 08:41:26 robertj
* Fixed problems with uninitialised mutexes in PProcess.
*
* Revision 1.14 1999/03/16 10:54:16 robertj
* Added parameterless version of WaitForTermination.
*
* Revision 1.13 1999/03/16 10:30:37 robertj
* Added missing PThread::WaitForTermination function.
*
* Revision 1.12 1999/01/12 12:09:51 robertj
* Removed redundent member variable, was in common.
* Fixed BSD threads compatibility.
*
* Revision 1.11 1999/01/11 12:05:56 robertj
* Fixed some more race conditions in threads.
*
* Revision 1.10 1999/01/11 03:42:26 robertj
* Fixed problem with destroying thread automatically.
*
* Revision 1.9 1999/01/09 03:37:28 robertj
* Fixed problem with closing thread waiting on semaphore.
* Improved efficiency of mutex to use pthread functions directly.
*
* Revision 1.8 1999/01/08 01:31:03 robertj
* Support for pthreads under FreeBSD
*
* Revision 1.7 1998/12/15 12:41:07 robertj
* Fixed signal handling so can now ^C a pthread version.
*
* Revision 1.6 1998/11/05 09:45:04 robertj
* Removed StartImmediate option in thread construction.
*
* Revision 1.5 1998/09/24 04:12:25 robertj
* Added open software license.
*
*/
#include <ptlib/socket.h>
#include <sched.h> // for sched_yield
#include <pthread.h>
#include <sys/resource.h>
#ifdef P_RTEMS
#define SUSPEND_SIG SIGALRM
#include <sched.h>
#else
#define SUSPEND_SIG SIGVTALRM
#endif
#ifdef P_MACOSX
#include <mach/mach.h>
#include <mach/thread_policy.h>
#include <sys/param.h>
#include <sys/sysctl.h>
// going to need the main thread for adjusting relative priority
static pthread_t baseThread;
#endif
#ifdef P_HAS_SEMAPHORES_XPG6
#include "semaphore.h"
#endif
int PX_NewHandle(const char *, int);
#define PPThreadKill(id, sig) PProcess::Current().PThreadKill(id, sig)
#define PAssertPTHREAD(func, args) \
{ \
unsigned threadOpRetry = 0; \
while (PAssertThreadOp(func args, threadOpRetry, #func, __FILE__, __LINE__)); \
}
static BOOL PAssertThreadOp(int retval,
unsigned & retry,
const char * funcname,
const char * file,
unsigned line)
{
if (retval == 0) {
PTRACE_IF(2, retry > 0, "PWLib\t" << funcname << " required " << retry << " retries!");
return FALSE;
}
if (errno == EINTR || errno == EAGAIN) {
if (++retry < 1000) {
#if defined(P_RTEMS)
sched_yield();
#else
usleep(10000); // Basically just swap out thread to try and clear blockage
#endif
return TRUE; // Return value to try again
}
// Give up and assert
}
PAssertFunc(file, line, NULL, psprintf("Function %s failed", funcname));
return FALSE;
}
PDECLARE_CLASS(PHouseKeepingThread, PThread)
public:
PHouseKeepingThread()
: PThread(1000, NoAutoDeleteThread, NormalPriority, "Housekeeper")
{ closing = FALSE; Resume(); }
void Main();
void SetClosing() { closing = TRUE; }
protected:
BOOL closing;
};
static pthread_mutex_t MutexInitialiser = PTHREAD_MUTEX_INITIALIZER;
#define new PNEW
void PHouseKeepingThread::Main()
{
PProcess & process = PProcess::Current();
while (!closing) {
PTimeInterval delay = process.timers.Process();
int fd = process.timerChangePipe[0];
P_fd_set read_fds = fd;
P_timeval tval = delay;
if (::select(fd+1, read_fds, NULL, NULL, tval) == 1) {
BYTE ch;
::read(fd, &ch, 1);
}
process.PXCheckSignals();
}
}
#ifndef _DEBUG
#undef PMEMORY_CHECK
#endif
void PProcess::SignalTimerChange()
{
if (housekeepingThread == NULL) {
#if PMEMORY_CHECK
BOOL oldIgnoreAllocations = PMemoryHeap::SetIgnoreAllocations(TRUE);
#endif
housekeepingThread = new PHouseKeepingThread;
#if PMEMORY_CHECK
PMemoryHeap::SetIgnoreAllocations(oldIgnoreAllocations);
#endif
}
static BYTE ch = 0;
write(timerChangePipe[1], &ch, 1);
}
void PProcess::Construct()
{
#ifndef P_RTEMS
// get the file descriptor limit
struct rlimit rl;
PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
maxHandles = rl.rlim_cur;
PTRACE(4, "PWLib\tMaximum per-process file handles is " << maxHandles);
::pipe(timerChangePipe);
#else
maxHandles = 500; // arbitrary value
socketpair(AF_INET,SOCK_STREAM,0,timerChangePipe);
#endif
// initialise the housekeeping thread
housekeepingThread = NULL;
#ifdef P_MACOSX
// records the main thread for priority adjusting
baseThread = pthread_self();
#endif
CommonConstruct();
}
BOOL PProcess::SetMaxHandles(int newMax)
{
#ifndef P_RTEMS
// get the current process limit
struct rlimit rl;
PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
// set the new current limit
rl.rlim_cur = newMax;
if (setrlimit(RLIMIT_NOFILE, &rl) == 0) {
PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
maxHandles = rl.rlim_cur;
if (maxHandles == newMax) {
PTRACE(2, "PWLib\tNew maximum per-process file handles set to " << maxHandles);
return TRUE;
}
}
#endif // !P_RTEMS
PTRACE(1, "PWLib\tCannot set per-process file handle limit to "
<< newMax << " (is " << maxHandles << ") - check permissions");
return FALSE;
}
PProcess::~PProcess()
{
PreShutdown();
// Don't wait for housekeeper to stop if Terminate() is called from it.
if (housekeepingThread != NULL && PThread::Current() != housekeepingThread) {
housekeepingThread->SetClosing();
SignalTimerChange();
housekeepingThread->WaitForTermination();
delete housekeepingThread;
}
CommonDestruct();
PTRACE(5, "PWLib\tDestroyed process " << this);
}
BOOL PProcess::PThreadKill(pthread_t id, unsigned sig)
{
PWaitAndSignal m(threadMutex);
if (!activeThreads.Contains((unsigned)id))
return FALSE;
return pthread_kill(id, sig) == 0;
}
//////////////////////////////////////////////////////////////////////////////
PThread::PThread()
{
// see InitialiseProcessThread()
}
void PThread::InitialiseProcessThread()
{
autoDelete = FALSE;
PX_origStackSize = 0;
PX_threadId = pthread_self();
PX_priority = NormalPriority;
PX_suspendCount = 0;
#ifndef P_HAS_SEMAPHORES
PX_waitingSemaphore = NULL;
PX_WaitSemMutex = MutexInitialiser;
#endif
PX_suspendMutex = MutexInitialiser;
#ifdef P_RTEMS
PAssertOS(socketpair(AF_INET,SOCK_STREAM,0,unblockPipe) == 0);
#else
PAssertOS(::pipe(unblockPipe) == 0);
#endif
((PProcess *)this)->activeThreads.DisallowDeleteObjects();
((PProcess *)this)->activeThreads.SetAt((unsigned)PX_threadId, this);
PX_firstTimeStart = FALSE;
traceBlockIndentLevel = 0;
}
PThread::PThread(PINDEX stackSize,
AutoDeleteFlag deletion,
Priority priorityLevel,
const PString & name)
: threadName(name)
{
autoDelete = (deletion == AutoDeleteThread);
PAssert(stackSize > 0, PInvalidParameter);
PX_origStackSize = stackSize;
PX_threadId = 0;
PX_priority = priorityLevel;
PX_suspendCount = 1;
#ifndef P_HAS_SEMAPHORES
PX_waitingSemaphore = NULL;
PX_WaitSemMutex = MutexInitialiser;
#endif
PX_suspendMutex = MutexInitialiser;
#ifdef P_RTEMS
PAssertOS(socketpair(AF_INET,SOCK_STREAM,0,unblockPipe) == 0);
#else
PAssertOS(::pipe(unblockPipe) == 0);
#endif
PX_NewHandle("Thread unblock pipe", PMAX(unblockPipe[0], unblockPipe[1]));
// new thread is actually started the first time Resume() is called.
PX_firstTimeStart = TRUE;
traceBlockIndentLevel = 0;
PTRACE(5, "PWLib\tCreated thread " << this << ' ' << threadName);
}
PThread::~PThread()
{
if (PX_threadId != 0 && PX_threadId != pthread_self())
Terminate();
PAssertPTHREAD(::close, (unblockPipe[0]));
PAssertPTHREAD(::close, (unblockPipe[1]));
#ifndef P_HAS_SEMAPHORES
pthread_mutex_destroy(&PX_WaitSemMutex);
#endif
// If the mutex was not locked, the unlock will fail */
pthread_mutex_trylock(&PX_suspendMutex);
pthread_mutex_unlock(&PX_suspendMutex);
pthread_mutex_destroy(&PX_suspendMutex);
if (this != &PProcess::Current())
PTRACE(5, "PWLib\tDestroyed thread " << this << ' ' << threadName);
}
void PThread::Restart()
{
if (!IsTerminated())
return;
pthread_attr_t threadAttr;
pthread_attr_init(&threadAttr);
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_DETACHED);
#if defined(P_LINUX)
// Set a decent (256K) stack size that won't eat all virtual memory
pthread_attr_setstacksize(&threadAttr, 16*PTHREAD_STACK_MIN);
/*
Set realtime scheduling if our effective user id is root (only then is this
allowed) AND our priority is Highest.
As far as I can see, we could use either SCHED_FIFO or SCHED_RR here, it
doesn't matter.
I don't know if other UNIX OSs have SCHED_FIFO and SCHED_RR as well.
WARNING: a misbehaving thread (one that never blocks) started with Highest
priority can hang the entire machine. That is why root permission is
neccessary.
*/
if ((geteuid() == 0) && (PX_priority == HighestPriority))
PAssertPTHREAD(pthread_attr_setschedpolicy, (&threadAttr, SCHED_FIFO));
#elif defined(P_RTEMS)
pthread_attr_setstacksize(&threadAttr, 2*PTHREAD_MINIMUM_STACK_SIZE);
pthread_attr_setinheritsched(&threadAttr, PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy(&threadAttr, SCHED_OTHER);
struct sched_param sched_param;
sched_param.sched_priority = 125; /* set medium priority */
pthread_attr_setschedparam(&threadAttr, &sched_param);
#endif
PProcess & process = PProcess::Current();
PINDEX newHighWaterMark = 0;
static PINDEX highWaterMark = 0;
// lock the thread list
process.threadMutex.Wait();
// create the thread
PAssertPTHREAD(pthread_create, (&PX_threadId, &threadAttr, PX_ThreadStart, this));
// put the thread into the thread list
process.activeThreads.SetAt((unsigned)PX_threadId, this);
if (process.activeThreads.GetSize() > highWaterMark)
newHighWaterMark = highWaterMark = process.activeThreads.GetSize();
// unlock the thread list
process.threadMutex.Signal();
PTRACE_IF(4, newHighWaterMark > 0, "PWLib\tThread high water mark set: " << newHighWaterMark);
#ifdef P_MACOSX
if (PX_priority == HighestPriority) {
PTRACE(1, "set thread to have the highest priority (MACOSX)");
SetPriority(HighestPriority);
}
#endif
}
void PX_SuspendSignalHandler(int)
{
PThread * thread = PThread::Current();
if (thread == NULL)
return;
BOOL notResumed = TRUE;
while (notResumed) {
BYTE ch;
notResumed = ::read(thread->unblockPipe[0], &ch, 1) < 0 && errno == EINTR;
#if !( defined(P_NETBSD) && defined(P_NO_CANCEL) )
pthread_testcancel();
#endif
}
}
void PThread::Suspend(BOOL susp)
{
PAssertPTHREAD(pthread_mutex_lock, (&PX_suspendMutex));
// Check for start up condition, first time Resume() is called
if (PX_firstTimeStart) {
if (susp)
PX_suspendCount++;
else {
if (PX_suspendCount > 0)
PX_suspendCount--;
if (PX_suspendCount == 0) {
PX_firstTimeStart = FALSE;
Restart();
}
}
PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));
return;
}
#if defined(P_MACOSX) && (P_MACOSX <= 55)
// Suspend - warn the user with an Assertion
PAssertAlways("Cannot suspend threads on Mac OS X due to lack of pthread_kill()");
#else
if (PPThreadKill(PX_threadId, 0)) {
// if suspending, then see if already suspended
if (susp) {
PX_suspendCount++;
if (PX_suspendCount == 1) {
if (PX_threadId != pthread_self()) {
signal(SUSPEND_SIG, PX_SuspendSignalHandler);
PPThreadKill(PX_threadId, SUSPEND_SIG);
}
else {
PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));
PX_SuspendSignalHandler(SUSPEND_SIG);
return; // Mutex already unlocked
}
}
}
// if resuming, then see if to really resume
else if (PX_suspendCount > 0) {
PX_suspendCount--;
if (PX_suspendCount == 0)
PXAbortBlock();
}
}
PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));
#endif // P_MACOSX
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -