📄 condition.cpp
字号:
if (res != 0) throw thread_resource_error();}condition::~condition(){ int res = 0; res = pthread_cond_destroy(&m_condition); assert(res == 0);}void condition::notify_one(){ int res = 0; res = pthread_cond_signal(&m_condition); assert(res == 0);}void condition::notify_all(){ int res = 0; res = pthread_cond_broadcast(&m_condition); assert(res == 0);}void condition::do_wait(pthread_mutex_t* pmutex){ int res = 0; res = pthread_cond_wait(&m_condition, pmutex); assert(res == 0);}bool condition::do_timed_wait(const xtime& xt, pthread_mutex_t* pmutex){ timespec ts; to_timespec(xt, ts); int res = 0; res = pthread_cond_timedwait(&m_condition, pmutex, &ts); assert(res == 0 || res == ETIMEDOUT); return res != ETIMEDOUT;}#elif defined(BOOST_HAS_MPTASKS)using threads::mac::detail::safe_enter_critical_region;using threads::mac::detail::safe_wait_on_semaphore;condition::condition() : m_gone(0), m_blocked(0), m_waiting(0){ threads::mac::detail::thread_init(); OSStatus lStatus = noErr; lStatus = MPCreateSemaphore(1, 1, &m_gate); if(lStatus == noErr) lStatus = MPCreateSemaphore(ULONG_MAX, 0, &m_queue); if(lStatus != noErr || !m_gate || !m_queue) { if (m_gate) { lStatus = MPDeleteSemaphore(m_gate); assert(lStatus == noErr); } if (m_queue) { lStatus = MPDeleteSemaphore(m_queue); assert(lStatus == noErr); } throw thread_resource_error(); }}condition::~condition(){ OSStatus lStatus = noErr; lStatus = MPDeleteSemaphore(m_gate); assert(lStatus == noErr); lStatus = MPDeleteSemaphore(m_queue); assert(lStatus == noErr);}void condition::notify_one(){ unsigned signals = 0; OSStatus lStatus = noErr; lStatus = safe_enter_critical_region(m_mutex, kDurationForever, m_mutex_mutex); assert(lStatus == noErr); if (m_waiting != 0) // the m_gate is already closed { if (m_blocked == 0) { lStatus = MPExitCriticalRegion(m_mutex); assert(lStatus == noErr); return; } ++m_waiting; --m_blocked; } else { lStatus = safe_wait_on_semaphore(m_gate, kDurationForever); assert(lStatus == noErr); if (m_blocked > m_gone) { if (m_gone != 0) { m_blocked -= m_gone; m_gone = 0; } signals = m_waiting = 1; --m_blocked; } else { lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr); } lStatus = MPExitCriticalRegion(m_mutex); assert(lStatus == noErr); while (signals) { lStatus = MPSignalSemaphore(m_queue); assert(lStatus == noErr); --signals; } }}void condition::notify_all(){ unsigned signals = 0; OSStatus lStatus = noErr; lStatus = safe_enter_critical_region(m_mutex, kDurationForever, m_mutex_mutex); assert(lStatus == noErr); if (m_waiting != 0) // the m_gate is already closed { if (m_blocked == 0) { lStatus = MPExitCriticalRegion(m_mutex); assert(lStatus == noErr); return; } m_waiting += (signals = m_blocked); m_blocked = 0; } else { lStatus = safe_wait_on_semaphore(m_gate, kDurationForever); assert(lStatus == noErr); if (m_blocked > m_gone) { if (m_gone != 0) { m_blocked -= m_gone; m_gone = 0; } signals = m_waiting = m_blocked; m_blocked = 0; } else { lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr); } lStatus = MPExitCriticalRegion(m_mutex); assert(lStatus == noErr); while (signals) { lStatus = MPSignalSemaphore(m_queue); assert(lStatus == noErr); --signals; } }}void condition::enter_wait(){ OSStatus lStatus = noErr; lStatus = safe_wait_on_semaphore(m_gate, kDurationForever); assert(lStatus == noErr); ++m_blocked; lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr);}void condition::do_wait(){ OSStatus lStatus = noErr; lStatus = safe_wait_on_semaphore(m_queue, kDurationForever); assert(lStatus == noErr); unsigned was_waiting=0; unsigned was_gone=0; lStatus = safe_enter_critical_region(m_mutex, kDurationForever, m_mutex_mutex); assert(lStatus == noErr); was_waiting = m_waiting; was_gone = m_gone; if (was_waiting != 0) { if (--m_waiting == 0) { if (m_blocked != 0) { lStatus = MPSignalSemaphore(m_gate); // open m_gate assert(lStatus == noErr); was_waiting = 0; } else if (m_gone != 0) m_gone = 0; } } else if (++m_gone == (std::numeric_limits<unsigned>::max() / 2)) { // timeout occured, normalize the m_gone count // this may occur if many calls to wait with a timeout are made and // no call to notify_* is made lStatus = safe_wait_on_semaphore(m_gate, kDurationForever); assert(lStatus == noErr); m_blocked -= m_gone; lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr); m_gone = 0; } lStatus = MPExitCriticalRegion(m_mutex); assert(lStatus == noErr); if (was_waiting == 1) { for (/**/ ; was_gone; --was_gone) { // better now than spurious later lStatus = safe_wait_on_semaphore(m_queue, kDurationForever); assert(lStatus == noErr); } lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr); }}bool condition::do_timed_wait(const xtime& xt){ int milliseconds; to_duration(xt, milliseconds); OSStatus lStatus = noErr; lStatus = safe_wait_on_semaphore(m_queue, milliseconds); assert(lStatus == noErr || lStatus == kMPTimeoutErr); bool ret = (lStatus == noErr); unsigned was_waiting=0; unsigned was_gone=0; lStatus = safe_enter_critical_region(m_mutex, kDurationForever, m_mutex_mutex); assert(lStatus == noErr); was_waiting = m_waiting; was_gone = m_gone; if (was_waiting != 0) { if (!ret) // timeout { if (m_blocked != 0) --m_blocked; else ++m_gone; // count spurious wakeups } if (--m_waiting == 0) { if (m_blocked != 0) { lStatus = MPSignalSemaphore(m_gate); // open m_gate assert(lStatus == noErr); was_waiting = 0; } else if (m_gone != 0) m_gone = 0; } } else if (++m_gone == (std::numeric_limits<unsigned>::max() / 2)) { // timeout occured, normalize the m_gone count // this may occur if many calls to wait with a timeout are made and // no call to notify_* is made lStatus = safe_wait_on_semaphore(m_gate, kDurationForever); assert(lStatus == noErr); m_blocked -= m_gone; lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr); m_gone = 0; } lStatus = MPExitCriticalRegion(m_mutex); assert(lStatus == noErr); if (was_waiting == 1) { for (/**/ ; was_gone; --was_gone) { // better now than spurious later lStatus = safe_wait_on_semaphore(m_queue, kDurationForever); assert(lStatus == noErr); } lStatus = MPSignalSemaphore(m_gate); assert(lStatus == noErr); } return ret;}#endif} // namespace boost// Change Log:// 8 Feb 01 WEKEMPF Initial version.// 22 May 01 WEKEMPF Modified to use xtime for time outs.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -