⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reactor.cpp

📁 dcom机制在vxworks上的实现源代码
💻 CPP
字号:
/* Reactor - IO Multiplexor *//* Copyright (c) 1999 Wind River Systems, Inc. *//*modification history--------------------01v,17dec01,nel  Add include symbol for diab build.01u,10dec01,dbs  diab build01t,20sep01,nel  Fix compilation for ARM.01s,30jul01,dbs  fix assertion logic01r,13jul01,dbs  fix up includes01q,19jan00,nel  Modifications for Linux debug build01p,21sep99,aim  changed pipeDevCreate(1024, 1024) => (1,1); this fixes the                 MBX86001o,19aug99,aim  change assert to VXDCOM_ASSERT01n,13aug99,aim  added ARG_UNUSED to clear compiler warnings01m,12aug99,aim  MT fixes01l,09aug99,aim  change wakeup behaviour01k,02aug99,aim  added wakeup event handler01j,21jul99,aim  quantify tweaks01i,13jul99,aim  clear errno before entering select01h,12jul99,aim  fix returned nHandles in select01g,08jul99,aim  added timers01f,07jul99,aim  added critical section protection01e,29jun99,aim  reset timeout on eventLoopReset01d,07jun99,aim  remove dubious event loop termination01c,04jun99,aim  removed debug01b,03jun99,aim  remove abort01a,07may99,aim  created*/#include "EventHandler.h"#include "Reactor.h"#include "Syslog.h"#include "TraceCall.h"#include "private/comMisc.h"/* Include symbol for diab */extern "C" int include_vxdcom_Reactor (void)    {    return 0;    }#ifdef VXDCOM_PLATFORM_VXWORKS#include "pipeDrv.h"#include "selectLib.h"#endifReactor::Reactor ()  : m_rdHandles (),    m_wrHandles (),    m_exHandles (),    m_endEventLoop (false),    m_wakeupHandler (this),    m_handle2handlerMap (),    m_handler2handleMap (),    m_mutex (),    m_timerMap ()    {    TRACE_CALL;    if (m_wakeupHandler.handleGet () != INVALID_REACTOR_HANDLE)	{	timerAdd (&m_wakeupHandler, TimeValue (60));	handlerAdd (&m_wakeupHandler, EventHandler::READ_MASK);	}    else	m_endEventLoop = true;    COM_ASSERT (m_wakeupHandler.handleGet () > 0);    }Reactor::~Reactor ()    {    TRACE_CALL;    Handle2HandlerMapIter iter (m_handle2handlerMap.begin ());    while (iter != m_handle2handlerMap.end ())	{	REACTOR_HANDLE handle = (*iter).first;	EventHandler* eventHandler = (*iter).second;	// note: handleClose could remove an entry from the map we are	// currently iterating over and the entry it will delete is the	// current position of the iterator.  Therefore we must	// advance the iter and then call handleClose.	++iter;	COM_ASSERT (eventHandler);	if (eventHandler)	    eventHandler->handleClose (handle);	}    }int Reactor::run ()    {    TRACE_CALL;    return eventLoopRun ();    }int Reactor::eventLoopRun ()    {    TRACE_CALL;    int result = 0;    while (!m_endEventLoop)	{	if ((result = handleEvents ()) < 0)	    break;	}    return result;    }void Reactor::eventLoopEnd ()    {    TRACE_CALL;    m_endEventLoop = true;    }bool Reactor::eventLoopDone ()    {    TRACE_CALL;    return m_endEventLoop;    }void Reactor::close ()    {    TRACE_CALL;    eventLoopEnd ();    }int Reactor::handlerAdd    (    EventHandler*	eventHandler,    REACTOR_EVENT_MASK	eventMask    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);        REACTOR_HANDLE handle = eventHandler->handleGet ();    COM_ASSERT (handle != INVALID_REACTOR_HANDLE);    int result = handlerBind (handle, eventMask);    if (result != -1)	{	m_handle2handlerMap[handle] = eventHandler;	m_handler2handleMap[eventHandler] = handle;	}    wakeup ();        return result;    }int Reactor::handlerRemove    (    EventHandler*	eventHandler,    REACTOR_EVENT_MASK	eventMask    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);        REACTOR_HANDLE handle;    int result = -1;    if (handleFind (eventHandler, handle) == 0)	{	m_handle2handlerMap.erase (handle);	m_handler2handleMap.erase (eventHandler);	result = handlerUnbind (handle, eventMask, eventHandler);	}    wakeup ();    return result;    }int Reactor::handleFind    (    EventHandler*	eventHandler,    REACTOR_HANDLE&	handle    )    {    VxCritSec cs (m_mutex);    Handler2HandleMapIter iter;    iter = m_handler2handleMap.find (eventHandler);    if (iter == m_handler2handleMap.end ())	return -1;    handle = (*iter).second;    return 0;    }int Reactor::handlerFind    (    REACTOR_HANDLE handle,    EventHandler*& eventHandler    )    {    VxCritSec cs (m_mutex);    Handle2HandlerMapIter iter;    iter = m_handle2handlerMap.find (handle);    if (iter == m_handle2handlerMap.end ())	return -1;    eventHandler = (*iter).second;    return 0;    }int Reactor::select    (    int		nHandles,    HandleSet*	rdSet,    HandleSet*	wrSet,    HandleSet*  exSet,    TimeValue*	timeout    )    {    timeval* t = 0;    if (timeout)	t = *timeout;    REACTOR_HANDLE_SET_TYPE* r = 		rdSet ? *rdSet : (REACTOR_HANDLE_SET_TYPE *)NULL;    REACTOR_HANDLE_SET_TYPE* w = 		wrSet ? *wrSet : (REACTOR_HANDLE_SET_TYPE *)NULL;    REACTOR_HANDLE_SET_TYPE* e = 		exSet ? *exSet : (REACTOR_HANDLE_SET_TYPE *)NULL;#ifdef VXDCOM_PLATFORM_VXWORKS    errno = 0;#endif        return ::select (nHandles, r, w, e, t);    }int Reactor::handleEvents ()    {    TRACE_CALL;    HandleSet rdSet (m_rdHandles);    HandleSet wrSet (m_wrHandles);    HandleSet exSet; // (m_exHandles);    int haveTimer = 0;    int maxHandle = max (rdSet.maxHandle (), wrSet.maxHandle ());    TimeValue timeout;    if (nextTimerGet (timeout) == 0)	{	haveTimer = 1;	if (timeout < TimeValue::zero ())	    timeout = TimeValue::zero (); // + TimeValue (0, 100);	}    TimeValue startTime = TimeValue::now ();    int selectFds = select (maxHandle +1,			    (rdSet.count () > 0) ? &rdSet : 0,			    (wrSet.count () > 0) ? &wrSet : 0,			    (exSet.count () > 0) ? &exSet : 0,			    haveTimer ? &timeout : 0);    if (m_endEventLoop)	return 0;    if (selectFds < 0)	return -1;    if (haveTimer)	updateTimers ((TimeValue::now() - startTime));        if (haveTimer)	dispatchTimers ();    if (selectFds > 0)	{	rdSet.sync (maxHandle +1);	wrSet.sync (maxHandle +1);	exSet.sync (maxHandle +1);	dispatchFdEvents (rdSet, wrSet, exSet);	}    return 0;    }int Reactor::dispatchFdEvents    (    HandleSet&	rdSet,    HandleSet&	wrSet,    HandleSet&	exSet    )    {    // exceptions not supported on vxWorks    // dispatchFdEvents (exSet,    //	               m_exHandles,    //	               EventHandler::EXCEPT_MASK,    //	               &EventHandler::handleException);    if (rdSet.count () > 0)	{	dispatchFdEvents (rdSet,			  m_rdHandles,			  EventHandler::READ_MASK,			  &EventHandler::handleInput);	}#if 0    if (wrSet.count () > 0)	{	dispatchFdEvents (wrSet,			  m_wrHandles,			  EventHandler::WRITE_MASK,			  &EventHandler::handleOutput);	}#endif    return 0;    }int Reactor::dispatchFdEvents    (    HandleSet&			selectHandles,    HandleSet&	       		reactorHandles,    REACTOR_EVENT_MASK		mask,    EventHandlerCallback	callback    )    {    TRACE_CALL;    REACTOR_HANDLE handle;    HandleSetIterator hsIter (selectHandles);    while ((handle = hsIter ()) != INVALID_REACTOR_HANDLE)	dispatchFdEvent (reactorHandles, handle, mask, callback);    return 0;    }int Reactor::dispatchFdEvent    (    HandleSet&			reactorHandles,    REACTOR_HANDLE		handle,    REACTOR_EVENT_MASK		eventMask,    EventHandlerCallback	callback    )    {    TRACE_CALL;    int result = -1;    EventHandler* eventHandler = 0;    handlerFind (handle, eventHandler);    if (eventHandler)	{	result = (eventHandler->*callback) (handle);	if (result < 0)	    handlerRemove (eventHandler, eventMask);	else if (result > 0)	    COM_ASSERT (0);	// XXX reactorHandles.clr (handle);	}    else	{	S_DEBUG(0, "Missing eventHandler: " << handle << (*this));        COM_ASSERT (eventHandler);	}    return result;    }int Reactor::handlerBind    (    REACTOR_HANDLE	handle,    REACTOR_EVENT_MASK	eventMask    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);    if (handle == INVALID_REACTOR_HANDLE)	return -1;    if (eventMask & EventHandler::READ_MASK)	m_rdHandles.set (handle);    if (eventMask & EventHandler::ACCEPT_MASK)	m_rdHandles.set (handle);    if (eventMask & EventHandler::CONNECT_MASK)	m_rdHandles.set (handle);    if (eventMask & EventHandler::WRITE_MASK)	m_wrHandles.set (handle);    if (eventMask & EventHandler::EXCEPT_MASK)	m_exHandles.set (handle);    return 0;    }int Reactor::handlerUnbind    (    REACTOR_HANDLE	handle,    REACTOR_EVENT_MASK	eventMask,    EventHandler*	eventHandler    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);    if (handle == INVALID_REACTOR_HANDLE)	return -1;    if (eventMask & EventHandler::READ_MASK)	m_rdHandles.clr (handle);    if (eventMask & EventHandler::ACCEPT_MASK)	m_rdHandles.clr (handle);    if (eventMask & EventHandler::CONNECT_MASK)	m_rdHandles.clr (handle);    if (eventMask & EventHandler::WRITE_MASK)	m_wrHandles.clr (handle);    if (eventMask & EventHandler::EXCEPT_MASK)	m_exHandles.clr (handle);    if ((eventMask & EventHandler::DONT_CALL) == 0)	eventHandler->handleClose (handle, eventMask);    return 0;    }Reactor* Reactor::instance ()    {    static Reactor r;    return &r;    }const HandleSet& Reactor::rdHandles () const    {    TRACE_CALL;    return m_rdHandles;    }const HandleSet& Reactor::wrHandles () const    {    TRACE_CALL;    return m_wrHandles;    }const HandleSet& Reactor::exHandles () const    {    TRACE_CALL;    return m_exHandles;    }ostream& operator<< (ostream& os, const Reactor& r)    {    os << "read-mask ("       << r.rdHandles ()       << ") ";#if 0    os << "write-mask ("       << r.wrHandles ()       << ") ";    os << "event-mask ("       << r.exHandles ()       << ")";#endif    return os;    }void Reactor::timerAdd    (    EventHandler*	eventHandler,    const TimeValue&	timeValue    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);    m_timerMap [eventHandler] = TimerValuePair (timeValue, timeValue);    wakeup ();    }void Reactor::timerRemove    (    EventHandler*	eventHandler    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);    m_timerMap.erase (eventHandler);    }int Reactor::nextTimerGet    (    TimeValue&	timeValue    )    {    TRACE_CALL;    VxCritSec cs (m_mutex);    if (m_timerMap.size () == 0)	return -1;        TimerMap::const_iterator iter (m_timerMap.begin ());    while (iter != m_timerMap.end ())	{	const TimerValuePair& tp = (*iter).second;	if (iter == m_timerMap.begin ())	    timeValue = tp.second;	else	    timeValue = min (timeValue, tp.second);	++iter;	}    return 0;    }int Reactor::updateTimers (const TimeValue& elapsedTime)    {    TRACE_CALL;    VxCritSec cs (m_mutex);    TimerMap::iterator iter (m_timerMap.begin ());    while (iter != m_timerMap.end ())	{	TimerValuePair& tp = (*iter).second;	++iter;	tp.second -= elapsedTime;	}    return 0;    }int Reactor::dispatchTimers ()    {    TRACE_CALL;    VxCritSec cs (m_mutex);    TimerMap::iterator iter (m_timerMap.begin ());    while (iter != m_timerMap.end ())	{	EventHandler* eventHandler = (*iter).first;	TimerValuePair& tp = (*iter).second;	// note: dispatchTimer (could) remove an entry from the map we	// are currently iterating over.  The entry it will delete is	// the current position of the iterator.  Therefore we must	// advance the iter and then call dispatchTimer.	++iter;	if (tp.second <= TimeValue::zero ())	    {	    dispatchTimer (tp.first, eventHandler);	    tp.second = tp.first;	      // reset timer	    }	}    return 0;    }int Reactor::dispatchTimer    (    const TimeValue&	timeValue,    EventHandler*	eventHandler    )    {    TRACE_CALL;        COM_ASSERT (eventHandler);    int result = eventHandler->handleTimeout (timeValue);    if (result < 0)	timerRemove (eventHandler);    return result;    }int Reactor::wakeup ()    {    TRACE_CALL;    return m_wakeupHandler.reactorWakeup ();    }Reactor::WakeupHandler::WakeupHandler (Reactor* reactor)  : m_wakeupPending (false),    m_wakeupPendingLock ()    {    TRACE_CALL;    m_handles[0] = INVALID_REACTOR_HANDLE;    m_handles[1] = INVALID_REACTOR_HANDLE;    reactorSet (reactor);#if defined (VXDCOM_PLATFORM_SOLARIS) || defined (VXDCOM_PLATFORM_LINUX)    int result = ::pipe (m_handles);    if (result != -1)	{	handleSet (m_handles[0]);	}#elif defined (VXDCOM_PLATFORM_VXWORKS)    char* filename = "/pipe/vxdcom";    if (::pipeDevCreate (filename, 1, 1) == OK)	{	m_handles[0] = ::open (filename, O_RDONLY, 0);	m_handles[1] = ::open (filename, O_WRONLY, 0);	if (m_handles[0] < 0 || m_handles[1] < 0)	    {	    // tidy up on any error.	    	    ::close (m_handles[0]);	    ::close (m_handles[1]);	    m_handles[0] = INVALID_REACTOR_HANDLE;	    m_handles[1] = INVALID_REACTOR_HANDLE;	    }	}    else	{	S_EMERG (LOG_REACTOR, "cannot open: " << filename);	}#endif    }REACTOR_HANDLE Reactor::WakeupHandler::handleGet () const    {    return m_handles[0];    }REACTOR_HANDLE Reactor::WakeupHandler::handleSet (REACTOR_HANDLE handle)    {    return m_handles[0] = handle;    }    int Reactor::WakeupHandler::handleInput (REACTOR_HANDLE handle)    {    TRACE_CALL;        VxCritSec cs (m_wakeupPendingLock);        char buf [1];    COM_ASSERT (m_wakeupPending);        int n = ::read (m_handles[0], buf, 1);    if (n != 1)	{	S_ERR (LOG_REACTOR | LOG_ERRNO,	       "Reactor::WakeupHandler read failed");	}    // Mark reactorWakeup() that so that a new event may be posted.            m_wakeupPending = false;    // Always return 0 for this EventHandler.    return 0;    }int Reactor::WakeupHandler::reactorWakeup ()    {    TRACE_CALL;    VxCritSec cs (m_wakeupPendingLock);    // Don't bombard the Reactor if there is an event outstanding.        if (m_wakeupPending)	return 0;    m_wakeupPending = true;    // Now, wakeup the Reactor    int n = ::write (m_handles[1], "", 1); // write one null byte    if (n != 1)	{	S_ERR (LOG_REACTOR | LOG_ERRNO,	       "Reactor::Wakeup failed to write null byte");	}    // Always return 0 for this EventHandler.        return 0;    }int Reactor::WakeupHandler::handleTimeout (const TimeValue&)    {    Reactor* reactor = reactorGet ();    COM_ASSERT (reactor);    S_DEBUG(LOG_REACTOR, (*reactor));    return 0;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -