📄 tlibthrd.cxx
字号:
}void PThread::PX_ThreadEnd(void * arg){ PThread * thread = (PThread *)arg; PProcess & process = PProcess::Current(); // remove this thread from the thread list process.threadMutex.Wait(); process.activeThreads.SetAt(thread->PX_GetThreadId(), NULL); process.threadMutex.Signal(); thread->PX_threadId = 0; // Prevent terminating terminated thread // delete the thread if required if (thread->autoDelete) delete thread;}unsigned PThread::PX_GetThreadId() const{ return (unsigned)PX_threadId;}void PThread::Restart(){ if (IsTerminated()) return; PX_NewThread(FALSE);}void PThread::Terminate(){ if (PX_origStackSize <= 0) return; if (IsTerminated()) return; if (Current() == this) pthread_exit(NULL); else {#ifndef P_HAS_SEMAPHORES PAssertOS(pthread_mutex_lock(&PX_WaitSemMutex) == 0); if (PX_waitingSemaphore != NULL) { PAssertOS(pthread_mutex_lock(&PX_waitingSemaphore->mutex) == 0); PX_waitingSemaphore->queuedLocks--; PAssertOS(pthread_mutex_unlock(&PX_waitingSemaphore->mutex) == 0); PX_waitingSemaphore = NULL; } PAssertOS(pthread_mutex_unlock(&PX_WaitSemMutex) == 0);#endif#if defined(P_FREEBSD) || defined(P_OPENBSD) || defined(P_NETBSD) || defined (P_AIX) pthread_kill(PX_threadId, SIGKILL);#else pthread_cancel(PX_threadId);#endif }}void PThread::PXSetWaitingSemaphore(PSemaphore * sem){#ifndef P_HAS_SEMAPHORES PAssertOS(pthread_mutex_lock(&PX_WaitSemMutex) == 0); PX_waitingSemaphore = sem; PAssertOS(pthread_mutex_unlock(&PX_WaitSemMutex) == 0);#endif}BOOL PThread::IsTerminated() const{ if (PX_threadId == 0) return TRUE; if (pthread_kill(PX_threadId, 0) != 0) return TRUE; return FALSE;}void PThread::Suspend(BOOL susp){ PAssertOS(pthread_mutex_lock(&PX_suspendMutex) == 0); BOOL unlock = TRUE; if (pthread_kill(PX_threadId, 0) == 0) { // if suspending, then see if already suspended if (susp) { PX_suspendCount++; if (PX_suspendCount == 1) { if (PX_threadId != pthread_self()) pthread_kill(PX_threadId, SUSPEND_SIG); else { unlock = FALSE; PAssertOS(pthread_mutex_unlock(&PX_suspendMutex) == 0); sigset_t waitSignals; sigemptyset(&waitSignals); sigaddset(&waitSignals, RESUME_SIG);#if defined(P_LINUX) || defined(P_FREEBSD) || defined(P_OPENBSD) || defined(P_NETBSD) || defined (P_AIX) int sig; sigwait(&waitSignals, &sig);#else sigwait(&waitSignals);#endif } } } // if resuming, then see if to really resume else if (PX_suspendCount > 0) { PX_suspendCount--; if (PX_suspendCount == 0) pthread_kill(PX_threadId, RESUME_SIG); } } if (unlock) PAssertOS(pthread_mutex_unlock(&PX_suspendMutex) == 0);}void PThread::Resume(){ Suspend(FALSE);}BOOL PThread::IsSuspended() const{ if (IsTerminated()) return FALSE; PAssertOS(pthread_mutex_lock((pthread_mutex_t *)&PX_suspendMutex) == 0); BOOL suspended = PX_suspendCount > 0; PAssertOS(pthread_mutex_unlock((pthread_mutex_t *)&PX_suspendMutex) == 0); return suspended;}void PThread::SetPriority(Priority /*priorityLevel*/){}PThread::Priority PThread::GetPriority() const{ return LowestPriority;}void PThread::Yield(){ ::sleep(0);}PThread * PThread::Current(){ PProcess & process = PProcess::Current(); process.threadMutex.Wait(); PThread * thread = process.activeThreads.GetAt((unsigned)pthread_self()); process.threadMutex.Signal(); return PAssertNULL(thread);}void PThread::Sleep(const PTimeInterval & timeout){ struct timeval * tptr = NULL; struct timeval timeout_val; if (timeout != PMaxTimeInterval) { if (timeout.GetMilliSeconds() < 1000L*60L*60L*24L) { timeout_val.tv_usec = (timeout.GetMilliSeconds() % 1000) * 1000; timeout_val.tv_sec = timeout.GetSeconds(); tptr = &timeout_val; } } while (::select(0, NULL, NULL, NULL, tptr) != 0) PProcess::Current().PXCheckSignals();}void PThread::WaitForTermination() const{ BYTE ch; ::write(termPipe[1], &ch, 1); while (!IsTerminated()) Current()->Sleep(10);}BOOL PThread::WaitForTermination(const PTimeInterval & maxWait) const{ BYTE ch; ::write(termPipe[1], &ch, 1); PTimer timeout = maxWait; while (!IsTerminated()) { if (timeout == 0) return FALSE; Current()->Sleep(10); } return TRUE;}///////////////////////////////////////////////////////////////////////////////PSemaphore::PSemaphore(unsigned initial, unsigned maxCount){#ifdef P_HAS_SEMAPHORES PAssertOS(sem_init(&semId, 0, initial) == 0);#else PAssert(maxCount > 0, "Invalid semaphore maximum."); if (initial > maxCount) initial = maxCount; currentCount = initial; maximumCount = maxCount; queuedLocks = 0; //pthread_mutexattr_t mutexAttr; //pthread_mutexattr_init(&mutexAttr); //pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_PRIVATE); pthread_mutex_init(&mutex, NULL); //pthread_condattr_t condAttr; //pthread_condattr_init(&condAttr); PAssertOS(pthread_cond_init(&condVar, NULL) == 0);#endif}PSemaphore::~PSemaphore(){#ifdef P_HAS_SEMAPHORES PAssertOS(sem_destroy(&semId) == 0);#else PAssertOS(pthread_mutex_lock(&mutex) == 0); PAssert(queuedLocks == 0, "Semaphore destroyed with queued locks");#if defined (P_LINUX) || defined(P_FREEBSD) || defined(P_OPENBSD) || defined(P_NETBSD) || defined (P_AIX) pthread_cond_destroy(&condVar); pthread_mutex_destroy(&mutex);#else PAssertOS(pthread_cond_destroy(&condVar) == 0); PAssertOS(pthread_mutex_destroy(&mutex) == 0);#endif#endif}void PSemaphore::Wait(){#ifdef P_HAS_SEMAPHORES PAssertOS(sem_wait(&semId) == 0);#else PAssertOS(pthread_mutex_lock(&mutex) == 0); queuedLocks++; PThread::Current()->PXSetWaitingSemaphore(this); while (currentCount == 0) { int err = pthread_cond_wait(&condVar, &mutex); PProcess::Current().PXCheckSignals(); PAssert(err == 0 || err == EINTR, psprintf("wait error = %i", err)); } PThread::Current()->PXSetWaitingSemaphore(NULL); queuedLocks--; currentCount--; PAssertOS(pthread_mutex_unlock(&mutex) == 0);#endif}BOOL PSemaphore::Wait(const PTimeInterval & waitTime){ if (waitTime == PMaxTimeInterval) { Wait(); return TRUE; } // create absolute finish time struct timeval finishTime; ::gettimeofday(&finishTime, NULL); finishTime.tv_sec += waitTime.GetSeconds(); finishTime.tv_usec += waitTime.GetMilliSeconds() % 1000L; if (finishTime.tv_usec > 1000000) { finishTime.tv_usec -= 1000000; finishTime.tv_sec++; }#ifdef P_HAS_SEMAPHORES // loop until timeout, or semaphore becomes available // don't use a PTimer, as this causes the housekeeping // thread to get very busy for (;;) { if (sem_trywait(&semId) == 0) return TRUE; PThread::Current()->Sleep(10); struct timeval now; ::gettimeofday(&now, NULL); if (now.tv_sec > finishTime.tv_sec) return FALSE; else if ((now.tv_sec == finishTime.tv_sec) && (now.tv_usec >= finishTime.tv_usec)) return FALSE; } return FALSE;#else struct timespec absTime; absTime.tv_sec = finishTime.tv_sec; absTime.tv_nsec = finishTime.tv_usec * 1000; PAssertOS(pthread_mutex_lock(&mutex) == 0); PThread::Current()->PXSetWaitingSemaphore(this); queuedLocks++; BOOL ok = TRUE; while (currentCount == 0) { int err = pthread_cond_timedwait(&condVar, &mutex, &absTime); PProcess::Current().PXCheckSignals(); if (err == ETIMEDOUT) { ok = FALSE; break; } else PAssert(err == 0 || err == EINTR, psprintf("timed wait error = %i", err)); } PThread::Current()->PXSetWaitingSemaphore(NULL); queuedLocks--; if (ok) currentCount--; PAssertOS(pthread_mutex_unlock((pthread_mutex_t *)&mutex) == 0); return ok;#endif}void PSemaphore::Signal(){#ifdef P_HAS_SEMAPHORES PAssertOS(sem_post(&semId) == 0);#else PAssertOS(pthread_mutex_lock(&mutex) == 0); if (currentCount < maximumCount) currentCount++; if (queuedLocks > 0) pthread_cond_signal(&condVar); PAssertOS(pthread_mutex_unlock(&mutex) == 0);#endif}BOOL PSemaphore::WillBlock() const{#ifdef P_HAS_SEMAPHORES return sem_trywait((sem_t *)&semId) != 0;#else return currentCount == 0;#endif}PMutex::PMutex() : PSemaphore(1, 1){#ifdef P_HAS_SEMAPHORES pthread_mutex_init(&mutex, NULL);#endif}PMutex::~PMutex(){ pthread_mutex_unlock(&mutex);#ifdef P_HAS_SEMAPHORES PAssertOS(pthread_mutex_destroy(&mutex) == 0);#endif}void PMutex::Wait(){ PAssertOS(pthread_mutex_lock(&mutex) == 0);}BOOL PMutex::Wait(const PTimeInterval & waitTime){ if (waitTime == PMaxTimeInterval) { Wait(); return TRUE; } PTimeInterval sleepTime = waitTime/100; if (sleepTime > 1000) sleepTime = 1000; int subdivision = waitTime.GetMilliSeconds()/sleepTime.GetMilliSeconds(); for (int count = 0; count < subdivision; count++) { if (pthread_mutex_trylock(&mutex) == 0) return TRUE; PThread::Current()->Sleep(sleepTime); } return FALSE;}void PMutex::Signal(){ PAssertOS(pthread_mutex_unlock(&mutex) == 0);}BOOL PMutex::WillBlock() const{ pthread_mutex_t * mp = (pthread_mutex_t*)&mutex; if (pthread_mutex_trylock(mp) != 0) return TRUE; return pthread_mutex_unlock(mp) != 0;}PSyncPoint::PSyncPoint() : PSemaphore(0, 1){}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -