📄 mpthread.cpp
字号:
return err == wxSEMA_TIMEOUT ? wxCOND_TIMEOUT : wxCOND_MISC_ERROR;
}
return wxCOND_NO_ERROR;
}
wxCondError wxConditionInternal::DoSignal( bool signalAll)
{
m_gate.Wait();
m_varSection.Enter();
wxASSERT( m_signals == m_canceled);
if ( m_waiters == m_canceled)
{
m_varSection.Leave();
m_gate.Post();
return wxCOND_NO_ERROR;
}
if ( m_canceled > 0)
{
m_waiters -= m_canceled;
m_signals = 0;
m_canceled = 0;
}
m_signals = signalAll ? m_waiters : 1;
size_t n = m_signals;
m_varSection.Leave();
// Let the waiters inherit the gate lock.
do
{
wxSemaError err = m_semaphore.Post();
wxASSERT( err == wxSEMA_NO_ERROR);
} while ( -- n);
return wxCOND_NO_ERROR;
}
#else
class wxConditionInternal
{
public:
wxConditionInternal(wxMutex& mutex);
bool IsOk() const { return m_mutex.IsOk() && m_semaphore.IsOk(); }
wxCondError Wait();
wxCondError WaitTimeout(unsigned long milliseconds);
wxCondError Signal();
wxCondError Broadcast();
private:
// the number of threads currently waiting for this condition
SInt32 m_numWaiters;
// the critical section protecting m_numWaiters
wxCriticalSection m_csWaiters;
wxMutex& m_mutex;
wxSemaphore m_semaphore;
DECLARE_NO_COPY_CLASS(wxConditionInternal)
};
wxConditionInternal::wxConditionInternal(wxMutex& mutex)
: m_mutex(mutex)
{
// another thread can't access it until we return from ctor, so no need to
// protect access to m_numWaiters here
m_numWaiters = 0;
}
wxCondError wxConditionInternal::Wait()
{
// increment the number of waiters
IncrementAtomic(&m_numWaiters);
m_mutex.Unlock();
// a potential race condition can occur here
//
// after a thread increments nwaiters, and unlocks the mutex and before the
// semaphore.Wait() is called, if another thread can cause a signal to be
// generated
//
// this race condition is handled by using a semaphore and incrementing the
// semaphore only if 'nwaiters' is greater that zero since the semaphore,
// can 'remember' signals the race condition will not occur
// wait ( if necessary ) and decrement semaphore
wxSemaError err = m_semaphore.Wait();
m_mutex.Lock();
return err == wxSEMA_NO_ERROR ? wxCOND_NO_ERROR : wxCOND_MISC_ERROR;
}
wxCondError wxConditionInternal::WaitTimeout(unsigned long milliseconds)
{
IncrementAtomic(&m_numWaiters);
m_mutex.Unlock();
// a race condition can occur at this point in the code
//
// please see the comments in Wait(), for details
wxSemaError err = m_semaphore.WaitTimeout(milliseconds);
if ( err == wxSEMA_BUSY )
{
// another potential race condition exists here it is caused when a
// 'waiting' thread timesout, and returns from WaitForSingleObject, but
// has not yet decremented 'nwaiters'.
//
// at this point if another thread calls signal() then the semaphore
// will be incremented, but the waiting thread will miss it.
//
// to handle this particular case, the waiting thread calls
// WaitForSingleObject again with a timeout of 0, after locking
// 'nwaiters_mutex'. this call does not block because of the zero
// timeout, but will allow the waiting thread to catch the missed
// signals.
wxCriticalSectionLocker lock(m_csWaiters);
err = m_semaphore.WaitTimeout(0);
if ( err != wxSEMA_NO_ERROR )
{
m_numWaiters--;
}
}
m_mutex.Lock();
return err == wxSEMA_NO_ERROR ? wxCOND_NO_ERROR : wxCOND_MISC_ERROR;
}
wxCondError wxConditionInternal::Signal()
{
wxCriticalSectionLocker lock(m_csWaiters);
if ( m_numWaiters > 0 )
{
// increment the semaphore by 1
if ( m_semaphore.Post() != wxSEMA_NO_ERROR )
return wxCOND_MISC_ERROR;
m_numWaiters--;
}
return wxCOND_NO_ERROR;
}
wxCondError wxConditionInternal::Broadcast()
{
wxCriticalSectionLocker lock(m_csWaiters);
while ( m_numWaiters > 0 )
{
if ( m_semaphore.Post() != wxSEMA_NO_ERROR )
return wxCOND_MISC_ERROR;
m_numWaiters--;
}
return wxCOND_NO_ERROR;
}
#endif
// ----------------------------------------------------------------------------
// wxCriticalSection implementation
// ----------------------------------------------------------------------------
// XXX currently implemented as mutex in headers. Change to critical section.
// ----------------------------------------------------------------------------
// wxThread implementation
// ----------------------------------------------------------------------------
// wxThreadInternal class
// ----------------------
class wxThreadInternal
{
public:
wxThreadInternal()
{
m_tid = kInvalidID;
m_state = STATE_NEW;
m_prio = WXTHREAD_DEFAULT_PRIORITY;
m_notifyQueueId = kInvalidID;
m_exitcode = 0;
m_cancelled = false ;
// set to true only when the thread starts waiting on m_semSuspend
m_isPaused = false;
// defaults for joinable threads
m_shouldBeJoined = true;
m_isDetached = false;
}
~wxThreadInternal()
{
if ( m_notifyQueueId)
{
MPDeleteQueue( m_notifyQueueId);
m_notifyQueueId = kInvalidID ;
}
}
// thread function
static OSStatus MacThreadStart(void* arg);
// create a new (suspended) thread (for the given thread object)
bool Create(wxThread *thread, unsigned int stackSize);
// thread actions
// start the thread
wxThreadError Run();
// unblock the thread allowing it to run
void SignalRun() { m_semRun.Post(); }
// ask the thread to terminate
void Wait();
// go to sleep until Resume() is called
void Pause();
// resume the thread
void Resume();
// accessors
// priority
int GetPriority() const { return m_prio; }
void SetPriority(int prio) ;
// state
wxThreadState GetState() const { return m_state; }
void SetState(wxThreadState state) { m_state = state; }
// Get the ID of this thread's underlying MP Services task.
MPTaskID GetId() const { return m_tid; }
void SetCancelFlag() { m_cancelled = true; }
bool WasCancelled() const { return m_cancelled; }
// exit code
void SetExitCode(wxThread::ExitCode exitcode) { m_exitcode = exitcode; }
wxThread::ExitCode GetExitCode() const { return m_exitcode; }
// the pause flag
void SetReallyPaused(bool paused) { m_isPaused = paused; }
bool IsReallyPaused() const { return m_isPaused; }
// tell the thread that it is a detached one
void Detach()
{
wxCriticalSectionLocker lock(m_csJoinFlag);
m_shouldBeJoined = false;
m_isDetached = true;
}
private:
// the thread we're associated with
wxThread * m_thread;
MPTaskID m_tid; // thread id
MPQueueID m_notifyQueueId; // its notification queue
wxThreadState m_state; // see wxThreadState enum
int m_prio; // in wxWidgets units: from 0 to 100
// this flag is set when the thread should terminate
bool m_cancelled;
// this flag is set when the thread is blocking on m_semSuspend
bool m_isPaused;
// the thread exit code - only used for joinable (!detached) threads and
// is only valid after the thread termination
wxThread::ExitCode m_exitcode;
// many threads may call Wait(), but only one of them should call
// pthread_join(), so we have to keep track of this
wxCriticalSection m_csJoinFlag;
bool m_shouldBeJoined;
bool m_isDetached;
// this semaphore is posted by Run() and the threads Entry() is not
// called before it is done
wxSemaphore m_semRun;
// this one is signaled when the thread should resume after having been
// Pause()d
wxSemaphore m_semSuspend;
};
OSStatus wxThreadInternal::MacThreadStart(void *parameter)
{
wxThread* thread = (wxThread*) parameter ;
wxThreadInternal *pthread = thread->m_internal;
// add to TLS so that This() will work
verify_noerr( MPSetTaskStorageValue( gs_tlsForWXThread , (long) thread ) ) ;
// have to declare this before pthread_cleanup_push() which defines a
// block!
bool dontRunAtAll;
// wait for the semaphore to be posted from Run()
pthread->m_semRun.Wait();
// test whether we should run the run at all - may be it was deleted
// before it started to Run()?
{
wxCriticalSectionLocker lock(thread->m_critsect);
dontRunAtAll = pthread->GetState() == STATE_NEW &&
pthread->WasCancelled();
}
if ( !dontRunAtAll )
{
pthread->m_exitcode = thread->Entry();
{
wxCriticalSectionLocker lock(thread->m_critsect);
pthread->SetState(STATE_EXITED);
}
}
if ( dontRunAtAll )
{
if ( pthread->m_isDetached )
delete thread;
return -1 ;
}
else
{
// on mac for the running code the correct thread termination is to
// return
// terminate the thread
thread->Exit(pthread->m_exitcode);
return (OSStatus) NULL ; // pthread->m_exitcode;
}
}
bool wxThreadInternal::Create(wxThread *thread, unsigned int stackSize)
{
wxMacMPThreadsInitVerify() ;
wxASSERT_MSG( m_state == STATE_NEW && !m_tid,
_T("Create()ing thread twice?") );
OSStatus err = noErr ;
m_thread = thread;
if ( m_notifyQueueId == kInvalidID )
{
OSStatus err = MPCreateQueue( & m_notifyQueueId);
if( err)
{
wxLogSysError(_("Cant create the thread event queue"));
return false;
}
}
m_state = STATE_NEW;
err = MPCreateTask( MacThreadStart,
(void*) m_thread,
stackSize,
m_notifyQueueId,
&m_exitcode,
0,
0,
&m_tid);
if ( err)
{
wxLogSysError(_("Can't create thread"));
return false;
}
if ( m_prio != WXTHREAD_DEFAULT_PRIORITY )
{
SetPriority(m_prio);
}
return true;
}
void wxThreadInternal::SetPriority( int priority)
{
m_prio = priority;
if ( m_tid)
{
// Mac priorities range from 1 to 10,000, with a default of 100.
// wxWidgets priorities range from 0 to 100 with a default of 50.
// We can map wxWidgets to Mac priorities easily by assuming
// the former uses a logarithmic scale.
const unsigned int macPriority = ( int)( exp( priority / 25.0 * log( 10.0)) + 0.5);
MPSetTaskWeight( m_tid, macPriority);
}
}
wxThreadError wxThreadInternal::Run()
{
wxCHECK_MSG( GetState() == STATE_NEW, wxTHREAD_RUNNING,
wxT("thread may only be started once after Create()") );
SetState(STATE_RUNNING);
// wake up threads waiting for our start
SignalRun();
return wxTHREAD_NO_ERROR;
}
void wxThreadInternal::Wait()
{
wxCHECK_RET( !m_isDetached, _T("can't wait for a detached thread") );
// if the thread we're waiting for is waiting for the GUI mutex, we will
// deadlock so make sure we release it temporarily
if ( wxThread::IsMain() )
wxMutexGuiLeave();
{
wxCriticalSectionLocker lock(m_csJoinFlag);
if ( m_shouldBeJoined )
{
void * param1;
void * param2;
void * rc;
OSStatus err = MPWaitOnQueue ( m_notifyQueueId,
& param1,
& param2,
& rc,
kDurationForever);
if ( err)
{
wxLogSysError( _( "Cannot wait for thread termination."));
rc = (void*) -1;
}
// actually param1 would be the address of m_exitcode
// but we don't need this here
m_exitcode = rc;
m_shouldBeJoined = false;
}
}
// reacquire GUI mutex
if ( wxThread::IsMain() )
wxMutexGuiEnter();
}
void wxThreadInternal::Pause()
{
// the state is set from the thread which pauses us first, this function
// is called later so the state should have been already set
wxCHECK_RET( m_state == STATE_PAUSED,
wxT("thread must first be paused with wxThread::Pause().") );
// wait until the semaphore is Post()ed from Resume()
m_semSuspend.Wait();
}
void wxThreadInternal::Resume()
{
wxCHECK_RET( m_state == STATE_PAUSED,
wxT("can't resume thread which is not suspended.") );
// the thread might be not actually paused yet - if there were no call to
// TestDestroy() since the last call to Pause() for example
if ( IsReallyPaused() )
{
// wake up Pause()
m_semSuspend.Post();
// reset the flag
SetReallyPaused(FALSE);
}
SetState(STATE_RUNNING);
}
// static functions
// ----------------
wxThread *wxThread::This()
{
wxThread* thr = (wxThread*) MPGetTaskStorageValue( gs_tlsForWXThread ) ;
return thr;
}
bool wxThread::IsMain()
{
return GetCurrentId() == gs_idMainThread;
}
#ifdef Yield
#undef Yield
#endif
void wxThread::Yield()
{
#if TARGET_API_MAC_OSX
CFRunLoopRunInMode( kCFRunLoopDefaultMode , 0 , true ) ;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -