📄 tlibthrd.cxx
字号:
{ PAssert(deletion != AutoDeleteThread || this != &PProcess::Current(), PLogicError); autoDelete = deletion == AutoDeleteThread;}#ifdef P_MACOSX// obtain thread priority of the main threadstatic unsigned longGetThreadBasePriority (){ thread_basic_info_data_t threadInfo; policy_info_data_t thePolicyInfo; unsigned int count; if (baseThread == 0) { return 0; } // get basic info count = THREAD_BASIC_INFO_COUNT; thread_info (pthread_mach_thread_np (baseThread), THREAD_BASIC_INFO, (integer_t*)&threadInfo, &count); switch (threadInfo.policy) { case POLICY_TIMESHARE: count = POLICY_TIMESHARE_INFO_COUNT; thread_info(pthread_mach_thread_np (baseThread), THREAD_SCHED_TIMESHARE_INFO, (integer_t*)&(thePolicyInfo.ts), &count); return thePolicyInfo.ts.base_priority; break; case POLICY_FIFO: count = POLICY_FIFO_INFO_COUNT; thread_info(pthread_mach_thread_np (baseThread), THREAD_SCHED_FIFO_INFO, (integer_t*)&(thePolicyInfo.fifo), &count); if (thePolicyInfo.fifo.depressed) { return thePolicyInfo.fifo.depress_priority; } else { return thePolicyInfo.fifo.base_priority; } break; case POLICY_RR: count = POLICY_RR_INFO_COUNT; thread_info(pthread_mach_thread_np (baseThread), THREAD_SCHED_RR_INFO, (integer_t*)&(thePolicyInfo.rr), &count); if (thePolicyInfo.rr.depressed) { return thePolicyInfo.rr.depress_priority; } else { return thePolicyInfo.rr.base_priority; } break; } return 0;}#endifvoid PThread::SetPriority(Priority priorityLevel){ PX_priority = priorityLevel;#if defined(P_LINUX) if (IsTerminated()) return; struct sched_param sched_param; if ((priorityLevel == HighestPriority) && (geteuid() == 0) ) { sched_param.sched_priority = sched_get_priority_min( SCHED_FIFO ); PAssertPTHREAD(pthread_setschedparam, (PX_threadId, SCHED_FIFO, &sched_param)); } else if (priorityLevel != HighestPriority) { /* priority 0 is the only permitted value for the SCHED_OTHER scheduler */ sched_param.sched_priority = 0; PAssertPTHREAD(pthread_setschedparam, (PX_threadId, SCHED_OTHER, &sched_param)); }#endif#if defined(P_MACOSX) if (IsTerminated()) return; if (priorityLevel == HighestPriority) { /* get fixed priority */ { int result; thread_extended_policy_data_t theFixedPolicy; thread_precedence_policy_data_t thePrecedencePolicy; long relativePriority; theFixedPolicy.timeshare = false; // set to true for a non-fixed thread result = thread_policy_set (pthread_mach_thread_np(PX_threadId), THREAD_EXTENDED_POLICY, (thread_policy_t)&theFixedPolicy, THREAD_EXTENDED_POLICY_COUNT); if (result != KERN_SUCCESS) { PTRACE(1, "thread_policy - Couldn't set thread as fixed priority."); } // set priority // precedency policy's "importance" value is relative to // spawning thread's priority relativePriority = 62 - GetThreadBasePriority(); PTRACE(1, "relativePriority is " << relativePriority << " base priority is " << GetThreadBasePriority()); thePrecedencePolicy.importance = relativePriority; result = thread_policy_set (pthread_mach_thread_np(PX_threadId), THREAD_PRECEDENCE_POLICY, (thread_policy_t)&thePrecedencePolicy, THREAD_PRECEDENCE_POLICY_COUNT); if (result != KERN_SUCCESS) { PTRACE(1, "thread_policy - Couldn't set thread priority."); } } }#endif}PThread::Priority PThread::GetPriority() const{#if defined(LINUX) int schedulingPolicy; struct sched_param schedParams; PAssertPTHREAD(pthread_getschedparam, (PX_threadId, &schedulingPolicy, &schedParams)); switch( schedulingPolicy ) { case SCHED_OTHER: break; case SCHED_FIFO: case SCHED_RR: return HighestPriority; default: /* Unknown scheduler. We don't know what priority this thread has. */ PTRACE(1, "PWLib\tPThread::GetPriority: unknown scheduling policy #" << schedulingPolicy); }#endif return NormalPriority; /* as good a guess as any */}#ifndef P_HAS_SEMAPHORESvoid PThread::PXSetWaitingSemaphore(PSemaphore * sem){ PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex)); PX_waitingSemaphore = sem; PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));}#endif#ifdef P_GNU_PTH// GNU PTH threads version (used by NetBSD)// Taken from NetBSD pkg patchesvoid PThread::Sleep(const PTimeInterval & timeout){ PTime lastTime; PTime targetTime = PTime() + timeout; sched_yield(); lastTime = PTime(); while (lastTime < targetTime) { P_timeval tval = targetTime - lastTime; if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR) break; pthread_testcancel(); lastTime = PTime(); }}#else// Normal Posix threads versionvoid PThread::Sleep(const PTimeInterval & timeout){ PTime lastTime; PTime targetTime = lastTime + timeout; do { P_timeval tval = targetTime - lastTime; if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR) break;#if !( defined(P_NETBSD) && defined(P_NO_CANCEL) ) pthread_testcancel();#endif lastTime = PTime(); } while (lastTime < targetTime);}#endifvoid PThread::Yield(){ sched_yield();}PThread * PThread::Current(){ PProcess & process = PProcess::Current(); process.threadMutex.Wait(); PThread * thread = process.activeThreads.GetAt((unsigned)pthread_self()); process.threadMutex.Signal(); return thread;}void PThread::Terminate(){ if (PX_origStackSize <= 0) return; // don't use PThread::Current, as the thread may already not be in the // active threads list if (PX_threadId == pthread_self()) { pthread_exit(0); return; } if (IsTerminated()) return; PTRACE(2, "PWLib\tForcing termination of thread " << (void *)this); PXAbortBlock(); WaitForTermination(20);#ifndef P_HAS_SEMAPHORES PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex)); if (PX_waitingSemaphore != NULL) { PAssertPTHREAD(pthread_mutex_lock, (&PX_waitingSemaphore->mutex)); PX_waitingSemaphore->queuedLocks--; PAssertPTHREAD(pthread_mutex_unlock, (&PX_waitingSemaphore->mutex)); PX_waitingSemaphore = NULL; } PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));#endif#if ( defined(P_NETBSD) && defined(P_NO_CANCEL) ) pthread_kill(PX_threadId,SIGKILL);#else if (PX_threadId) { pthread_cancel(PX_threadId); }#endif}BOOL PThread::IsTerminated() const{ if (PX_threadId == 0) return TRUE;#if defined(P_MACOSX) && (P_MACOSX <= 55) // MacOS X (darwin 5.5) does not support pthread_kill so we cannot use it // to test the validity of the thread#else if (pthread_kill(PX_threadId, 0) != 0) return TRUE;#endif PTRACE(7, "PWLib\tIsTerminated(" << (void *)this << ") not dead yet"); return FALSE;}void PThread::WaitForTermination() const{ if (this == Current()) { PTRACE(2, "WaitForTermination short circuited"); return; } PXAbortBlock(); // this assist in clean shutdowns on some systems while (!IsTerminated()) { Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100% // CPU usage and also yeilds so other threads can run. } }BOOL PThread::WaitForTermination(const PTimeInterval & maxWait) const{ if (this == Current()) { PTRACE(2, "WaitForTermination(t) short circuited"); return TRUE; } PTRACE(6, "PWLib\tWaitForTermination(" << maxWait << ')'); PXAbortBlock(); // this assist in clean shutdowns on some systems PTimer timeout = maxWait; while (!IsTerminated()) { if (timeout == 0) return FALSE; Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100% // CPU usage and also yeilds so other threads can run. } return TRUE;}void * PThread::PX_ThreadStart(void * arg){ pthread_t threadId = pthread_self(); // self-detach pthread_detach(threadId); PThread * thread = (PThread *)arg; // Added this to guarantee that the thread creation (PThread::Restart) // has completed before we start the thread. Then the PX_threadId has // been set. pthread_mutex_lock(&thread->PX_suspendMutex); thread->SetThreadName(thread->GetThreadName()); pthread_mutex_unlock(&thread->PX_suspendMutex); PProcess & process = PProcess::Current(); PINDEX newHighWaterMark = 0; static PINDEX highWaterMark = 0; // add thread to thread list process.threadMutex.Wait(); process.activeThreads.SetAt((unsigned)threadId, thread); if (process.activeThreads.GetSize() > highWaterMark) newHighWaterMark = highWaterMark = process.activeThreads.GetSize(); process.threadMutex.Signal(); PTRACE_IF(4, newHighWaterMark > 0, "PWLib\tThread high water mark set: " << newHighWaterMark); // make sure the cleanup routine is called when the thread exits pthread_cleanup_push(PThread::PX_ThreadEnd, arg); PTRACE(5, "PWLib\tStarted thread " << thread << ' ' << thread->threadName); // now call the the thread main routine thread->Main(); // execute the cleanup routine pthread_cleanup_pop(1); return NULL;}void PThread::PX_ThreadEnd(void * arg){ PThread * thread = (PThread *)arg; pthread_t id = thread->GetThreadId(); if (id == 0) { // Don't know why, but pthreads under Linux at least can call this function // multiple times! Probably a bug, but we have to allow for it. PTRACE(2, "PWLib\tAttempted to multiply end thread " << thread << " ThreadID=" << (void *)id); return; } PTRACE(5, "PWLib\tEnded thread " << thread << ' ' << thread->threadName); // remove this thread from the active thread list PProcess & process = PProcess::Current(); process.threadMutex.Wait(); process.activeThreads.SetAt((unsigned)id, NULL); process.threadMutex.Signal(); // delete the thread if required, note this is done this way to avoid // a race condition, the thread ID cannot be zeroed before the if! if (thread->autoDelete) { thread->PX_threadId = 0; // Prevent terminating terminated thread // Now should be safe to delete the thread! delete thread; } else thread->PX_threadId = 0;}int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout){ PTRACE(7, "PWLib\tPThread::PXBlockOnIO(" << handle << ',' << type << ')'); if ((handle < 0) || (handle >= PProcess::Current().GetMaxHandles())) { PTRACE(2, "PWLib\tAttempt to use illegal handle in PThread::PXBlockOnIO, handle=" << handle); errno = EBADF; return -1; } // make sure we flush the buffer before doing a write P_fd_set read_fds; P_fd_set write_fds; P_fd_set exception_fds; int retval; do { switch (type) { case PChannel::PXReadBlock: case PChannel::PXAcceptBlock: read_fds = handle; write_fds.Zero(); exception_fds.Zero(); break; case PChannel::PXWriteBlock: read_fds.Zero(); write_fds = handle; exception_fds.Zero(); break; case PChannel::PXConnectBlock: read_fds.Zero(); write_fds = handle; exception_fds = handle; break; default: PAssertAlways(PLogicError); return 0; } // include the termination pipe into all blocking I/O functions read_fds += unblockPipe[0]; P_timeval tval = timeout; retval = ::select(PMAX(handle, unblockPipe[0])+1, read_fds, write_fds, exception_fds, tval); } while (retval < 0 && errno == EINTR); if ((retval == 1) && read_fds.IsPresent(unblockPipe[0])) { BYTE ch; ::read(unblockPipe[0], &ch, 1); errno = EINTR; retval = -1; PTRACE(6, "PWLib\tUnblocked I/O fd=" << unblockPipe[0]); } return retval;}void PThread::PXAbortBlock() const{ BYTE ch; ::write(unblockPipe[1], &ch, 1); PTRACE(6, "PWLib\tUnblocking I/O fd=" << unblockPipe[0] << " thread=" << GetThreadName());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -