⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ecg_udp_receiver.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// ECG_UDP_Receiver.cpp,v 1.6 2003/08/02 01:15:54 bala Exp

#include "orbsvcs/Event/ECG_UDP_Receiver.h"
#include "orbsvcs/Event_Utilities.h"
#include "ace/SString.h"

#if !defined(__ACE_INLINE__)
#include "ECG_UDP_Receiver.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID(Event, ECG_UDP_Receiver, "ECG_UDP_Receiver.cpp,v 1.6 2003/08/02 01:15:54 bala Exp")


// ****************************************************************

TAO_ECG_UDP_Receiver::~TAO_ECG_UDP_Receiver (void)
{
  this->consumer_proxy_ =
    RtecEventChannelAdmin::ProxyPushConsumer::_nil ();

  if (this->handler_rptr_.get ())
    this->handler_rptr_->shutdown ();
}

void
TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
                            TAO_ECG_Refcounted_Endpoint ignore_from,
                            RtecUDPAdmin::AddrServer_ptr addr_server
                            ACE_ENV_ARG_DECL)
{
  // Verify arguments.
  // <addr_server> is allowed to be nil.  But then, if get_addr () method
  // is invoked, it throws an exception.
  if (CORBA::is_nil (lcl_ec))
    {
      ACE_ERROR ((LM_ERROR,
                  "TAO_ECG_UDP_Receiver::init(): "
                  "<lcl_ec> argument is nil."));
      ACE_THROW (CORBA::INTERNAL ());
    }

  this->cdr_receiver_.init (ignore_from);

  this->lcl_ec_ =
    RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);

  this->addr_server_ =
    RtecUDPAdmin::AddrServer::_duplicate (addr_server);
}

void
TAO_ECG_UDP_Receiver::connect (const RtecEventChannelAdmin::SupplierQOS& pub
                               ACE_ENV_ARG_DECL)
{
  if (CORBA::is_nil (this->lcl_ec_.in ()))
    {
      ACE_ERROR ((LM_ERROR,
                  "Error initializing TAO_ECG_UDP_Receiver: "
                  "init() hasn't been called before connect()."));
      ACE_THROW (CORBA::INTERNAL ());
    }

  if (pub.publications.length () == 0)
    {
      ACE_ERROR ((LM_ERROR,
                  "TAO_ECG_UDP_Receiver::connect(): "
                  "0-length publications argument."));
      ACE_THROW (CORBA::INTERNAL ());
    }

  if (CORBA::is_nil (this->consumer_proxy_.in ()))
    {
      this->new_connect (pub ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
  else
    {
      this->reconnect (pub ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
TAO_ECG_UDP_Receiver::new_connect (const RtecEventChannelAdmin::SupplierQOS& pub
                                   ACE_ENV_ARG_DECL)
{
  // Activate with poa.
  RtecEventComm::PushSupplier_var supplier_ref;
  PortableServer::POA_var poa = this->_default_POA ();

  TAO_EC_Object_Deactivator deactivator;
  activate (supplier_ref,
            poa.in (),
            this,
            deactivator
            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Connect as a supplier to the local EC.
  RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
    this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
    supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
  ECG_Receiver_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());

  proxy->connect_push_supplier (supplier_ref.in (),
                                pub
                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Update resource managers.
  this->consumer_proxy_ = proxy._retn ();
  this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
  this->set_deactivator (deactivator);
}

void
TAO_ECG_UDP_Receiver::reconnect (const RtecEventChannelAdmin::SupplierQOS& pub
                                 ACE_ENV_ARG_DECL)
{
  // Obtain our object reference from the POA.
  RtecEventComm::PushSupplier_var supplier_ref;
  PortableServer::POA_var poa = this->_default_POA ();

  CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
  supplier_ref =
    RtecEventComm::PushSupplier::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  if (CORBA::is_nil (supplier_ref.in ()))
    {
      ACE_THROW (CORBA::INTERNAL ());
    }

  // Reconnect.
  this->consumer_proxy_->connect_push_supplier (supplier_ref.in (),
                                                pub
                                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_ECG_UDP_Receiver::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Prevent attempts to disconnect.
  this->auto_proxy_disconnect_.disallow_command ();

  this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
TAO_ECG_UDP_Receiver::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
  if (this->handler_rptr_.get ())
    this->handler_rptr_->shutdown ();
  TAO_ECG_Refcounted_Handler empty_handler_rptr;
  this->handler_rptr_ = empty_handler_rptr;

  this->consumer_proxy_ =
    RtecEventChannelAdmin::ProxyPushConsumer::_nil ();

  this->auto_proxy_disconnect_.execute ();

  this->deactivator_.deactivate ();

  this->cdr_receiver_.shutdown ();
}

// Helper class for using <cdr_receiver_>.
class TAO_ECG_Event_CDR_Decoder: public TAO_ECG_CDR_Processor
{
public:
  virtual int decode (TAO_InputCDR &cdr);

  RtecEventComm::EventSet events;
};

int
TAO_ECG_Event_CDR_Decoder::decode (TAO_InputCDR &cdr)
{
  if (!(cdr >> this->events))
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Error decoding events cdr."),
                        -1);
    }
  return 0;
}

int
TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      // Make sure we are connected to the Event Channel before proceeding
      // any further.
      if (CORBA::is_nil (this->consumer_proxy_.in ()))
        {
          ACE_ERROR ((LM_ERROR,
                      "TAO_ECG_UDP_Receiver::handle_input() "
                      "called but the Receiver is not connected "
                      "to an event channel. Shutting down the Receiver."));
          this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_TRY_CHECK;

          return 0;
        }

      // Receive data.
      TAO_ECG_Event_CDR_Decoder cdr_decoder;
      int result = this->cdr_receiver_.handle_input (dgram, &cdr_decoder);

      if (result == 0)
        // No data to act on.
        {
          return 0;
        }
      if (result == -1)
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                            "Error receiving multicasted events."),
                            0);
        }

      this->consumer_proxy_->push (cdr_decoder.events ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }

  ACE_CATCHANY
    {
      ACE_DEBUG ((LM_ERROR,
                  "Caught and swallowed EXCEPTION in "
                  "ECG_UDP_Receiver::handle_input: %s",
                  ACE_ANY_EXCEPTION._info ().c_str ()));
    }
  ACE_ENDTRY;
  return 0;
}

// ****************************************************************

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template void activate<>(TAO_Objref_Var_T<RtecEventComm::PushSupplier, RtecEventComm::tao_PushSupplier_life>&, PortableServer::POA*, TAO_ServantBase*, TAO_EC_Object_Deactivator& ACE_ENV_ARG_DECL);
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate void activate<>(TAO_Objref_Var_T<RtecEventComm::PushSupplier, RtecEventComm::tao_PushSupplier_life>&, PortableServer::POA*, TAO_ServantBase*, TAO_EC_Object_Deactivator& ACE_ENV_ARG_DECL)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -