📄 thread.cpp
字号:
m_mutex( mutex ), m_semaphore( 0, 1 ), m_gate( 1, 1 ) { m_waiters = 0; m_signals = 0; m_canceled = 0; } virtual ~wxConditionInternal() {} bool IsOk() const { return m_mutex.IsOk(); } wxCondError Wait() { return WaitTimeout( kDurationForever ); } wxCondError WaitTimeout( unsigned long msectimeout ); wxCondError Signal() { return DoSignal( false); } wxCondError Broadcast() { return DoSignal( true ); }private: wxCondError DoSignal( bool signalAll ); wxMutex& m_mutex; wxSemaphoreInternal m_semaphore; // Signals the waiting threads. wxSemaphoreInternal m_gate; wxCriticalSection m_varSection; size_t m_waiters; // Number of threads waiting for a signal. size_t m_signals; // Number of signals to send. size_t m_canceled; // Number of canceled waiters in m_waiters.};wxCondError wxConditionInternal::WaitTimeout( unsigned long msectimeout ){ m_gate.Wait(); if ( ++ m_waiters == INT_MAX ) { m_varSection.Enter(); m_waiters -= m_canceled; m_signals -= m_canceled; m_canceled = 0; m_varSection.Leave(); } m_gate.Post(); m_mutex.Unlock(); wxSemaError err = m_semaphore.WaitTimeout( msectimeout); wxASSERT( err == wxSEMA_NO_ERROR || err == wxSEMA_TIMEOUT); m_varSection.Enter(); if ( err != wxSEMA_NO_ERROR ) { if ( m_signals > m_canceled ) { // A signal is being sent after we timed out. if ( m_waiters == m_signals ) { // There are no excess waiters to catch the signal, so // we must throw it away. wxSemaError err2 = m_semaphore.Wait(); if ( err2 != wxSEMA_NO_ERROR ) { wxLogSysError( wx("Error while waiting on semaphore") ); } wxASSERT( err2 == wxSEMA_NO_ERROR); --m_waiters; if ( --m_signals == m_canceled ) { // This was the last signal. open the gate. wxASSERT( m_waiters == m_canceled ); m_gate.Post(); } } else { // There are excess waiters to catch the signal, leave it be. --m_waiters; } } else { // No signals is being sent: // the gate may be open or closed, so we can't touch m_waiters. ++m_canceled; ++m_signals; } } else { // We caught a signal. wxASSERT( m_signals > m_canceled ); --m_waiters; if ( --m_signals == m_canceled) { // This was the last signal. open the gate. wxASSERT( m_waiters == m_canceled ); m_gate.Post(); } } m_varSection.Leave(); m_mutex.Lock(); if (err != noErr) return err == wxSEMA_TIMEOUT ? wxCOND_TIMEOUT : wxCOND_MISC_ERROR; return wxCOND_NO_ERROR;}wxCondError wxConditionInternal::DoSignal( bool signalAll){ m_gate.Wait(); m_varSection.Enter(); wxASSERT( m_signals == m_canceled ); if ( m_waiters == m_canceled) { m_varSection.Leave(); m_gate.Post(); return wxCOND_NO_ERROR; } if ( m_canceled > 0) { m_waiters -= m_canceled; m_signals = 0; m_canceled = 0; } m_signals = signalAll ? m_waiters : 1; size_t n = m_signals; m_varSection.Leave(); // Let the waiters inherit the gate lock. do { wxSemaError err = m_semaphore.Post(); wxASSERT( err == wxSEMA_NO_ERROR ); } while ( --n ); return wxCOND_NO_ERROR;}#elseclass wxConditionInternal{public: wxConditionInternal( wxMutex& mutex ); bool IsOk() const { return m_mutex.IsOk() && m_semaphore.IsOk(); } wxCondError Wait(); wxCondError WaitTimeout( unsigned long milliseconds ); wxCondError Signal(); wxCondError Broadcast();private: // the number of threads currently waiting for this condition SInt32 m_numWaiters; // the critical section protecting m_numWaiters wxCriticalSection m_csWaiters; wxMutex& m_mutex; wxSemaphore m_semaphore; DECLARE_NO_COPY_CLASS(wxConditionInternal)};wxConditionInternal::wxConditionInternal( wxMutex& mutex ) : m_mutex(mutex){ // another thread can't access it until we return from ctor, so no need to // protect access to m_numWaiters here m_numWaiters = 0;}wxCondError wxConditionInternal::Wait(){ // increment the number of waiters IncrementAtomic( &m_numWaiters ); m_mutex.Unlock(); // a potential race condition can occur here // // after a thread increments nwaiters, and unlocks the mutex and before the // semaphore.Wait() is called, if another thread can cause a signal to be // generated // // this race condition is handled by using a semaphore and incrementing the // semaphore only if 'nwaiters' is greater that zero since the semaphore, // can 'remember' signals the race condition will not occur // wait ( if necessary ) and decrement semaphore wxSemaError err = m_semaphore.Wait(); m_mutex.Lock(); return err == wxSEMA_NO_ERROR ? wxCOND_NO_ERROR : wxCOND_MISC_ERROR;}wxCondError wxConditionInternal::WaitTimeout( unsigned long milliseconds ){ IncrementAtomic( &m_numWaiters ); m_mutex.Unlock(); // a race condition can occur at this point in the code // // please see the comments in Wait(), for details wxSemaError err = m_semaphore.WaitTimeout(milliseconds); if ( err == wxSEMA_BUSY ) { // another potential race condition exists here it is caused when a // 'waiting' thread timesout, and returns from WaitForSingleObject, but // has not yet decremented 'nwaiters'. // // at this point if another thread calls signal() then the semaphore // will be incremented, but the waiting thread will miss it. // // to handle this particular case, the waiting thread calls // WaitForSingleObject again with a timeout of 0, after locking // 'nwaiters_mutex'. this call does not block because of the zero // timeout, but will allow the waiting thread to catch the missed // signals. wxCriticalSectionLocker lock(m_csWaiters); err = m_semaphore.WaitTimeout(0); if ( err != wxSEMA_NO_ERROR ) { m_numWaiters--; } } m_mutex.Lock(); return err == wxSEMA_NO_ERROR ? wxCOND_NO_ERROR : wxCOND_MISC_ERROR;}wxCondError wxConditionInternal::Signal(){ wxCriticalSectionLocker lock(m_csWaiters); if ( m_numWaiters > 0 ) { // increment the semaphore by 1 if ( m_semaphore.Post() != wxSEMA_NO_ERROR ) return wxCOND_MISC_ERROR; m_numWaiters--; } return wxCOND_NO_ERROR;}wxCondError wxConditionInternal::Broadcast(){ wxCriticalSectionLocker lock(m_csWaiters); while ( m_numWaiters > 0 ) { if ( m_semaphore.Post() != wxSEMA_NO_ERROR ) return wxCOND_MISC_ERROR; m_numWaiters--; } return wxCOND_NO_ERROR;}#endif// ----------------------------------------------------------------------------// wxCriticalSection implementation// ----------------------------------------------------------------------------// XXX currently implemented as mutex in headers. Change to critical section.// ----------------------------------------------------------------------------// wxThread implementation// ----------------------------------------------------------------------------// wxThreadInternal class// ----------------------class wxThreadInternal{public: wxThreadInternal() { m_tid = kInvalidID; m_state = STATE_NEW; m_prio = WXTHREAD_DEFAULT_PRIORITY; m_notifyQueueId = kInvalidID; m_exitcode = 0; m_cancelled = false ; // set to true only when the thread starts waiting on m_semSuspend m_isPaused = false; // defaults for joinable threads m_shouldBeJoined = true; m_isDetached = false; } virtual ~wxThreadInternal() { if ( m_notifyQueueId) { MPDeleteQueue( m_notifyQueueId ); m_notifyQueueId = kInvalidID ; } } // thread function static OSStatus MacThreadStart(void* arg); // create a new (suspended) thread (for the given thread object) bool Create(wxThread *thread, unsigned int stackSize); // thread actions // start the thread wxThreadError Run(); // unblock the thread allowing it to run void SignalRun() { m_semRun.Post(); } // ask the thread to terminate void Wait(); // go to sleep until Resume() is called void Pause(); // resume the thread void Resume(); // accessors // priority int GetPriority() const { return m_prio; } void SetPriority(int prio); // state wxThreadState GetState() const { return m_state; } void SetState(wxThreadState state) { m_state = state; } // Get the ID of this thread's underlying MP Services task. MPTaskID GetId() const { return m_tid; } void SetCancelFlag() { m_cancelled = true; } bool WasCancelled() const { return m_cancelled; } // exit code void SetExitCode(wxThread::ExitCode exitcode) { m_exitcode = exitcode; } wxThread::ExitCode GetExitCode() const { return m_exitcode; } // the pause flag void SetReallyPaused(bool paused) { m_isPaused = paused; } bool IsReallyPaused() const { return m_isPaused; } // tell the thread that it is a detached one void Detach() { wxCriticalSectionLocker lock(m_csJoinFlag); m_shouldBeJoined = false; m_isDetached = true; }private: // the thread we're associated with wxThread * m_thread; MPTaskID m_tid; // thread id MPQueueID m_notifyQueueId; // its notification queue wxThreadState m_state; // see wxThreadState enum int m_prio; // in wxWidgets units: from 0 to 100 // this flag is set when the thread should terminate bool m_cancelled; // this flag is set when the thread is blocking on m_semSuspend bool m_isPaused; // the thread exit code - only used for joinable (!detached) threads and // is only valid after the thread termination wxThread::ExitCode m_exitcode; // many threads may call Wait(), but only one of them should call // pthread_join(), so we have to keep track of this wxCriticalSection m_csJoinFlag; bool m_shouldBeJoined; bool m_isDetached; // this semaphore is posted by Run() and the threads Entry() is not // called before it is done wxSemaphore m_semRun; // this one is signaled when the thread should resume after having been // Pause()d wxSemaphore m_semSuspend;};OSStatus wxThreadInternal::MacThreadStart(void *parameter){ wxThread* thread = (wxThread*) parameter ; wxThreadInternal *pthread = thread->m_internal; // add to TLS so that This() will work verify_noerr( MPSetTaskStorageValue( gs_tlsForWXThread , (TaskStorageValue) thread ) ) ; // have to declare this before pthread_cleanup_push() which defines a // block! bool dontRunAtAll; // wait for the semaphore to be posted from Run() pthread->m_semRun.Wait(); // test whether we should run the run at all - may be it was deleted // before it started to Run()? { wxCriticalSectionLocker lock(thread->m_critsect); dontRunAtAll = pthread->GetState() == STATE_NEW && pthread->WasCancelled(); } if ( !dontRunAtAll ) { pthread->m_exitcode = thread->Entry(); { wxCriticalSectionLocker lock(thread->m_critsect); pthread->SetState( STATE_EXITED ); } } if ( dontRunAtAll ) { if ( pthread->m_isDetached ) delete thread; return -1; } else { // on Mac for the running code, // the correct thread termination is to return // terminate the thread thread->Exit( pthread->m_exitcode ); return (OSStatus) NULL; // pthread->m_exitcode; }}bool wxThreadInternal::Create( wxThread *thread, unsigned int stackSize ){ wxASSERT_MSG( m_state == STATE_NEW && !m_tid, wxT("Create()ing thread twice?") ); OSStatus err = noErr; m_thread = thread; if ( m_notifyQueueId == kInvalidID ) { OSStatus err = MPCreateQueue( &m_notifyQueueId ); if (err != noErr) { wxLogSysError( wxT("Cant create the thread event queue") ); return false; } } m_state = STATE_NEW; err = MPCreateTask( MacThreadStart, (void*)m_thread, stackSize, m_notifyQueueId, &m_exitcode, 0, 0, &m_tid ); if (err != noErr) { wxLogSysError( wxT("Can't create thread") ); return false; } if ( m_prio != WXTHREAD_DEFAULT_PRIORITY ) SetPriority( m_prio ); return true;}void wxThreadInternal::SetPriority( int priority ){ m_prio = priority; if (m_tid) { // Mac priorities range from 1 to 10,000, with a default of 100. // wxWidgets priorities range from 0 to 100 with a default of 50. // We can map wxWidgets to Mac priorities easily by assuming // the former uses a logarithmic scale. const unsigned int macPriority = (int)( exp( priority / 25.0 * log( 10.0)) + 0.5); MPSetTaskWeight( m_tid, macPriority ); }}wxThreadError wxThreadInternal::Run(){ wxCHECK_MSG( GetState() == STATE_NEW, wxTHREAD_RUNNING, wxT("thread may only be started once after Create()") ); SetState( STATE_RUNNING ); // wake up threads waiting for our start SignalRun(); return wxTHREAD_NO_ERROR;}void wxThreadInternal::Wait(){ wxCHECK_RET( !m_isDetached, wxT("can't wait for a detached thread") ); // if the thread we're waiting for is waiting for the GUI mutex, we will // deadlock so make sure we release it temporarily if ( wxThread::IsMain() ) { // give the thread we're waiting for chance to do the GUI call // it might be in, we don't do this conditionally as the to be waited on // thread might have to acquire the mutex later but before terminating if ( wxGuiOwnedByMainThread() ) wxMutexGuiLeave(); } { wxCriticalSectionLocker lock(m_csJoinFlag); if ( m_shouldBeJoined ) { void *param1, *param2, *rc; OSStatus err = MPWaitOnQueue( m_notifyQueueId, ¶m1, ¶m2, &rc, kDurationForever ); if (err != noErr) { wxLogSysError( wxT( "Cannot wait for thread termination.")); rc = (void*) -1; } // actually param1 would be the address of m_exitcode // but we don't need this here m_exitcode = rc;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -