📄 ec_multiple.cpp
字号:
if (ACE_Scheduler_Factory::use_config (naming_context.in (),
buf) == -1)
return -1;
}
break;
}
// Create the EventService implementation, but don't start its
// internal threads.
// Explicit cat to CORBA::Boolean to disambiguate call.
ACE_EventChannel ec_impl (CORBA::Boolean(0));
// Register Event_Service with the Naming Service.
RtecEventChannelAdmin::EventChannel_var ec =
ec_impl._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
CORBA::String_var str =
orb->object_to_string (ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_OS::sleep (5);
ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ()));
ACE_OS::strcpy (buf, "EventChannel@");
ACE_OS::strcat (buf, this->lcl_name_);
CosNaming::Name channel_name (1);
channel_name.length (1);
channel_name[0].id = CORBA::string_dup (buf);
naming_context->bind (channel_name, ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "waiting to start\n"));
ACE_Time_Value tv (15, 0);
if (this->rmt_name_ != 0)
{
orb->run (&tv ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_DEBUG ((LM_DEBUG, "starting....\n"));
RtecEventChannelAdmin::EventChannel_var local_ec =
this->get_ec (naming_context.in (),
this->lcl_name_
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "located local EC\n"));
for (int sd = 0; sd < this->supplier_disconnects_; ++sd)
{
this->connect_suppliers (local_ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->disconnect_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_OS::sleep (5);
ACE_DEBUG ((LM_DEBUG, "Supplier disconnection %d\n", sd));
}
this->connect_suppliers (local_ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "connected supplier\n"));
RtecEventChannelAdmin::Observer_Handle observer_handle = 0;
if (this->rmt_name_ != 0)
{
tv.set (5, 0);
orb->run (&tv ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
RtecEventChannelAdmin::EventChannel_var remote_ec =
this->get_ec (naming_context.in (),
this->rmt_name_
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "located remote EC\n"));
CosNaming::Name rsch_name (1);
rsch_name.length (1);
ACE_OS::strcpy (buf, "ScheduleService");
if (this->scheduling_type_ != Test_ECG::ss_global)
{
ACE_OS::strcat (buf, "@");
ACE_OS::strcat (buf, this->rmt_name_);
}
rsch_name[0].id = CORBA::string_dup (buf);
CORBA::Object_var tmpobj =
naming_context->resolve (rsch_name ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
RtecScheduler::Scheduler_var remote_sch =
RtecScheduler::Scheduler::_narrow (tmpobj.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->connect_ecg (local_ec.in (),
remote_ec.in (),
remote_sch.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "connected proxy\n"));
tv.set (5, 0);
orb->run (&tv ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
RtecEventChannelAdmin::Observer_ptr observer =
this->ecg_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
observer_handle = ec_impl.append_observer (observer
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
for (int cd = 0; cd < this->consumer_disconnects_; ++cd)
{
this->connect_consumers (local_ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->disconnect_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_OS::sleep (5);
ACE_DEBUG ((LM_DEBUG, "Consumer disconnection %d\n", cd));
}
this->connect_consumers (local_ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "connected consumer\n"));
this->activate_suppliers (local_ec.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
this->running_suppliers_ = this->hp_suppliers_ + this->lp_suppliers_;
// Acquire the mutex for the ready mutex, blocking any supplier
// that may start after this point.
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_, 1);
this->ready_ = 1;
this->test_start_ = ACE_OS::gethrtime ();
this->ready_cnd_.broadcast ();
ready_mon.release ();
ACE_DEBUG ((LM_DEBUG, "activate the EC\n"));
if (this->rmt_name_ != 0)
{
ec_impl.remove_observer (observer_handle ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
// Create the EC internal threads
ec_impl.activate ();
ACE_DEBUG ((LM_DEBUG, "running the test\n"));
orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->test_stop_ = ACE_OS::gethrtime ();
ACE_DEBUG ((LM_DEBUG, "shutdown the EC\n"));
ec_impl.shutdown ();
this->dump_results ();
if (this->schedule_file_ != 0)
{
RtecScheduler::RT_Info_Set_var infos;
RtecScheduler::Dependency_Set_var deps;
RtecScheduler::Config_Info_Set_var configs;
RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
#if defined (__SUNPRO_CC)
// Sun C++ 4.2 warns with the code below:
// Warning (Anachronism): Temporary used for non-const
// reference, now obsolete.
// Note: Type "CC -migration" for more on anachronisms.
// Warning (Anachronism): The copy constructor for argument
// infos of type RtecScheduler::RT_Info_Set_out should take
// const RtecScheduler::RT_Info_Set_out&.
// But, this code is not CORBA conformant, because users should
// not define instances of _out types.
RtecScheduler::RT_Info_Set_out infos_out (infos);
RtecScheduler::Dependency_Set_out deps_out (deps);
RtecScheduler::Config_Info_Set_out configs_out (configs);
RtecScheduler::Scheduling_Anomaly_Set_out anomalies_out (anomalies);
ACE_Scheduler_Factory::server ()->compute_scheduling
(ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
ACE_SCOPE_THREAD),
ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
ACE_SCOPE_THREAD),
infos_out, deps_out,
configs_out, anomalies_out ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
#else /* ! __SUNPRO_CC */
ACE_Scheduler_Factory::server ()->compute_scheduling
(ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
ACE_SCOPE_THREAD),
ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
ACE_SCOPE_THREAD),
infos.out (), deps.out (),
configs.out (), anomalies.out ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
#endif /* ! __SUNPRO_CC */
ACE_Scheduler_Factory::dump_schedule (infos.in (),
deps.in (),
configs.in (),
anomalies.in (),
this->schedule_file_);
}
naming_context->unbind (channel_name ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (this->rmt_name_ != 0)
{
this->ecg_.close (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->ecg_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
this->disconnect_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
this->disconnect_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n"));
tv.set (5, 0);
orb->run (&tv ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCH (CORBA::SystemException, sys_ex)
{
ACE_PRINT_EXCEPTION (sys_ex, "SYS_EX");
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "NON SYS EX");
}
ACE_ENDTRY;
return 0;
}
RtecEventChannelAdmin::EventChannel_ptr
Test_ECG::get_ec (CosNaming::NamingContext_ptr naming_context,
const char* process_name
ACE_ENV_ARG_DECL)
{
const int bufsize = 512;
char buf[bufsize];
ACE_OS::strcpy (buf, "EventChannel@");
ACE_OS::strcat (buf, process_name);
CosNaming::Name channel_name (1);
channel_name.length (1);
channel_name[0].id = CORBA::string_dup (buf);
CORBA::Object_var ec_ptr =
naming_context->resolve (channel_name ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (RtecEventChannelAdmin::EventChannel::_nil ());
if (CORBA::is_nil (ec_ptr.in ()))
return RtecEventChannelAdmin::EventChannel::_nil ();
return RtecEventChannelAdmin::EventChannel::_narrow (ec_ptr.in ()
ACE_ENV_ARG_PARAMETER);
}
void
Test_ECG::disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
{
this->suppliers_[i]->close (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
void
Test_ECG::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec
ACE_ENV_ARG_DECL)
{
int i;
for (i = 0; i < this->hp_suppliers_; ++i)
{
// Limit the number of messages sent by each supplier
int mc = this->hp_message_count_ / this->hp_suppliers_;
if (mc == 0)
mc = 1;
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "hp_supplier_%02d@%s", i, this->lcl_name_);
ACE_NEW (this->suppliers_[i],
Test_Supplier (this, this->suppliers_ + i));
this->suppliers_[i]->open (buf,
this->hps_event_a_,
this->hps_event_b_,
mc,
this->hp_interval_ * 10,
local_ec
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
for (; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
{
// Limit the number of messages sent by each supplier
int mc = this->lp_message_count_ / this->lp_suppliers_;
if (mc == 0)
mc = 1;
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "lp_supplier_%02d@%s",
i - this->hp_suppliers_, this->lcl_name_);
ACE_NEW (this->suppliers_[i],
Test_Supplier (this, this->suppliers_ + i));
this->suppliers_[i]->open (buf,
this->lps_event_a_,
this->lps_event_b_,
mc,
this->lp_interval_ * 10,
local_ec
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
Test_ECG::disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL)
{
for (int i = 0; i < this->hp_consumers_ + this->lp_consumers_; ++i)
{
this->consumers_[i]->close (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
void
Test_ECG::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec
ACE_ENV_ARG_DECL)
{
ACE_TRY
{
int i;
for (i = 0; i < this->hp_suppliers_; ++i)
{
// Limit the number of messages sent by each supplier
int mc = this->hp_message_count_ / this->hp_suppliers_;
if (mc == 0)
mc = 1;
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "hp_supplier_%02d@%s", i, this->lcl_name_);
this->suppliers_[i]->activate (buf,
this->hp_interval_ * 10,
local_ec
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
for (; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
{
// Limit the number of messages sent by each supplier
int mc = this->lp_message_count_ / this->lp_suppliers_;
if (mc == 0)
mc = 1;
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "lp_supplier_%02d@%s",
i - this->hp_suppliers_, this->lcl_name_);
this->suppliers_[i]->activate (buf,
this->lp_interval_ * 10,
local_ec
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
ACE_CATCHANY
{
ACE_RE_THROW;
}
ACE_ENDTRY;
}
void
Test_ECG::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec
ACE_ENV_ARG_DECL)
{
int i;
for (i = 0; i < this->hp_consumers_; ++i)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "hp_consumer_%02d@%s", i, this->lcl_name_);
ACE_NEW (this->consumers_[i],
Test_Consumer (this, this->consumers_ + i));
this->consumers_[i]->open (buf,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -