📄 tlibthrd.cxx
字号:
}
unsigned PThread::PX_GetThreadId() const
{
return (unsigned)PX_threadId;
}
void PThread::Restart()
{
PAssert(IsTerminated(), "Cannot restart running thread");
PX_NewThread(FALSE);
}
void PThread::Terminate()
{
if (PX_origStackSize <= 0)
return;
PAssert(!IsTerminated(), "Cannot terminate a thread which is already terminated");
if (Current() == this)
pthread_exit(NULL);
else {
#ifndef P_HAS_SEMAPHORES
PAssertOS(pthread_mutex_lock(&PX_WaitSemMutex) == 0);
if (PX_waitingSemaphore != NULL) {
PAssertOS(pthread_mutex_lock(&PX_waitingSemaphore->mutex) == 0);
PX_waitingSemaphore->queuedLocks--;
PAssertOS(pthread_mutex_unlock(&PX_waitingSemaphore->mutex) == 0);
PX_waitingSemaphore = NULL;
}
PAssertOS(pthread_mutex_unlock(&PX_WaitSemMutex) == 0);
#endif
#ifndef P_FREEBSD
pthread_cancel(PX_threadId);
#else
pthread_kill(PX_threadId, SIGKILL);
#endif
}
}
void PThread::PXSetWaitingSemaphore(PSemaphore * sem)
{
#ifndef P_HAS_SEMAPHORES
PAssertOS(pthread_mutex_lock(&PX_WaitSemMutex) == 0);
PX_waitingSemaphore = sem;
PAssertOS(pthread_mutex_unlock(&PX_WaitSemMutex) == 0);
#endif
}
BOOL PThread::IsTerminated() const
{
if (PX_threadId == 0)
return TRUE;
if (pthread_kill(PX_threadId, 0) != 0)
return TRUE;
return FALSE;
}
void PThread::Suspend(BOOL susp)
{
PAssertOS(pthread_mutex_lock(&PX_suspendMutex) == 0);
BOOL unlock = TRUE;
if (pthread_kill(PX_threadId, 0) == 0) {
// if suspending, then see if already suspended
if (susp) {
PX_suspendCount++;
if (PX_suspendCount == 1) {
if (PX_threadId != pthread_self())
pthread_kill(PX_threadId, SUSPEND_SIG);
else {
unlock = FALSE;
PAssertOS(pthread_mutex_unlock(&PX_suspendMutex) == 0);
sigset_t waitSignals;
sigemptyset(&waitSignals);
sigaddset(&waitSignals, RESUME_SIG);
#ifdef P_LINUX
int sig;
sigwait(&waitSignals, &sig);
#else
sigwait(&waitSignals);
#endif
}
}
}
// if resuming, then see if to really resume
else if (PX_suspendCount > 0) {
PX_suspendCount--;
if (PX_suspendCount == 0)
pthread_kill(PX_threadId, RESUME_SIG);
}
}
if (unlock)
PAssertOS(pthread_mutex_unlock(&PX_suspendMutex) == 0);
}
void PThread::Resume()
{
Suspend(FALSE);
}
BOOL PThread::IsSuspended() const
{
PAssert(!IsTerminated(), "Operation on terminated thread");
PAssertOS(pthread_mutex_lock((pthread_mutex_t *)&PX_suspendMutex) == 0);
BOOL suspended = PX_suspendCount > 0;
PAssertOS(pthread_mutex_unlock((pthread_mutex_t *)&PX_suspendMutex) == 0);
return suspended;
}
void PThread::SetPriority(Priority /*priorityLevel*/)
{
PAssert(!IsTerminated(), "Cannot set priority of terminated thread");
}
PThread::Priority PThread::GetPriority() const
{
PAssert(!IsTerminated(), "Cannot get priority of terminated thread");
return LowestPriority;
}
void PThread::Yield()
{
::sleep(0);
}
PThread * PThread::Current()
{
PProcess & process = PProcess::Current();
process.threadMutex.Wait();
PThread * thread = process.activeThreads.GetAt((unsigned)pthread_self());
process.threadMutex.Signal();
return PAssertNULL(thread);
}
void PThread::Sleep(const PTimeInterval & timeout)
{
struct timeval * tptr = NULL;
struct timeval timeout_val;
if (timeout != PMaxTimeInterval) {
if (timeout.GetMilliSeconds() < 1000L*60L*60L*24L) {
timeout_val.tv_usec = (timeout.GetMilliSeconds() % 1000) * 1000;
timeout_val.tv_sec = timeout.GetSeconds();
tptr = &timeout_val;
}
}
while (::select(0, NULL, NULL, NULL, tptr) != 0)
PProcess::Current().PXCheckSignals();
}
void PThread::WaitForTermination() const
{
BYTE ch;
::write(termPipe[1], &ch, 1);
while (!IsTerminated())
Current()->Sleep(10);
}
BOOL PThread::WaitForTermination(const PTimeInterval & maxWait) const
{
BYTE ch;
::write(termPipe[1], &ch, 1);
PTimer timeout = maxWait;
while (!IsTerminated()) {
if (timeout == 0)
return FALSE;
Current()->Sleep(10);
}
return TRUE;
}
///////////////////////////////////////////////////////////////////////////////
PSemaphore::PSemaphore(unsigned initial, unsigned maxCount)
{
#ifdef P_HAS_SEMAPHORES
PAssertOS(sem_init(&semId, 0, initial) == 0);
#else
PAssert(maxCount > 0, "Invalid semaphore maximum.");
if (initial > maxCount)
initial = maxCount;
currentCount = initial;
maximumCount = maxCount;
queuedLocks = 0;
//pthread_mutexattr_t mutexAttr;
//pthread_mutexattr_init(&mutexAttr);
//pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_PRIVATE);
pthread_mutex_init(&mutex, NULL);
//pthread_condattr_t condAttr;
//pthread_condattr_init(&condAttr);
PAssertOS(pthread_cond_init(&condVar, NULL) == 0);
#endif
}
PSemaphore::~PSemaphore()
{
#ifdef P_HAS_SEMAPHORES
PAssertOS(sem_destroy(&semId) == 0);
#else
PAssertOS(pthread_mutex_lock(&mutex) == 0);
PAssert(queuedLocks == 0, "Semaphore destroyed with queued locks");
#ifdef P_LINUX
pthread_cond_destroy(&condVar);
pthread_mutex_destroy(&mutex);
#else
PAssertOS(pthread_cond_destroy(&condVar) == 0);
PAssertOS(pthread_mutex_destroy(&mutex) == 0);
#endif
#endif
}
void PSemaphore::Wait()
{
#ifdef P_HAS_SEMAPHORES
PAssertOS(sem_wait(&semId) == 0);
#else
PAssertOS(pthread_mutex_lock(&mutex) == 0);
queuedLocks++;
PThread::Current()->PXSetWaitingSemaphore(this);
while (currentCount == 0) {
int err = pthread_cond_wait(&condVar, &mutex);
PProcess::Current().PXCheckSignals();
PAssert(err == 0 || err == EINTR, psprintf("wait error = %i", err));
}
PThread::Current()->PXSetWaitingSemaphore(NULL);
queuedLocks--;
currentCount--;
PAssertOS(pthread_mutex_unlock(&mutex) == 0);
#endif
}
BOOL PSemaphore::Wait(const PTimeInterval & waitTime)
{
if (waitTime == PMaxTimeInterval) {
Wait();
return TRUE;
}
// create absolute finish time
struct timeval finishTime;
::gettimeofday(&finishTime, NULL);
finishTime.tv_sec += waitTime.GetSeconds();
finishTime.tv_usec += waitTime.GetMilliSeconds() % 1000L;
if (finishTime.tv_usec > 1000000) {
finishTime.tv_usec -= 1000000;
finishTime.tv_sec++;
}
#ifdef P_HAS_SEMAPHORES
// loop until timeout, or semaphore becomes available
// don't use a PTimer, as this causes the housekeeping
// thread to get very busy
for (;;) {
if (sem_trywait(&semId) == 0)
return TRUE;
PThread::Current()->Sleep(10);
struct timeval now;
::gettimeofday(&now, NULL);
if (now.tv_sec > finishTime.tv_sec)
return FALSE;
else if ((now.tv_sec == finishTime.tv_sec) && (now.tv_usec >= finishTime.tv_usec))
return FALSE;
}
return FALSE;
#else
struct timespec absTime;
absTime.tv_sec = finishTime.tv_sec;
absTime.tv_nsec = finishTime.tv_usec * 1000;
PAssertOS(pthread_mutex_lock(&mutex) == 0);
PThread::Current()->PXSetWaitingSemaphore(this);
queuedLocks++;
BOOL ok = TRUE;
while (currentCount == 0) {
int err = pthread_cond_timedwait(&condVar, &mutex, &absTime);
PProcess::Current().PXCheckSignals();
if (err == ETIMEDOUT) {
ok = FALSE;
break;
}
else
PAssert(err == 0 || err == EINTR, psprintf("timed wait error = %i", err));
}
PThread::Current()->PXSetWaitingSemaphore(NULL);
queuedLocks--;
if (ok)
currentCount--;
PAssertOS(pthread_mutex_unlock((pthread_mutex_t *)&mutex) == 0);
return ok;
#endif
}
void PSemaphore::Signal()
{
#ifdef P_HAS_SEMAPHORES
PAssertOS(sem_post(&semId) == 0);
#else
PAssertOS(pthread_mutex_lock(&mutex) == 0);
if (currentCount < maximumCount)
currentCount++;
if (queuedLocks > 0)
pthread_cond_signal(&condVar);
PAssertOS(pthread_mutex_unlock(&mutex) == 0);
#endif
}
BOOL PSemaphore::WillBlock() const
{
#ifdef P_HAS_SEMAPHORES
return sem_trywait((sem_t *)&semId) != 0;
#else
return currentCount == 0;
#endif
}
PMutex::PMutex()
: PSemaphore(1, 1)
{
#ifdef P_HAS_SEMAPHORES
pthread_mutex_init(&mutex, NULL);
#endif
}
PMutex::~PMutex()
{
pthread_mutex_unlock(&mutex);
#ifdef P_HAS_SEMAPHORES
PAssertOS(pthread_mutex_destroy(&mutex) == 0);
#endif
}
void PMutex::Wait()
{
PAssertOS(pthread_mutex_lock(&mutex) == 0);
}
BOOL PMutex::Wait(const PTimeInterval & waitTime)
{
if (waitTime == PMaxTimeInterval) {
Wait();
return TRUE;
}
PTimeInterval sleepTime = waitTime/100;
if (sleepTime > 1000)
sleepTime = 1000;
int subdivision = waitTime.GetMilliSeconds()/sleepTime.GetMilliSeconds();
for (int count = 0; count < subdivision; count++) {
if (pthread_mutex_trylock(&mutex) == 0)
return TRUE;
PThread::Current()->Sleep(sleepTime);
}
return FALSE;
}
void PMutex::Signal()
{
PAssertOS(pthread_mutex_unlock(&mutex) == 0);
}
BOOL PMutex::WillBlock() const
{
pthread_mutex_t * mp = (pthread_mutex_t*)&mutex;
if (pthread_mutex_trylock(mp) != 0)
return TRUE;
return pthread_mutex_unlock(mp) != 0;
}
PSyncPoint::PSyncPoint()
: PSemaphore(0, 1)
{
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -