supplier_client.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 273 行

CPP
273
字号
// Supplier_Client.cpp,v 1.6 2003/11/02 23:27:22 dhinton Exp

#include "Supplier_Client.h"

#include "ORB_Run_Task.h"
#include "ace/Arg_Shifter.h"
#include "tao/ORB_Core.h"
#include "ace/Sched_Params.h"
#include "Supplier.h"
#include "orbsvcs/orbsvcs/NotifyExtC.h"
#include "orbsvcs/orbsvcs/CosNamingC.h"
#include "ace/OS_NS_errno.h"

ACE_RCSID (Notify, TAO_Notify_Lanes_Supplier_Client, "Supplier_Client.cpp,v 1.6 2003/11/02 23:27:22 dhinton Exp")

TAO_Notify_Lanes_Supplier_Client::TAO_Notify_Lanes_Supplier_Client (TAO_Notify_ORB_Objects& orb_objects)
  : orb_objects_ (orb_objects)
    ,supplier_ (0)
    , consumer_count_ (2)
{
}

TAO_Notify_Lanes_Supplier_Client::~TAO_Notify_Lanes_Supplier_Client ()
{
}

int
TAO_Notify_Lanes_Supplier_Client::parse_args (int argc, char *argv[])
{
  ACE_Arg_Shifter arg_shifter (argc, argv);

  const ACE_TCHAR *current_arg = 0;

  while (arg_shifter.is_anything_left ())
    {
      if ((current_arg = arg_shifter.get_the_parameter (ACE_LIB_TEXT("-Consumers")))) // Number of consumers that we need to send an event to.
        {
          if (current_arg != 0)
            {
              this->consumer_count_ = ACE_OS::atoi (current_arg);
            }

          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter (ACE_LIB_TEXT("-IORoutput")))) // The file to output the supplier ior to.
        {
          if (current_arg != 0)
            {
              this->ior_file_name_ = current_arg;
            }

          arg_shifter.consume_arg ();
        }
      else
        {
          arg_shifter.ignore_arg ();
        }
    }

  return 0;
}

void
TAO_Notify_Lanes_Supplier_Client::initialize (ACE_ENV_SINGLE_ARG_DECL)
{
  PortableServer::POAManager_var poa_manager =
    this->orb_objects_.root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  CosNotifyChannelAdmin::EventChannel_var ec = this->create_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // Create a Supplier Admin
  CosNotifyChannelAdmin::AdminID adminid = 0;

  CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
    ec->new_for_suppliers (CosNotifyChannelAdmin::AND_OP, adminid ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  ACE_ASSERT (!CORBA::is_nil (supplier_admin.in ()));

  // Create a Supplier
  this->supplier_ = new TAO_Notify_Lanes_Supplier (this->orb_objects_);

  // Initialize it.
  this->supplier_->init (supplier_admin, this->consumer_count_ ACE_ENV_ARG_PARAMETER);
}

CosNotifyChannelAdmin::EventChannel_ptr
TAO_Notify_Lanes_Supplier_Client::create_ec (ACE_ENV_SINGLE_ARG_DECL)
{
  CosNotifyChannelAdmin::EventChannel_var ec;

  CosNotifyChannelAdmin::EventChannelFactory_var ecf = this->orb_objects_.notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (ec._retn ());

  // Create an EventChannel
  CosNotification::QoSProperties qos;
  CosNotification::AdminProperties admin;

  // Create an event channel
  CosNotifyChannelAdmin::ChannelID id;

  ec = ecf->create_channel (qos,
                            admin,
                            id
                            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (ec._retn ());

  // Set the Qos : 2 Lanes
  NotifyExt::ThreadPoolLanesParams tpl_params;

  tpl_params.priority_model = NotifyExt::CLIENT_PROPAGATED;
  tpl_params.server_priority = 0;
  tpl_params.stacksize = 0;
  tpl_params.lanes.length (this->consumer_count_ + 1);
  tpl_params.allow_borrowing = 0;
  tpl_params.allow_request_buffering = 0;
  tpl_params.max_buffered_requests = 0;
  tpl_params.max_request_buffer_size = 0;

  /*
   * Note that we actually create 1 extra Lane.
   * The extra Lane at priority 0 is created to match the priority 0 of the supplier thread.
   * As the ProxyConsumer is activated in an RT POA with lanes, each invocation must mach some lane.
   * Now, we typically reserve higer priorities to make requests and the lowest priority 0 for administrative calls
   * e.g. <subscription_change>. If we do not have a lane at the lowest 0 priority, then the invocation made from
   * the supplier at priority 0 will fail.
   */
  tpl_params.lanes[0].lane_priority = 0; // Priority 0
  tpl_params.lanes[0].static_threads = 1;
  tpl_params.lanes[0].dynamic_threads = 0;

  RTCORBA::Priority priority = 1; // The priority at which we send an event each.

  for (int i = 1; i <= this->consumer_count_; ++i, ++priority)
    {
      tpl_params.lanes[i].lane_priority = priority;
      tpl_params.lanes[i].static_threads = 1;
      tpl_params.lanes[i].dynamic_threads = 0;
    }

  qos.length (1);
  qos[0].name = CORBA::string_dup (NotifyExt::ThreadPoolLanes);
  qos[0].value <<= tpl_params;

  // Note that instead of <set_qos>, the <qos> can also be passed while creating the channel.
  ec->set_qos (qos ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (ec._retn ());

  return ec._retn ();
}

void
TAO_Notify_Lanes_Supplier_Client::run (ACE_ENV_SINGLE_ARG_DECL)
{
  /// First, signal that the supplier is ready.
  this->write_ior (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_->run (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
TAO_Notify_Lanes_Supplier_Client::write_ior (ACE_ENV_SINGLE_ARG_DECL)
{
  CosNotifyComm::StructuredPushSupplier_var objref = this->supplier_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // Write the ior to a file to signal waiting consumers.
  FILE *ior_output_file = ACE_OS::fopen (this->ior_file_name_.c_str (), ACE_LIB_TEXT("w"));

  if (ior_output_file != 0)
    {
      CORBA::String_var str =
        this->orb_objects_.orb_->object_to_string (objref.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      ACE_OS::fprintf (ior_output_file,
                       "%s",
                       str.in ());
      ACE_OS::fclose (ior_output_file);
    }
}

int
TAO_Notify_Lanes_Supplier_Client::svc (void)
{
  ACE_TRY_NEW_ENV
    {
      this->orb_objects_.current_->the_priority (0 ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      this->initialize (ACE_ENV_SINGLE_ARG_PARAMETER); //Init the Client
      ACE_TRY_CHECK;

      this->run (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION,
                          ACE_TEXT ("Supplier error "));

    }
  ACE_ENDTRY;

  return 0;
}

int
main (int argc, char *argv [])
{
  ACE_TRY_NEW_ENV
    {
      // Initialize an ORB
      CORBA::ORB_var orb = CORBA::ORB_init (argc,
                                            argv,
                                            ""
                                            ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Create a holder for the common ORB Objects.
      TAO_Notify_ORB_Objects orb_objects;

      orb_objects.init (orb ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      /* Run the ORB in a seperate thread */
      TAO_Notify_ORB_Run_Task orb_run_task (orb_objects);

      /* Create a Client */
      TAO_Notify_Lanes_Supplier_Client client (orb_objects);

      if (client.parse_args (argc, argv) != 0)
        {
          ACE_DEBUG ((LM_DEBUG, "Supplier_Client::Error parsing options\n"));
          return -1;
        }

      long flags = THR_NEW_LWP | THR_JOINABLE;

      flags |=
        orb->orb_core ()->orb_params ()->thread_creation_flags ();

      /* Both the tasks initialize themselves at Priority 0*/
      if (orb_run_task.activate (flags) == -1 || client.activate (flags) == -1)
        {
          if (ACE_OS::last_error () == EPERM)
            ACE_ERROR_RETURN ((LM_ERROR,
                               ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
                              -1);
          else
            ACE_DEBUG ((LM_ERROR,
                        ACE_TEXT ("(%t) Task activation at priority %d failed. \n")));
        }

      orb_run_task.thr_mgr ()->wait ();
      client.thr_mgr ()->wait ();
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION,
                          ACE_TEXT ("Supplier Client error "));
    }
  ACE_ENDTRY;

  return 0;
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?