consumer_handler.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 383 行

CPP
383
字号
// Consumer_Handler.cpp,v 1.21 2002/01/29 20:20:40 okellogg Exp

// ===========================================================
//
//
// = LIBRARY
//    TAO/examples/Callback_Quoter
//
// = FILENAME
//    Consumer_Input_Handler.cpp
//
// = DESCRIPTION
//    Implementation of the Consumer_Handler class.
//
// = AUTHOR
//    Kirthika Parameswaran <kirthika@cs.wustl.edu>
//
// ===========================================================

#include "Consumer_Handler.h"

#include "tao/ORB.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"

#include "ace/Read_Buffer.h"
#include "ace/Get_Opt.h"
#include "ace/Read_Buffer.h"
#include "ace/OS.h"
#include "ace/Reactor.h"
#include "ace/Event_Handler.h"

Consumer_Handler::Consumer_Handler (void)
  : stock_name_ ("Unknown"),
    threshold_value_ (0),
    server_ (),
    registered_ (0),
    unregistered_ (0),
    ior_ (0),
    shutdown_ (0),
    use_naming_service_ (1),
    interactive_ (1)
{

}

Consumer_Handler::~Consumer_Handler (void)
{
  // Make sure to cleanup the STDIN handler.

  if (this->interactive_ == 1)
    {
      if (ACE_Event_Handler::remove_stdin_handler
          (this->orb_->orb_core ()->reactor (),
           this->orb_->orb_core ()->thr_mgr ()) == -1)
        ACE_ERROR ((LM_ERROR,
                    "%p\n",
                    "remove_stdin_handler"));
    }
}

// Reads the Server factory IOR from a file.

int
Consumer_Handler::read_ior (char *filename)
{
  // Open the file for reading.
  ACE_HANDLE f_handle = ACE_OS::open (filename, 0);

  if (f_handle == ACE_INVALID_HANDLE)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "Unable to open %s for reading: %p\n",
                       filename),
                      -1);

  ACE_Read_Buffer ior_buffer (f_handle);
  char *data = ior_buffer.read ();

  if (data == 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "Unable to read ior: %p\n"),
                      -1);

  this->ior_ = ACE_OS::strdup (data);
  ior_buffer.alloc ()->free (data);

  ACE_OS::close (f_handle);

  return 0;
}

// Parses the command line arguments and returns an error status.

int
Consumer_Handler::parse_args (void)
{
  ACE_Get_Opt get_opts (argc_, argv_, "a:t:d:f:xk:xs");
  int c;
  int result;

  while ((c = get_opts ()) != -1)
    switch (c)
      {
      case 'd':  // debug flag
        TAO_debug_level++; //****
        break;

      case 'k':  // ior provide on command line
        this->ior_ = ACE_OS::strdup (get_opts.opt_arg ());
        break;

      case 'f': // read the IOR from the file.
        result = this->read_ior (get_opts.opt_arg ());
        if (result < 0)
          ACE_ERROR_RETURN ((LM_ERROR,
                             "Unable to read ior from %s : %p\n",
                             get_opts.opt_arg ()),
                            -1);
        break;

      case 's': // don't use the naming service
        this->use_naming_service_ = 0;
        break;

      case 'a': // to be given only on using run_test.pl
        this->stock_name_ = get_opts.opt_arg ();
        this->interactive_ = 0;
        break;

      case 't':
        this->threshold_value_ = ACE_OS::atoi (get_opts.opt_arg ());
        break;


      case 'x':
        this->shutdown_ = 1;
        break;

      case '?':
      default:
        ACE_ERROR_RETURN ((LM_ERROR,
                           "usage:  %s"
                           " [-d]"
                           " [-f ior-file]"
                           " [-k ior]"
                           " [-x]"
                           " [-s]"
                           " [-a stock_name]"
                           " [-t threshold]"
                           "\n",
                           this->argv_ [0]),
                          -1);
      }

  // Indicates successful parsing of command line.
  return 0;
}

// this method uses the naming service to obtain the server object refernce.

int
Consumer_Handler::via_naming_service (void)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      // Initialization of the naming service.
      if (naming_services_client_.init (orb_.in ()) != 0)
        ACE_ERROR_RETURN ((LM_ERROR,
                           " (%P|%t) Unable to initialize "
                           "the TAO_Naming_Client. \n"),
                          -1);

      CosNaming::Name notifier_ref_name (1);
      notifier_ref_name.length (1);
      notifier_ref_name[0].id = CORBA::string_dup ("Notifier");

      CORBA::Object_var notifier_obj =
        this->naming_services_client_->resolve (notifier_ref_name
                                                ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // The CORBA::Object_var object is downcast to Notifier_var using
      // the <_narrow> method.
      this->server_ =
        Notifier::_narrow (notifier_obj.in ()
                           ACE_ENV_ARG_PARAMETER);

      ACE_TRY_CHECK;

    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer_Handler::via_naming_service\n");
      return -1;
    }
  ACE_ENDTRY;
  ACE_CHECK_RETURN (-1);

  return 0;
}

// Init function.
int
Consumer_Handler::init (int argc, char **argv)
{

  this->argc_ = argc;
  this->argv_ = argv;

  // Register our <Input_Handler> to handle STDIN events, which will
  // trigger the <handle_input> method to process these events.

  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      // Retrieve the ORB.
      this->orb_ = CORBA::ORB_init (this->argc_,
                                    this->argv_,
                                    0
                                    ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;


      // Parse command line and verify parameters.
      if (this->parse_args () == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "parse_args failed\n"),
                          -1);

      if (this->interactive_ == 1)
        {
          ACE_DEBUG ((LM_DEBUG,
                      " Services provided:\n "
                      " * Registration <type 'r'>\n "
                      " * Unregistration <type 'u'>\n "
                      " * Quit <type 'q'>\n "));

          ACE_NEW_RETURN (consumer_input_handler_,
                          Consumer_Input_Handler (this),
                          -1);

          if (ACE_Event_Handler::register_stdin_handler
              (consumer_input_handler_,
               this->orb_->orb_core ()->reactor (),
               this->orb_->orb_core ()->thr_mgr ()) == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "%p\n",
                               "register_stdin_handler"),
                              -1);

          // Register the signal event handler for ^C
          ACE_NEW_RETURN (consumer_signal_handler_,
                          Consumer_Signal_Handler (this),
                          -1);

          if (this->reactor_used ()->register_handler
              (SIGINT,
               consumer_signal_handler_) == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "%p\n",
                               "register_handler for SIGINT"),
                              -1);
        }
      // use the naming service.
      if (this->use_naming_service_)
        {
          if (via_naming_service () == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                                "via_naming_service failed\n"),
                              -1);
         }
      else
        {

          if (this->ior_ == 0)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "%s: no ior specified\n",
                               this->argv_[0]),
                              -1);

          CORBA::Object_var server_object =
            this->orb_->string_to_object (this->ior_
                                          ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;

          if (CORBA::is_nil (server_object.in ()))
            ACE_ERROR_RETURN ((LM_ERROR,
                               "invalid ior <%s>\n",
                               this->ior_),
                              -1);
          // The downcasting from CORBA::Object_var to Notifier_var is
          // done using the <_narrow> method.
          this->server_ = Notifier::_narrow (server_object.in ()
                                             ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }

    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "Consumer_Handler::init");
      return -1;
    }
  ACE_ENDTRY;
  ACE_CHECK_RETURN (-1);

  return 0;
}

int
Consumer_Handler::run (void)
{

  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      // Obtain and activate the RootPOA.
     CORBA::Object_var obj =
            this->orb_->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
     ACE_TRY_CHECK;

     PortableServer::POA_var root_poa =
        PortableServer::POA::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
     ACE_TRY_CHECK;

     PortableServer::POAManager_var poa_manager=
       root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
     ACE_TRY_CHECK;

     poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
     ACE_TRY_CHECK;

     ACE_NEW_RETURN (this->consumer_servant_,
                     Consumer_i (),
                     -1);
     // Set the orb in the consumer_ object.
     this->consumer_servant_->orb (this->orb_.in ());

     // Get the consumer stub (i.e consumer object) pointer.
     this->consumer_var_ =
       this->consumer_servant_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (this->interactive_ == 0)
        {

          // Register with the server.
          this->server_->register_callback (this->stock_name_,
                                            this->threshold_value_,
                                            this->consumer_var_.in ()
                                            ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;

          // Note the registration.
          this->registered_ = 1;
          this->unregistered_ = 0;

          ACE_DEBUG ((LM_DEBUG,
                      "registeration done!\n"));
        }

      // Run the ORB.
      this->orb_->run (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer_Handler::init");
      return -1;
    }
  ACE_ENDTRY;

  return 0;
}

ACE_Reactor *
Consumer_Handler::reactor_used (void) const
{
  return this->orb_->orb_core ()->reactor ();
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?