📄 event_channel.cpp
字号:
// Event_Channel.cpp,v 1.74 2003/10/28 18:34:19 bala Exp
#include "ace/Service_Config.h"
#include "ace/Auto_Ptr.h"
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event/Dispatching_Modules.h"
#include "orbsvcs/Event/Memory_Pools.h"
#include "orbsvcs/Event/EC_Gateway.h"
#include "orbsvcs/Event/Module_Factory.h"
#include "orbsvcs/Event/Event_Manip.h"
#include "orbsvcs/Event/Event_Channel.h"
#if !defined (__ACE_INLINE__)
#include "Event_Channel.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Event, Event_Channel, "Event_Channel.cpp,v 1.74 2003/10/28 18:34:19 bala Exp")
#include "tao/Timeprobe.h"
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Atomic_Op<ACE_ES_MUTEX, int>;
template class ACE_Atomic_Op_Ex<ACE_ES_MUTEX, int>;
template class ACE_Map_Entry<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT>;
template class ACE_Map_Entry<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT>;
template class ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry>;
template class ACE_Map_Manager<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Manager<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator_Base<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Map_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>;
template class ACE_Map_Reverse_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>;
template class ACE_Node<ACE_ES_Consumer_Rep *>;
template class ACE_Node<ACE_Push_Consumer_Proxy *>;
template class ACE_Node<ACE_Push_Supplier_Proxy *>;
template class ACE_Unbounded_Set_Ex<ACE_ES_Consumer_Rep *>;
template class ACE_Unbounded_Set_Ex<ACE_Push_Consumer_Proxy *>;
template class ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *>;
template class ACE_Unbounded_Set_Ex_Iterator<ACE_ES_Consumer_Rep *>;
template class ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Consumer_Proxy *>;
template class ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *>;
template class ACE_Auto_Basic_Ptr<ACE_Push_Supplier_Proxy>;
template class ACE_Auto_Basic_Ptr<ACE_Push_Consumer_Proxy>;
template class auto_ptr<ACE_Push_Supplier_Proxy>;
template class auto_ptr<ACE_Push_Consumer_Proxy>;
template class ACE_Array<TAO_EC_Event>;
template class ACE_Array_Base<TAO_EC_Event>;
template class ACE_Array_Iterator<TAO_EC_Event>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Atomic_Op<ACE_ES_MUTEX, int>
#pragma instantiate ACE_Atomic_Op_Ex<ACE_ES_MUTEX, int>
#pragma instantiate ACE_Map_Entry<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT>
#pragma instantiate ACE_Map_Entry<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT>
#pragma instantiate ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry>
#pragma instantiate ACE_Map_Manager<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Manager<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator_Base<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator_Base<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::EXT, ACE_ES_Subscription_Info::INT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_ES_Subscription_Info::sEXT, ACE_ES_Subscription_Info::sINT, ACE_ES_Subscription_Info::SYNCH>
#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventChannelAdmin::Observer_Handle, ACE_EventChannel::Observer_Entry, ACE_Null_Mutex>
#pragma instantiate ACE_Node<ACE_ES_Consumer_Rep *>
#pragma instantiate ACE_Node<ACE_Push_Consumer_Proxy *>
#pragma instantiate ACE_Node<ACE_Push_Supplier_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex<ACE_ES_Consumer_Rep *>
#pragma instantiate ACE_Unbounded_Set_Ex<ACE_Push_Consumer_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex_Iterator<ACE_ES_Consumer_Rep *>
#pragma instantiate ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Consumer_Proxy *>
#pragma instantiate ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *>
#pragma instantiate ACE_Auto_Basic_Ptr<ACE_Push_Supplier_Proxy>
#pragma instantiate ACE_Auto_Basic_Ptr<ACE_Push_Consumer_Proxy>
#pragma instantiate auto_ptr<ACE_Push_Supplier_Proxy>
#pragma instantiate auto_ptr<ACE_Push_Consumer_Proxy>
#pragma instantiate ACE_Array<TAO_EC_Event>
#pragma instantiate ACE_Array_Base<TAO_EC_Event>
#pragma instantiate ACE_Array_Iterator<TAO_EC_Event>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#if defined (ACE_ENABLE_TIMEPROBES)
static const char *TAO_Event_Channel_Timeprobe_Description[] =
{
"Preemption_Priority - priority requested",
"connected - priority obtained",
"enter Push_Supplier_Proxy::push",
"enter ES_Consumer_Module::push",
"leave ES_Consumer_Module::push",
"enter ACE_ES_Correlation_Module::push",
"pushed to Correlation_Module",
"push_source_type: Dispatch Module enqueuing",
"ACE_ES_Consumer_Correlation::push, enter",
"Consumer_Correlation::push, determine NO CORR.",
"Consumer_Correlation::push, NO_CORR: alloc",
"Consumer_Rep_Timeout::execute",
"deliver to Subscription Module",
"begin push_source_type",
"end push_source_type",
"deliver to Supplier Module (thru Supplier Proxy)",
"connected - priority requested",
"Consumer_Name - priority requested",
"Consumer_Name - priority obtained",
"deliver event to consumer proxy",
"enter ACE_ES_Subscription_Module::push",
"push_source_type"
};
enum
{
// Timeprobe description table start key
TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED = 5100,
TAO_EVENT_CHANNEL_CONNECTED_PRIORITY_OBTAINED,
TAO_EVENT_CHANNEL_ENTER_PUSH_SUPPLIER_PROXY_PUSH,
TAO_EVENT_CHANNEL_ENTER_ES_CONSUMER_MODULE_PUSH,
TAO_EVENT_CHANNEL_LEAVE_ES_CONSUMER_MODULE_PUSH,
TAO_EVENT_CHANNEL_ENTER_ACE_ES_CORRELATION_MODULE_PUSH,
TAO_EVENT_CHANNEL_PUSHED_TO_CORRELATION_MODULE,
TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE_DISPATCH_MODULE_ENQUEUING,
TAO_EVENT_CHANNEL_ACE_ES_CONSUMER_CORRELATION_PUSH_ENTER,
TAO_EVENT_CHANNEL_CONSUMER_CORRELATION_PUSH_DETERMINE_NO_CORR,
TAO_EVENT_CHANNEL_CONSUMER_CORRELATION_PUSH_NO_CORR_ALLOC,
TAO_EVENT_CHANNEL_CONSUMER_REP_TIMEOUT_EXECUTE,
TAO_EVENT_CHANNEL_DELIVER_TO_SUBSCRIPTION_MODULE,
TAO_EVENT_CHANNEL_BEGIN_PUSH_SOURCE_TYPE,
TAO_EVENT_CHANNEL_END_PUSH_SOURCE_TYPE,
TAO_EVENT_CHANNEL_DELIVER_TO_SUPPLIER_MODULE_THRU_SUPPLIER_PROXY,
TAO_EVENT_CHANNEL_CONNECTED_PRIORITY_REQUESTED,
TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_REQUESTED,
TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_OBTAINED,
TAO_EVENT_CHANNEL_DELIVER_EVENT_TO_CONSUMER_PROXY,
TAO_EVENT_CHANNEL_ENTER_ACE_ES_SUBSCRIPTION_MODULE_PUSH,
TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE
};
// Setup Timeprobes
ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Event_Channel_Timeprobe_Description,
TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED);
#endif /* ACE_ENABLE_TIMEPROBES */
static RtecScheduler::Preemption_Priority_t
Preemption_Priority (RtecScheduler::Scheduler_ptr scheduler,
RtecScheduler::handle_t rtinfo
ACE_ENV_ARG_DECL)
{
RtecScheduler::OS_Priority thread_priority;
RtecScheduler::Preemption_Subpriority_t subpriority;
RtecScheduler::Preemption_Priority_t preemption_priority;
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED);
#if 1
scheduler->priority
(rtinfo,
thread_priority,
subpriority,
preemption_priority
ACE_ENV_ARG_PARAMETER);
#else
ACE_Scheduler_Factory::server ()->priority
(rtinfo,
thread_priority,
subpriority,
preemption_priority
ACE_ENV_ARG_PARAMETER);
#endif
ACE_CHECK_RETURN (0);
return preemption_priority;
}
static RtecScheduler::OS_Priority
IntervalToPriority (RtecScheduler::Time interval)
{
for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
if (interval <= ACE_Scheduler_Rates[x])
return x;
return ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
}
class TAO_RTOLDEvent_Export Shutdown_Consumer : public ACE_ES_Dispatch_Request
{
// = TITLE
// Shutdown Consumer command
//
// = DESCRIPTION
// This command object is sent through the system when a consumer
// disconnects. When the Dispatching Module dequeues this request,
// it calls execute which execute calls back to the Consumer
// Module. At that point, the Consumer Module can tell the rest of
// the system that the consumer has disconnected and delete the
// consumer proxy. This allows all events queued for the consumer
// to be flushed to the consumer proxy (which will drop them).
// Events can be queued in the ReactorEx (in a dispatch set), or in
// the Dispatching Module.
public:
// When executed, tells <consumer_module> that <consumer> has shut
// down.
Shutdown_Consumer (ACE_ES_Consumer_Module *consumer_module,
ACE_Push_Consumer_Proxy *consumer,
RtecScheduler::Scheduler_ptr scheduler)
: consumer_module_ (consumer_module)
{
consumer_ = consumer;
// Set rt_info_ to the lowest priority rt_info in consumer_.
// This is so the dispatching module can query us as a dispatch
// request to get the appropriate preemption priority.
ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies);
RtecScheduler::Preemption_Priority_t p =
ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
while (iter.advance_dependency () == 0)
{
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
RtecEventComm::EventType &type = (*iter).event.header.type;
if (type != ACE_ES_GLOBAL_DESIGNATOR &&
type != ACE_ES_CONJUNCTION_DESIGNATOR &&
type != ACE_ES_DISJUNCTION_DESIGNATOR)
{
RtecScheduler::Preemption_Priority_t q =
::Preemption_Priority (scheduler, (*iter).rt_info
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (rt_info_ == 0 || q < p)
{
this->rt_info_ = ((*iter).rt_info);
p = q;
}
}
}
ACE_CATCHANY
{
// Ignore exceptions...
}
ACE_ENDTRY;
}
}
// Report to the consumer module that consumer_ has shutdown.
virtual int execute (u_long &command_action)
{
consumer_module_->shutdown_request (this);
command_action = ACE_RT_Task_Command::RELEASE;
return 0;
}
#if 0
// @@ Memory allocators
void *operator new (size_t /* nbytes */)
{ return ::new char[sizeof (Shutdown_Consumer)]; }
void operator delete (void *buf)
{ ::delete [] ACE_static_cast(char*,buf); }
#endif /* 0 */
// The module that we report to.
ACE_ES_Consumer_Module *consumer_module_;
};
class TAO_RTOLDEvent_Export Shutdown_Channel : public ACE_ES_Dispatch_Request
{
public:
Shutdown_Channel (ACE_EventChannel *channel) :
channel_ (channel) {}
// Report to the consumer module that consumer_ has shutdown.
virtual int execute (u_long &command_action)
{
#if 0
channel_->destroy_i ();
#endif
command_action = ACE_RT_Task_Command::RELEASE;
return 0;
}
#if 0
// @@ Memory allocators
void *operator new (size_t /* nbytes */)
{ return ::new char[sizeof (Shutdown_Channel)]; }
void operator delete (void *buf)
{ ::delete [] ACE_static_cast(char*,buf); }
#endif
ACE_EventChannel *channel_;
};
class TAO_RTOLDEvent_Export Flush_Queue_ACT : public ACE_Command_Base
{
// = TITLE
// Flush Queue Asynchronous Completion Token
//
// = DESCRIPTION
// Carries a single dispatch request through the ReactorEx.
// Deletes itself when execute is called.
public:
Flush_Queue_ACT (ACE_ES_Dispatch_Request *request,
ACE_ES_Dispatching_Module *dispatching_module) :
request_ (request),
dispatching_module_ (dispatching_module) { }
virtual int execute (void* /* arg = 0 */)
{
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
ACE_ES_Dispatch_Request *request = request_;
dispatching_module_->push (request ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
delete this;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Flush_Queue_ACT::execute, "
"unexpected exception");
}
ACE_ENDTRY;
return 0;
}
ACE_ES_Dispatch_Request *request_;
ACE_ES_Dispatching_Module *dispatching_module_;
};
void
dump_event (const RtecEventComm::Event &event)
{
ACE_DEBUG ((LM_DEBUG, "source_ = %ld "
"type_ = %d "
"time_ = %u.\n",
event.header.source,
event.header.type,
// The divide-by-1 is for ACE_U_LongLong support.
ORBSVCS_Time::to_hrtime (event.header.creation_time) / 1));
}
ACE_Push_Supplier_Proxy::ACE_Push_Supplier_Proxy (ACE_ES_Supplier_Module *sm)
: supplier_module_ (sm),
push_supplier_ (0)
{
}
void
ACE_Push_Supplier_Proxy::connect_push_supplier (
RtecEventComm::PushSupplier_ptr push_supplier,
const RtecEventChannelAdmin::SupplierQOS &qos
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RtecEventChannelAdmin::AlreadyConnected))
{
if (this->connected ())
ACE_THROW (RtecEventChannelAdmin::AlreadyConnected());
this->push_supplier_ =
RtecEventComm::PushSupplier::_duplicate(push_supplier);
//ACE_DEBUG ((LM_DEBUG, "EC (%t) connect_push_supplier QOS is "));
//ACE_SupplierQOS_Factory::debug (qos);
// Copy by value.
this->qos_ = qos;
// ACE_SupplierQOS_Factory::debug (qos_);
// @@ TODO: The SupplierQOS should have a more reasonable interface to
// obtain the supplier_id(), BTW, a callback to push_supplier will
// not work: it usually results in some form of dead-lock.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -