ft_notifier_i.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 775 行 · 第 1/2 页

CPP
775
字号
// -*- C++ -*-
//=============================================================================
/**
 *  @file    FT_Notifier_i.cpp
 *
 *  FT_Notifier_i.cpp,v 1.5 2004/01/08 17:03:54 bala Exp
 *
 *  This file is part of Fault Tolerant CORBA.
 *
 *  @author Dale Wilson <wilson_d@ociweb.com>
 */
//=============================================================================

#include "FT_Notifier_i.h"

#include <ace/Get_Opt.h>
#include <ace/OS_NS_stdio.h>
#include <tao/debug.h>

// Use this macro at the beginning of CORBA methods
// to aid in debugging.
#define METHOD_ENTRY(name)    \
  if (TAO_debug_level > 6)    \
  {                           \
    ACE_DEBUG (( LM_DEBUG,    \
    "Enter %s\n", #name       \
      ));                     \
  }

// Use this macro to return from CORBA methods
// to aid in debugging.  Note that you can specify
// the return value after the macro, for example:
// METHOD_RETURN(Plugh::plover) xyzzy; is equivalent
// to return xyzzy;
// METHOD_RETURN(Plugh::troll); is equivalent to
// return;
// WARNING: THIS GENERATES TWO STATEMENTS!!! THE FOLLOWING
// will not do what you want it to:
//  if (cave_is_closing) METHOD_RETURN(Plugh::pirate) aarrggh;
// Moral:  Always use braces.
#define METHOD_RETURN(name)   \
  if (TAO_debug_level > 6)    \
  {                           \
    ACE_DEBUG (( LM_DEBUG,    \
      "Leave %s\n", #name     \
      ));                     \
  }                           \
  return /* value goes here */


// Implementation skeleton constructor
TAO::FT_FaultNotifier_i::FT_FaultNotifier_i ()
  : orb_ (0)
  , poa_ (0)
  , object_id_ (0)
  , ior_output_file_(0)
  , ns_name_(0)
  , naming_context_ (0)
  , this_name_ (1)
  , rm_register_ (1)
  , replication_manager_ (0)
  , registered_ (0)
  , identity_ ("")
  , proxy_infos_ (0)
  , consumer_connects_(0)
  , consumer_disconnects_(0)
  , notify_channel_ (0)
  , filter_factory_ (0)
  , supplier_admin_ (0)
  , consumer_admin_ (0)
  , structured_proxy_push_consumer_ (0)
  , sequence_proxy_push_consumer_ (0)
  , quit_on_idle_(0)
  , quitting_(0)
  , gone_(0)
{
}

// Implementation skeleton destructor
TAO::FT_FaultNotifier_i::~FT_FaultNotifier_i ()
{
  ACE_TRY_NEW_ENV
  {
    fini (ACE_ENV_SINGLE_ARG_PARAMETER);
    ACE_TRY_CHECK;
  }
  ACE_CATCHALL
  {
  }
  ACE_ENDTRY;
}


int TAO::FT_FaultNotifier_i::idle(int &result ACE_ENV_ARG_DECL_NOT_USED)
{
  static unsigned long linger = 0;
  ACE_UNUSED_ARG(result);
  if (gone_)
  {
    if ( linger == 0)
    {
      ACE_ERROR ((LM_ERROR,
        "FaultNotifier (%P|%t) Begin linger.\n"
      ));
    }
    if(++linger > 5)//10)
    {
      ACE_ERROR ((LM_ERROR,
        "FaultNotifier (%P|%t) idle returnning gone\n"
      ));
    }
    else
    {
      return 0;
    }
  }
  return this->gone_;
}

////////////////////////////////////////////
// FT_FaultNotifier_i private methods


// TODO: find this a common home
int TAO::FT_FaultNotifier_i::write_ior()
{
  int result = -1;
  FILE* out = ACE_OS::fopen (this->ior_output_file_, "w");
  if (out)
  {
    ACE_OS::fprintf (out, "%s", ACE_static_cast(const char *, this->ior_));
    ACE_OS::fclose (out);
    result = 0;
  }
  else
  {
    ACE_ERROR ((LM_ERROR,
      "%T %n (%P|%t) Open failed for %s\n", ior_output_file_
    ));
  }
  return result;
}

//////////////////////////////////////////////////////
// FT_FaultNotifier_i public, non-CORBA methods

int TAO::FT_FaultNotifier_i::parse_args (int argc, char * argv[])
{
  ACE_Get_Opt get_opts (argc, argv, "o:rq");
  int c;

  while ((c = get_opts ()) != -1)
  {
    switch (c)
    {
      case 'o':
      {
        this->ior_output_file_ = get_opts.opt_arg ();
        break;
      }
      case 'r':
      {
        this->rm_register_ = ! this->rm_register_;
        break;
      }
      case 'q':
      {
        this->quit_on_idle_ = 1;
        break;
      }
      case '?':
        // fall thru
      default:
      {
        ACE_ERROR_RETURN ((LM_ERROR,
                           "usage:  %s"
                           " -o <iorfile>"
                           " -r disable registration with ReplicationManager"
                           " -q(uit on idle)"
                           "\n",
                           argv [0]),
                          -1);
        break;
      }
    }
  }
  // Indicates sucessful parsing of the command line
  return 0;
}

const char * TAO::FT_FaultNotifier_i::identity () const
{
  return this->identity_.c_str();
}

PortableServer::POA_ptr TAO::FT_FaultNotifier_i::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
  return this->poa_.in();
}


void TAO::FT_FaultNotifier_i::_remove_ref (ACE_ENV_SINGLE_ARG_DECL)
{
  notify_channel_->destroy(ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  ACE_ERROR ((LM_ERROR,
    "FaultNotifier (%P|%t) _remove_ref setting gone\n"
  ));
  this->gone_ = 1;
}

int TAO::FT_FaultNotifier_i::fini (ACE_ENV_SINGLE_ARG_DECL)
{
  if (this->ior_output_file_ != 0)
  {
    ACE_OS::unlink (this->ior_output_file_);
    this->ior_output_file_ = 0;
  }
  if (this->ns_name_ != 0 && this->naming_context_.in() != 0)
  {
    this->naming_context_->unbind (this_name_
                            ACE_ENV_ARG_PARAMETER);
    this->ns_name_ = 0;
  }

  if (this->registered_)
  {
    ACE_TRY
    {
      this->replication_manager_->register_fault_notifier(::FT::FaultNotifier::_nil ());
      ACE_TRY_CHECK;
      ACE_DEBUG ((LM_DEBUG,
        "FaultNotifier unregistered from ReplicationManager.\n"
        ));
    }
    ACE_CATCHANY
    {
      ACE_DEBUG ((LM_DEBUG,
        "FaultNotifier Can't unregister from ReplicationManager.\n"
        ));
      // complain, but otherwise ignore this error
      // RM may be down.
    }
    ACE_ENDTRY;

    this->registered_ = 0;
  }
  return 0;
}

int TAO::FT_FaultNotifier_i::init (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL )
{
  int result = 0;
  this->orb_ = CORBA::ORB::_duplicate (orb);

  // Use the ROOT POA for now
  CORBA::Object_var poa_object =
    this->orb_->resolve_initial_references (TAO_OBJID_ROOTPOA
                                            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (CORBA::is_nil (poa_object.in ()))
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" (%P|%t) Unable to initialize the POA.\n")),
                      -1);

  // Get the POA object.
  this->poa_ =
    PortableServer::POA::_narrow (poa_object.in ()
                                  ACE_ENV_ARG_PARAMETER);

  ACE_CHECK_RETURN (-1);

  if (CORBA::is_nil(this->poa_.in ()))
  {
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT (" (%P|%t) Unable to narrow the POA.\n")),
                      -1);
  }

  PortableServer::POAManager_var poa_manager =
    this->poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  // Register with the POA.

  this->object_id_ = this->poa_->activate_object (this ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  // find my IOR

  CORBA::Object_var this_obj =
    this->poa_->id_to_reference (object_id_.in ()
                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  this->ior_ = this->orb_->object_to_string (this_obj.in ()
                                  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);


  ////////////////////////////////////////////////
  // Register with coresident Notification Channel
  CosNotifyChannelAdmin::EventChannelFactory_var notify_factory =
    TAO_Notify_EventChannelFactory_i::create (poa_.in ()
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);
  CosNotification::QoSProperties initial_qos;
  CosNotification::AdminProperties initial_admin;
  this->notify_channel_ =
    notify_factory->create_channel (initial_qos,
    initial_admin,
    channel_id_
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  this->filter_factory_ = this->notify_channel_->default_filter_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  ///////////////////////////
  // Producer registration

  this->supplier_admin_ = this->notify_channel_->default_supplier_admin (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  ::CosNotifyChannelAdmin::ProxyID proxyId = 0;

  //////////////////////
  // structured producer
  ::CosNotifyChannelAdmin::ProxyConsumer_var consumer
    = this->supplier_admin_->obtain_notification_push_consumer (
      ::CosNotifyChannelAdmin::STRUCTURED_EVENT,
      proxyId
      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  structured_proxy_push_consumer_
    = ::CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(consumer.in ()
      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);
  if (CORBA::is_nil (this->structured_proxy_push_consumer_.in ()))
  {
    ACE_ERROR_RETURN ((LM_ERROR,
       "%T %n (%P|%t) Should not occur: Unable to narrow Structured Proxy Push Consumer\n"),
      1);
  }

  // todo: implement a push supplier if we want to receive disconnect notice
  CosNotifyComm::StructuredPushSupplier_var stubPushSupplier =
    CosNotifyComm::StructuredPushSupplier::_nil();

  this->structured_proxy_push_consumer_->connect_structured_push_supplier (stubPushSupplier.in()
     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);

  ////////////////////
  // Sequence producer
  consumer
    = this->supplier_admin_->obtain_notification_push_consumer (
      ::CosNotifyChannelAdmin::SEQUENCE_EVENT,
      proxyId
      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  this->sequence_proxy_push_consumer_
    = ::CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow(consumer.in ()
      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);
  if (CORBA::is_nil (this->sequence_proxy_push_consumer_.in ()))
  {
    ACE_ERROR_RETURN ((LM_ERROR,
       "%T %n (%P|%t) Should not occur: Unable to narrow Sequence Proxy Push Consumer\n"),
      1);
  }

  // todo: implement this if we want to receive disconnect notice
  CosNotifyComm::SequencePushSupplier_var stubSeqPushSupplier =
    CosNotifyComm::SequencePushSupplier::_nil();

  this->sequence_proxy_push_consumer_->connect_sequence_push_supplier (stubSeqPushSupplier.in()
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);
  ///////////////////////////
  // Consumer registration

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?