📄 ec_gateway_iiop.cpp
字号:
// EC_Gateway_IIOP.cpp,v 1.14 2003/10/28 18:34:19 bala Exp
#include "orbsvcs/Event/EC_Gateway_IIOP.h"
#include "orbsvcs/Event/ECG_Defaults.h"
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Time_Utilities.h"
#include "EC_Gateway_IIOP_Factory.h"
#include "ECG_ConsumerEC_Control.h"
#include "ace/Dynamic_Service.h"
ACE_RCSID (Event,
EC_Gateway_IIOP,
"EC_Gateway_IIOP.cpp,v 1.14 2003/10/28 18:34:19 bala Exp")
TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
: busy_count_ (0),
update_posted_ (0),
cleanup_posted_ (0),
supplier_ec_suspended_ (0),
supplier_info_ (0),
consumer_info_ (0),
consumer_ (this),
consumer_is_active_ (0),
supplier_ (this),
supplier_is_active_ (0),
ec_control_ (0),
factory_ (0),
use_ttl_ (1),
use_consumer_proxy_map_ (1)
{
if (this->factory_ == 0)
{
this->factory_ =
ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");
if (this->factory_ == 0)
{
TAO_EC_Gateway_IIOP_Factory *f = 0;
ACE_NEW (f,
TAO_EC_Gateway_IIOP_Factory);
this->factory_ = f;
}
}
if (this->factory_ != 0)
{
this->use_ttl_ = this->factory_->use_ttl();
this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
}
}
TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
{
delete ec_control_;
ec_control_ = 0;
}
int
TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
RtecEventChannelAdmin::EventChannel_ptr consumer_ec
ACE_ENV_ARG_DECL)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
return this->init_i (supplier_ec, consumer_ec ACE_ENV_ARG_PARAMETER);
}
int
TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
RtecEventChannelAdmin::EventChannel_ptr consumer_ec
ACE_ENV_ARG_DECL_NOT_USED)
{
if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
{
this->supplier_ec_ =
RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
this->consumer_ec_ =
RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);
if (ec_control_ == 0)
{
ec_control_ = factory_->create_consumerec_control(this);
ec_control_->activate();
}
return 0;
}
else
ACE_ERROR_RETURN ((LM_ERROR,
"TAO_EC_Gateway_IIOP - init_i "
"Supplier and consumer event channel reference "
"should be nil.\n"), -1);
}
void
TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
}
void
TAO_EC_Gateway_IIOP::cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
// In case we are still pushing, don't cleanup the proxies
if (this->busy_count_ != 0)
{
this->cleanup_posted_ = 1;
return;
}
this->cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
}
void
TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL)
{
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
this->disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void
TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->consumer_proxy_map_.current_size () > 0)
{
for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
j != this->consumer_proxy_map_.end ();
++j)
{
RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
if (CORBA::is_nil (consumer))
continue;
ACE_TRY
{
consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
}
ACE_ENDTRY;
CORBA::release (consumer);
}
// Remove all the elements on the map. Calling close() does not
// work because the map is left in an inconsistent state.
this->consumer_proxy_map_.open ();
}
if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
{
this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->default_consumer_proxy_ =
RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
}
}
void
TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_DECL)
{
if (!CORBA::is_nil (this->supplier_proxy_.in ()))
{
this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->supplier_proxy_ =
RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
}
}
void
TAO_EC_Gateway_IIOP::reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_DECL)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
if (this->busy_count_ != 0)
{
this->update_posted_ = 1;
return;
}
this->update_consumer_i (c_qos_ ACE_ENV_ARG_PARAMETER);
}
void
TAO_EC_Gateway_IIOP::update_consumer (
const RtecEventChannelAdmin::ConsumerQOS& c_qos
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
if (c_qos.dependencies.length () == 0)
return;
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
this->c_qos_ = c_qos;
if (this->busy_count_ != 0)
{
this->update_posted_ = 1;
return;
}
this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER);
}
void
TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
if (this->consumer_proxy_map_.current_size () > 0)
{
for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
j != this->consumer_proxy_map_.end ();
++j)
{
RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
if (CORBA::is_nil (consumer))
continue;
CORBA::release (consumer);
}
// Remove all the elements on the map. Calling close() does not
// work because the map is left in an inconsistent state.
this->consumer_proxy_map_.open ();
}
if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
{
this->default_consumer_proxy_ =
RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
}
}
void
TAO_EC_Gateway_IIOP::update_consumer_i (
const RtecEventChannelAdmin::ConsumerQOS& c_qos
ACE_ENV_ARG_DECL)
{
this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
if (CORBA::is_nil (this->consumer_ec_.in ())
|| CORBA::is_nil (this->supplier_ec_.in ()))
return;
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
this->open_i (c_qos ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
TAO_EC_Gateway_IIOP::open_i (
const RtecEventChannelAdmin::ConsumerQOS& c_qos
ACE_ENV_ARG_DECL)
{
// = Connect as a supplier to the consumer EC
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
this->consumer_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
sub.is_gateway = 1;
// Change the RT_Info in the consumer QoS.
// On the same loop we discover the subscriptions by event source,
// and fill the consumer proxy map if we have to use this map.
for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
{
sub.dependencies[i].rt_info = this->supplier_info_;
RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
const RtecEventComm::EventHeader &h =
sub.dependencies[i].event.header;
RtecEventComm::EventSourceID sid = h.source;
//ACE_DEBUG ((LM_DEBUG,
// "ECG (%t) trying (%d,%d)\n",
// sid, h.type));
// Skip all subscriptions that do not require an specific source
// id or skip all subscriptions when we don't need to use the consumer
// proxy map.
if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
continue;
// Skip all the magic event types.
if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
continue;
if (this->consumer_proxy_map_.find (sid, proxy) == -1)
{
//ACE_DEBUG ((LM_DEBUG,
// "ECG (%t) binding source %d\n",
// sid));
proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_proxy_map_.bind (sid, proxy);
}
}
//ACE_DEBUG ((LM_DEBUG,
// "ECG (%t) consumer map computed (%d entries)\n",
// this->consumer_proxy_map_.current_size ()));
if (this->consumer_proxy_map_.current_size () > 0)
{
this->supplier_is_active_ = 1;
// Obtain a reference to our supplier personality...
RtecEventComm::PushSupplier_var supplier_ref =
this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// For each subscription by source build the set of publications
// (they may several, by type, for instance) and connect to the
// consumer proxy.
for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
j != this->consumer_proxy_map_.end ();
++j)
{
RtecEventChannelAdmin::SupplierQOS pub;
pub.publications.length (sub.dependencies.length () + 1);
pub.is_gateway = 1;
int c = 0;
RtecEventComm::EventSourceID sid = (*j).ext_id_;
for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
{
const RtecEventComm::EventHeader& h =
sub.dependencies[k].event.header;
if (h.source != sid
|| (ACE_ES_EVENT_ANY < h.type
&& h.type < ACE_ES_EVENT_UNDEFINED))
continue;
pub.publications[c].event.header = h;
pub.publications[c].dependency_info.dependency_type =
RtecBase::TWO_WAY_CALL;
pub.publications[c].dependency_info.number_of_calls = 1;
pub.publications[c].dependency_info.rt_info = this->consumer_info_;
c++;
}
//ACE_DEBUG ((LM_DEBUG,
// "ECG (%t) supplier id %d has %d elements\n",
// sid, c));
if (c == 0)
continue;
pub.publications.length (c);
// ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
// ACE_SupplierQOS_Factory::debug (pub);
(*j).int_id_->connect_push_supplier (supplier_ref.in (),
pub
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -