📄 tlibthrd.cxx
字号:
maxCountVar = sem.GetMaxCount();
if(pxClass == PXSemaphore) {
#if defined(P_HAS_SEMAPHORES)
PAssertPTHREAD(sem_init, (&semId, 0, initialVar));
#elif defined(P_HAS_NAMED_SEMAPHORES)
semId = CreateSem(initialVar);
#else
PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
PAssert(maxCountVar > 0, "Invalid semaphore maximum.");
if (initialVar > maxCountVar)
initialVar = maxCountVar;
currentCount = initialVar;
maximumCount = maxCountVar;
queuedLocks = 0;
#endif
}
}
PSemaphore::~PSemaphore()
{
if(pxClass == PXSemaphore) {
#if defined(P_HAS_SEMAPHORES)
PAssertPTHREAD(sem_destroy, (&semId));
#elif defined(P_HAS_NAMED_SEMAPHORES)
PAssertPTHREAD(sem_close, (semId));
#else
PAssert(queuedLocks == 0, "Semaphore destroyed with queued locks");
PAssertPTHREAD(pthread_mutex_destroy, (&mutex));
PAssertPTHREAD(pthread_cond_destroy, (&condVar));
#endif
}
}
#if defined(P_HAS_NAMED_SEMAPHORES)
sem_t * PSemaphore::CreateSem(unsigned initialValue)
{
sem_t *sem;
// Since sem_open and sem_unlink are two operations, there is a small
// window of opportunity that two simultaneous accesses may return
// the same semaphore. Therefore, the static mutex is used to
// prevent this, even if the chances are small
static pthread_mutex_t semCreationMutex = PTHREAD_MUTEX_INITIALIZER;
PAssertPTHREAD(pthread_mutex_lock, (&semCreationMutex));
sem_unlink("/pwlib_sem");
sem = sem_open("/pwlib_sem", (O_CREAT | O_EXCL), 700, initialValue);
PAssertPTHREAD(pthread_mutex_unlock, (&semCreationMutex));
PAssert(((int)sem != SEM_FAILED), "Couldn't create named semaphore");
return sem;
}
#endif
void PSemaphore::Wait()
{
#if defined(P_HAS_SEMAPHORES)
PAssertPTHREAD(sem_wait, (&semId));
#elif defined(P_HAS_NAMED_SEMAPHORES)
PAssertPTHREAD(sem_wait, (semId));
#else
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
queuedLocks++;
PThread::Current()->PXSetWaitingSemaphore(this);
while (currentCount == 0) {
int err = pthread_cond_wait(&condVar, &mutex);
PAssert(err == 0 || err == EINTR, psprintf("wait error = %i", err));
}
PThread::Current()->PXSetWaitingSemaphore(NULL);
queuedLocks--;
currentCount--;
PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
#endif
}
PBoolean PSemaphore::Wait(const PTimeInterval & waitTime)
{
if (waitTime == PMaxTimeInterval) {
Wait();
return PTrue;
}
// create absolute finish time
PTime finishTime;
finishTime += waitTime;
#if defined(P_HAS_SEMAPHORES)
#ifdef P_HAS_SEMAPHORES_XPG6
// use proper timed spinlocks if supported.
// http://www.opengroup.org/onlinepubs/007904975/functions/sem_timedwait.html
struct timespec absTime;
absTime.tv_sec = finishTime.GetTimeInSeconds();
absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
if (sem_timedwait(&semId, &absTime) == 0) {
return PTrue;
}
else {
return PFalse;
}
#else
// loop until timeout, or semaphore becomes available
// don't use a PTimer, as this causes the housekeeping
// thread to get very busy
do {
if (sem_trywait(&semId) == 0)
return PTrue;
#if defined(P_LINUX)
// sched_yield in a tight loop is bad karma
// for the linux scheduler: http://www.ussg.iu.edu/hypermail/linux/kernel/0312.2/1127.html
PThread::Current()->Sleep(10);
#else
PThread::Yield();
#endif
} while (PTime() < finishTime);
return PFalse;
#endif
#elif defined(P_HAS_NAMED_SEMAPHORES)
do {
if(sem_trywait(semId) == 0)
return PTrue;
PThread::Current()->Sleep(10);
} while (PTime() < finishTime);
return PFalse;
#else
struct timespec absTime;
absTime.tv_sec = finishTime.GetTimeInSeconds();
absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
PThread * thread = PThread::Current();
thread->PXSetWaitingSemaphore(this);
queuedLocks++;
PBoolean ok = PTrue;
while (currentCount == 0) {
int err = pthread_cond_timedwait(&condVar, &mutex, &absTime);
if (err == ETIMEDOUT) {
ok = PFalse;
break;
}
else
PAssert(err == 0 || err == EINTR, psprintf("timed wait error = %i", err));
}
thread->PXSetWaitingSemaphore(NULL);
queuedLocks--;
if (ok)
currentCount--;
PAssertPTHREAD(pthread_mutex_unlock, ((pthread_mutex_t *)&mutex));
return ok;
#endif
}
void PSemaphore::Signal()
{
#if defined(P_HAS_SEMAPHORES)
PAssertPTHREAD(sem_post, (&semId));
#elif defined(P_HAS_NAMED_SEMAPHORES)
PAssertPTHREAD(sem_post, (semId));
#else
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
if (currentCount < maximumCount)
currentCount++;
if (queuedLocks > 0)
PAssertPTHREAD(pthread_cond_signal, (&condVar));
PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
#endif
}
PBoolean PSemaphore::WillBlock() const
{
#if defined(P_HAS_SEMAPHORES)
if (sem_trywait((sem_t *)&semId) != 0) {
PAssertOS(errno == EAGAIN || errno == EINTR);
return PTrue;
}
PAssertPTHREAD(sem_post, ((sem_t *)&semId));
return PFalse;
#elif defined(P_HAS_NAMED_SEMAPHORES)
if (sem_trywait(semId) != 0) {
PAssertOS(errno == EAGAIN || errno == EINTR);
return PTrue;
}
PAssertPTHREAD(sem_post, (semId));
return PFalse;
#else
return currentCount == 0;
#endif
}
PTimedMutex::PTimedMutex()
// : PSemaphore(PXMutex)
{
#if P_HAS_RECURSIVE_MUTEX
pthread_mutexattr_t attr;
PAssertPTHREAD(pthread_mutexattr_init, (&attr));
PAssertPTHREAD(pthread_mutexattr_settype, (&attr,
#if P_HAS_RECURSIVE_MUTEX == 2
PTHREAD_MUTEX_RECURSIVE
#else
PTHREAD_MUTEX_RECURSIVE_NP
#endif
));
PAssertPTHREAD(pthread_mutex_init, (&mutex, &attr));
PAssertPTHREAD(pthread_mutexattr_destroy, (&attr));
#else
PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
#endif
}
PTimedMutex::PTimedMutex(const PTimedMutex & /*mut*/)
// : PSemaphore(PXMutex)
{
#if P_HAS_RECURSIVE_MUTEX
pthread_mutexattr_t attr;
PAssertPTHREAD(pthread_mutexattr_init, (&attr));
PAssertPTHREAD(pthread_mutexattr_settype, (&attr,
#if P_HAS_RECURSIVE_MUTEX == 2
PTHREAD_MUTEX_RECURSIVE
#else
PTHREAD_MUTEX_RECURSIVE_NP
#endif
));
PAssertPTHREAD(pthread_mutex_init, (&mutex, &attr));
PAssertPTHREAD(pthread_mutexattr_destroy, (&attr));
#else
pthread_mutex_init(&mutex, NULL);
#endif
}
PTimedMutex::~PTimedMutex()
{
int result = pthread_mutex_destroy(&mutex);
PINDEX i = 0;
while ((result == EBUSY) && (i++ < 20)) {
pthread_mutex_unlock(&mutex);
result = pthread_mutex_destroy(&mutex);
}
#ifdef _DEBUG
PAssert((result == 0), "Error destroying mutex");
#endif
}
void PTimedMutex::Wait()
{
pthread_t currentThreadId = pthread_self();
#if P_HAS_RECURSIVE_MUTEX == 0
// if the mutex is already acquired by this thread,
// then just increment the lock count
if (pthread_equal(lockerId, currentThreadId)) {
// Note this does not need a lock as it can only be touched by the thread
// which already has the mutex locked.
++lockCount;
return;
}
#endif
// acquire the lock for real
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
#if P_HAS_RECURSIVE_MUTEX == 0
PAssert((lockerId == (pthread_t)-1) && (lockCount.IsZero()),
"PMutex acquired whilst locked by another thread");
// Note this is protected by the mutex itself only the thread with
// the lock can alter it.
#endif
lockerId = currentThreadId;
}
PBoolean PTimedMutex::Wait(const PTimeInterval & waitTime)
{
// get the current thread ID
pthread_t currentThreadId = pthread_self();
// if waiting indefinitely, then do so
if (waitTime == PMaxTimeInterval) {
Wait();
lockerId = currentThreadId;
return PTrue;
}
#if P_HAS_RECURSIVE_MUTEX == 0
// if we already have the mutex, return immediately
if (pthread_equal(lockerId, currentThreadId)) {
// Note this does not need a lock as it can only be touched by the thread
// which already has the mutex locked.
++lockCount;
return PTrue;
}
#endif
// create absolute finish time
PTime finishTime;
finishTime += waitTime;
#if P_PTHREADS_XPG6
struct timespec absTime;
absTime.tv_sec = finishTime.GetTimeInSeconds();
absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
if (pthread_mutex_timedlock(&mutex, &absTime) != 0)
return PFalse;
#if P_HAS_RECURSIVE_MUTEX == 0
PAssert((lockerId == (pthread_t)-1) && (lockCount.IsZero()),
"PMutex acquired whilst locked by another thread");
#endif
// Note this is protected by the mutex itself only the thread with
// the lock can alter it.
lockerId = currentThreadId;
return PTrue;
#else // P_PTHREADS_XPG6
do {
if (pthread_mutex_trylock(&mutex) == 0) {
#if P_HAS_RECURSIVE_MUTEX == 0
PAssert((lockerId == (pthread_t)-1) && (lockCount.IsZero()),
"PMutex acquired whilst locked by another thread");
#endif
lockerId = currentThreadId;
return PTrue;
}
PThread::Current()->Sleep(10); // sleep for 10ms
} while (PTime() < finishTime);
return PFalse;
#endif // P_PTHREADS_XPG6
}
void PTimedMutex::Signal()
{
#if P_HAS_RECURSIVE_MUTEX == 0
if (!pthread_equal(lockerId, pthread_self())) {
PAssertAlways("PMutex signal failed - no matching wait or signal by wrong thread");
return;
}
// if lock was recursively acquired, then decrement the counter
// Note this does not need a separate lock as it can only be touched by the thread
// which already has the mutex locked.
if (!lockCount.IsZero()) {
--lockCount;
return;
}
// otherwise mark mutex as available
lockerId = (pthread_t)-1;
#endif
PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
}
PBoolean PTimedMutex::WillBlock() const
{
#if P_HAS_RECURSIVE_MUTEX == 0
pthread_t currentThreadId = pthread_self();
if (currentThreadId == lockerId)
return PFalse;
#endif
pthread_mutex_t * mp = (pthread_mutex_t*)&mutex;
if (pthread_mutex_trylock(mp) != 0)
return PTrue;
PAssertPTHREAD(pthread_mutex_unlock, (mp));
return PFalse;
}
PSyncPoint::PSyncPoint()
: PSemaphore(PXSyncPoint)
{
PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
signalled = false;
}
PSyncPoint::PSyncPoint(const PSyncPoint &)
: PSemaphore(PXSyncPoint)
{
PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
signalled = false;
}
PSyncPoint::~PSyncPoint()
{
PAssertPTHREAD(pthread_mutex_destroy, (&mutex));
PAssertPTHREAD(pthread_cond_destroy, (&condVar));
}
void PSyncPoint::Wait()
{
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
while (!signalled)
pthread_cond_wait(&condVar, &mutex);
signalled = false;
PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
}
PBoolean PSyncPoint::Wait(const PTimeInterval & waitTime)
{
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
PTime finishTime;
finishTime += waitTime;
struct timespec absTime;
absTime.tv_sec = finishTime.GetTimeInSeconds();
absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
int err = 0;
while (!signalled) {
err = pthread_cond_timedwait(&condVar, &mutex, &absTime);
if (err == 0 || err == ETIMEDOUT)
break;
PAssertOS(err == EINTR && errno == EINTR);
}
if (err == 0)
signalled = false;
PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
return err == 0;
}
void PSyncPoint::Signal()
{
PAssertPTHREAD(pthread_mutex_lock, (&mutex));
signalled = true;
PAssertPTHREAD(pthread_cond_signal, (&condVar));
PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
}
PBoolean PSyncPoint::WillBlock() const
{
return !signalled;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -