📄 tlibthrd.cxx
字号:
#ifdef P_MACOSX
// obtain thread priority of the main thread
static unsigned long
GetThreadBasePriority ()
{
thread_basic_info_data_t threadInfo;
policy_info_data_t thePolicyInfo;
unsigned int count;
if (baseThread == 0) {
return 0;
}
// get basic info
count = THREAD_BASIC_INFO_COUNT;
thread_info (pthread_mach_thread_np (baseThread), THREAD_BASIC_INFO,
(integer_t*)&threadInfo, &count);
switch (threadInfo.policy) {
case POLICY_TIMESHARE:
count = POLICY_TIMESHARE_INFO_COUNT;
thread_info(pthread_mach_thread_np (baseThread),
THREAD_SCHED_TIMESHARE_INFO,
(integer_t*)&(thePolicyInfo.ts), &count);
return thePolicyInfo.ts.base_priority;
case POLICY_FIFO:
count = POLICY_FIFO_INFO_COUNT;
thread_info(pthread_mach_thread_np (baseThread),
THREAD_SCHED_FIFO_INFO,
(integer_t*)&(thePolicyInfo.fifo), &count);
if (thePolicyInfo.fifo.depressed)
return thePolicyInfo.fifo.depress_priority;
return thePolicyInfo.fifo.base_priority;
case POLICY_RR:
count = POLICY_RR_INFO_COUNT;
thread_info(pthread_mach_thread_np (baseThread),
THREAD_SCHED_RR_INFO,
(integer_t*)&(thePolicyInfo.rr), &count);
if (thePolicyInfo.rr.depressed)
return thePolicyInfo.rr.depress_priority;
return thePolicyInfo.rr.base_priority;
}
return 0;
}
#endif
void PThread::SetPriority(Priority priorityLevel)
{
PX_priority = priorityLevel;
#if defined(P_LINUX)
if (IsTerminated())
return;
struct sched_param sched_param;
if ((priorityLevel == HighestPriority) && (geteuid() == 0) ) {
sched_param.sched_priority = sched_get_priority_min( SCHED_FIFO );
PAssertPTHREAD(pthread_setschedparam, (PX_threadId, SCHED_FIFO, &sched_param));
}
else if (priorityLevel != HighestPriority) {
/* priority 0 is the only permitted value for the SCHED_OTHER scheduler */
sched_param.sched_priority = 0;
PAssertPTHREAD(pthread_setschedparam, (PX_threadId, SCHED_OTHER, &sched_param));
}
#endif
#if defined(P_MACOSX)
if (IsTerminated())
return;
if (priorityLevel == HighestPriority) {
/* get fixed priority */
{
int result;
thread_extended_policy_data_t theFixedPolicy;
thread_precedence_policy_data_t thePrecedencePolicy;
long relativePriority;
theFixedPolicy.timeshare = false; // set to true for a non-fixed thread
result = thread_policy_set (pthread_mach_thread_np(PX_threadId),
THREAD_EXTENDED_POLICY,
(thread_policy_t)&theFixedPolicy,
THREAD_EXTENDED_POLICY_COUNT);
if (result != KERN_SUCCESS) {
PTRACE(1, "thread_policy - Couldn't set thread as fixed priority.");
}
// set priority
// precedency policy's "importance" value is relative to
// spawning thread's priority
relativePriority = 62 - GetThreadBasePriority();
PTRACE(1, "relativePriority is " << relativePriority << " base priority is " << GetThreadBasePriority());
thePrecedencePolicy.importance = relativePriority;
result = thread_policy_set (pthread_mach_thread_np(PX_threadId),
THREAD_PRECEDENCE_POLICY,
(thread_policy_t)&thePrecedencePolicy,
THREAD_PRECEDENCE_POLICY_COUNT);
if (result != KERN_SUCCESS) {
PTRACE(1, "thread_policy - Couldn't set thread priority.");
}
}
}
#endif
}
PThread::Priority PThread::GetPriority() const
{
#if defined(LINUX)
int schedulingPolicy;
struct sched_param schedParams;
PAssertPTHREAD(pthread_getschedparam, (PX_threadId, &schedulingPolicy, &schedParams));
switch( schedulingPolicy )
{
case SCHED_OTHER:
break;
case SCHED_FIFO:
case SCHED_RR:
return HighestPriority;
default:
/* Unknown scheduler. We don't know what priority this thread has. */
PTRACE(1, "PWLib\tPThread::GetPriority: unknown scheduling policy #" << schedulingPolicy);
}
#endif
return NormalPriority; /* as good a guess as any */
}
#ifndef P_HAS_SEMAPHORES
void PThread::PXSetWaitingSemaphore(PSemaphore * sem)
{
PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex));
PX_waitingSemaphore = sem;
PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));
}
#endif
#ifdef P_GNU_PTH
// GNU PTH threads version (used by NetBSD)
// Taken from NetBSD pkg patches
void PThread::Sleep(const PTimeInterval & timeout)
{
PTime lastTime;
PTime targetTime = PTime() + timeout;
sched_yield();
lastTime = PTime();
while (lastTime < targetTime) {
P_timeval tval = targetTime - lastTime;
if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR)
break;
pthread_testcancel();
lastTime = PTime();
}
}
#else
// Normal Posix threads version
void PThread::Sleep(const PTimeInterval & timeout)
{
PTime lastTime;
PTime targetTime = lastTime + timeout;
do {
P_timeval tval = targetTime - lastTime;
if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR)
break;
#if !( defined(P_NETBSD) && defined(P_NO_CANCEL) )
pthread_testcancel();
#endif
lastTime = PTime();
} while (lastTime < targetTime);
}
#endif
void PThread::Yield()
{
sched_yield();
}
PThread * PThread::Current()
{
PProcess & process = PProcess::Current();
process.threadMutex.Wait();
PThread * thread = process.activeThreads.GetAt((unsigned)pthread_self());
process.threadMutex.Signal();
return thread;
}
void PThread::Terminate()
{
if (PX_origStackSize <= 0)
return;
// don't use PThread::Current, as the thread may already not be in the
// active threads list
if (PX_threadId == pthread_self()) {
pthread_exit(0);
return;
}
if (IsTerminated())
return;
PTRACE(2, "PWLib\tForcing termination of thread " << (void *)this);
PXAbortBlock();
WaitForTermination(20);
#if !defined(P_HAS_SEMAPHORES) && !defined(P_HAS_NAMED_SEMAPHORES)
PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex));
if (PX_waitingSemaphore != NULL) {
PAssertPTHREAD(pthread_mutex_lock, (&PX_waitingSemaphore->mutex));
PX_waitingSemaphore->queuedLocks--;
PAssertPTHREAD(pthread_mutex_unlock, (&PX_waitingSemaphore->mutex));
PX_waitingSemaphore = NULL;
}
PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));
#endif
#if ( defined(P_NETBSD) && defined(P_NO_CANCEL) )
PPThreadKill(PX_threadId,SIGKILL);
#else
if (PX_threadId) {
pthread_cancel(PX_threadId);
}
#endif
}
PBoolean PThread::IsTerminated() const
{
pthread_t id = PX_threadId;
return (id == 0) || !PPThreadKill(id, 0);
}
void PThread::WaitForTermination() const
{
if (this == Current()) {
PTRACE(2, "WaitForTermination short circuited");
return;
}
PXAbortBlock(); // this assist in clean shutdowns on some systems
while (!IsTerminated()) {
Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100%
// CPU usage and also yeilds so other threads can run.
}
}
PBoolean PThread::WaitForTermination(const PTimeInterval & maxWait) const
{
if (this == Current()) {
PTRACE(2, "WaitForTermination(t) short circuited");
return PTrue;
}
PTRACE(6, "PWLib\tWaitForTermination(" << maxWait << ')');
PXAbortBlock(); // this assist in clean shutdowns on some systems
PTimer timeout = maxWait;
while (!IsTerminated()) {
if (timeout == 0)
return PFalse;
Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100%
// CPU usage and also yeilds so other threads can run.
}
return PTrue;
}
void * PThread::PX_ThreadStart(void * arg)
{
PThread * thread = (PThread *)arg;
//don't need to detach the the thread, it was created in the PTHREAD_CREATE_DETACHED state
// Added this to guarantee that the thread creation (PThread::Restart)
// has completed before we start the thread. Then the PX_threadId has
// been set.
pthread_mutex_lock(&thread->PX_suspendMutex);
thread->SetThreadName(thread->GetThreadName());
pthread_mutex_unlock(&thread->PX_suspendMutex);
// make sure the cleanup routine is called when the thread exits
pthread_cleanup_push(&PThread::PX_ThreadEnd, arg);
PTRACE(5, "PWLib\tStarted thread " << thread << ' ' << thread->threadName);
PProcess::Current().OnThreadStart(*thread);
// now call the the thread main routine
thread->Main();
// execute the cleanup routine
pthread_cleanup_pop(1);
return NULL;
}
void PThread::PX_ThreadEnd(void * arg)
{
PThread * thread = (PThread *)arg;
PProcess & process = PProcess::Current();
process.OnThreadEnded(*thread);
process.threadMutex.Wait();
pthread_t id = thread->GetThreadId();
if (id == 0) {
// Don't know why, but pthreads under Linux at least can call this function
// multiple times! Probably a bug, but we have to allow for it.
process.threadMutex.Signal();
PTRACE(2, "PWLib\tAttempted to multiply end thread " << thread << " ThreadID=" << (void *)id);
return;
}
// remove this thread from the active thread list
process.activeThreads.SetAt((unsigned)id, NULL);
// delete the thread if required, note this is done this way to avoid
// a race condition, the thread ID cannot be zeroed before the if!
if (thread->autoDelete) {
thread->PX_threadId = 0; // Prevent terminating terminated thread
process.threadMutex.Signal();
PTRACE(5, "PWLib\tEnded thread " << thread << ' ' << thread->threadName);
/* It is now safe to delete this thread. Note that this thread
is deleted after the process.threadMutex.Signal(), which means
PWaitAndSignal(process.threadMutex) could not be used */
delete thread;
}
else {
thread->PX_threadId = 0;
PString threadName = thread->threadName;
process.threadMutex.Signal();
PTRACE(5, "PWLib\tEnded thread " << thread << ' ' << threadName);
}
}
int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
PTRACE(7, "PWLib\tPThread::PXBlockOnIO(" << handle << ',' << type << ')');
if ((handle < 0) || (handle >= PProcess::Current().GetMaxHandles())) {
PTRACE(2, "PWLib\tAttempt to use illegal handle in PThread::PXBlockOnIO, handle=" << handle);
errno = EBADF;
return -1;
}
// make sure we flush the buffer before doing a write
P_fd_set read_fds;
P_fd_set write_fds;
P_fd_set exception_fds;
int retval;
do {
switch (type) {
case PChannel::PXReadBlock:
case PChannel::PXAcceptBlock:
read_fds = handle;
write_fds.Zero();
exception_fds.Zero();
break;
case PChannel::PXWriteBlock:
read_fds.Zero();
write_fds = handle;
exception_fds.Zero();
break;
case PChannel::PXConnectBlock:
read_fds.Zero();
write_fds = handle;
exception_fds = handle;
break;
default:
PAssertAlways(PLogicError);
return 0;
}
// include the termination pipe into all blocking I/O functions
read_fds += unblockPipe[0];
P_timeval tval = timeout;
retval = ::select(PMAX(handle, unblockPipe[0])+1,
read_fds, write_fds, exception_fds, tval);
} while (retval < 0 && errno == EINTR);
if ((retval == 1) && read_fds.IsPresent(unblockPipe[0])) {
BYTE ch;
::read(unblockPipe[0], &ch, 1);
errno = EINTR;
retval = -1;
PTRACE(6, "PWLib\tUnblocked I/O fd=" << unblockPipe[0]);
}
return retval;
}
void PThread::PXAbortBlock() const
{
static BYTE ch = 0;
::write(unblockPipe[1], &ch, 1);
PTRACE(6, "PWLib\tUnblocking I/O fd=" << unblockPipe[0] << " thread=" << GetThreadName());
}
///////////////////////////////////////////////////////////////////////////////
PSemaphore::PSemaphore(PXClass pxc)
{
pxClass = pxc;
// these should never be used, as this constructor is
// only used for PMutex and PSyncPoint and they have their
// own copy constructors
initialVar = maxCountVar = 0;
if(pxClass == PXSemaphore) {
#if defined(P_HAS_SEMAPHORES)
/* call sem_init, otherwise sem_destroy fails*/
PAssertPTHREAD(sem_init, (&semId, 0, 0));
#elif defined(P_HAS_NAMED_SEMAPHORES)
semId = CreateSem(0);
#else
currentCount = maximumCount = 0;
queuedLocks = 0;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condVar, NULL);
#endif
}
}
PSemaphore::PSemaphore(unsigned initial, unsigned maxCount)
{
pxClass = PXSemaphore;
initialVar = initial;
maxCountVar = maxCount;
#if defined(P_HAS_SEMAPHORES)
PAssertPTHREAD(sem_init, (&semId, 0, initial));
#elif defined(P_HAS_NAMED_SEMAPHORES)
semId = CreateSem(initialVar);
#else
PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
PAssert(maxCount > 0, "Invalid semaphore maximum.");
if (initial > maxCount)
initial = maxCount;
currentCount = initial;
maximumCount = maxCount;
queuedLocks = 0;
#endif
}
PSemaphore::PSemaphore(const PSemaphore & sem)
{
pxClass = sem.GetSemClass();
initialVar = sem.GetInitial();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -