dualec_sup.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 1,211 行 · 第 1/3 页

CPP
1,211
字号
// DualEC_Sup.cpp,v 1.25 2003/11/04 05:21:30 dhinton Exp

// ============================================================================
//
// = FILENAME
//    DualEC_Sup.cpp
//
// = DESCRIPTION
//   Event Supplier for visualizing scheduling behavior, using arrival
//   and dispatch data logged by an event channel dispatch command object
//
// = AUTHOR
//    Chris Gill (cdgill@cs.wustl.edu)
//
//    Adapted from the DOVE simulation event supplier
//    originally
//    David Levine (levine@cs.wustl.edu) and
//    Tim Harrison (harrison@cs.wustl.edu)
//    modified
//    Michael Kircher (mk1@cs.wustl.edu)
//
// ============================================================================

#include "DualEC_Sup.h"
#include "NavWeapC.h"

#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
#include "orbsvcs/orbsvcs/Sched/Config_Scheduler.h"
#include "orbsvcs/orbsvcs/Runtime_Scheduler.h"
#include "orbsvcs/RtecEventChannelAdminC.h"

#include "tao/PortableServer/ORB_Manager.h"
#include "tao/ORB_Core.h"

#include "ace/Get_Opt.h"
#include "ace/Sched_Params.h"
#include "ace/OS_NS_errno.h"

ACE_RCSID (Event_Supplier, 
           DualEC_Sup, 
           "DualEC_Sup.cpp,v 1.25 2003/11/04 05:21:30 dhinton Exp")

static const char usage [] =
"[[-?]\n"
"                 -f <name of input data file>\n"
"                 [-O[RBport] ORB port number]\n"
"                 [-m <count> of messages to send (2000)]\n"
"                 [-b <count> at which to break navigation event\n"
"                             stream out onto its own channel (1000)]\n"
"                 [-n <usec> pause between navigation events (100000)]\n"
"                 [-w <usec> pause between weapons events (100000)]\n"
"                 [-d to dump scheduler header files]\n"
"                 [-s to suppress data updates by EC]\n"
"                 [-r to use runtime schedulers]\n"
"                 [-p to suppress prioritization of operations]\n";

DualEC_Supplier::DualEC_Supplier (int argc, char** argv)
: nav_pause_ (0, 100000),
  weap_pause_ (0, 100000),
  channel_hi_name_ (1),
  channel_lo_name_ (1),
  sched_hi_name_ (1),
  sched_lo_name_ (1),
  sched_hi_impl_ (0),
  sched_lo_impl_ (0),
  ec_hi_impl_ (0),
  ec_lo_impl_ (0),
  argc_(argc),
  argv_(argv),
  total_messages_ (2000),
  break_count_(-1),
  input_file_name_(0),
  update_data_ (1),
  dump_schedule_headers_ (0),
  use_runtime_schedulers_ (0),
  suppress_priority_ (0),
  hi_schedule_file_name_ ("DualEC_Runtime_Hi.h"),
  lo_schedule_file_name_ ("DualEC_Runtime_Lo.h"),
  nav_roll_ (0),
  nav_pitch_ (0)
{
  ACE_TRY_NEW_ENV
    {
      this->sched_hi_name_.length (1);
      this->sched_hi_name_[0].id = CORBA::string_dup ("DUAL_SCHED_HI");
      ACE_TRY_CHECK;

      this->sched_lo_name_.length (1);
      this->sched_lo_name_[0].id = CORBA::string_dup ("DUAL_SCHED_LO");
      ACE_TRY_CHECK;

      this->channel_hi_name_.length (1);
      this->channel_hi_name_[0].id = CORBA::string_dup ("DUAL_EC_HI");
      ACE_TRY_CHECK;

      this->channel_lo_name_.length (1);
      this->channel_lo_name_[0].id = CORBA::string_dup ("DUAL_EC_LO");
      ACE_TRY_CHECK;

      this->terminator_ = terminator_impl_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
  {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "DualEC_Supplier::DualEC_Supplier : could "
                           "not resolve reference to terminator");
  }
  ACE_ENDTRY;

  // Initialize the high priority RT_Info data
  rt_info_nav_hi_.entry_point = "DUALEC_NAV_HI";
  rt_info_nav_hi_.criticality = RtecScheduler::VERY_HIGH_CRITICALITY;
  rt_info_nav_hi_.worst_case_execution_time = ORBSVCS_Time::zero ();
  rt_info_nav_hi_.typical_execution_time = ORBSVCS_Time::zero ();
  rt_info_nav_hi_.cached_execution_time = ORBSVCS_Time::zero ();
  rt_info_nav_hi_.period = 2500000;
  rt_info_nav_hi_.importance = RtecScheduler::VERY_HIGH_IMPORTANCE;
  rt_info_nav_hi_.quantum = ORBSVCS_Time::zero ();
  rt_info_nav_hi_.threads = 1;
  rt_info_nav_hi_.info_type = RtecScheduler::OPERATION;
  rt_info_weap_hi_ = rt_info_nav_hi_;
  rt_info_weap_hi_.entry_point = "DUALEC_WEAP_HI";
  rt_info_dummy_hi_ = rt_info_nav_hi_;
  rt_info_dummy_hi_.entry_point = "DUALEC_DUMMY_HI";

  // Initialize the low priority RT_Info data
  rt_info_nav_lo_.entry_point = "DUALEC_NAV_LO";
  rt_info_nav_lo_.criticality = RtecScheduler::VERY_LOW_CRITICALITY;
  rt_info_nav_lo_.worst_case_execution_time = ORBSVCS_Time::zero ();
  rt_info_nav_lo_.typical_execution_time = ORBSVCS_Time::zero ();
  rt_info_nav_lo_.cached_execution_time = ORBSVCS_Time::zero ();
  rt_info_nav_lo_.period = 10000000;
  rt_info_nav_lo_.importance = RtecScheduler::VERY_LOW_IMPORTANCE;
  rt_info_nav_lo_.quantum = ORBSVCS_Time::zero ();
  rt_info_nav_lo_.threads = 1;
  rt_info_nav_lo_.info_type = RtecScheduler::OPERATION;
  rt_info_weap_lo_ = rt_info_nav_lo_;
  rt_info_weap_lo_.entry_point = "DUALEC_WEAP_LO";
  rt_info_dummy_lo_ = rt_info_nav_lo_;
  rt_info_dummy_lo_.entry_point = "DUALEC_DUMMY_LO";
}

DualEC_Supplier::~DualEC_Supplier ()
{
  ACE_TRY_NEW_ENV
    {
      this->navigation_Supplier_.disconnect ();
      this->weapons_Supplier_.disconnect ();

      // Unbind the schedulers from the NS.
      this->naming_context_->unbind (this->sched_hi_name_ ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      this->naming_context_->unbind (this->sched_lo_name_ ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Unbind the ECs from the NS.
      this->naming_context_->unbind (this->channel_hi_name_ ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      this->naming_context_->unbind (this->channel_lo_name_ ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "DualEC_Supplier::~DualEC_Supplier");
    }
  ACE_ENDTRY;

  // @@TBD - destroy the ECs
  // @@TBD - destroy the schedulers
}

int
DualEC_Supplier::init ()
{
  this->get_options (argc_, argv_);

  ACE_TRY_NEW_ENV
  {
    // Connect to the RootPOA.
    CORBA::Object_var poaObject_var =
      TAO_ORB_Core_instance()->orb()->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
    ACE_TRY_CHECK;

    if (CORBA::is_nil (poaObject_var.in ()))
      ACE_ERROR_RETURN ((LM_ERROR,
                        " (%P|%t) Unable to initialize the POA.\n"),
                        1);

    this->root_POA_var_ =
      PortableServer::POA::_narrow (poaObject_var.in () ACE_ENV_ARG_PARAMETER);
    ACE_TRY_CHECK;

    this->poa_manager_ =
       root_POA_var_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
    ACE_TRY_CHECK;

    poa_manager_->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
    ACE_TRY_CHECK;

    // Get the Naming Service object reference.
    CORBA::Object_var namingObj_var =
      TAO_ORB_Core_instance()->orb()->resolve_initial_references (
          "NameService"
          ACE_ENV_ARG_PARAMETER);
    ACE_TRY_CHECK;

    if (CORBA::is_nil (namingObj_var.in ()))
      ACE_ERROR_RETURN ((LM_ERROR,
                        " (%P|%t) Unable to get the Naming Service.\n"),
                        -1);

    this->naming_context_ =
      CosNaming::NamingContext::_narrow (namingObj_var.in ()
                                       ACE_ENV_ARG_PARAMETER);
    ACE_TRY_CHECK;

  }
  ACE_CATCHANY
  {
    ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                         "DualEC_Supplier::init");
    return -1;
  }
  ACE_ENDTRY;

  // Create two scheduling service instances.
  if (this->create_schedulers () == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Could not create schedulers"),
                        -1);
    }

  // Create two event channels.
  if (this->create_event_channels () == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Could not create event channels"),
                        -1);
    }

  // Connect suppliers to the respective event channels.
  ACE_Scheduler_Factory::POD_RT_Info * rt_info_nav_hi =
    (suppress_priority_) ? 0 : &rt_info_nav_hi_;
  ACE_Scheduler_Factory::POD_RT_Info * rt_info_weap_hi =
    (suppress_priority_) ? 0 : &rt_info_weap_hi_;
  ACE_Scheduler_Factory::POD_RT_Info * rt_info_nav_lo =
    (suppress_priority_) ? 0 : &rt_info_nav_lo_;
  ACE_Scheduler_Factory::POD_RT_Info * rt_info_weap_lo =
    (suppress_priority_) ? 0 : &rt_info_weap_lo_;

  if (this->navigation_Supplier_.connect ("MIB_unknown",
                                          "DUAL_EC_HI",
                                          "DUAL_SCHED_HI",
                                           rt_info_nav_hi) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Could not connect navigation supplier to DUAL_EC_HI"),
                        -1);
    }

 if (this->navigation_Supplier_.connect ("MIB_unknown",
                                          "DUAL_EC_LO",
                                          "DUAL_SCHED_LO",
                                           rt_info_nav_lo) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Could not connect navigation supplier to DUAL_EC_LO"),
                        -1);
    }

  if (this->weapons_Supplier_.connect ("MIB_unknown",
                                       "DUAL_EC_HI",
                                       "DUAL_SCHED_HI",
                                       rt_info_weap_hi) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Could not connect weapons supplier to DUAL_EC_HI"),
                        -1);
    }

 if (this->weapons_Supplier_.connect ("MIB_unknown",
                                      "DUAL_EC_LO",
                                      "DUAL_SCHED_LO",
                                      rt_info_weap_lo) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Could not connect weapons supplier to DUAL_EC_LO"),
                        -1);
    }

  return 0;
}

// Private class that implements a termination servant.

void
DualEC_Supplier::Terminator::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  TAO_ORB_Core_instance ()->orb ()->shutdown ();
}


// Run the ORB event loop.

ACE_THR_FUNC_RETURN
DualEC_Supplier::run_orb (void *)
{
  TAO_ORB_Core_instance ()->orb ()->run ();
  return 0;
}


// Run navigation event generation thread.

ACE_THR_FUNC_RETURN
DualEC_Supplier::run_nav_thread (void *arg)
{
  DualEC_Supplier * sup =
    ACE_static_cast (DualEC_Supplier *, arg);

  ACE_TRY_NEW_ENV
    {
      ACE_Unbounded_Queue_Iterator<Navigation *>
        nav_iter (sup->navigation_data_);

      if (nav_iter.done ())
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             "DualEC_Supplier::run_event_thread: "
                             "there is no navigation data\n"), 0);
        }

      CORBA::Any any;

      long total_sent = 0;

      do
      {
        // Insert the event data
        Navigation **nav;

        if ((nav_iter.next (nav)) && (nav) && (*nav))
          {
            any <<= *nav;

            // Sleep briefly to avoid too much livelock (a little is good).
            ACE_OS::sleep (sup->nav_pause_);

            // If the break count has been reached, change the
            // channel that is being used by the NAV supplier
            if (total_sent == sup->break_count_)
              {
               ACE_DEBUG ((LM_DEBUG,
                           "breaking out nav at event: %d\n",
                           sup->break_count_));

                sup->navigation_Supplier_.use_next_connection ();
              }

            sup->navigation_Supplier_.notify (any);
            ACE_TRY_CHECK;
          }
        else
          {
            ACE_ERROR ((LM_ERROR,
                        "DualEC_Supplier::run_nav_thread:"
                        "Could Not access navigation data"));
          }

        if (total_sent < 5)
          ACE_DEBUG ((LM_DEBUG,
                      "Pushing event data.\n"));
        else if (total_sent == 5)
          ACE_DEBUG ((LM_DEBUG,
                      "Everything is running. Going to be mute.\n"));

        nav_iter.advance ();

        if (nav_iter.done ())
          nav_iter.first ();

      }
      while (++total_sent < sup->total_messages_);

    }
  ACE_CATCHANY
    {
    }
  ACE_ENDTRY;

  return 0;
}


// Run weapons event generation thread.

ACE_THR_FUNC_RETURN
DualEC_Supplier::run_weap_thread (void *arg)
{

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?