📄 ec_multiple.cpp
字号:
this->hpc_event_a_,
this->hpc_event_b_,
local_ec
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->stats_[i].total_time_ = 0;
this->stats_[i].lcl_count_ = 0;
this->stats_[i].rmt_count_ = 0;
}
for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "lp_consumer_%02d@%s",
i - this->hp_consumers_, this->lcl_name_);
ACE_NEW (this->consumers_[i],
Test_Consumer (this, this->consumers_ + i));
this->consumers_[i]->open (buf,
this->lpc_event_a_,
this->lpc_event_b_,
local_ec
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->stats_[i].total_time_ = 0;
this->stats_[i].lcl_count_ = 0;
this->stats_[i].rmt_count_ = 0;
}
this->running_consumers_ = this->hp_consumers_ + this->lp_consumers_;
}
void
Test_ECG::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec,
RtecEventChannelAdmin::EventChannel_ptr remote_ec,
RtecScheduler::Scheduler_ptr remote_sch
ACE_ENV_ARG_DECL)
{
RtecScheduler::Scheduler_ptr local_sch =
ACE_Scheduler_Factory::server ();
// ECG name.
const int bufsize = 512;
char ecg_name[bufsize];
ACE_OS::strcpy (ecg_name, "ecg_");
ACE_OS::strcat (ecg_name, this->lcl_name_);
// We could use the same name on the local and remote scheduler,
// but that fails when using a global scheduler.
char rmt[BUFSIZ];
ACE_OS::strcpy (rmt, ecg_name);
ACE_OS::strcat (rmt, "@");
ACE_OS::strcat (rmt, this->rmt_name_);
// We could use the same name on the local and remote scheduler,
// but that fails when using a global scheduler.
char lcl[bufsize];
ACE_OS::strcpy (lcl, ecg_name);
ACE_OS::strcat (lcl, "@");
ACE_OS::strcat (lcl, this->lcl_name_);
this->ecg_.init (remote_ec, local_ec, remote_sch, local_sch,
rmt, lcl ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
Test_ECG::push_supplier (void * /* cookie */,
RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
const RtecEventComm::EventSet &events
ACE_ENV_ARG_DECL)
{
this->wait_until_ready ();
// ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
// @@ TODO we could keep somekind of stats here...
if (!this->short_circuit_)
{
consumer->push (events ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
else
{
int i = 0;
for (; i < this->hp_consumers_; ++i)
{
this->consumers_[i]->push (events ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
{
this->consumers_[i]->push (events ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
}
void
Test_ECG::push_consumer (void *consumer_cookie,
ACE_hrtime_t arrival,
const RtecEventComm::EventSet &events
ACE_ENV_ARG_DECL_NOT_USED)
{
int ID =
(ACE_reinterpret_cast(Test_Consumer**,consumer_cookie)
- this->consumers_);
// ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));
if (events.length () == 0)
{
// ACE_DEBUG ((LM_DEBUG, "no events\n"));
return;
}
// ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
#if 0
const int bufsize = 128;
char buf[bufsize];
ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID);
print_priority_info (buf);
#endif
for (u_int i = 0; i < events.length (); ++i)
{
const RtecEventComm::Event& e = events[i];
if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
{
this->shutdown_consumer (ID);
continue;
}
ACE_hrtime_t s;
ORBSVCS_Time::TimeT_to_hrtime (s, e.header.creation_time);
ACE_hrtime_t nsec = arrival - s;
if (this->local_source (e.header.source))
{
int& count = this->stats_[ID].lcl_count_;
this->stats_[ID].lcl_latency_[count] = nsec;
int workload = this->hp_workload_;
int interval = this->hp_interval_;
if (ID >= this->hp_consumers_)
{
workload = this->lp_workload_;
interval = this->lp_interval_;
}
for (int j = 0; j < workload; ++j)
{
// Eat a little CPU so the Utilization test can measure the
// consumed time....
/* takes about 40.2 usecs on a 167 MHz Ultra2 */
u_long n = 1279UL;
ACE::is_prime (n, 2, n / 2);
}
// Increment the elapsed time on this consumer.
ACE_hrtime_t now = ACE_OS::gethrtime ();
this->stats_[ID].total_time_ += now - arrival;
this->stats_[ID].end_[count] = now;
// We estimate our laxity based on the event creation
// time... it may not be very precise, but will do; other
// strategies include:
// + Keep track of the "current frame", then then deadline
// is the end of the frame.
// + Use the start of the test to keep the current frame.
// + Use the last execution.
CORBA::ULong tmp = ACE_U64_TO_U32 (s - now);
this->stats_[ID].laxity_[count] = 1 + tmp/1000.0F/interval;
count++;
}
else
{
int& count = this->stats_[ID].rmt_count_;
this->stats_[ID].rmt_latency_[count] = nsec;
count++;
}
}
}
void
Test_ECG::wait_until_ready (void)
{
ACE_GUARD (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_);
while (!this->ready_)
this->ready_cnd_.wait ();
}
void
Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
RtecEventComm::PushConsumer_ptr consumer
ACE_ENV_ARG_DECL)
{
this->running_suppliers_--;
if (this->running_suppliers_ != 0)
return;
// We propagate a shutdown event through the system...
RtecEventComm::EventSet shutdown (1);
shutdown.length (1);
RtecEventComm::Event& s = shutdown[0];
s.header.source = 0;
s.header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
s.header.type = ACE_ES_EVENT_SHUTDOWN;
consumer->push (shutdown ACE_ENV_ARG_PARAMETER);
}
void
Test_ECG::shutdown_consumer (int id)
{
ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id));
this->running_consumers_--;
if (this->running_consumers_ == 0)
if (TAO_ORB_Core_instance ()->orb () == 0)
{
ACE_ERROR ((LM_ERROR,
"(%P|%t) Test_ECG::shutdown_consumer, "
"ORB instance is 0\n"));
}
else
TAO_ORB_Core_instance ()->orb ()->shutdown ();
}
int
Test_ECG::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n"));
if (this->rmt_name_ != 0)
{
this->ecg_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
TAO_ORB_Core_instance ()->orb ()->shutdown ();
return 0;
}
void
Test_ECG::dump_results (void)
{
const int bufsize = 512;
char buf[bufsize];
int i;
for (i = 0; i < this->hp_consumers_; ++i)
{
ACE_OS::sprintf (buf, "HP%02d", i);
this->dump_results (buf, this->stats_[i]);
}
for (i = 0; i < this->lp_consumers_; ++i)
{
ACE_OS::sprintf (buf, "LP%02d", i);
this->dump_results (buf, this->stats_[i + this->hp_consumers_]);
}
CORBA::ULong tmp = ACE_U64_TO_U32 (this->test_stop_ - this->test_start_);
double usec = tmp / 1000.0;
ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec));
}
void
Test_ECG::dump_results (const char* name, Stats& stats)
{
// @@ We are reporting the information without specifics about
double usec = ACE_U64_TO_U32 (stats.total_time_) / 1000.0;
ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec));
int i;
for (i = 1; i < stats.lcl_count_ - 1; ++i)
{
usec = ACE_U64_TO_U32 (stats.lcl_latency_[i]) / 1000.0;
ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec));
double percent = stats.laxity_[i] * 100.0;
ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent));
usec = ACE_U64_TO_U32 (stats.end_[i] - this->test_start_) / 1000.0;
ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec));
}
for (i = 1; i < stats.rmt_count_ - 1; ++i)
{
double usec = ACE_U64_TO_U32 (stats.rmt_latency_[i]) / 1000.0;
ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec));
}
}
int
Test_ECG::local_source (RtecEventComm::EventSourceID id) const
{
for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
{
if (this->suppliers_[i]->supplier_id () == id)
return 1;
}
return 0;
}
int
Test_ECG::parse_args (int argc, char *argv [])
{
ACE_Get_Opt get_opt (argc, argv, "l:r:s:i:xh:w:p:d:");
int opt;
while ((opt = get_opt ()) != EOF)
{
switch (opt)
{
case 'l':
this->lcl_name_ = get_opt.opt_arg ();
break;
case 'r':
this->rmt_name_ = get_opt.opt_arg ();
break;
case 's':
if (ACE_OS::strcasecmp (get_opt.opt_arg (), "global") == 0)
{
this->scheduling_type_ = Test_ECG::ss_global;
}
else if (ACE_OS::strcasecmp (get_opt.opt_arg (), "local") == 0)
{
this->scheduling_type_ = Test_ECG::ss_local;
}
else if (ACE_OS::strcasecmp (get_opt.opt_arg (), "runtime") == 0)
{
this->scheduling_type_ = Test_ECG::ss_runtime;
}
else
{
ACE_DEBUG ((LM_DEBUG,
"Unknown scheduling type <%s> "
"defaulting to local\n",
get_opt.opt_arg ()));
this->scheduling_type_ = Test_ECG::ss_local;
}
break;
case 'x':
this->short_circuit_ = 1;
break;
case 'i':
{
char* aux = 0;
char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);
this->consumer_disconnects_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->supplier_disconnects_ = ACE_OS::atoi (arg);
}
break;
case 'h':
{
char* aux = 0;
char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);
this->hp_suppliers_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hp_consumers_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hp_workload_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hp_interval_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hp_message_count_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->hpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
}
break;
case 'w':
{
char* aux = 0;
char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);
this->lp_suppliers_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lp_consumers_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lp_workload_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lp_interval_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lp_message_count_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
this->lpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
}
break;
case 'p':
this->pid_file_name_ = get_opt.opt_arg ();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -