📄 event_channel.cpp
字号:
this->source_id_ = qos_.publications[0].event.header.source;
supplier_module_->connected (this ACE_ENV_ARG_PARAMETER);
}
void
ACE_Push_Supplier_Proxy::push (const RtecEventComm::EventSet &event
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_ENTER_PUSH_SUPPLIER_PROXY_PUSH);
// NOTE: Detecting that the supplier is collocated is a TAOism.
if (!this->push_supplier_->_is_collocated ())
{
// NOTE: This is *extremely* non-portable, we know that the ORB
// core allocates this buffer from the global heap, hence it is
// safe to steal it (further the EC will release the buffer, but
// in another thread!). Other ORBs may do different things and
// this may not work!
RtecEventComm::EventSet& copy =
ACE_const_cast (RtecEventComm::EventSet&, event);
this->time_stamp (copy);
this->supplier_module_->push (this, copy ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
else
{
RtecEventComm::EventSet copy = event;
this->time_stamp (copy);
this->supplier_module_->push (this, copy ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ACE_Push_Supplier_Proxy::time_stamp (RtecEventComm::EventSet& event)
{
#if !defined(TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS)
ACE_hrtime_t ec_recv = ACE_OS::gethrtime ();
for (CORBA::ULong i = 0; i < event.length (); ++i)
{
ORBSVCS_Time::hrtime_to_TimeT (event[i].header.ec_recv_time,
ec_recv);
}
#else
ACE_UNUSED_ARG (event);
#endif /* TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS */
}
void
ACE_Push_Supplier_Proxy::disconnect_push_consumer (
ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_TIMEPROBE_PRINT;
if (this->connected ())
{
this->push_supplier_ = 0;
this->supplier_module_->disconnecting (this ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ACE_Push_Supplier_Proxy::shutdown (void)
{
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
push_supplier_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"ACE_Push_Supplier_Proxy::shutdown failed.\n");
}
ACE_ENDTRY;
}
ACE_Push_Consumer_Proxy::ACE_Push_Consumer_Proxy (ACE_ES_Consumer_Module *cm)
: push_consumer_ (0),
consumer_module_ (cm)
{
}
ACE_Push_Consumer_Proxy::~ACE_Push_Consumer_Proxy (void)
{
}
void
ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events
ACE_ENV_ARG_DECL)
{
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_DELIVER_EVENT_TO_CONSUMER_PROXY);
if (CORBA::is_nil (push_consumer_.in ()))
{
ACE_DEBUG ((LM_DEBUG,
"EC (%t) Push to disconnected consumer\n"));
return;
}
push_consumer_->push (events ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
ACE_Push_Consumer_Proxy::connect_push_consumer (
RtecEventComm::PushConsumer_ptr push_consumer,
const RtecEventChannelAdmin::ConsumerQOS &qos
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RtecEventChannelAdmin::AlreadyConnected,
RtecEventChannelAdmin::TypeError))
{
if (this->connected ())
ACE_THROW (RtecEventChannelAdmin::AlreadyConnected());
this->push_consumer_ =
RtecEventComm::PushConsumer::_duplicate(push_consumer);
// @@ TODO Find out why are two duplicates needed...
RtecEventComm::PushConsumer::_duplicate(push_consumer);
//ACE_DEBUG ((LM_DEBUG, "EC (%t) connect_push_consumer QOS is "));
//ACE_ConsumerQOS_Factory::debug (qos);
// Copy by value.
this->qos_ = qos;
// ACE_ConsumerQOS_Factory::debug (qos_);
this->consumer_module_->connected (this ACE_ENV_ARG_PARAMETER);
}
void
ACE_Push_Consumer_Proxy::disconnect_push_supplier (
ACE_ENV_SINGLE_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_TIMEPROBE_PRINT;
this->push_consumer_ = RtecEventComm::PushConsumer::_nil ();
this->consumer_module_->disconnecting (this ACE_ENV_ARG_PARAMETER);
}
void
ACE_Push_Consumer_Proxy::suspend_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
correlation_.suspend ();
}
void
ACE_Push_Consumer_Proxy::resume_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
correlation_.resume ();
}
void
ACE_Push_Consumer_Proxy::shutdown (void)
{
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
this->push_consumer_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"ACE_Push_Consumer_Proxy::shutdown failed.\n");
}
ACE_ENDTRY;
}
ACE_EventChannel::ACE_EventChannel (RtecScheduler::Scheduler_ptr scheduler,
CORBA::Boolean activate_threads,
u_long type,
TAO_Module_Factory* factory)
: rtu_manager_ (0),
type_ (type),
state_ (INITIAL_STATE),
destroyed_ (0),
handle_generator_ (0),
own_factory_ (0),
module_factory_ (factory)
{
this->scheduler_ =
RtecScheduler::Scheduler::_duplicate (scheduler);
this->init (activate_threads);
}
ACE_EventChannel::ACE_EventChannel (CORBA::Boolean activate_threads,
u_long type,
TAO_Module_Factory* factory)
: rtu_manager_ (0),
type_ (type),
state_ (INITIAL_STATE),
destroyed_ (0),
handle_generator_ (0),
own_factory_ (0),
module_factory_ (factory)
{
this->scheduler_ =
RtecScheduler::Scheduler::_duplicate (ACE_Scheduler_Factory::server ());
this->init (activate_threads);
}
void
ACE_EventChannel::init (int activate_threads)
{
if (this->module_factory_ == 0)
{
this->own_factory_ = 1;
ACE_NEW (this->module_factory_, TAO_Default_Module_Factory);
}
consumer_module_ =
this->module_factory_->create_consumer_module (this);
this->timer_module_ =
this->module_factory_->create_timer_module (this);
this->dispatching_module_ =
this->module_factory_->create_dispatching_module(this);
this->correlation_module_ =
this->module_factory_->create_correlation_module (this);
this->subscription_module_ =
this->module_factory_->create_subscription_module (this);
this->supplier_module_ =
this->module_factory_->create_supplier_module (this);
consumer_module_->open (dispatching_module_);
dispatching_module_->open (consumer_module_, correlation_module_);
correlation_module_->open (dispatching_module_, subscription_module_);
subscription_module_->open (correlation_module_, supplier_module_);
supplier_module_->open (subscription_module_);
if (activate_threads)
this->activate ();
}
ACE_EventChannel::~ACE_EventChannel (void)
{
ACE_DEBUG ((LM_DEBUG,
"EC (%t) ACE_EventChannel deleting all modules.\n"));
// @@ This should go away, it is too late to raise a CORBA
// exception, at this point we should only be cleaning up memory,
// not sending messages.
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"ACE_EventChannel::~ACE_EventChannel");
}
ACE_ENDTRY;
this->cleanup_observers ();
this->timer_module_->shutdown ();
this->dispatching_module_->shutdown ();
this->module_factory_->destroy_timer_module (this->timer_module_);
this->module_factory_->destroy_supplier_module (this->supplier_module_);
this->module_factory_->destroy_subscription_module (this->subscription_module_);
this->module_factory_->destroy_correlation_module (this->correlation_module_);
this->module_factory_->destroy_dispatching_module(this->dispatching_module_);
this->module_factory_->destroy_consumer_module (this->consumer_module_);
if (this->own_factory_)
delete this->module_factory_;
}
void
ACE_EventChannel::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
{
ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);
if (this->destroyed_ != 0)
return;
this->destroyed_ = 1;
ACE_DEBUG ((LM_DEBUG, "EC (%t) Event Channel shutting down.\n"));
}
this->cleanup_observers ();
// Send a shutdown message through the modules.
this->supplier_module_->shutdown ();
#if 0
// Flush all messages in the channel.
Shutdown_Channel *sc = new Shutdown_Channel (this);
if (sc == 0)
ACE_THROW (CORBA::NO_MEMORY ());
// Create a wrapper around the dispatch request.
Flush_Queue_ACT *act = new Flush_Queue_ACT (sc, dispatching_module_);
if (act == 0)
ACE_THROW (CORBA::NO_MEMORY ());
// Set a 100ns timer.
if (this->timer_module ()->schedule_timer (0, // no rt-info
act,
ACE_Scheduler_MIN_PREEMPTION_PRIORITY,
100, // 10 usec delta
0) == -1) // no interval
{
ACE_ERROR ((LM_ERROR, "%p queue_request failed.\n", "ACE_ES_Consumer_Module"));
delete sc;
delete act;
}
#endif
}
void
ACE_EventChannel::activate (void)
{
this->dispatching_module_->activate (THREADS_PER_DISPATCH_QUEUE);
this->timer_module_->activate ();
}
void
ACE_EventChannel::shutdown (void)
{
this->cleanup_observers ();
this->timer_module_->shutdown ();
this->dispatching_module_->shutdown ();
}
void
ACE_EventChannel::report_connect (u_long event)
{
ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);
this->report_connect_i (event);
}
void
ACE_EventChannel::report_connect_i (u_long event)
{
ACE_CLR_BITS (state_, event);
}
void
ACE_EventChannel::report_disconnect (u_long event)
{
// No need to gtrab the lock is already take by our callers.
ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);
this->report_disconnect (event);
}
void
ACE_EventChannel::report_disconnect_i (u_long event)
{
ACE_SET_BITS (state_, event);
if (state_ == SHUTDOWN)
ACE_DEBUG ((LM_DEBUG,
"EC (%t) Event Channel has no consumers or suppliers.\n"));
}
void
ACE_EventChannel::add_gateway (TAO_EC_Gateway* gw
ACE_ENV_ARG_DECL)
{
RtecEventChannelAdmin::Observer_var observer = gw->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
gw->observer_handle (this->append_observer (observer.in () ACE_ENV_ARG_PARAMETER));
ACE_CHECK;
}
void
ACE_EventChannel::del_gateway (TAO_EC_Gateway* gw
ACE_ENV_ARG_DECL)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -