dualec_sup.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 1,211 行 · 第 1/3 页
CPP
1,211 行
// DualEC_Sup.cpp,v 1.25 2003/11/04 05:21:30 dhinton Exp
// ============================================================================
//
// = FILENAME
// DualEC_Sup.cpp
//
// = DESCRIPTION
// Event Supplier for visualizing scheduling behavior, using arrival
// and dispatch data logged by an event channel dispatch command object
//
// = AUTHOR
// Chris Gill (cdgill@cs.wustl.edu)
//
// Adapted from the DOVE simulation event supplier
// originally
// David Levine (levine@cs.wustl.edu) and
// Tim Harrison (harrison@cs.wustl.edu)
// modified
// Michael Kircher (mk1@cs.wustl.edu)
//
// ============================================================================
#include "DualEC_Sup.h"
#include "NavWeapC.h"
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
#include "orbsvcs/orbsvcs/Sched/Config_Scheduler.h"
#include "orbsvcs/orbsvcs/Runtime_Scheduler.h"
#include "orbsvcs/RtecEventChannelAdminC.h"
#include "tao/PortableServer/ORB_Manager.h"
#include "tao/ORB_Core.h"
#include "ace/Get_Opt.h"
#include "ace/Sched_Params.h"
#include "ace/OS_NS_errno.h"
ACE_RCSID (Event_Supplier,
DualEC_Sup,
"DualEC_Sup.cpp,v 1.25 2003/11/04 05:21:30 dhinton Exp")
static const char usage [] =
"[[-?]\n"
" -f <name of input data file>\n"
" [-O[RBport] ORB port number]\n"
" [-m <count> of messages to send (2000)]\n"
" [-b <count> at which to break navigation event\n"
" stream out onto its own channel (1000)]\n"
" [-n <usec> pause between navigation events (100000)]\n"
" [-w <usec> pause between weapons events (100000)]\n"
" [-d to dump scheduler header files]\n"
" [-s to suppress data updates by EC]\n"
" [-r to use runtime schedulers]\n"
" [-p to suppress prioritization of operations]\n";
DualEC_Supplier::DualEC_Supplier (int argc, char** argv)
: nav_pause_ (0, 100000),
weap_pause_ (0, 100000),
channel_hi_name_ (1),
channel_lo_name_ (1),
sched_hi_name_ (1),
sched_lo_name_ (1),
sched_hi_impl_ (0),
sched_lo_impl_ (0),
ec_hi_impl_ (0),
ec_lo_impl_ (0),
argc_(argc),
argv_(argv),
total_messages_ (2000),
break_count_(-1),
input_file_name_(0),
update_data_ (1),
dump_schedule_headers_ (0),
use_runtime_schedulers_ (0),
suppress_priority_ (0),
hi_schedule_file_name_ ("DualEC_Runtime_Hi.h"),
lo_schedule_file_name_ ("DualEC_Runtime_Lo.h"),
nav_roll_ (0),
nav_pitch_ (0)
{
ACE_TRY_NEW_ENV
{
this->sched_hi_name_.length (1);
this->sched_hi_name_[0].id = CORBA::string_dup ("DUAL_SCHED_HI");
ACE_TRY_CHECK;
this->sched_lo_name_.length (1);
this->sched_lo_name_[0].id = CORBA::string_dup ("DUAL_SCHED_LO");
ACE_TRY_CHECK;
this->channel_hi_name_.length (1);
this->channel_hi_name_[0].id = CORBA::string_dup ("DUAL_EC_HI");
ACE_TRY_CHECK;
this->channel_lo_name_.length (1);
this->channel_lo_name_[0].id = CORBA::string_dup ("DUAL_EC_LO");
ACE_TRY_CHECK;
this->terminator_ = terminator_impl_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"DualEC_Supplier::DualEC_Supplier : could "
"not resolve reference to terminator");
}
ACE_ENDTRY;
// Initialize the high priority RT_Info data
rt_info_nav_hi_.entry_point = "DUALEC_NAV_HI";
rt_info_nav_hi_.criticality = RtecScheduler::VERY_HIGH_CRITICALITY;
rt_info_nav_hi_.worst_case_execution_time = ORBSVCS_Time::zero ();
rt_info_nav_hi_.typical_execution_time = ORBSVCS_Time::zero ();
rt_info_nav_hi_.cached_execution_time = ORBSVCS_Time::zero ();
rt_info_nav_hi_.period = 2500000;
rt_info_nav_hi_.importance = RtecScheduler::VERY_HIGH_IMPORTANCE;
rt_info_nav_hi_.quantum = ORBSVCS_Time::zero ();
rt_info_nav_hi_.threads = 1;
rt_info_nav_hi_.info_type = RtecScheduler::OPERATION;
rt_info_weap_hi_ = rt_info_nav_hi_;
rt_info_weap_hi_.entry_point = "DUALEC_WEAP_HI";
rt_info_dummy_hi_ = rt_info_nav_hi_;
rt_info_dummy_hi_.entry_point = "DUALEC_DUMMY_HI";
// Initialize the low priority RT_Info data
rt_info_nav_lo_.entry_point = "DUALEC_NAV_LO";
rt_info_nav_lo_.criticality = RtecScheduler::VERY_LOW_CRITICALITY;
rt_info_nav_lo_.worst_case_execution_time = ORBSVCS_Time::zero ();
rt_info_nav_lo_.typical_execution_time = ORBSVCS_Time::zero ();
rt_info_nav_lo_.cached_execution_time = ORBSVCS_Time::zero ();
rt_info_nav_lo_.period = 10000000;
rt_info_nav_lo_.importance = RtecScheduler::VERY_LOW_IMPORTANCE;
rt_info_nav_lo_.quantum = ORBSVCS_Time::zero ();
rt_info_nav_lo_.threads = 1;
rt_info_nav_lo_.info_type = RtecScheduler::OPERATION;
rt_info_weap_lo_ = rt_info_nav_lo_;
rt_info_weap_lo_.entry_point = "DUALEC_WEAP_LO";
rt_info_dummy_lo_ = rt_info_nav_lo_;
rt_info_dummy_lo_.entry_point = "DUALEC_DUMMY_LO";
}
DualEC_Supplier::~DualEC_Supplier ()
{
ACE_TRY_NEW_ENV
{
this->navigation_Supplier_.disconnect ();
this->weapons_Supplier_.disconnect ();
// Unbind the schedulers from the NS.
this->naming_context_->unbind (this->sched_hi_name_ ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->naming_context_->unbind (this->sched_lo_name_ ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Unbind the ECs from the NS.
this->naming_context_->unbind (this->channel_hi_name_ ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->naming_context_->unbind (this->channel_lo_name_ ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"DualEC_Supplier::~DualEC_Supplier");
}
ACE_ENDTRY;
// @@TBD - destroy the ECs
// @@TBD - destroy the schedulers
}
int
DualEC_Supplier::init ()
{
this->get_options (argc_, argv_);
ACE_TRY_NEW_ENV
{
// Connect to the RootPOA.
CORBA::Object_var poaObject_var =
TAO_ORB_Core_instance()->orb()->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (poaObject_var.in ()))
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to initialize the POA.\n"),
1);
this->root_POA_var_ =
PortableServer::POA::_narrow (poaObject_var.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->poa_manager_ =
root_POA_var_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
poa_manager_->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// Get the Naming Service object reference.
CORBA::Object_var namingObj_var =
TAO_ORB_Core_instance()->orb()->resolve_initial_references (
"NameService"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (namingObj_var.in ()))
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to get the Naming Service.\n"),
-1);
this->naming_context_ =
CosNaming::NamingContext::_narrow (namingObj_var.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"DualEC_Supplier::init");
return -1;
}
ACE_ENDTRY;
// Create two scheduling service instances.
if (this->create_schedulers () == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Could not create schedulers"),
-1);
}
// Create two event channels.
if (this->create_event_channels () == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Could not create event channels"),
-1);
}
// Connect suppliers to the respective event channels.
ACE_Scheduler_Factory::POD_RT_Info * rt_info_nav_hi =
(suppress_priority_) ? 0 : &rt_info_nav_hi_;
ACE_Scheduler_Factory::POD_RT_Info * rt_info_weap_hi =
(suppress_priority_) ? 0 : &rt_info_weap_hi_;
ACE_Scheduler_Factory::POD_RT_Info * rt_info_nav_lo =
(suppress_priority_) ? 0 : &rt_info_nav_lo_;
ACE_Scheduler_Factory::POD_RT_Info * rt_info_weap_lo =
(suppress_priority_) ? 0 : &rt_info_weap_lo_;
if (this->navigation_Supplier_.connect ("MIB_unknown",
"DUAL_EC_HI",
"DUAL_SCHED_HI",
rt_info_nav_hi) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Could not connect navigation supplier to DUAL_EC_HI"),
-1);
}
if (this->navigation_Supplier_.connect ("MIB_unknown",
"DUAL_EC_LO",
"DUAL_SCHED_LO",
rt_info_nav_lo) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Could not connect navigation supplier to DUAL_EC_LO"),
-1);
}
if (this->weapons_Supplier_.connect ("MIB_unknown",
"DUAL_EC_HI",
"DUAL_SCHED_HI",
rt_info_weap_hi) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Could not connect weapons supplier to DUAL_EC_HI"),
-1);
}
if (this->weapons_Supplier_.connect ("MIB_unknown",
"DUAL_EC_LO",
"DUAL_SCHED_LO",
rt_info_weap_lo) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"Could not connect weapons supplier to DUAL_EC_LO"),
-1);
}
return 0;
}
// Private class that implements a termination servant.
void
DualEC_Supplier::Terminator::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
TAO_ORB_Core_instance ()->orb ()->shutdown ();
}
// Run the ORB event loop.
ACE_THR_FUNC_RETURN
DualEC_Supplier::run_orb (void *)
{
TAO_ORB_Core_instance ()->orb ()->run ();
return 0;
}
// Run navigation event generation thread.
ACE_THR_FUNC_RETURN
DualEC_Supplier::run_nav_thread (void *arg)
{
DualEC_Supplier * sup =
ACE_static_cast (DualEC_Supplier *, arg);
ACE_TRY_NEW_ENV
{
ACE_Unbounded_Queue_Iterator<Navigation *>
nav_iter (sup->navigation_data_);
if (nav_iter.done ())
{
ACE_ERROR_RETURN ((LM_ERROR,
"DualEC_Supplier::run_event_thread: "
"there is no navigation data\n"), 0);
}
CORBA::Any any;
long total_sent = 0;
do
{
// Insert the event data
Navigation **nav;
if ((nav_iter.next (nav)) && (nav) && (*nav))
{
any <<= *nav;
// Sleep briefly to avoid too much livelock (a little is good).
ACE_OS::sleep (sup->nav_pause_);
// If the break count has been reached, change the
// channel that is being used by the NAV supplier
if (total_sent == sup->break_count_)
{
ACE_DEBUG ((LM_DEBUG,
"breaking out nav at event: %d\n",
sup->break_count_));
sup->navigation_Supplier_.use_next_connection ();
}
sup->navigation_Supplier_.notify (any);
ACE_TRY_CHECK;
}
else
{
ACE_ERROR ((LM_ERROR,
"DualEC_Supplier::run_nav_thread:"
"Could Not access navigation data"));
}
if (total_sent < 5)
ACE_DEBUG ((LM_DEBUG,
"Pushing event data.\n"));
else if (total_sent == 5)
ACE_DEBUG ((LM_DEBUG,
"Everything is running. Going to be mute.\n"));
nav_iter.advance ();
if (nav_iter.done ())
nav_iter.first ();
}
while (++total_sent < sup->total_messages_);
}
ACE_CATCHANY
{
}
ACE_ENDTRY;
return 0;
}
// Run weapons event generation thread.
ACE_THR_FUNC_RETURN
DualEC_Supplier::run_weap_thread (void *arg)
{
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?