📄 observer.cpp
字号:
// Observer.cpp,v 1.15 2002/01/29 20:20:54 okellogg Exp
#include "Observer.h"
#include "Consumer.h"
#include "Supplier.h"
#include "orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/Event/EC_Default_Factory.h"
#include "ace/Arg_Shifter.h"
#include "ace/High_Res_Timer.h"
ACE_RCSID(EC_Tests_Basic, Observer, "Observer.cpp,v 1.15 2002/01/29 20:20:54 okellogg Exp")
int
main (int argc, char *argv [])
{
TAO_EC_Default_Factory::init_svcs ();
EC_Master master;
return master.run (argc, argv);
}
// ****************************************************************
EC_Master::EC_Master (void)
: seed_ (0),
n_channels_ (4),
channels_ (0)
{
}
EC_Master::~EC_Master (void)
{
if (this->channels_ != 0)
{
for (int i = 0; i < this->n_channels_; ++i)
delete this->channels_[i];
delete[] this->channels_;
}
}
int
EC_Master::run (int argc, char* argv[])
{
ACE_TRY_NEW_ENV
{
// Calibrate the high resolution timer *before* starting the
// test.
ACE_High_Res_Timer::calibrate ();
this->seed_ = ACE_OS::time (0);
this->initialize_orb_and_poa (argc, argv ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (this->parse_args (argc, argv))
return 1;
ACE_DEBUG ((LM_DEBUG,
"The seed value is %d\n", this->seed_));
ACE_NEW_RETURN (this->channels_,
EC_Observer*[this->n_channels_],
1);
{
for (int i = 0; i != this->n_channels_; ++i)
{
ACE_OS::rand_r (this->seed_);
ACE_NEW_RETURN (this->channels_[i],
EC_Observer (this,
this->seed_,
this->orb_.in (),
this->root_poa_.in (),
i),
1);
}
}
{
char** targv;
ACE_NEW_RETURN (targv, char*[argc], 1);
for (int i = 0; i != this->n_channels_; ++i)
{
int targc = argc;
for (int j = 0; j < targc; ++j)
targv[j] = argv[j];
this->channels_[i]->run_init (targc, targv ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
delete[] targv;
}
{
for (int i = 0; i != this->n_channels_; ++i)
{
this->channels_[i]->execute_test (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
if (ACE_Thread_Manager::instance ()->wait () == -1)
{
ACE_ERROR ((LM_ERROR,
"EC_Master (%P|%t) thread manager wait failed\n"));
return 1;
}
{
for (int i = 0; i != this->n_channels_; ++i)
{
this->channels_[i]->dump_results ();
}
}
{
for (int i = 0; i != this->n_channels_; ++i)
{
this->channels_[i]->run_cleanup (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
{
for (int i = 0; i != this->n_channels_; ++i)
{
this->channels_[i]->disconnect_clients (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->channels_[i]->shutdown_clients (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->channels_[i]->destroy_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->channels_[i]->deactivate_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->channels_[i]->cleanup_tasks ();
this->channels_[i]->cleanup_suppliers ();
this->channels_[i]->cleanup_consumers ();
this->channels_[i]->cleanup_ec ();
}
}
this->root_poa_->destroy (1,
1
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->orb_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "EC_Driver::run");
}
ACE_CATCHALL
{
ACE_ERROR ((LM_ERROR, "EC_Driver (%P|%t) non-corba exception raised\n"));
}
ACE_ENDTRY;
return 0;
}
void
EC_Master::initialize_orb_and_poa (int &argc, char* argv[]
ACE_ENV_ARG_DECL)
{
this->orb_ =
CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
CORBA::Object_var poa_object =
this->orb_->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (CORBA::is_nil (poa_object.in ()))
{
ACE_ERROR ((LM_ERROR,
"EC_Driver (%P|%t) Unable to initialize the POA.\n"));
return;
}
this->root_poa_ =
PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
PortableServer::POAManager_var poa_manager =
this->root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
int
EC_Master::parse_args (int &argc, char *argv [])
{
ACE_Arg_Shifter arg_shifter (argc, argv);
while (arg_shifter.is_anything_left ())
{
const char *arg = arg_shifter.get_current ();
if (ACE_OS::strcmp (arg, "-channels") == 0)
{
arg_shifter.consume_arg ();
this->n_channels_ = ACE_OS::atoi (arg_shifter.get_current ());
}
else if (ACE_OS::strcmp (arg, "-seed") == 0)
{
arg_shifter.consume_arg ();
this->seed_ = ACE_OS::atoi (arg_shifter.get_current ());
}
arg_shifter.ignore_arg ();
}
return 0;
}
int
EC_Master::channel_count (void) const
{
return this->n_channels_;
}
EC_Observer*
EC_Master::channel (int i) const
{
return this->channels_[i];
}
// ****************************************************************
EC_Observer::EC_Observer (EC_Master *master,
ACE_RANDR_TYPE seed,
CORBA::ORB_ptr orb,
PortableServer::POA_ptr root_poa,
int id)
: master_ (master),
seed_ (seed),
id_ (id),
gwys_ (0)
{
this->orb_ = CORBA::ORB::_duplicate (orb);
this->root_poa_ = PortableServer::POA::_duplicate (root_poa);
}
EC_Observer::~EC_Observer (void)
{
if (this->gwys_ != 0)
delete[] this->gwys_;
}
void
EC_Observer::initialize_orb_and_poa (int&, char*[]
ACE_ENV_ARG_DECL_NOT_USED)
{
}
int
EC_Observer::parse_args (int& argc, char* argv[])
{
return this->EC_Driver::parse_args (argc, argv);
}
void
EC_Observer::print_args (void) const
{
this->EC_Driver::print_args ();
}
void
EC_Observer::print_usage (void)
{
this->EC_Driver::print_usage ();
}
void
EC_Observer::execute_test (ACE_ENV_SINGLE_ARG_DECL)
{
int peer_count = this->master_->channel_count ();
ACE_NEW (this->gwys_, TAO_EC_Gateway_IIOP[peer_count]);
for (int i = 0; i != peer_count; ++i)
{
if (i == this->id_)
continue;
RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
this->master_->channel (i)->event_channel_.in ();
this->gwys_[i].init (rmt_ec,
this->event_channel_.in ()
ACE_ENV_ARG_PARAMETER);
RtecEventChannelAdmin::Observer_var obs =
this->gwys_[i]._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
RtecEventChannelAdmin::Observer_Handle h =
rmt_ec->append_observer (obs.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->gwys_[i].observer_handle (h);
ACE_CHECK;
}
if (this->allocate_tasks () == -1)
return;
this->activate_tasks (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Observer[%d] (%P|%t) suppliers are active\n",
this->id_));
}
void
EC_Observer::run_cleanup (ACE_ENV_SINGLE_ARG_DECL)
{
for (int j = 0; j != this->master_->channel_count (); ++j)
{
if (j == this->id_)
continue;
RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
this->master_->channel (j)->event_channel_.in ();
rmt_ec->remove_observer (this->gwys_[j].observer_handle ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->gwys_[j].shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
void
EC_Observer::dump_results (void)
{
ACE_DEBUG ((LM_DEBUG, "===== Results for %d =====\n", this->id_));
ACE_Throughput_Stats throughput;
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
for (int j = 0; j < this->n_consumers_; ++j)
{
this->consumers_[j]->accumulate (throughput);
}
ACE_DEBUG ((LM_DEBUG, "\n"));
ACE_Throughput_Stats suppliers;
for (int i = 0; i < this->n_suppliers_; ++i)
{
this->suppliers_[i]->accumulate (suppliers);
}
ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
throughput.dump_results ("EC_Consumer/totals", gsf);
ACE_DEBUG ((LM_DEBUG, "\n"));
suppliers.dump_results ("EC_Supplier/totals", gsf);
}
void
EC_Observer::connect_consumer (
RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin,
int i
ACE_ENV_ARG_DECL)
{
if (i == 0)
{
this->EC_Driver::connect_consumer (consumer_admin, i
ACE_ENV_ARG_PARAMETER);
return;
}
unsigned int x = ACE_OS::rand_r (this->seed_);
if (x < RAND_MAX / 8)
this->EC_Driver::connect_consumer (consumer_admin, i
ACE_ENV_ARG_PARAMETER);
}
void
EC_Observer::consumer_push (void*,
const RtecEventComm::EventSet&
ACE_ENV_ARG_DECL)
{
unsigned int x = ACE_OS::rand_r (this->seed_);
if (x < (RAND_MAX / 64))
{
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG,
"EC_Observer[%d] (%P|%t) reconnecting\n", this->id_));
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
this->event_channel_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
for (int i = 1; i < this->n_consumers_; ++i)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
if (this->consumers_[i]->connected ())
{
this->consumers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
else
{
this->EC_Driver::connect_consumer (consumer_admin.in (),
i ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
}
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -