ft_notifier_i.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 775 行 · 第 1/2 页

CPP
775
字号

  // find the channel administrator for consumers
  this->consumer_admin_ = this->notify_channel_->default_consumer_admin (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN(-1);
  if (CORBA::is_nil (this->consumer_admin_.in ()))
  {
    ACE_ERROR ((LM_ERROR,
      "%T %n (%P|%t) NIL consumer admin\n"
      ));
    result = -1;
  }
  // everything else happens when subscriber shows up

  ///////////////////////////////
  // Register with ReplicationManager
  if (this->rm_register_)
  {
    ACE_TRY_NEW_ENV
    {
      CORBA::Object_var rm_obj = orb->resolve_initial_references("ReplicationManager" ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      this->replication_manager_ = ::FT::ReplicationManager::_narrow(rm_obj.in() ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      if (!CORBA::is_nil (replication_manager_.in ()))
      {
        // @@: should we check to see if there's already one registered?
        FT::FaultNotifier_var notifier = FT::FaultNotifier::_narrow (this_obj.in ());
        if (! CORBA::is_nil (notifier.in ()))
        {
          this->replication_manager_->register_fault_notifier(notifier.in ());
          ACE_DEBUG ((LM_DEBUG,
            "FaultNotifier registered with ReplicationManager.\n"
            ));
          this->registered_ = 1;
        }
        else
        {
          ACE_ERROR ((LM_ERROR,
            "Error: Registration failed.  This is not a FaultNotifier (should not occur.)\n"
            ));
        }
      }
      else
      {
        ACE_ERROR ((LM_ERROR,"FaultNotifier: Can't resolve ReplicationManager, It will not be registered.\n" ));
      }
    }
    ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
        "FaultNotifier: Exception resolving ReplicationManager.  Notifier will not be registered.\n" );
    }
    ACE_ENDTRY;
  }
  else
  {
    ACE_DEBUG ((LM_DEBUG,
      "FaultNotifier: ReplicationManager registration disabled.\n"
      ));
  }
  ///////////////////////////////
  // Set up and ready for action
  // publish our IOR

  if(result == 0)
  {
    if (this->ior_output_file_ != 0)
    {
      this->identity_ = "file:";
      this->identity_ += this->ior_output_file_;
      result = write_ior();
    }
  }

  if (result == 0)
  {
    if (this->ns_name_ != 0)
    {
      this->identity_ = "name:";
      this->identity_ += this->ns_name_;

      CORBA::Object_var naming_obj =
        this->orb_->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      if (CORBA::is_nil(naming_obj.in ())){
        ACE_ERROR_RETURN ((LM_ERROR,
                           "%T %n (%P|%t) Unable to find the Naming Service\n"),
                          1);
      }

      this->naming_context_ =
        CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      if (CORBA::is_nil(this->naming_context_.in ()))
      {
        ACE_ERROR_RETURN ((LM_ERROR,
           "%T %n (%P|%t) Should not occur: Can't narrow initial reference to naming context.\n"),
          1);
      }
      this->this_name_.length (1);
      this->this_name_[0].id = CORBA::string_dup (this->ns_name_);

      this->naming_context_->rebind (this->this_name_, this_obj.in()  //CORBA::Object::_duplicate(this_obj)
                              ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
    }
  }

  return result;
}

///////////////////
// CORBA METHODS

void TAO::FT_FaultNotifier_i::push_structured_fault (
    const CosNotification::StructuredEvent & event
    ACE_ENV_ARG_DECL
  )
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  METHOD_ENTRY(TAO::FT_FaultNotifier_i::push_structured_fault);

  this->structured_proxy_push_consumer_->push_structured_event (event
    ACE_ENV_ARG_PARAMETER);

  METHOD_RETURN(TAO::FT_FaultNotifier_i::push_structured_fault);
}

void TAO::FT_FaultNotifier_i::push_sequence_fault (
    const CosNotification::EventBatch & events
    ACE_ENV_ARG_DECL
  )
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  METHOD_ENTRY(TAO::FT_FaultNotifier_i::push_sequence_fault);

  this->sequence_proxy_push_consumer_->push_structured_events (events
    ACE_ENV_ARG_PARAMETER);

  METHOD_RETURN(TAO::FT_FaultNotifier_i::push_sequence_fault);
}

::CosNotifyFilter::Filter_ptr TAO::FT_FaultNotifier_i::create_subscription_filter (
    const char * constraint_grammar
    ACE_ENV_ARG_DECL_NOT_USED
  )
  ACE_THROW_SPEC ((CORBA::SystemException, CosNotifyFilter::InvalidGrammar))
{
  METHOD_ENTRY(TAO::FT_FaultNotifier_i::create_subscription_filter);
  ACE_UNUSED_ARG (constraint_grammar); //@@todo

  CosNotifyFilter::Filter_var filter = this->filter_factory_->create_filter ("ETCL");
  METHOD_RETURN(TAO::FT_FaultNotifier_i::create_subscription_filter)
    filter._retn ();
}

FT::FaultNotifier::ConsumerId TAO::FT_FaultNotifier_i::connect_structured_fault_consumer (
    CosNotifyComm::StructuredPushConsumer_ptr push_consumer,
    CosNotifyFilter::Filter_ptr filter
    ACE_ENV_ARG_DECL
  )
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  METHOD_ENTRY(TAO::FT_FaultNotifier_i::connect_structured_fault_consumer);

  /////////////////////////
  // find a ProxyInfo entry
  // use the first nil entry or a new entry if no nils found

  size_t infoPos = 0;
  int looking = 1;
  for ( size_t pos = 0; looking && pos < this->proxy_infos_.size (); ++pos)
  {
    ProxyInfo & pi = this->proxy_infos_[pos];
    if (CORBA::is_nil(pi.proxyVar_.in ()))
    {
      infoPos = pos;
      looking = 0;
    }
  }
  if (looking)
  {
    infoPos = this->proxy_infos_.size();
    this->proxy_infos_.push_back(ProxyInfo());
  }

  ///////////////////////////////////////
  // Assign an ID, populate the ProxyInfo
  FT::FaultNotifier::ConsumerId result = infoPos;
  ProxyInfo & info = this->proxy_infos_[infoPos];
  info.proxyVar_
      = this->consumer_admin_->obtain_notification_push_supplier (
      ::CosNotifyChannelAdmin::STRUCTURED_EVENT,
      info.proxyId_
      ACE_ENV_ARG_PARAMETER);

  this->consumer_connects_ += 1;

  ::CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier
    = ::CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(info.proxyVar_.in ()
        ACE_ENV_ARG_PARAMETER);

  if ( CORBA::is_nil (proxySupplier.in ()))
  {
    // this is a shoould-not-occur situation.  The consumer admin returned
    // the wrong kind of object.
    ACE_ERROR(( LM_ERROR,
       "%T %n (%P|%t) Unexpected result: Wrong type of object returned from obtain_notification_push_supplier\n"
       ));
  }
  else
  {
    proxySupplier->connect_structured_push_consumer ( push_consumer
      ACE_ENV_ARG_PARAMETER);

    if (! CORBA::is_nil (filter))
    {
      proxySupplier->add_filter(filter);
    }
  }

  METHOD_RETURN(TAO::FT_FaultNotifier_i::connect_structured_fault_consumer) result;
}

FT::FaultNotifier::ConsumerId TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer (
    CosNotifyComm::SequencePushConsumer_ptr push_consumer,
    CosNotifyFilter::Filter_ptr filter
    ACE_ENV_ARG_DECL
  )
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  METHOD_ENTRY(TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer);
  /////////////////////////
  // find a ProxyInfo entry
  // use the first nil entry or a new entry if no nils found

  size_t infoPos = 0;
  int looking = 1;
  for ( size_t pos = 0; looking && pos < this->proxy_infos_.size (); ++pos)
  {
    ProxyInfo & pi = this->proxy_infos_[pos];
    if (CORBA::is_nil(pi.proxyVar_.in ()))
    {
      infoPos = pos;
      looking = 0;
    }
  }
  if (looking)
  {
    infoPos = this->proxy_infos_.size();
    this->proxy_infos_.push_back(ProxyInfo());
  }

  ///////////////////////////////////////
  // Assign an ID, populate the ProxyInfo
  FT::FaultNotifier::ConsumerId result = infoPos;
  ProxyInfo & info = this->proxy_infos_[infoPos];
  info.proxyVar_
    = this->consumer_admin_->obtain_notification_push_supplier (
      ::CosNotifyChannelAdmin::SEQUENCE_EVENT,
      info.proxyId_
      ACE_ENV_ARG_PARAMETER);

  this->consumer_connects_ += 1;

  ::CosNotifyChannelAdmin::SequenceProxyPushSupplier_var proxySupplier
    = ::CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow(info.proxyVar_.in ()
      ACE_ENV_ARG_PARAMETER);
  if ( CORBA::is_nil (proxySupplier.in ()))
  {
    // this is a shoould-not-occur situation.  The consumer admin returned
    // the wrong kind of object.
    ACE_ERROR(( LM_ERROR,
       "%T %n (%P|%t) Unexpected result: Wrong type of object returned from obtain_notification_push_supplier\n"
       ));
  }
  else
  {
    proxySupplier->connect_sequence_push_consumer ( push_consumer
      ACE_ENV_ARG_PARAMETER);

    if (! CORBA::is_nil (filter))
    {
      proxySupplier->add_filter(filter);
    }
  }
  METHOD_RETURN(TAO::FT_FaultNotifier_i::connect_sequence_fault_consumer) result;
}

void TAO::FT_FaultNotifier_i::disconnect_consumer (
    FT::FaultNotifier::ConsumerId connection
    ACE_ENV_ARG_DECL
  )
  ACE_THROW_SPEC ((CORBA::SystemException, CosEventComm::Disconnected))
{
  METHOD_ENTRY(TAO::FT_FaultNotifier_i::disconnect_consumer);

  size_t index = ACE_static_cast ( size_t, connection);
  if (index < this->proxy_infos_.size())
  {
    ProxyInfo & info = this->proxy_infos_[index];
    if (CORBA::is_nil(info.proxyVar_.in ()) )
    {
      ACE_THROW(CosEventComm::Disconnected());
    }
    else
    {
      ::CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier
        = ::CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(info.proxyVar_.in ()
            ACE_ENV_ARG_PARAMETER);
      if (! CORBA::is_nil (proxySupplier.in ()))
      {
        proxySupplier->disconnect_structured_push_supplier ();
        info.proxyVar_ = ::CosNotifyChannelAdmin::ProxySupplier::_nil();
      }
      else
      {
        ::CosNotifyChannelAdmin::SequenceProxyPushSupplier_var proxySupplier
          = ::CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow(info.proxyVar_.in ()
              ACE_ENV_ARG_PARAMETER);
        if (! CORBA::is_nil (proxySupplier.in ()))
        {
          proxySupplier->disconnect_sequence_push_supplier ();
          info.proxyVar_ = ::CosNotifyChannelAdmin::ProxySupplier::_nil();
        }
        else
        {
          ACE_ERROR((LM_ERROR,
            "%T %n (%P|%t) Unexpected proxy supplier type\n"
            ));
          ACE_THROW(CosEventComm::Disconnected());
        }
      }
    }
  }
  else
  {
    ACE_THROW(CosEventComm::Disconnected());
  }

  this->consumer_disconnects_ += 1;
  if (this->quit_on_idle_)
  {
    if (! this->quitting_
      && this->consumer_connects_ == this->consumer_disconnects_)
    {
      ACE_ERROR((LM_ERROR,
        "FaultNotifier (%P|%t) quit on idle: connects %d, disconnects %d\n",
        ACE_static_cast (unsigned int, this->consumer_connects_),
        ACE_static_cast (unsigned int, this->consumer_disconnects_)
        ));
      this->poa_->deactivate_object (this->object_id_.in ()
                   ACE_ENV_ARG_PARAMETER);
      this->quitting_ = 1;
    }
  }

  METHOD_RETURN(TAO::FT_FaultNotifier_i::disconnect_consumer);
}

CORBA::Boolean TAO::FT_FaultNotifier_i::is_alive (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  METHOD_RETURN(TAO::FT_FaultNotifier_i::is_alive) 1;
}

//////////////
// ProxyInfo

TAO::FT_FaultNotifier_i::ProxyInfo::ProxyInfo ()
  : proxyId_ (0)
  , proxyVar_ (::CosNotifyChannelAdmin::ProxySupplier::_nil())
{
}

TAO::FT_FaultNotifier_i::ProxyInfo::ProxyInfo (const ProxyInfo & rhs)
  : proxyId_ (rhs.proxyId_)
  , proxyVar_ (rhs.proxyVar_)
{
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
  template class ACE_Guard<ACE_SYNCH_MUTEX>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
# pragma instantiate ACE_Guard<ACE_SYNCH_MUTEX>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?