📄 mpthread.cpp
字号:
return err == wxSEMA_TIMEOUT ? wxCOND_TIMEOUT : wxCOND_MISC_ERROR; } return wxCOND_NO_ERROR;}wxCondError wxConditionInternal::DoSignal( bool signalAll){ m_gate.Wait(); m_varSection.Enter(); wxASSERT( m_signals == m_canceled); if ( m_waiters == m_canceled) { m_varSection.Leave(); m_gate.Post(); return wxCOND_NO_ERROR; } if ( m_canceled > 0) { m_waiters -= m_canceled; m_signals = 0; m_canceled = 0; } m_signals = signalAll ? m_waiters : 1; size_t n = m_signals; m_varSection.Leave(); // Let the waiters inherit the gate lock. do { wxSemaError err = m_semaphore.Post(); wxASSERT( err == wxSEMA_NO_ERROR); } while ( -- n); return wxCOND_NO_ERROR;}#elseclass wxConditionInternal{public: wxConditionInternal(wxMutex& mutex); bool IsOk() const { return m_mutex.IsOk() && m_semaphore.IsOk(); } wxCondError Wait(); wxCondError WaitTimeout(unsigned long milliseconds); wxCondError Signal(); wxCondError Broadcast();private: // the number of threads currently waiting for this condition SInt32 m_numWaiters; // the critical section protecting m_numWaiters wxCriticalSection m_csWaiters; wxMutex& m_mutex; wxSemaphore m_semaphore; DECLARE_NO_COPY_CLASS(wxConditionInternal)};wxConditionInternal::wxConditionInternal(wxMutex& mutex) : m_mutex(mutex){ // another thread can't access it until we return from ctor, so no need to // protect access to m_numWaiters here m_numWaiters = 0;}wxCondError wxConditionInternal::Wait(){ // increment the number of waiters IncrementAtomic(&m_numWaiters); m_mutex.Unlock(); // a potential race condition can occur here // // after a thread increments nwaiters, and unlocks the mutex and before the // semaphore.Wait() is called, if another thread can cause a signal to be // generated // // this race condition is handled by using a semaphore and incrementing the // semaphore only if 'nwaiters' is greater that zero since the semaphore, // can 'remember' signals the race condition will not occur // wait ( if necessary ) and decrement semaphore wxSemaError err = m_semaphore.Wait(); m_mutex.Lock(); return err == wxSEMA_NO_ERROR ? wxCOND_NO_ERROR : wxCOND_MISC_ERROR;}wxCondError wxConditionInternal::WaitTimeout(unsigned long milliseconds){ IncrementAtomic(&m_numWaiters); m_mutex.Unlock(); // a race condition can occur at this point in the code // // please see the comments in Wait(), for details wxSemaError err = m_semaphore.WaitTimeout(milliseconds); if ( err == wxSEMA_BUSY ) { // another potential race condition exists here it is caused when a // 'waiting' thread timesout, and returns from WaitForSingleObject, but // has not yet decremented 'nwaiters'. // // at this point if another thread calls signal() then the semaphore // will be incremented, but the waiting thread will miss it. // // to handle this particular case, the waiting thread calls // WaitForSingleObject again with a timeout of 0, after locking // 'nwaiters_mutex'. this call does not block because of the zero // timeout, but will allow the waiting thread to catch the missed // signals. wxCriticalSectionLocker lock(m_csWaiters); err = m_semaphore.WaitTimeout(0); if ( err != wxSEMA_NO_ERROR ) { m_numWaiters--; } } m_mutex.Lock(); return err == wxSEMA_NO_ERROR ? wxCOND_NO_ERROR : wxCOND_MISC_ERROR;}wxCondError wxConditionInternal::Signal(){ wxCriticalSectionLocker lock(m_csWaiters); if ( m_numWaiters > 0 ) { // increment the semaphore by 1 if ( m_semaphore.Post() != wxSEMA_NO_ERROR ) return wxCOND_MISC_ERROR; m_numWaiters--; } return wxCOND_NO_ERROR;}wxCondError wxConditionInternal::Broadcast(){ wxCriticalSectionLocker lock(m_csWaiters); while ( m_numWaiters > 0 ) { if ( m_semaphore.Post() != wxSEMA_NO_ERROR ) return wxCOND_MISC_ERROR; m_numWaiters--; } return wxCOND_NO_ERROR;}#endif// ----------------------------------------------------------------------------// wxCriticalSection implementation// ----------------------------------------------------------------------------// XXX currently implemented as mutex in headers. Change to critical section.// ----------------------------------------------------------------------------// wxThread implementation// ----------------------------------------------------------------------------// wxThreadInternal class// ----------------------class wxThreadInternal{public: wxThreadInternal() { m_tid = kInvalidID; m_state = STATE_NEW; m_prio = WXTHREAD_DEFAULT_PRIORITY; m_notifyQueueId = kInvalidID; m_exitcode = 0; m_cancelled = false ; // set to true only when the thread starts waiting on m_semSuspend m_isPaused = false; // defaults for joinable threads m_shouldBeJoined = true; m_isDetached = false; } ~wxThreadInternal() { if ( m_notifyQueueId) { MPDeleteQueue( m_notifyQueueId); m_notifyQueueId = kInvalidID ; } } // thread function static OSStatus MacThreadStart(void* arg); // create a new (suspended) thread (for the given thread object) bool Create(wxThread *thread, unsigned int stackSize); // thread actions // start the thread wxThreadError Run(); // unblock the thread allowing it to run void SignalRun() { m_semRun.Post(); } // ask the thread to terminate void Wait(); // go to sleep until Resume() is called void Pause(); // resume the thread void Resume(); // accessors // priority int GetPriority() const { return m_prio; } void SetPriority(int prio) ; // state wxThreadState GetState() const { return m_state; } void SetState(wxThreadState state) { m_state = state; } // Get the ID of this thread's underlying MP Services task. MPTaskID GetId() const { return m_tid; } void SetCancelFlag() { m_cancelled = true; } bool WasCancelled() const { return m_cancelled; } // exit code void SetExitCode(wxThread::ExitCode exitcode) { m_exitcode = exitcode; } wxThread::ExitCode GetExitCode() const { return m_exitcode; } // the pause flag void SetReallyPaused(bool paused) { m_isPaused = paused; } bool IsReallyPaused() const { return m_isPaused; } // tell the thread that it is a detached one void Detach() { wxCriticalSectionLocker lock(m_csJoinFlag); m_shouldBeJoined = false; m_isDetached = true; }private: // the thread we're associated with wxThread * m_thread; MPTaskID m_tid; // thread id MPQueueID m_notifyQueueId; // its notification queue wxThreadState m_state; // see wxThreadState enum int m_prio; // in wxWidgets units: from 0 to 100 // this flag is set when the thread should terminate bool m_cancelled; // this flag is set when the thread is blocking on m_semSuspend bool m_isPaused; // the thread exit code - only used for joinable (!detached) threads and // is only valid after the thread termination wxThread::ExitCode m_exitcode; // many threads may call Wait(), but only one of them should call // pthread_join(), so we have to keep track of this wxCriticalSection m_csJoinFlag; bool m_shouldBeJoined; bool m_isDetached; // this semaphore is posted by Run() and the threads Entry() is not // called before it is done wxSemaphore m_semRun; // this one is signaled when the thread should resume after having been // Pause()d wxSemaphore m_semSuspend;};OSStatus wxThreadInternal::MacThreadStart(void *parameter){ wxThread* thread = (wxThread*) parameter ; wxThreadInternal *pthread = thread->m_internal; // add to TLS so that This() will work verify_noerr( MPSetTaskStorageValue( gs_tlsForWXThread , (long) thread ) ) ; // have to declare this before pthread_cleanup_push() which defines a // block! bool dontRunAtAll; // wait for the semaphore to be posted from Run() pthread->m_semRun.Wait(); // test whether we should run the run at all - may be it was deleted // before it started to Run()? { wxCriticalSectionLocker lock(thread->m_critsect); dontRunAtAll = pthread->GetState() == STATE_NEW && pthread->WasCancelled(); } if ( !dontRunAtAll ) { pthread->m_exitcode = thread->Entry(); { wxCriticalSectionLocker lock(thread->m_critsect); pthread->SetState(STATE_EXITED); } } if ( dontRunAtAll ) { if ( pthread->m_isDetached ) delete thread; return -1 ; } else { // on mac for the running code the correct thread termination is to // return // terminate the thread thread->Exit(pthread->m_exitcode); return (OSStatus) NULL ; // pthread->m_exitcode; }}bool wxThreadInternal::Create(wxThread *thread, unsigned int stackSize){ wxMacMPThreadsInitVerify() ; wxASSERT_MSG( m_state == STATE_NEW && !m_tid, _T("Create()ing thread twice?") ); OSStatus err = noErr ; m_thread = thread; if ( m_notifyQueueId == kInvalidID ) { OSStatus err = MPCreateQueue( & m_notifyQueueId); if( err) { wxLogSysError(_("Cant create the thread event queue")); return false; } } m_state = STATE_NEW; err = MPCreateTask( MacThreadStart, (void*) m_thread, stackSize, m_notifyQueueId, &m_exitcode, 0, 0, &m_tid); if ( err) { wxLogSysError(_("Can't create thread")); return false; } if ( m_prio != WXTHREAD_DEFAULT_PRIORITY ) { SetPriority(m_prio); } return true;}void wxThreadInternal::SetPriority( int priority){ m_prio = priority; if ( m_tid) { // Mac priorities range from 1 to 10,000, with a default of 100. // wxWidgets priorities range from 0 to 100 with a default of 50. // We can map wxWidgets to Mac priorities easily by assuming // the former uses a logarithmic scale. const unsigned int macPriority = ( int)( exp( priority / 25.0 * log( 10.0)) + 0.5); MPSetTaskWeight( m_tid, macPriority); }}wxThreadError wxThreadInternal::Run(){ wxCHECK_MSG( GetState() == STATE_NEW, wxTHREAD_RUNNING, wxT("thread may only be started once after Create()") ); SetState(STATE_RUNNING); // wake up threads waiting for our start SignalRun(); return wxTHREAD_NO_ERROR;}void wxThreadInternal::Wait(){ wxCHECK_RET( !m_isDetached, _T("can't wait for a detached thread") ); // if the thread we're waiting for is waiting for the GUI mutex, we will // deadlock so make sure we release it temporarily if ( wxThread::IsMain() ) wxMutexGuiLeave(); { wxCriticalSectionLocker lock(m_csJoinFlag); if ( m_shouldBeJoined ) { void * param1; void * param2; void * rc; OSStatus err = MPWaitOnQueue ( m_notifyQueueId, & param1, & param2, & rc, kDurationForever); if ( err) { wxLogSysError( _( "Cannot wait for thread termination.")); rc = (void*) -1; } // actually param1 would be the address of m_exitcode // but we don't need this here m_exitcode = rc; m_shouldBeJoined = false; } } // reacquire GUI mutex if ( wxThread::IsMain() ) wxMutexGuiEnter();}void wxThreadInternal::Pause(){ // the state is set from the thread which pauses us first, this function // is called later so the state should have been already set wxCHECK_RET( m_state == STATE_PAUSED, wxT("thread must first be paused with wxThread::Pause().") ); // wait until the semaphore is Post()ed from Resume() m_semSuspend.Wait();}void wxThreadInternal::Resume(){ wxCHECK_RET( m_state == STATE_PAUSED, wxT("can't resume thread which is not suspended.") ); // the thread might be not actually paused yet - if there were no call to // TestDestroy() since the last call to Pause() for example if ( IsReallyPaused() ) { // wake up Pause() m_semSuspend.Post(); // reset the flag SetReallyPaused(FALSE); } SetState(STATE_RUNNING);}// static functions// ----------------wxThread *wxThread::This(){ wxThread* thr = (wxThread*) MPGetTaskStorageValue( gs_tlsForWXThread ) ; return thr;}bool wxThread::IsMain(){ return GetCurrentId() == gs_idMainThread;}#ifdef Yield#undef Yield#endifvoid wxThread::Yield(){#if TARGET_API_MAC_OSX CFRunLoopRunInMode( kCFRunLoopDefaultMode , 0 , true ) ;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -