📄 ec_gateway_iiop.cpp
字号:
}
}
// Also build the subscriptions that are *not* by source when we use the
// consumer proxy map, and all subscriptions when we don't use the map and
// then connect to the default consumer proxy.
RtecEventChannelAdmin::SupplierQOS pub;
pub.publications.length (sub.dependencies.length () + 1);
pub.is_gateway = 1;
int c = 0;
for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
{
const RtecEventComm::EventHeader& h =
sub.dependencies[k].event.header;
RtecEventComm::EventSourceID sid = h.source;
// Skip all subscriptions with a specific source when we use the map
if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
continue;
// Skip all the magic event types.
if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
continue;
pub.publications[c].event.header = h;
pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
pub.publications[c].dependency_info.dependency_type =
RtecBase::TWO_WAY_CALL;
pub.publications[c].dependency_info.number_of_calls = 1;
pub.publications[c].dependency_info.rt_info = this->consumer_info_;
c++;
}
if (c > 0)
{
this->supplier_is_active_ = 1;
// Obtain a reference to our supplier personality...
RtecEventComm::PushSupplier_var supplier_ref =
this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// Obtain the consumer....
this->default_consumer_proxy_ =
supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
pub.publications.length (c);
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
// ACE_SupplierQOS_Factory::debug (pub);
this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
pub
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
this->supplier_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->supplier_proxy_ =
consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_is_active_ = 1;
RtecEventComm::PushConsumer_var consumer_ref =
this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
// ACE_ConsumerQOS_Factory::debug (sub);
this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
sub
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
TAO_EC_Gateway_IIOP::update_supplier (
const RtecEventChannelAdmin::SupplierQOS&
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Do nothing...
}
void
TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
// ACE_DEBUG ((LM_DEBUG,
// "ECG (%t): Supplier-consumer received "
// "disconnect from channel.\n"));
}
void
TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
// ACE_DEBUG ((LM_DEBUG,
// "ECG (%t): Supplier received "
// "disconnect from channel.\n"));
}
void
TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events
ACE_ENV_ARG_DECL)
{
// ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));
if (events.length () == 0)
{
// ACE_DEBUG ((LM_DEBUG, "no events\n"));
return;
}
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
this->busy_count_++;
}
// ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
// @@ TODO, there is an extra data copy here, we should do the event
// modification without it and only compact the necessary events.
RtecEventComm::EventSet out (1);
out.length (1);
for (CORBA::ULong i = 0; i < events.length (); ++i)
{
if (this->use_ttl_ == 1)
{
if (events[i].header.ttl == 0)
continue;
}
RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
RtecEventComm::EventSourceID sid = events[i].header.source;
if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
|| this->consumer_proxy_map_.find (sid, proxy) == -1)
{
// If the source is not in our map or we should not use the map then
// use the default consumer proxy.
proxy = this->default_consumer_proxy_.in ();
}
if (CORBA::is_nil (proxy))
continue;
out[0] = events[i];
if (this->use_ttl_ == 1)
out[0].header.ttl--;
// ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
this->push_to_consumer(proxy, out ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
this->busy_count_--;
if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
{
this->cleanup_posted_ = 0;
this->cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
if (this->busy_count_ == 0 && this->update_posted_ != 0)
{
this->update_posted_ = 0;
this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
}
void
TAO_EC_Gateway_IIOP::push_to_consumer (
RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
const RtecEventComm::EventSet& event
ACE_ENV_ARG_DECL)
{
ACE_TRY
{
consumer->push (event ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
{
ec_control_->event_channel_not_exist (this ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCH (CORBA::SystemException, sysex)
{
ec_control_->system_exception (this,
sysex
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
// Shouldn't happen.
}
ACE_ENDTRY;
}
int
TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
ec_control_->shutdown();
this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (this->supplier_is_active_)
{
PortableServer::POA_var poa =
this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
PortableServer::ObjectId_var id =
poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
this->supplier_is_active_ = 0;
}
if (this->consumer_is_active_)
{
PortableServer::POA_var poa =
this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
PortableServer::ObjectId_var id =
poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
this->consumer_is_active_ = 0;
}
this->cleanup_consumer_ec_i ();
this->cleanup_supplier_ec_i ();
return 0;
}
int
TAO_EC_Gateway_IIOP::cleanup_consumer_ec (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
this->cleanup_consumer_ec_i ();
return 0;
}
int
TAO_EC_Gateway_IIOP::cleanup_supplier_ec (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
this->cleanup_supplier_ec_i ();
return 0;
}
void
TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i (void)
{
this->consumer_ec_ =
RtecEventChannelAdmin::EventChannel::_nil ();
}
void
TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i (void)
{
this->supplier_ec_ =
RtecEventChannelAdmin::EventChannel::_nil ();
}
CORBA::Boolean
TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i (void) const
{
return !CORBA::is_nil (this->consumer_ec_.in ());
}
CORBA::Boolean
TAO_EC_Gateway_IIOP::consumer_ec_non_existent (
CORBA::Boolean_out disconnected
ACE_ENV_ARG_DECL)
{
CORBA::Object_var consumer_ec;
{
ACE_GUARD_THROW_EX (
TAO_SYNCH_MUTEX, ace_mon, this->lock_,
CORBA::INTERNAL ());
ACE_CHECK_RETURN (0);
disconnected = 0;
if (this->is_consumer_ec_connected_i () == 0)
{
disconnected = 1;
return 0;
}
consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
}
#if (TAO_HAS_MINIMUM_CORBA == 0)
return consumer_ec->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
#else
return 0;
#endif /* TAO_HAS_MINIMUM_CORBA */
}
void
TAO_EC_Gateway_IIOP::suspend_supplier_ec (ACE_ENV_SINGLE_ARG_DECL)
{
if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
{
this->supplier_proxy_->suspend_connection (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
supplier_ec_suspended_ = 1;
}
}
void
TAO_EC_Gateway_IIOP::resume_supplier_ec (ACE_ENV_SINGLE_ARG_DECL)
{
if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
{
this->supplier_proxy_->resume_connection (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
supplier_ec_suspended_ = 0;
}
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>;
template class ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>;
template class ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>;
template class ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
template class ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
template class ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
template class ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>
#pragma instantiate ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>
#pragma instantiate ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>
#pragma instantiate ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -