ect_throughput.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 730 行 · 第 1/2 页
CPP
730 行
{
// int ID =
// (ACE_reinterpret_cast(Test_Consumer**,consumer_cookie)
// - this->consumers_);
//
// ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
this->active_count_--;
if (this->active_count_ <= 0)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) shutting down the ORB\n"));
// Not needed: this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER);
}
}
void
ECT_Throughput::connect_consumers
(RtecScheduler::Scheduler_ptr scheduler,
RtecEventChannelAdmin::EventChannel_ptr channel
ACE_ENV_ARG_DECL)
{
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
this->active_count_ = this->n_consumers_;
}
for (int i = 0; i < this->n_consumers_; ++i)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "consumer_%02d", i);
ACE_NEW (this->consumers_[i],
Test_Consumer (this,
this->consumers_ + i,
this->n_suppliers_));
int start = this->consumer_type_start_
+ i * this->consumer_type_shift_;
this->consumers_[i]->connect (scheduler,
buf,
start,
this->consumer_type_count_,
channel
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ECT_Throughput::connect_suppliers
(RtecScheduler::Scheduler_ptr scheduler,
RtecEventChannelAdmin::EventChannel_ptr channel
ACE_ENV_ARG_DECL)
{
for (int i = 0; i < this->n_suppliers_; ++i)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "supplier_%02d", i);
ACE_NEW (this->suppliers_[i], Test_Supplier (this));
int start = this->supplier_type_start_ + i*this->supplier_type_shift_;
this->suppliers_[i]->connect (scheduler,
buf,
this->burst_count_,
this->burst_size_,
this->event_size_,
this->burst_pause_,
start,
this->supplier_type_count_,
channel
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ECT_Throughput::activate_suppliers (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
int priority =
(ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
for (int i = 0; i < this->n_suppliers_; ++i)
{
if (this->suppliers_[i]->activate (this->thr_create_flags_,
1, 0, priority) == -1)
{
ACE_ERROR ((LM_ERROR,
"Cannot activate thread for supplier %d\n",
i));
}
}
}
void
ECT_Throughput::disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
for (int i = 0; i < this->n_suppliers_; ++i)
{
this->suppliers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ECT_Throughput::disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL)
{
for (int i = 0; i < this->n_consumers_; ++i)
{
this->consumers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ECT_Throughput::dump_results (void)
{
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
ACE_Throughput_Stats consumers;
for (int j = 0; j < this->n_consumers_; ++j)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "consumer_%02d", j);
this->consumers_[j]->dump_results (buf, gsf);
this->consumers_[j]->accumulate (consumers);
}
consumers.dump_results ("ECT_Consumer/totals", gsf);
ACE_Throughput_Stats suppliers;
for (int i = 0; i < this->n_suppliers_; ++i)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "supplier_%02d", i);
this->suppliers_[i]->dump_results (buf, gsf);
this->suppliers_[i]->accumulate (suppliers);
}
suppliers.dump_results ("ECT_Supplier/totals", gsf);
}
int
ECT_Throughput::parse_args (int argc, char *argv [])
{
ACE_Get_Opt get_opt (argc, argv, "rdc:s:u:n:t:b:h:l:p:m:w:");
int opt;
while ((opt = get_opt ()) != EOF)
{
switch (opt)
{
case 'r':
this->new_ec_ = 0;
this->reactive_ec_ = 1;
break;
case 'm':
if (ACE_OS::strcasecmp (get_opt.opt_arg (), "rt") == 0)
{
this->new_ec_ = 0;
this->reactive_ec_ = 0;
}
else if (ACE_OS::strcasecmp (get_opt.opt_arg (), "st") == 0)
{
this->new_ec_ = 0;
this->reactive_ec_ = 1;
}
else if (ACE_OS::strcasecmp (get_opt.opt_arg (), "new") == 0)
{
this->new_ec_ = 1;
this->reactive_ec_ = 1;
}
else
{
ACE_DEBUG ((LM_DEBUG,
"Unknown mode <%s> "
"default is rt\n",
get_opt.opt_arg ()));
this->new_ec_ = 0;
this->reactive_ec_ = 0;
}
break;
case 'c':
this->n_consumers_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 's':
this->n_suppliers_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'u':
this->burst_count_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'n':
this->burst_size_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'b':
this->event_size_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 't':
this->burst_pause_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'h':
{
char* aux;
char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);
this->consumer_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->consumer_type_count_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->consumer_type_shift_ = ACE_OS::atoi (arg);
}
break;
case 'l':
{
char* aux;
char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);
this->supplier_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->supplier_type_count_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->supplier_type_shift_ = ACE_OS::atoi (arg);
}
break;
case 'p':
this->pid_file_name_ = get_opt.opt_arg ();
break;
case 'w':
this->ec_concurrency_hwm_ = ACE_OS::atoi (get_opt.opt_arg ());
break;
case '?':
default:
ACE_DEBUG ((LM_DEBUG,
"Usage: %s "
"[ORB options] "
"-r -d -x "
"-c <n_consumers> "
"-s <n_suppliers> "
"-u <burst count> "
"-n <burst size> "
"-b <event payload size> "
"-t <burst pause (usecs)> "
"-h <consumer_start,consumer_count,consumer_shift> "
"-l <supplier_start,supplier_count,supplier_shift> "
"-p <pid file name> "
"-w <concurrency HWM> "
"-r "
"\n",
argv[0]));
return -1;
}
}
if (this->burst_count_ <= 0)
{
ACE_DEBUG ((LM_DEBUG,
"%s: burst count (%d) is out of range, "
"reset to default (%d)\n",
argv[0], this->burst_count_,
100));
this->burst_count_ = 100;
}
if (this->burst_size_ <= 0)
{
ACE_DEBUG ((LM_DEBUG,
"%s: burst size (%d) is out of range, "
"reset to default (%d)\n",
argv[0], this->burst_size_,
10));
this->burst_size_ = 10;
}
if (this->event_size_ < 0)
{
ACE_DEBUG ((LM_DEBUG,
"%s: event size (%d) is out of range, "
"reseting to default (%d)\n",
argv[0], this->event_size_,
128));
this->event_size_ = 128;
}
if (this->n_consumers_ < 0
|| this->n_consumers_ >= ECT_Throughput::MAX_CONSUMERS)
{
this->n_consumers_ = 1;
ACE_ERROR_RETURN ((LM_ERROR,
"%s: number of consumers or "
"suppliers out of range, "
"reset to default (%d)\n",
argv[0], 1), -1);
}
if (this->n_suppliers_ < 0
|| this->n_suppliers_ >= ECT_Throughput::MAX_SUPPLIERS)
{
this->n_suppliers_ = 1;
ACE_ERROR_RETURN ((LM_ERROR,
"%s: number of suppliers out of range, "
"reset to default (%d)\n",
argv[0], 1), -1);
}
if (this->n_suppliers_ == 0 && this->n_consumers_ == 0)
{
this->n_suppliers_ = 1;
this->n_consumers_ = 1;
ACE_ERROR_RETURN ((LM_ERROR,
"%s: no suppliers or consumers, "
"reset to default (%d of each)\n",
argv[0], 1), -1);
}
if (this->ec_concurrency_hwm_ <= 0)
{
this->ec_concurrency_hwm_ = 1;
ACE_ERROR_RETURN ((LM_ERROR,
"%s: invalid concurrency HWM, "
"reset to default (%d)\n",
argv[0], 1), -1);
}
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Auto_Basic_Ptr<POA_RtecScheduler::Scheduler>;
template class auto_ptr<POA_RtecScheduler::Scheduler>;
template class ACE_Auto_Basic_Ptr<POA_RtecEventChannelAdmin::EventChannel>;
template class auto_ptr<POA_RtecEventChannelAdmin::EventChannel>;
template class ACE_Auto_Basic_Ptr<TAO_Module_Factory>;
template class auto_ptr<TAO_Module_Factory>;
template class ACE_Auto_Basic_Ptr<TAO_EC_Factory>;
template class auto_ptr<TAO_EC_Factory>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Auto_Basic_Ptr<POA_RtecScheduler::Scheduler>
#pragma instantiate auto_ptr<POA_RtecScheduler::Scheduler>
#pragma instantiate ACE_Auto_Basic_Ptr<POA_RtecEventChannelAdmin::EventChannel>
#pragma instantiate auto_ptr<POA_RtecEventChannelAdmin::EventChannel>
#pragma instantiate ACE_Auto_Basic_Ptr<TAO_Module_Factory>
#pragma instantiate auto_ptr<TAO_Module_Factory>
#pragma instantiate ACE_Auto_Basic_Ptr<TAO_EC_Factory>
#pragma instantiate auto_ptr<TAO_EC_Factory>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?