📄 vxworks.cc
字号:
//////////////////////////////////////////////////////////////////////////////// Filename: vxWorks.cc// Author: Tihomir Sokcevic// Acterna, Eningen.// Description: vxWorks adaptation of the omnithread wrapper classes// Notes: Munching strategy is imperative//////////////////////////////////////////////////////////////////////////////// $Log: vxWorks.cc,v $// Revision 1.1.1.1 2004/04/10 18:00:52 eb// Initial subversions check in of GNU Radio 2.x work-in-progress//// Revision 1.1.1.1 2004/03/01 00:20:27 eb// initial checkin//// Revision 1.1 2003/05/25 05:29:04 eb// see ChangeLog//// Revision 1.1.2.1 2003/02/17 02:03:11 dgrisby// vxWorks port. (Thanks Michael Sturm / Acterna Eningen GmbH).//// Revision 1.1.1.1 2002/11/19 14:58:04 sokcevti// OmniOrb4.0.0 VxWorks port//// Revision 1.4 2002/10/15 07:54:09 kuttlest// change semaphore from SEM_FIFO to SEM_PRIO// ---//// Revision 1.3 2002/07/05 07:38:52 engeln// made priority redefinable on load time by defining int variables// omni_thread_prio_low = 220;// omni_thread_prio_normal = 110;// omni_thread_prio_high = 55;// the default priority is prio_normal.// The normal priority default has been increased from 200 to 110 and the// high priority from 100 to 55.// ---//// Revision 1.2 2002/06/14 12:44:57 engeln// replaced possibly unsafe wakeup procedure in broadcast.// ---//// Revision 1.1.1.1 2002/04/02 10:09:34 sokcevti// omniORB4 initial realease//// Revision 1.0 2001/10/23 14:22:45 sokcevti// Initial Version 4.00// ---//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// Include files//////////////////////////////////////////////////////////////////////////////#include <stdlib.h>#include <stdio.h>#include <errno.h>#include <time.h>#include <omnithread.h>#include <sysLib.h>#include <assert.h> // assert#include <intLib.h> // intContext//////////////////////////////////////////////////////////////////////////////// Local defines//////////////////////////////////////////////////////////////////////////////#define ERRNO(x) (((x) != 0) ? (errno) : 0)#define THROW_ERRORS(x) { if((x) != OK) throw omni_thread_fatal(errno); }#define OMNI_THREAD_ID 0x7F7155AAl#define OMNI_STACK_SIZE 32768l#ifdef _DEBUG #include <fstream> #define DBG_TRACE(X) X#else // _DEBUG #define DBG_TRACE(X)#endif // _DEBUG#define DBG_ASSERT(X)#define DBG_THROW(X) Xint omni_thread_prio_low = 220;int omni_thread_prio_normal = 110;int omni_thread_prio_high = 55;/////////////////////////////////////////////////////////////////////////////// Mutex/////////////////////////////////////////////////////////////////////////////omni_mutex::omni_mutex(void):m_bConstructed(false){ mutexID = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE); DBG_ASSERT(assert(mutexID != NULL)); if(mutexID==NULL) { DBG_TRACE(cout<<"Exception: omni_mutex::omni_mutex() tid: "<<(int)taskIdSelf()<<endl); DBG_THROW(throw omni_thread_fatal(-1)); } m_bConstructed = true;}omni_mutex::~omni_mutex(void){ m_bConstructed = false; STATUS status = semDelete(mutexID); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_mutex::~omni_mutex() mutexID: "<<(int)mutexID<<" tid: "<<(int)taskIdSelf()<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}/*void omni_mutex::lock(void){ DBG_ASSERT(assert(!intContext())); // not in ISR context DBG_ASSERT(assert(m_bConstructed)); STATUS status = semTake(mutexID, WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_mutex::lock() mutexID: "<<(int)mutexID<<" tid: "<<(int)taskIdSelf()<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}void omni_mutex::unlock(void){ DBG_ASSERT(assert(m_bConstructed)); STATUS status = semGive(mutexID); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_mutex::unlock() mutexID: "<<(int)mutexID<<" tid: "<<(int)taskIdSelf()<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}*//////////////////////////////////////////////////////////////////////////////// Condition variable/////////////////////////////////////////////////////////////////////////////omni_condition::omni_condition(omni_mutex* m) : mutex(m){ DBG_TRACE(cout<<"omni_condition::omni_condition mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl); waiters_ = 0; sema_ = semCCreate(SEM_Q_PRIORITY, 0); if(sema_ == NULL) { DBG_TRACE(cout<<"Exception: omni_condition::omni_condition() tid: "<<(int)taskIdSelf()<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } waiters_lock_ = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE); if(waiters_lock_ == NULL) { DBG_TRACE(cout<<"Exception: omni_condition::omni_condition() tid: "<<(int)taskIdSelf()<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}omni_condition::~omni_condition(void){ STATUS status = semDelete(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::~omni_condition"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } status = semDelete(sema_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::~omni_condition"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}void omni_condition::wait(void){ DBG_TRACE(cout<<"omni_condition::wait mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl); // Prevent race conditions on the <waiters_> count. STATUS status = semTake(waiters_lock_,WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } ++waiters_; status = semGive(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } // disable task lock to have an atomic unlock+semTake taskLock(); // We keep the lock held just long enough to increment the count of // waiters by one. Note that we can't keep it held across the call // to wait() since that will deadlock other calls to signal(). mutex->unlock(); // Wait to be awakened by a cond_signal() or cond_broadcast(). status = semTake(sema_,WAIT_FOREVER); // reenable task rescheduling taskUnlock(); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } // Reacquire lock to avoid race conditions on the <waiters_> count. status = semTake(waiters_lock_,WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } // We're ready to return, so there's one less waiter. --waiters_; // Release the lock so that other collaborating threads can make // progress. status = semGive(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } // Bad things happened, so let's just return below. // We must always regain the <external_mutex>, even when errors // occur because that's the guarantee that we give to our callers. mutex->lock();}// The time given is absolute. Return 0 is timeoutint omni_condition::timedwait(unsigned long secs, unsigned long nanosecs){ STATUS result = OK; timespec now; unsigned long timeout; int ticks; // Prevent race conditions on the <waiters_> count. STATUS status = semTake(waiters_lock_, WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } ++waiters_; status = semGive(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } clock_gettime(CLOCK_REALTIME, &now); if(((unsigned long)secs <= (unsigned long)now.tv_sec) && (((unsigned long)secs < (unsigned long)now.tv_sec) || (nanosecs < (unsigned long)now.tv_nsec))) timeout = 0; else timeout = (secs-now.tv_sec) * 1000 + (nanosecs-now.tv_nsec) / 1000000l; // disable task lock to have an atomic unlock+semTake taskLock(); // We keep the lock held just long enough to increment the count // of waiters by one. mutex->unlock(); // Wait to be awakened by a signal() or broadcast(). ticks = (timeout * sysClkRateGet()) / 1000L; result = semTake(sema_, ticks); // reenable task rescheduling taskUnlock(); // Reacquire lock to avoid race conditions. status = semTake(waiters_lock_, WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } --waiters_; status = semGive(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } // A timeout has occured - fires exception if the origin is other than timeout if(result!=OK && !(errno == S_objLib_OBJ_TIMEOUT || errno == S_objLib_OBJ_UNAVAILABLE)) { DBG_TRACE(cout<<"omni_condition::timedwait! - thread:"<<omni_thread::self()->id()<<" SemID:"<<(int)sema_<<" errno:"<<errno<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } // We must always regain the <external_mutex>, even when errors // occur because that's the guarantee that we give to our callers. mutex->lock(); if(result!=OK) // timeout return 0; return 1;}void omni_condition::signal(void){ DBG_TRACE(cout<<"omni_condition::signal mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl); STATUS status = semTake(waiters_lock_, WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } int have_waiters = waiters_ > 0; status = semGive(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } if(have_waiters != 0) { status = semGive(sema_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } }}void omni_condition::broadcast(void){ DBG_TRACE(cout<<"omni_condition::broadcast mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl); int have_waiters = 0; // The <external_mutex> must be locked before this call is made. // This is needed to ensure that <waiters_> and <was_broadcast_> are // consistent relative to each other. STATUS status = semTake(waiters_lock_, WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } if(waiters_ > 0) { // We are broadcasting, even if there is just one waiter... // Record the fact that we are broadcasting. This helps the // cond_wait() method know how to optimize itself. Be sure to // set this with the <waiters_lock_> held. have_waiters = 1; } status = semGive(waiters_lock_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } if(have_waiters) { // Wake up all the waiters. status = semFlush(sema_); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"omni_condition::broadcast1! - thread:"<<omni_thread::self()->id()<<" SemID:"<<(int)sema_<<" errno:"<<errno<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } }}/////////////////////////////////////////////////////////////////////////////// Counting semaphore/////////////////////////////////////////////////////////////////////////////omni_semaphore::omni_semaphore(unsigned int initial){ DBG_ASSERT(assert(0 <= (int)initial)); // POSIX expects only unsigned init values semID = semCCreate(SEM_Q_PRIORITY, (int)initial); DBG_ASSERT(assert(semID!=NULL)); if(semID==NULL) { DBG_TRACE(cout<<"Exception: omni_semaphore::omni_semaphore"<<endl); DBG_THROW(throw omni_thread_fatal(-1)); }}omni_semaphore::~omni_semaphore(void){ STATUS status = semDelete(semID); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_semaphore::~omni_semaphore"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}void omni_semaphore::wait(void){ DBG_ASSERT(assert(!intContext())); // no wait in ISR STATUS status = semTake(semID, WAIT_FOREVER); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_semaphore::wait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}int omni_semaphore::trywait(void){ STATUS status = semTake(semID, NO_WAIT); DBG_ASSERT(assert(status == OK)); if(status != OK) { if(errno == S_objLib_OBJ_UNAVAILABLE) { return 0; } else { DBG_ASSERT(assert(false)); DBG_TRACE(cout<<"Exception: omni_semaphore::trywait"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); } } return 1;}void omni_semaphore::post(void){ STATUS status = semGive(semID); DBG_ASSERT(assert(status == OK)); if(status != OK) { DBG_TRACE(cout<<"Exception: omni_semaphore::post"<<endl); DBG_THROW(throw omni_thread_fatal(errno)); }}/////////////////////////////////////////////////////////////////////////////// Thread///////////////////////////////////////////////////////////////////////////////// static variables//omni_mutex* omni_thread::next_id_mutex = 0;int omni_thread::next_id = 0;// omniORB requires a larger stack size than the default (21120) on OSF/1static size_t stack_size = OMNI_STACK_SIZE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -