📄 driver.cpp
字号:
void
EC_Driver::build_consumer_qos (
int i,
RtecEventChannelAdmin::ConsumerQOS& qos,
int& shutdown_event_type
ACE_ENV_ARG_DECL_NOT_USED)
{
RtecBase::handle_t rt_info = 0;
int type_start =
this->consumer_type_start_
+ i * this->consumer_type_shift_;
shutdown_event_type = type_start + this->consumer_type_count_;
ACE_ConsumerQOS_Factory qos_factory;
qos_factory.start_disjunction_group (1 + this->consumer_type_count_);
qos_factory.insert_type (shutdown_event_type, rt_info);
for (int j = 0; j != this->consumer_type_count_; ++j)
qos_factory.insert_type (type_start + j, rt_info);
qos = qos_factory.get_ConsumerQOS ();
}
void
EC_Driver::connect_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
this->event_channel_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
for (int i = 0; i < this->n_suppliers_; ++i)
{
this->connect_supplier (supplier_admin.in (), i ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) connected supplier(s)\n"));
}
void
EC_Driver::connect_supplier (
RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin,
int i
ACE_ENV_ARG_DECL)
{
RtecEventChannelAdmin::SupplierQOS qos;
int shutdown_event_type;
this->build_supplier_qos (i, qos, shutdown_event_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->suppliers_[i]->connect (supplier_admin,
qos,
shutdown_event_type
ACE_ENV_ARG_PARAMETER);
}
void
EC_Driver::build_supplier_qos (
int i,
RtecEventChannelAdmin::SupplierQOS& qos,
int& shutdown_event_type
ACE_ENV_ARG_DECL_NOT_USED)
{
int type_start = this->supplier_type_start_ + i*this->supplier_type_shift_;
int supplier_id = i + 1;
shutdown_event_type = type_start + this->supplier_type_count_;
RtecBase::handle_t rt_info = 0;
ACE_SupplierQOS_Factory qos_factory;
for (int j = 0; j != this->supplier_type_count_; ++j)
qos_factory.insert (supplier_id,
type_start + j,
rt_info, 1);
qos_factory.insert (supplier_id,
shutdown_event_type,
rt_info, 1);
qos = qos_factory.get_SupplierQOS ();
}
void
EC_Driver::execute_test (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->allocate_tasks () == -1)
return;
this->activate_tasks (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) suppliers are active\n"));
// Wait for the supplier threads...
if (ACE_Thread_Manager::instance ()->wait () == -1)
{
ACE_ERROR ((LM_ERROR, "EC_Driver (%P|%t) Thread_Manager wait failed\n"));
}
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) suppliers finished\n"));
}
int
EC_Driver::allocate_tasks (void)
{
if (this->tasks_ != 0)
return 0;
ACE_NEW_RETURN (this->tasks_,
ACE_Task_Base*[this->n_suppliers_],
-1);
for (int i = 0; i < this->n_suppliers_; ++i)
this->tasks_[i] =
this->allocate_task (i);
return 0;
}
ACE_Task_Base*
EC_Driver::allocate_task (int i)
{
int start = this->supplier_type_start_ + i*this->supplier_type_shift_;
return new EC_Supplier_Task (this->suppliers_[i],
this,
this->suppliers_ + i,
this->burst_count_,
this->burst_size_,
this->burst_pause_,
this->payload_size_,
start + this->supplier_type_count_);
}
void
EC_Driver::activate_tasks (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
int priority =
(ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
if (ACE_BIT_DISABLED (this->thr_create_flags_, THR_SCHED_FIFO))
{
priority =
ACE_Sched_Params::priority_min (ACE_SCHED_OTHER);
}
for (int i = 0; i < this->n_suppliers_; ++i)
{
if (this->tasks_[i]->activate (this->thr_create_flags_,
1, 0, priority) == -1)
{
ACE_ERROR ((LM_ERROR,
"EC_Driver (%P|%t) Cannot activate thread "
"for supplier %d\n%p\n",
i, "EC_Driver - OS error is:"));
}
}
}
void
EC_Driver::disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->suppliers_ == 0)
return;
for (int i = 0; i < this->n_suppliers_; ++i)
{
this->suppliers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) suppliers disconnected\n"));
}
void
EC_Driver::disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->consumers_ == 0)
return;
for (int i = 0; i < this->n_consumers_; ++i)
{
this->consumers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) consumers disconnected\n"));
}
void
EC_Driver::shutdown_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->suppliers_ == 0)
return;
for (int i = 0; i < this->n_suppliers_; ++i)
{
this->suppliers_[i]->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) suppliers deactivated\n"));
}
void
EC_Driver::shutdown_consumers (ACE_ENV_SINGLE_ARG_DECL)
{
if (this->consumers_ == 0)
return;
for (int i = 0; i < this->n_consumers_; ++i)
{
this->consumers_[i]->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
if (this->verbose ())
ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) consumers deactivated\n"));
}
void
EC_Driver::dump_results (void)
{
ACE_Throughput_Stats throughput;
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
char buf[BUFSIZ];
for (int j = 0; j < this->n_consumers_; ++j)
{
ACE_OS::sprintf (buf, "Consumer [%02d]", j);
this->consumers_[j]->dump_results (buf, gsf);
this->consumers_[j]->accumulate (throughput);
}
ACE_DEBUG ((LM_DEBUG, "\n"));
ACE_Throughput_Stats suppliers;
for (int i = 0; i < this->n_suppliers_; ++i)
{
ACE_OS::sprintf (buf, "Supplier [%02d]", i);
this->suppliers_[i]->dump_results (buf, gsf);
this->suppliers_[i]->accumulate (suppliers);
}
ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
throughput.dump_results ("EC_Consumer/totals", gsf);
ACE_DEBUG ((LM_DEBUG, "\n"));
suppliers.dump_results ("EC_Supplier/totals", gsf);
}
int
EC_Driver::parse_args (int &argc, char *argv [])
{
ACE_Arg_Shifter arg_shifter (argc, argv);
while (arg_shifter.is_anything_left ())
{
const char *arg = arg_shifter.get_current ();
if (ACE_OS::strcmp (arg, "-verbose") == 0)
{
arg_shifter.consume_arg ();
this->verbose_ = 1;
}
else if (ACE_OS::strcmp (arg, "-remote") == 0)
{
arg_shifter.consume_arg ();
this->use_remote_ec_ = 1;
if (arg_shifter.is_parameter_next ())
{
this->event_service_name_ = arg_shifter.get_current ();
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-old_reactive") == 0)
{
arg_shifter.consume_arg ();
this->use_old_ec_ = 1;
}
else if (ACE_OS::strcmp (arg, "-old_threaded") == 0)
{
arg_shifter.consume_arg ();
this->use_old_ec_ = 1;
}
else if (ACE_OS::strcmp (arg, "-suppliers") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->n_suppliers_ = ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-consumers") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->n_consumers_ = ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-burstcount") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->burst_count_ = ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-burstsize") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->burst_size_ = ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-payloadsize") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->payload_size_ = ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-burstpause") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->burst_pause_ = ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-consumer_tstart") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->consumer_type_start_ =
ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-consumer_tcount") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->consumer_type_count_ =
ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-consumer_tshift") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->consumer_type_shift_ =
ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-supplier_tstart") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->supplier_type_start_ =
ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-supplier_tcount") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->supplier_type_count_ =
ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else if (ACE_OS::strcmp (arg, "-supplier_tshift") == 0)
{
arg_shifter.consume_arg ();
if (arg_shifter.is_parameter_next ())
{
this->supplier_type_shift_ =
ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
}
}
else
{
arg_shifter.ignore_arg ();
}
}
return 0;
}
void
EC_Driver::print_usage (void)
{
ACE_DEBUG ((LM_DEBUG,
"EC_Driver Usage:\n"
" -verbose\n"
" -remote <ec_name>\n"
" -old_reactive\n"
" -old_threaded\n"
" -suppliers <nsuppliers>\n"
" -consumers <nsuppliers>\n"
" -burstcount <bursts>\n"
" -burstsize <size>\n"
" -payloadsize <size>\n"
" -burstpause <usecs>\n"
" -consumer_tstart <type>\n"
" -consumer_tcount <count>\n"
" -consumer_tshift <shift>\n"
" -supplier_tstart <type>\n"
" -supplier_tcount <count>\n"
" -supplier_tshift <shift>\n"
));
}
void
EC_Driver::modify_attributes (TAO_EC_Event_Channel_Attributes& attr)
{
ACE_UNUSED_ARG(attr);
// This method can be overruled by derived tests to set the event channel
// attributes
}
void
EC_Driver::cleanup_tasks (void)
{
if (this->tasks_ != 0)
{
for (int i = 0; i != this->n_suppliers_; ++i)
{
delete this->tasks_[i];
this->tasks_[i] = 0;
}
delete[] this->tasks_;
this->tasks_ = 0;
}
}
void
EC_Driver::cleanup_suppliers (void)
{
if (this->suppliers_ != 0)
{
for (int i = 0; i != this->n_suppliers_; ++i)
{
delete this->suppliers_[i];
this->suppliers_[i] = 0;
}
delete[] this->suppliers_;
this->suppliers_ = 0;
}
}
void
EC_Driver::cleanup_consumers (void)
{
if (this->consumers_ != 0)
{
for (int i = 0; i != this->n_consumers_; ++i)
{
delete this->consumers_[i];
this->consumers_[i] = 0;
}
delete[] this->consumers_;
this->consumers_ = 0;
}
}
void
EC_Driver::cleanup_ec (void)
{
delete this->ec_impl_;
#if !defined(EC_DISABLE_OLD_EC)
delete this->module_factory_;
#endif
}
int
EC_Driver::decode_consumer_cookie (void* cookie) const
{
return ACE_static_cast(EC_Consumer**,cookie) - this->consumers_;
}
int
EC_Driver::decode_supplier_cookie (void* cookie) const
{
return ACE_static_cast(EC_Supplier**,cookie) - this->suppliers_;
}
void
EC_Driver::consumer_push (void*,
const RtecEventComm::EventSet&
ACE_ENV_ARG_DECL_NOT_USED)
{
}
void
EC_Driver::consumer_shutdown (void*
ACE_ENV_ARG_DECL_NOT_USED)
{
}
void
EC_Driver::consumer_disconnect (void*
ACE_ENV_ARG_DECL_NOT_USED)
{
}
void
EC_Driver::supplier_disconnect (void*
ACE_ENV_ARG_DECL_NOT_USED)
{
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -