📄 ec_mcast.cpp
字号:
: federation_ (federation),
supplier_proxy_ (0),
consumer_admin_ (0)
{
}
void
ECM_Consumer::open (const char*,
RtecEventChannelAdmin::EventChannel_ptr ec,
ACE_RANDR_TYPE &seed
ACE_ENV_ARG_DECL)
{
// The worst case execution time is far less than 2
// milliseconds, but that is a safe estimate....
ACE_Time_Value tv (0, 2000);
TimeBase::TimeT time;
ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
// = Connect as a consumer.
this->consumer_admin_ = ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->connect (seed ACE_ENV_ARG_PARAMETER);
}
void
ECM_Consumer::connect (ACE_RANDR_TYPE &seed
ACE_ENV_ARG_DECL)
{
if (CORBA::is_nil (this->consumer_admin_.in ()))
return;
this->supplier_proxy_ =
this->consumer_admin_->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
ACE_ConsumerQOS_Factory qos;
qos.start_disjunction_group ();
qos.insert_type (ACE_ES_EVENT_SHUTDOWN, 0);
const ECM_Federation* federation = this->federation_->federation ();
for (int i = 0; i < federation->consumer_types (); ++i)
{
unsigned int x = ACE_OS::rand_r (seed);
if (x < RAND_MAX / 2)
{
ACE_DEBUG ((LM_DEBUG,
"Federation %s leaves group %s\n",
federation->name (),
federation->consumer_name (i)));
this->federation_->subscribed_bit (i, 0);
continue;
}
ACE_DEBUG ((LM_DEBUG,
"Federation %s joins group %s\n",
federation->name (),
federation->consumer_name (i)));
this->federation_->subscribed_bit (i, 1);
qos.insert_type (federation->consumer_ipaddr (i), 0);
}
RtecEventComm::PushConsumer_var objref = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->supplier_proxy_->connect_push_consumer (objref.in (),
qos.get_ConsumerQOS ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
ECM_Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
if (CORBA::is_nil (this->supplier_proxy_.in ())
|| CORBA::is_nil (this->consumer_admin_.in ()))
return;
RtecEventChannelAdmin::ProxyPushSupplier_var tmp =
this->supplier_proxy_._retn ();
tmp->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void
ECM_Consumer::close (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_TRY
{
this->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->consumer_admin_ =
RtecEventChannelAdmin::ConsumerAdmin::_nil ();
}
ACE_CATCHANY
{
this->consumer_admin_ =
RtecEventChannelAdmin::ConsumerAdmin::_nil ();
ACE_RE_THROW;
}
ACE_ENDTRY;
}
void
ECM_Consumer::push (const RtecEventComm::EventSet& events
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
ACE_hrtime_t arrival = ACE_OS::gethrtime ();
this->federation_->consumer_push (arrival, events ACE_ENV_ARG_PARAMETER);
}
void
ECM_Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
// ****************************************************************
ECM_Local_Federation::ECM_Local_Federation (ECM_Federation *federation,
ECM_Driver *driver)
: federation_ (federation),
driver_ (driver),
consumer_ (this),
supplier_ (this),
recv_count_ (0),
unfiltered_count_ (0),
invalid_count_ (0),
send_count_ (0),
event_count_ (0),
last_publication_change_ (0),
last_subscription_change_ (0),
mcast_eh_ (0),
seed_ (0),
subscription_change_period_ (10000),
publication_change_period_ (10000)
{
receiver_ = TAO_ECG_UDP_Receiver::create (true);
ACE_NEW (mcast_eh_, TAO_ECG_Mcast_EH((&*receiver_)));
ACE_NEW (this->subscription_subset_,
CORBA::Boolean[this->consumer_types ()]);
}
ECM_Local_Federation::~ECM_Local_Federation (void)
{
delete mcast_eh_;
delete[] this->subscription_subset_;
}
void
ECM_Local_Federation::open (int event_count,
RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL)
{
this->event_count_ = event_count;
const int bufsize = 512;
char buf[bufsize];
ACE_OS::strcpy (buf, this->federation_->name ());
ACE_OS::strcat (buf, "/supplier");
this->supplier_.open (buf, ec ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_OS::strcpy (buf, this->federation_->name ());
ACE_OS::strcat (buf, "/consumer");
this->consumer_.open (buf, ec, this->seed_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->last_subscription_change_ = ACE_OS::gettimeofday ();
}
void
ECM_Local_Federation::close (ACE_ENV_SINGLE_ARG_DECL)
{
this->consumer_.close (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->supplier_.close (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void
ECM_Local_Federation::activate (RtecEventChannelAdmin::EventChannel_ptr ec,
RtecEventComm::Time interval
ACE_ENV_ARG_DECL)
{
this->supplier_.activate (ec, interval ACE_ENV_ARG_PARAMETER);
}
void
ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer
ACE_ENV_ARG_DECL)
{
RtecEventComm::EventSet sent (1);
sent.length (1);
RtecEventComm::Event& s = sent[0];
s.header.source = this->supplier_.supplier_id();
s.header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
this->event_count_--;
// ACE_DEBUG ((LM_DEBUG, "Federation <%s> event count <%d>\n",
// this->name (), this->event_count_));
if (this->event_count_ < 0)
{
this->driver_->federation_has_shutdown (this ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
return;
}
int i = this->event_count_ % this->federation_->supplier_types ();
s.header.type = this->federation_->supplier_ipaddr (i);
consumer->push (sent ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->send_count_++;
ACE_Time_Value delta = ACE_OS::gettimeofday () -
this->last_subscription_change_;
unsigned int x = ACE_OS::rand_r (this->seed_);
double p = double (x) / RAND_MAX;
double maxp = double (delta.msec ()) / this->subscription_change_period_;
if (4 * p < maxp)
{
ACE_DEBUG ((LM_DEBUG,
"Reconfiguring federation %s: %f %f [%d]\n",
this->name (), p, maxp, x));
this->consumer_.disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->consumer_.connect (this->seed_ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->last_subscription_change_ = ACE_OS::gettimeofday ();
}
}
void
ECM_Local_Federation::consumer_push (ACE_hrtime_t,
const RtecEventComm::EventSet &event
ACE_ENV_ARG_DECL_NOT_USED)
{
if (event.length () == 0)
{
return;
}
for (CORBA::ULong i = 0; i < event.length (); ++i)
{
const RtecEventComm::Event& e = event[i];
this->recv_count_++;
int j = 0;
for (; j < this->federation_->consumer_types (); ++j)
{
CORBA::ULong type = e.header.type;
if (type == this->federation_->consumer_ipaddr(j))
{
if (this->subscribed_bit (j) == 0)
this->unfiltered_count_++;
break;
}
}
if (j == this->federation_->consumer_types ())
this->invalid_count_++;
}
}
void
ECM_Local_Federation::open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec,
TAO_ECG_Refcounted_Endpoint ignore_from
ACE_ENV_ARG_DECL)
{
RtecUDPAdmin::AddrServer_var addr_server =
this->federation_->addr_server (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
ACE_Reactor* reactor = TAO_ORB_Core_instance ()->reactor ();
this->receiver_->init (ec,
ignore_from,
addr_server.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
const int bufsize = 512;
char buf[bufsize];
ACE_OS::strcpy (buf, this->name ());
ACE_OS::strcat (buf, "/receiver");
RtecEventComm::EventSourceID source = ACE::crc32 (buf);
this->mcast_eh_->reactor (reactor);
this->mcast_eh_->open (ec ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_SupplierQOS_Factory qos;
for (int i = 0; i < this->consumer_types (); ++i)
{
qos.insert (source,
this->consumer_ipaddr (i),
0, 1);
}
RtecEventChannelAdmin::SupplierQOS qos_copy =
qos.get_SupplierQOS ();
this->receiver_->connect (qos_copy ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
ECM_Local_Federation::close_receiver (ACE_ENV_SINGLE_ARG_DECL)
{
this->receiver_->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->mcast_eh_->shutdown ();
}
void
ECM_Local_Federation::dump_results (void) const
{
double unfiltered_ratio = 0;
if (this->recv_count_ != 0)
unfiltered_ratio = double(this->unfiltered_count_)/this->recv_count_;
double invalid_ratio = 0;
if (this->recv_count_ != 0)
invalid_ratio = double(this->invalid_count_)/this->recv_count_;
ACE_DEBUG ((LM_DEBUG,
"Federation: %s\n"
" events received: %d\n"
" unfiltered events received: %d\n"
" ratio: %f\n"
" invalid events received: %d\n"
" ratio: %f\n"
" events sent: %d\n",
this->name (),
this->recv_count_,
this->unfiltered_count_,
unfiltered_ratio,
this->invalid_count_,
invalid_ratio,
this->send_count_));
}
void
ECM_Local_Federation::subscribed_bit (int i, CORBA::Boolean x)
{
if (i > this->consumer_types ())
return;
this->subscription_subset_[i] = x;
}
CORBA::Boolean
ECM_Local_Federation::subscribed_bit (int i) const
{
if (i > this->consumer_types ())
return 0;
return this->subscription_subset_[i];
}
int
main (int argc, char *argv [])
{
TAO_EC_Default_Factory::init_svcs ();
ECM_Driver driver;
return driver.run (argc, argv);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -