⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ec_gateway_iiop.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
        }
    }

  // Also build the subscriptions that are *not* by source when we use the
  // consumer proxy map, and all subscriptions when we don't use the map and
  // then connect to the default consumer proxy.
  RtecEventChannelAdmin::SupplierQOS pub;
  pub.publications.length (sub.dependencies.length () + 1);
  pub.is_gateway = 1;
  int c = 0;
  for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
    {
      const RtecEventComm::EventHeader& h =
        sub.dependencies[k].event.header;
      RtecEventComm::EventSourceID sid = h.source;

      // Skip all subscriptions with a specific source when we use the map
      if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
        continue;

      // Skip all the magic event types.
      if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
        continue;

      pub.publications[c].event.header = h;
      pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
      pub.publications[c].dependency_info.dependency_type =
        RtecBase::TWO_WAY_CALL;
      pub.publications[c].dependency_info.number_of_calls = 1;
      pub.publications[c].dependency_info.rt_info = this->consumer_info_;
      c++;
    }

  if (c > 0)
    {
      this->supplier_is_active_ = 1;

      // Obtain a reference to our supplier personality...
      RtecEventComm::PushSupplier_var supplier_ref =
        this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      // Obtain the consumer....
      this->default_consumer_proxy_ =
        supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      pub.publications.length (c);
      // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
      // ACE_SupplierQOS_Factory::debug (pub);
      this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
                                                            pub
                                                            ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }

  RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
    this->supplier_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_proxy_ =
    consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_is_active_ = 1;
  RtecEventComm::PushConsumer_var consumer_ref =
    this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
  // ACE_ConsumerQOS_Factory::debug (sub);

  this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
                                                sub
                                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_EC_Gateway_IIOP::update_supplier (
    const RtecEventChannelAdmin::SupplierQOS&
    ACE_ENV_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Do nothing...
}

void
TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
  // ACE_DEBUG ((LM_DEBUG,
  //             "ECG (%t): Supplier-consumer received "
  //            "disconnect from channel.\n"));
}

void
TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
  // ACE_DEBUG ((LM_DEBUG,
  //            "ECG (%t): Supplier received "
  //            "disconnect from channel.\n"));
}

void
TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events
                           ACE_ENV_ARG_DECL)
{
  // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));

  if (events.length () == 0)
    {
      // ACE_DEBUG ((LM_DEBUG, "no events\n"));
      return;
    }

  {
    ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

    this->busy_count_++;
  }

  // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));

  // @@ TODO, there is an extra data copy here, we should do the event
  // modification without it and only compact the necessary events.
  RtecEventComm::EventSet out (1);
  out.length (1);
  for (CORBA::ULong i = 0; i < events.length (); ++i)
    {
      if (this->use_ttl_ == 1)
        {
          if (events[i].header.ttl == 0)
            continue;
        }

      RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
      RtecEventComm::EventSourceID sid = events[i].header.source;
      if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
          || this->consumer_proxy_map_.find (sid, proxy) == -1)
        {
          // If the source is not in our map or we should not use the map then
          // use the default consumer proxy.
          proxy = this->default_consumer_proxy_.in ();
        }

      if (CORBA::is_nil (proxy))
        continue;

      out[0] = events[i];

      if (this->use_ttl_ == 1)
        out[0].header.ttl--;

      // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
      this->push_to_consumer(proxy, out ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }

  {
    ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

    this->busy_count_--;

    if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
      {
        this->cleanup_posted_ = 0;
        this->cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
        ACE_CHECK;
      }

    if (this->busy_count_ == 0 && this->update_posted_ != 0)
      {
        this->update_posted_ = 0;
        this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER);
        ACE_CHECK;
      }
  }
}

void
TAO_EC_Gateway_IIOP::push_to_consumer (
    RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
    const RtecEventComm::EventSet& event
    ACE_ENV_ARG_DECL)
{
  ACE_TRY
    {
      consumer->push (event ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
    {
      ec_control_->event_channel_not_exist (this ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCH (CORBA::SystemException, sysex)
    {
      ec_control_->system_exception (this,
                                     sysex
                                     ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      // Shouldn't happen.
    }
  ACE_ENDTRY;
}

int
TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);

  ec_control_->shutdown();

  this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (this->supplier_is_active_)
    {
      PortableServer::POA_var poa =
        this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      PortableServer::ObjectId_var id =
        poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      this->supplier_is_active_ = 0;
    }

  if (this->consumer_is_active_)
    {
      PortableServer::POA_var poa =
        this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      PortableServer::ObjectId_var id =
        poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      this->consumer_is_active_ = 0;
    }

  this->cleanup_consumer_ec_i ();
  this->cleanup_supplier_ec_i ();

  return 0;
}

int
TAO_EC_Gateway_IIOP::cleanup_consumer_ec (void)
{
  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);

  this->cleanup_consumer_ec_i ();

  return 0;
}

int
TAO_EC_Gateway_IIOP::cleanup_supplier_ec (void)
{
  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);

  this->cleanup_supplier_ec_i ();

  return 0;
}

void
TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i (void)
{
  this->consumer_ec_ =
    RtecEventChannelAdmin::EventChannel::_nil ();
}

void
TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i (void)
{
  this->supplier_ec_ =
    RtecEventChannelAdmin::EventChannel::_nil ();
}

CORBA::Boolean
TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i (void) const
{
  return !CORBA::is_nil (this->consumer_ec_.in ());
}

CORBA::Boolean
TAO_EC_Gateway_IIOP::consumer_ec_non_existent (
      CORBA::Boolean_out disconnected
      ACE_ENV_ARG_DECL)
{
  CORBA::Object_var consumer_ec;
  {
    ACE_GUARD_THROW_EX (
        TAO_SYNCH_MUTEX, ace_mon, this->lock_,
        CORBA::INTERNAL ());
    ACE_CHECK_RETURN (0);

    disconnected = 0;
    if (this->is_consumer_ec_connected_i () == 0)
      {
        disconnected = 1;
        return 0;
      }

    consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
  }

#if (TAO_HAS_MINIMUM_CORBA == 0)
  return consumer_ec->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
#else
  return 0;
#endif /* TAO_HAS_MINIMUM_CORBA */
}

void
TAO_EC_Gateway_IIOP::suspend_supplier_ec (ACE_ENV_SINGLE_ARG_DECL)
{
  if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
    {
      this->supplier_proxy_->suspend_connection (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      supplier_ec_suspended_ = 1;
    }
}

void
TAO_EC_Gateway_IIOP::resume_supplier_ec (ACE_ENV_SINGLE_ARG_DECL)
{
  if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
    {
      this->supplier_proxy_->resume_connection (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      supplier_ec_suspended_ = 0;
    }
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>;
template class ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>;
template class ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>;
template class ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
template class ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
template class ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
template class ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;

#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>
#pragma instantiate ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>

#pragma instantiate ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>
#pragma instantiate ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -