⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ecg_mcast_eh.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// ECG_Mcast_EH.cpp,v 1.26 2003/11/04 08:12:59 dhinton Exp

#include "orbsvcs/Event/ECG_Mcast_EH.h"
#include "orbsvcs/Event/ECG_UDP_Receiver.h"
#include "orbsvcs/Event_Service_Constants.h"
#include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
#include "ace/Unbounded_Set.h"
#include "ace/OS_NS_string.h"

#include "ace/Reactor.h"

#if !defined(__ACE_INLINE__)
#include "ECG_Mcast_EH.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID(Event, ECG_Mcast_EH, "ECG_Mcast_EH.cpp,v 1.26 2003/11/04 08:12:59 dhinton Exp")

TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
                                    const ACE_TCHAR *net_if,
                                    CORBA::ULong sz)
  : net_if_ (net_if?ACE_OS::strdup (net_if):0)
    , subscriptions_ ()
    , receiver_ (recv)
    , recvbuf_size_ (sz)
    , observer_ ()
    , auto_observer_disconnect_ ()
{
    ACE_ASSERT (this->receiver_);
}

TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
{
  ACE_OS::free (this->net_if_);
}

void
TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
                        ACE_ENV_ARG_DECL)
{
  if (!this->receiver_)
    {
      // We are shut down.
      ACE_THROW (CORBA::INTERNAL());
    }

  if (CORBA::is_nil (ec))
    {
      ACE_ERROR ((LM_ERROR,  "TAO_ECG_Mcast_EH::open(): "
                             "nil ec argument"));
      ACE_THROW (CORBA::INTERNAL ());
    }

  // Create and activate Event Channel Observer.
  ACE_NEW (this->observer_,
           Observer (this));

  if (!this->observer_.in ())
    {
      ACE_THROW (CORBA::NO_MEMORY ());
    }

  TAO_EC_Object_Deactivator observer_deactivator;
  RtecEventChannelAdmin::Observer_var observer_ref;
  PortableServer::POA_var poa =
    this->observer_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  activate (observer_ref,
            poa.in (),
            this->observer_.in (),
            observer_deactivator
            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventChannelAdmin::Observer_Handle handle =
    ec->append_observer (observer_ref.in ()
                         ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->observer_->set_deactivator (observer_deactivator);
  this->auto_observer_disconnect_.set_command
    (Observer_Disconnect_Command (handle, ec));
}

int
TAO_ECG_Mcast_EH::shutdown (void)
{
  // Already shut down.
  if (!this->receiver_)
    return -1;

  // Disconnect Observer from EC.
  this->auto_observer_disconnect_.execute ();

  // Shutdown the observer.
  if (this->observer_.in ())
    {
      this->observer_->shutdown ();
      this->observer_ = 0;
    }

  // Indicates that we are in a shutdown state.
  this->receiver_ = 0;

  // Deregister from reactor, close and clean up sockets.
  size_t subscriptions_size = this->subscriptions_.size ();
  for (size_t i = 0; i != subscriptions_size; ++i)
    {
      (void) this->reactor ()->remove_handler (
                               this->subscriptions_[i].dgram->get_handle (),
                               ACE_Event_Handler::READ_MASK);
      (void) this->subscriptions_[i].dgram->close();
      delete this->subscriptions_[i].dgram;
    }
  this->subscriptions_.size (0);

  return 0;
}

int
TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
{
  size_t subscriptions_size = this->subscriptions_.size ();
  for (size_t i = 0; i != subscriptions_size; ++i)
    {
      ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
      if (socket->get_handle () == fd)
        {
          return this->receiver_->handle_input (*socket);
        }
    }
  return -1;
}

void
TAO_ECG_Mcast_EH::update_consumer (
         const RtecEventChannelAdmin::ConsumerQOS& sub
    ACE_ENV_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  Address_Set multicast_addresses;

  this->compute_required_subscriptions (sub,
                                        multicast_addresses
                                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->delete_unwanted_subscriptions (multicast_addresses);

  this->add_new_subscriptions (multicast_addresses);
}

void
TAO_ECG_Mcast_EH::compute_required_subscriptions (
    const RtecEventChannelAdmin::ConsumerQOS& sub,
    Address_Set& multicast_addresses
    ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  CORBA::ULong count = sub.dependencies.length ();
  for (CORBA::ULong i = 0; i != count; ++i)
    {
      const RtecEventComm::EventHeader& header =
                           sub.dependencies[i].event.header;
      if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
        {
          continue;
        }
      RtecUDPAdmin::UDP_Addr addr;

      this->receiver_->get_addr (header, addr ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
      // Ignore errors, if the element is in the set we simply ignore
      // the problem...
      (void) multicast_addresses.insert (inet_addr);
    }
}

int
TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
    Address_Set& multicast_addresses)
{
  for (size_t i = 0; i < this->subscriptions_.size (); ++i)
    {
      ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
      if (multicast_addresses.find (multicast_group))
        {
          // Remove from the list of subscriptions to be added,
          // because we already subscribe to it...
          (void) multicast_addresses.remove (multicast_group);
          continue;
        }

      // This subscription is no longer needed - remove from reactor,
      // close and delete the socket.
      ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
      (void) this->reactor ()->remove_handler (socket->get_handle (),
                                               ACE_Event_Handler::READ_MASK);
      (void) socket->close();
      delete socket;
      // Move the deleted subscription out of the <subscriptions_>
      // array by moving the last subscription in array into its place.
      this->subscriptions_[i] =
        this->subscriptions_[this->subscriptions_.size () - 1];
      this->subscriptions_.size (this->subscriptions_.size () - 1);
      --i;
    }

    return 0;
}

void
TAO_ECG_Mcast_EH::add_new_subscriptions (Address_Set& multicast_addresses)
{
  typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
  for (Address_Iterator k = multicast_addresses.begin ();
       k != multicast_addresses.end ();
       ++k)
    {
      Subscription new_subscription;
      new_subscription.mcast_addr = *k;
      ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);

      size_t subscriptions_size = this->subscriptions_.size ();
      this->subscriptions_.size (subscriptions_size + 1);
      this->subscriptions_[subscriptions_size] = new_subscription;

      ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
      socket->enable (ACE_NONBLOCK);

      if (this->recvbuf_size_ != 0
          && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
                                                     SO_RCVBUF,
                                                     (void *) &this->recvbuf_size_,
                                                     sizeof (this->recvbuf_size_)) == -1)
          && errno != ENOTSUP )
        {
          ACE_ERROR ((LM_ERROR,
                      "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
                      errno,
                      this->recvbuf_size_));
        }
      socket->subscribe (new_subscription.mcast_addr, 1, this->net_if_);
      (void) this->reactor ()->register_handler (
                                         socket->get_handle (),
                                         this,
                                         ACE_Event_Handler::READ_MASK);
    }
}

// ****************************************************************

TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
  :  eh_ (eh)
{
}

void
TAO_ECG_Mcast_EH::Observer::update_consumer (
    const RtecEventChannelAdmin::ConsumerQOS& sub
    ACE_ENV_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  if (this->eh_)
    this->eh_->update_consumer (sub ACE_ENV_ARG_PARAMETER);
}

void
TAO_ECG_Mcast_EH::Observer::update_supplier (
    const RtecEventChannelAdmin::SupplierQOS&
    ACE_ENV_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
}

void
TAO_ECG_Mcast_EH::Observer::shutdown (void)
{
  this->eh_ = 0;
  this->deactivator_.deactivate ();
}


#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template void activate<>(TAO_Objref_Var_T<RtecEventChannelAdmin::Observer, RtecEventChannelAdmin::tao_Observer_life>&, PortableServer::POA*, TAO_ServantBase*, TAO_EC_Object_Deactivator& ACE_ENV_ARG_DECL);
template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>;
template class ACE_Hash<ACE_INET_Addr>;
template class ACE_Node<ACE_INET_Addr>;
template class ACE_Equal_To<ACE_INET_Addr>;
template class ACE_Unbounded_Set<ACE_INET_Addr>;
template class ACE_Unbounded_Set_Iterator<ACE_INET_Addr>;
template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>;
template class ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>;
template class ACE_Array_Base<TAO_ECG_Mcast_EH::Subscription>;

#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate void activate<>(TAO_Objref_Var_T<RtecEventChannelAdmin::Observer, RtecEventChannelAdmin::tao_Observer_life>&, PortableServer::POA*, TAO_ServantBase*, TAO_EC_Object_Deactivator& ACE_ENV_ARG_DECL)
#pragma instantiate ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Entry<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *>
#pragma instantiate ACE_Hash<ACE_INET_Addr>
#pragma instantiate ACE_Node<ACE_INET_Addr>
#pragma instantiate ACE_Equal_To<ACE_INET_Addr>
#pragma instantiate ACE_Unbounded_Set<ACE_INET_Addr>
#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_INET_Addr>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast *, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_INET_Addr,ACE_SOCK_Dgram_Mcast*,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr, ACE_SOCK_Dgram_Mcast*, ACE_Hash<ACE_INET_Addr>, ACE_Equal_To<ACE_INET_Addr>, ACE_Null_Mutex>
#pragma instantiate ACE_Array_Base<ACE_SOCK_Dgram_Mcast *>
#pragma instantiate ACE_Array_Base<TAO_ECG_Mcast_EH::Subscription>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -