📄 client.cpp
字号:
continuous_workers);
ACE_Throughput_Stats::dump_throughput ("Collective", gsf,
this->time_for_test_,
this->collective_stats_.samples_count ());
}
}
int
Continuous_Worker::setup (ACE_ENV_SINGLE_ARG_DECL)
{
if (priority_setting == AFTER_THREAD_CREATION)
{
this->current_->the_priority (continuous_worker_priority
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
this->CORBA_priority_ =
this->current_->the_priority (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::Boolean result =
this->priority_mapping_.to_native (this->CORBA_priority_,
this->native_priority_);
if (!result)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in converting CORBA priority %d to native priority\n",
this->CORBA_priority_),
-1);
return
start_synchronization (this->test_.in (),
this->synchronizers_);
}
int
Continuous_Worker::svc (void)
{
ACE_TRY_NEW_ENV
{
ACE_Sample_History history (this->iterations_);
int result =
this->setup (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
if (result != 0)
return result;
for (CORBA::ULong i = 0;
i != history.max_samples () && !done;
++i)
{
ACE_hrtime_t start = ACE_OS::gethrtime ();
this->test_->method (work,
prime_number
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
ACE_hrtime_t end = ACE_OS::gethrtime ();
history.sample (end - start);
}
ACE_hrtime_t test_end = ACE_OS::gethrtime ();
end_synchronization (this->synchronizers_);
this->print_stats (history,
test_end);
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception caught:");
return -1;
}
ACE_ENDTRY;
return 0;
}
class Task : public ACE_Task_Base
{
public:
Task (ACE_Thread_Manager &thread_manager,
CORBA::ORB_ptr orb);
int svc (void);
CORBA::ORB_var orb_;
};
Task::Task (ACE_Thread_Manager &thread_manager,
CORBA::ORB_ptr orb)
: ACE_Task_Base (&thread_manager),
orb_ (CORBA::ORB::_duplicate (orb))
{
}
int
Task::svc (void)
{
Synchronizers synchronizers;
gsf = ACE_High_Res_Timer::global_scale_factor ();
ACE_TRY_NEW_ENV
{
CORBA::Object_var object =
this->orb_->string_to_object (ior ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
test_var test =
test::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
object =
this->orb_->resolve_initial_references ("RTCurrent"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
RTCORBA::Current_var current =
RTCORBA::Current::_narrow (object.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
object =
this->orb_->resolve_initial_references ("PriorityMappingManager"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
RTCORBA::PriorityMappingManager_var mapping_manager =
RTCORBA::PriorityMappingManager::_narrow (object.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
RTCORBA::PriorityMapping &priority_mapping =
*mapping_manager->mapping ();
ULong_Array rates;
int result =
get_values ("client",
rates_file,
"rates",
rates,
1);
if (result != 0)
return result;
ULong_Array invocation_priorities;
result =
get_values ("client",
invocation_priorities_file,
"invocation priorities",
invocation_priorities,
1);
if (result != 0)
return result;
if (invocation_priorities.size () != 0 &&
invocation_priorities.size () != rates.size ())
ACE_ERROR_RETURN ((LM_ERROR,
"Number of invocation priorities (%d) != Number of rates (%d)\n",
invocation_priorities.size (),
rates.size ()),
-1);
synchronizers.number_of_workers_ =
rates.size () + continuous_workers;
CORBA::ULong max_rate = 0;
result =
max_throughput (test.in (),
current.in (),
priority_mapping,
max_rate);
if (result != 0)
return result;
CORBA::Short priority_range =
RTCORBA::maxPriority - RTCORBA::minPriority;
ACE_Thread_Manager paced_workers_manager;
CORBA::ULong i = 0;
Paced_Worker **paced_workers =
new Paced_Worker *[rates.size ()];
for (i = 0;
i < rates.size ();
++i)
{
CORBA::Short priority = 0;
if (invocation_priorities.size () == 0)
priority =
CORBA::Short ((priority_range /
double (rates.size ())) *
(i + 1));
else
priority =
invocation_priorities[i];
paced_workers[i] =
new Paced_Worker (paced_workers_manager,
test.in (),
rates[i],
time_for_test * rates[i],
priority,
current.in (),
priority_mapping,
synchronizers);
}
ACE_Thread_Manager continuous_workers_manager;
Continuous_Worker continuous_worker (continuous_workers_manager,
test.in (),
max_rate * time_for_test,
current.in (),
priority_mapping,
synchronizers);
long flags =
THR_NEW_LWP |
THR_JOINABLE |
this->orb_->orb_core ()->orb_params ()->thread_creation_flags ();
CORBA::Short CORBA_priority =
continuous_worker_priority;
CORBA::Short native_priority;
CORBA::Boolean convert_result =
priority_mapping.to_native (CORBA_priority,
native_priority);
if (!convert_result)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in converting CORBA priority %d to native priority\n",
CORBA_priority),
-1);
int force_active = 0;
if (priority_setting == AT_THREAD_CREATION)
{
result =
continuous_worker.activate (flags,
continuous_workers,
force_active,
native_priority);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Continuous_Worker::activate failed\n"),
result);
}
else
{
result =
continuous_worker.activate (flags,
continuous_workers);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Continuous_Worker::activate failed\n"),
result);
}
flags =
THR_NEW_LWP |
THR_JOINABLE |
this->orb_->orb_core ()->orb_params ()->thread_creation_flags ();
for (i = 0;
i < rates.size ();
++i)
{
if (priority_setting == AT_THREAD_CREATION)
{
if (set_priority)
{
CORBA_priority =
paced_workers[i]->priority_;
convert_result =
priority_mapping.to_native (CORBA_priority,
native_priority);
if (!convert_result)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in converting CORBA priority %d to native priority\n",
CORBA_priority),
-1);
}
result =
paced_workers[i]->activate (flags,
1,
force_active,
native_priority);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Paced_Worker::activate failed\n"),
result);
}
else
{
result =
paced_workers[i]->activate (flags);
if (result != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Paced_Worker::activate failed\n"),
result);
}
}
if (rates.size () != 0)
{
paced_workers_manager.wait ();
}
continuous_workers_manager.wait ();
continuous_worker.print_collective_stats ();
for (i = 0;
i < rates.size ();
++i)
{
delete paced_workers[i];
}
delete[] paced_workers;
if (shutdown_server)
{
test->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception caught:");
return -1;
}
ACE_ENDTRY;
return 0;
}
int
main (int argc, char *argv[])
{
ACE_TRY_NEW_ENV
{
CORBA::ORB_var orb =
CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
int result =
parse_args (argc, argv);
if (result != 0)
return result;
// Make sure we can support multiple priorities that are required
// for this test.
check_supported_priorities (orb.in ());
// Thread Manager for managing task.
ACE_Thread_Manager thread_manager;
// Create task.
Task task (thread_manager,
orb.in ());
// Task activation flags.
long flags =
THR_NEW_LWP |
THR_JOINABLE |
orb->orb_core ()->orb_params ()->thread_creation_flags ();
// Activate task.
result =
task.activate (flags);
ACE_ASSERT (result != -1);
ACE_UNUSED_ARG (result);
// Wait for task to exit.
result =
thread_manager.wait ();
ACE_ASSERT (result != -1);
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception caught:");
return -1;
}
ACE_ENDTRY;
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Array_Base<CORBA::ULong>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Array_Base<CORBA::ULong>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -