⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ec_gateway_iiop.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// EC_Gateway_IIOP.cpp,v 1.14 2003/10/28 18:34:19 bala Exp

#include "orbsvcs/Event/EC_Gateway_IIOP.h"
#include "orbsvcs/Event/ECG_Defaults.h"
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Time_Utilities.h"

#include "EC_Gateway_IIOP_Factory.h"
#include "ECG_ConsumerEC_Control.h"

#include "ace/Dynamic_Service.h"

ACE_RCSID (Event, 
           EC_Gateway_IIOP, 
           "EC_Gateway_IIOP.cpp,v 1.14 2003/10/28 18:34:19 bala Exp")

TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
  :  busy_count_ (0),
     update_posted_ (0),
     cleanup_posted_ (0),
     supplier_ec_suspended_ (0),
     supplier_info_ (0),
     consumer_info_ (0),
     consumer_ (this),
     consumer_is_active_ (0),
     supplier_ (this),
     supplier_is_active_ (0),
     ec_control_ (0),
     factory_ (0),
     use_ttl_ (1),
     use_consumer_proxy_map_ (1)
{
  if (this->factory_ == 0)
    {
      this->factory_ =
             ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");

      if (this->factory_ == 0)
        {
          TAO_EC_Gateway_IIOP_Factory *f = 0;
          ACE_NEW (f,
                   TAO_EC_Gateway_IIOP_Factory);
          this->factory_ = f;
        }
    }

  if (this->factory_ != 0)
    {
      this->use_ttl_ = this->factory_->use_ttl();
      this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
    }
}

TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
{
   delete ec_control_;
   ec_control_ = 0;
}

int
TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
                           RtecEventChannelAdmin::EventChannel_ptr consumer_ec
                           ACE_ENV_ARG_DECL)
{
  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);

  return this->init_i (supplier_ec, consumer_ec ACE_ENV_ARG_PARAMETER);
}

int
TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
                             RtecEventChannelAdmin::EventChannel_ptr consumer_ec
                             ACE_ENV_ARG_DECL_NOT_USED)
{
  if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
  {
    this->supplier_ec_ =
      RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
    this->consumer_ec_ =
      RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);

	if (ec_control_ == 0)
     {
        ec_control_ = factory_->create_consumerec_control(this);
        ec_control_->activate();
     }

    return 0;
  }
  else
    ACE_ERROR_RETURN ((LM_ERROR,
                       "TAO_EC_Gateway_IIOP - init_i "
                       "Supplier and consumer event channel reference "
                       "should be nil.\n"), -1);
}

void
TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

  this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
TAO_EC_Gateway_IIOP::cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

  // In case we are still pushing, don't cleanup the proxies
  if (this->busy_count_ != 0)
    {
      this->cleanup_posted_ = 1;
      return;
    }

  this->cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL)
{
  // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
  this->disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL)
{
  if (this->consumer_proxy_map_.current_size () > 0)
    {
      for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
           j != this->consumer_proxy_map_.end ();
           ++j)
        {
          RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
          if (CORBA::is_nil (consumer))
            continue;
          ACE_TRY
            {
              consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
              ACE_TRY_CHECK;
            }
          ACE_CATCHANY
            {
            }
          ACE_ENDTRY;
          CORBA::release (consumer);
        }
      // Remove all the elements on the map.  Calling close() does not
      // work because the map is left in an inconsistent state.
      this->consumer_proxy_map_.open ();
    }

  if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
    {
      this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      this->default_consumer_proxy_ =
        RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
    }
}

void
TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_DECL)
{
  if (!CORBA::is_nil (this->supplier_proxy_.in ()))
    {
      this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      this->supplier_proxy_ =
        RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
    }
}

void
TAO_EC_Gateway_IIOP::reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

  if (this->busy_count_ != 0)
    {
      this->update_posted_ = 1;
      return;
    }

  this->update_consumer_i (c_qos_ ACE_ENV_ARG_PARAMETER);
}

void
TAO_EC_Gateway_IIOP::update_consumer (
    const RtecEventChannelAdmin::ConsumerQOS& c_qos
    ACE_ENV_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  if (c_qos.dependencies.length () == 0)
    return;

  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

  this->c_qos_ = c_qos;

  if (this->busy_count_ != 0)
    {
      this->update_posted_ = 1;
      return;
    }

  this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER);
}

void
TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
  if (this->consumer_proxy_map_.current_size () > 0)
    {
      for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
           j != this->consumer_proxy_map_.end ();
           ++j)
        {
          RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
          if (CORBA::is_nil (consumer))
            continue;

          CORBA::release (consumer);
        }
      // Remove all the elements on the map.  Calling close() does not
      // work because the map is left in an inconsistent state.
      this->consumer_proxy_map_.open ();
    }

  if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
    {
      this->default_consumer_proxy_ =
        RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
    }
}

void
TAO_EC_Gateway_IIOP::update_consumer_i (
    const RtecEventChannelAdmin::ConsumerQOS& c_qos
    ACE_ENV_ARG_DECL)
{
  this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  if (CORBA::is_nil (this->consumer_ec_.in ())
      || CORBA::is_nil (this->supplier_ec_.in ()))
    return;

  // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));

  this->open_i (c_qos ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_EC_Gateway_IIOP::open_i (
    const RtecEventChannelAdmin::ConsumerQOS& c_qos
    ACE_ENV_ARG_DECL)
{
  // = Connect as a supplier to the consumer EC
  RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
    this->consumer_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
  sub.is_gateway = 1;

  // Change the RT_Info in the consumer QoS.
  // On the same loop we discover the subscriptions by event source,
  // and fill the consumer proxy map if we have to use this map.
  for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
    {
      sub.dependencies[i].rt_info = this->supplier_info_;

      RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
      const RtecEventComm::EventHeader &h =
        sub.dependencies[i].event.header;

      RtecEventComm::EventSourceID sid = h.source;

      //ACE_DEBUG ((LM_DEBUG,
      //            "ECG (%t)    trying (%d,%d)\n",
      //           sid, h.type));

      // Skip all subscriptions that do not require an specific source
      // id or skip all subscriptions when we don't need to use the consumer
      // proxy map.
      if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
        continue;

      // Skip all the magic event types.
      if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
        continue;

      if (this->consumer_proxy_map_.find (sid, proxy) == -1)
        {
          //ACE_DEBUG ((LM_DEBUG,
          //            "ECG (%t)    binding source %d\n",
          //            sid));
          proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_CHECK;
          this->consumer_proxy_map_.bind (sid, proxy);
        }
    }
  //ACE_DEBUG ((LM_DEBUG,
  //            "ECG (%t)    consumer map computed (%d entries)\n",
  //            this->consumer_proxy_map_.current_size ()));

  if (this->consumer_proxy_map_.current_size () > 0)
    {
      this->supplier_is_active_ = 1;

      // Obtain a reference to our supplier personality...
      RtecEventComm::PushSupplier_var supplier_ref =
        this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      // For each subscription by source build the set of publications
      // (they may several, by type, for instance) and connect to the
      // consumer proxy.
      for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
           j != this->consumer_proxy_map_.end ();
           ++j)
        {
          RtecEventChannelAdmin::SupplierQOS pub;
          pub.publications.length (sub.dependencies.length () + 1);
          pub.is_gateway = 1;

          int c = 0;

          RtecEventComm::EventSourceID sid = (*j).ext_id_;
          for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
            {
              const RtecEventComm::EventHeader& h =
                sub.dependencies[k].event.header;
              if (h.source != sid
                  || (ACE_ES_EVENT_ANY < h.type
                      && h.type < ACE_ES_EVENT_UNDEFINED))
                continue;
              pub.publications[c].event.header = h;
              pub.publications[c].dependency_info.dependency_type =
                RtecBase::TWO_WAY_CALL;
              pub.publications[c].dependency_info.number_of_calls = 1;
              pub.publications[c].dependency_info.rt_info = this->consumer_info_;
              c++;
            }
          //ACE_DEBUG ((LM_DEBUG,
          //            "ECG (%t) supplier id %d has %d elements\n",
          //            sid, c));
          if (c == 0)
            continue;

          pub.publications.length (c);

          // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
          // ACE_SupplierQOS_Factory::debug (pub);
          (*j).int_id_->connect_push_supplier (supplier_ref.in (),
                                               pub
                                               ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -