📄 throughput.cpp
字号:
const ACE_TCHAR* current_arg = 0;
while (arg_shifter.is_anything_left ())
{
if (arg_shifter.cur_arg_strncasecmp ("-collocated_ec") == 0)
{
this->collocated_ec_ = 1;
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-consumers")))
{
this->consumer_count_ = ACE_OS::atoi (current_arg);
// The number of events to send/receive.
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-suppliers")))
{
this->supplier_count_ = ACE_OS::atoi (current_arg);
// The number of events to send/receive.
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-burst_size")))
{
this->burst_size_ = ACE_OS::atoi (current_arg);
// The number of events to send/receive.
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-burst_count")))
{
this->burst_count_ = ACE_OS::atoi (current_arg);
//
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-burst_pause")))
{
this->burst_pause_ = ACE_OS::atoi (current_arg);
//
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-payload")))
{
this->payload_size_ = ACE_OS::atoi (current_arg);
ACE_NEW_RETURN (this->payload_,
char [this->payload_size_],
-1);
//
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-EC")))
{
this->ec_name_ = current_arg;
//
arg_shifter.consume_arg ();
}
else if ((current_arg =
arg_shifter.get_the_parameter ("-ExpectedCount")))
{
this->perconsumer_count_ = ACE_OS::atoi (current_arg);
//
arg_shifter.consume_arg ();
}
else if (arg_shifter.cur_arg_strncasecmp ("-?") == 0)
{
ACE_DEBUG((LM_DEBUG,
"usage: %s "
"-collocated_ec, "
"-consumers [count], "
"-suppliers [count], "
"-burst_size [size], "
"-burst_count [count], "
"-burst_pause [time(uS)], "
"-payload [size]"
"-EC [Channel Name]"
"-ExpectedCount [count]\n",
argv[0], argv[0]));
arg_shifter.consume_arg ();
return -1;
}
else
{
arg_shifter.ignore_arg ();
}
}
// Recalculate.
peer_done_count_ = consumer_count_ + supplier_count_;
return 0;
}
void
Notify_Throughput::create_EC (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->collocated_ec_ == 1)
{
TAO_Notify_Service* notify_service = ACE_Dynamic_Service<TAO_Notify_Service>::instance (TAO_NOTIFICATION_SERVICE_NAME);
if (notify_service == 0)
{
ACE_DEBUG ((LM_DEBUG, "Service not found! check conf. file\n"));
return;
}
// Activate the factory
this->notify_factory_ =
notify_service->create (this->root_poa_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_ASSERT (!CORBA::is_nil (this->notify_factory_.in ()));
}
else
{
this->resolve_naming_service (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->resolve_Notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
// A channel name was specified, use that to resolve the service.
if (this->ec_name_.length () != 0)
{
CosNaming::Name name (1);
name.length (1);
name[0].id = CORBA::string_dup (ec_name_.c_str ());
CORBA::Object_var obj =
this->naming_context_->resolve (name
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->ec_ =
CosNotifyChannelAdmin::EventChannel::_narrow (obj.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
else
{
CosNotifyChannelAdmin::ChannelID id;
ec_ = notify_factory_->create_channel (initial_qos_,
initial_admin_,
id
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
}
void
Notify_Throughput::run_test (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG, "collocated_ec_ %d ,"
"burst_count_ %d, "
"burst_pause_ %d, "
"burst_size_ %d, "
"payload_size_ %d, "
"consumer_count_ %d, "
"supplier_count_ %d "
"expected count %d\n",
collocated_ec_,
burst_count_ ,
burst_pause_ ,
burst_size_ ,
payload_size_,
consumer_count_ ,
supplier_count_ ,
perconsumer_count_));
for (int i = 0; i < this->supplier_count_; ++i)
{
suppliers_[i]->
TAO_Notify_Tests_StructuredPushSupplier::init (root_poa_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (suppliers_[i]->ACE_Task_Base::activate (THR_NEW_LWP | THR_JOINABLE) != 0)
{
ACE_ERROR ((LM_ERROR,
"Cannot activate client threads\n"));
}
}
// Wait till we're signalled done.
{
ACE_DEBUG ((LM_DEBUG, "(%t)Waiting for shutdown signal in main..\n"));
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_);
while (this->peer_done_count_ != 0)
{
condition_.wait ();
}
}
if (this->ec_name_.length () == 0) // we are not using a global EC
{
// Destroy the ec.
this->ec_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
// Signal the workers.
this->worker_.done_ = 1;
}
void
Notify_Throughput::peer_done (void)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_);
if (--this->peer_done_count_ == 0)
{
ACE_DEBUG ((LM_DEBUG, "calling shutdown\n"));
condition_.broadcast ();
}
}
void
Notify_Throughput::dump_results (void)
{
ACE_Throughput_Stats throughput;
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
char buf[BUFSIZ];
for (int j = 0; j < this->consumer_count_; ++j)
{
ACE_OS::sprintf (buf, "Consumer [%02d]", j);
this->consumers_[j]->dump_stats (buf, gsf);
this->consumers_[j]->accumulate_into (throughput);
}
ACE_DEBUG ((LM_DEBUG, "\n"));
ACE_Throughput_Stats suppliers;
for (int i = 0; i < this->supplier_count_; ++i)
{
ACE_OS::sprintf (buf, "Supplier [%02d]", i);
this->suppliers_[i]->dump_stats (buf, gsf);
this->suppliers_[i]->accumulate_into (suppliers);
}
ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
throughput.dump_results ("Notify_Consumer/totals", gsf);
ACE_DEBUG ((LM_DEBUG, "\n"));
suppliers.dump_results ("Notify_Supplier/totals", gsf);
}
/***************************************************************************/
int
main (int argc, char* argv[])
{
ACE_High_Res_Timer::calibrate ();
Notify_Throughput events;
if (events.parse_args (argc, argv) == -1)
{
return 1;
}
ACE_TRY_NEW_ENV
{
events.init (argc, argv
ACE_ENV_ARG_PARAMETER); //Init the Client
ACE_TRY_CHECK;
events.run_test (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "Waiting for threads to exit...\n"));
ACE_Thread_Manager::instance ()->wait ();
events.dump_results();
ACE_DEBUG ((LM_DEBUG, "ending main...\n"));
}
ACE_CATCH (CORBA::UserException, ue)
{
ACE_PRINT_EXCEPTION (ue,
"Events user error: ");
return 1;
}
ACE_CATCH (CORBA::SystemException, se)
{
ACE_PRINT_EXCEPTION (se,
"Events system error: ");
return 1;
}
ACE_ENDTRY;
return 0;
}
// ****************************************************************
Worker::Worker (void)
:done_ (0)
{
}
void
Worker::orb (CORBA::ORB_ptr orb)
{
orb_ = CORBA::ORB::_duplicate (orb);
}
int
Worker::svc (void)
{
ACE_Time_Value tv(5);
do
{
this->orb_->run (tv);
}while (!this->done_);
ACE_DEBUG ((LM_DEBUG, "(%P) (%t) done\n"));
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -