supplier.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 195 行

CPP
195
字号
// Supplier.cpp,v 1.4 2003/09/04 03:27:24 ossama Exp

#include "Supplier.h"

ACE_RCSID (Notify, TAO_Notify_ThreadPool_Supplier, "Supplier.cpp,v 1.4 2003/09/04 03:27:24 ossama Exp")

#include "tao/ORB_Core.h"

TAO_Notify_ThreadPool_Supplier::TAO_Notify_ThreadPool_Supplier  (TAO_Notify_ORB_Objects& orb_objects)
  : orb_objects_ (orb_objects)
    , proxy_consumer_id_ (0)
    , expected_consumer_count_ (2)
    , consumers_connected_ (lock_)
    , consumer_count_ (0)
    , max_events_ (10)
    , proxy_consumer_thread_count_ (0)
{
}

TAO_Notify_ThreadPool_Supplier::~TAO_Notify_ThreadPool_Supplier ()
{
}

void
TAO_Notify_ThreadPool_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var& admin, int expected_consumer_count ,int max_events,
                       int proxy_consumer_thread_count ACE_ENV_ARG_DECL)
{
  // First initialize the class members.
  this->admin_ = admin;
  this->expected_consumer_count_ = expected_consumer_count;
  this->max_events_ = max_events;
  this->proxy_consumer_thread_count_ = proxy_consumer_thread_count;

  this->connect (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
TAO_Notify_ThreadPool_Supplier::run (ACE_ENV_SINGLE_ARG_DECL)
{
  {
    ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);

    ACE_DEBUG ((LM_DEBUG, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_-1));

    // Wait till the consumers are ready to go.
    while (this->consumer_count_ != this->expected_consumer_count_)
      this->consumers_connected_.wait ();
  }

  ACE_DEBUG ((LM_DEBUG,
              "(%P, %t) Supplier is sending an events...\n"));

  // Send events to each consumer.
  for (int i = 0; i < this->max_events_; ++i)
    {
      for (int j = 0; j < this->expected_consumer_count_; ++j)
        {
          // send the event
          this->send_event (this->event_[j] ACE_ENV_ARG_PARAMETER);
        }
    }

  // Disconnect from the EC
  this->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);

  // Deactivate this object.
  this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);

  // we're done. shutdown the ORB to exit the process.
  this->orb_objects_.orb_->shutdown (1);
}

void
TAO_Notify_ThreadPool_Supplier::connect (ACE_ENV_SINGLE_ARG_DECL)
{
  // Activate the supplier object.
  CosNotifyComm::StructuredPushSupplier_var objref = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer;

  if (this->proxy_consumer_thread_count_ != 0)
    {
      // Narrow to the extended interface.
      NotifyExt::SupplierAdmin_var admin_ext = NotifyExt::SupplierAdmin::_narrow (this->admin_.in ()ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      NotifyExt::ThreadPoolParams tp_params = { NotifyExt::CLIENT_PROPAGATED, 0,
                                                0, this->proxy_consumer_thread_count_, 0, 0, 0, 0, 0 };

      CosNotification::QoSProperties qos (1);
      qos.length (1);
      qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
      qos[0].value <<= tp_params;

      // Obtain the proxy. The QoS is applied to the POA in which the Proxy is hosted.
      proxyconsumer = admin_ext->obtain_notification_push_consumer_with_qos (CosNotifyChannelAdmin::STRUCTURED_EVENT
                                                                                   , proxy_consumer_id_, qos ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
  else
    {
      // Obtain the proxy.
      proxyconsumer = this->admin_->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
                                                                       , proxy_consumer_id_ ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }

  ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));

  // narrow
  this->proxy_consumer_ =
    CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));

  // connect to the proxyconsumer.
  proxy_consumer_->connect_structured_push_supplier (objref.in ()
                                                     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  ACE_DEBUG ((LM_DEBUG, "(%P,%t) Created Supplier %d with %d threads at the ProxyConsumer\n", proxy_consumer_id_,
              this->proxy_consumer_thread_count_));
}

void
TAO_Notify_ThreadPool_Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));

  this->proxy_consumer_->disconnect_structured_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
TAO_Notify_ThreadPool_Supplier::deactivate (ACE_ENV_SINGLE_ARG_DECL)
{
  PortableServer::POA_var poa (this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER));
  ACE_CHECK;

  PortableServer::ObjectId_var id (poa->servant_to_id (this
                                                       ACE_ENV_ARG_PARAMETER));
  ACE_CHECK;

  poa->deactivate_object (id.in()
                          ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_Notify_ThreadPool_Supplier::subscription_change (const CosNotification::EventTypeSeq & added,
                                      const CosNotification::EventTypeSeq & /*removed */
                                      ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((
                   CORBA::SystemException,
                   CosNotifyComm::InvalidEventType
                   ))
{
  ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);

  // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
  // Only 1 consumer connects at a time.
  if (added.length () > 0)
    {
      // Set the domain and type nams in the event's fixed header.
      this->event_[consumer_count_].header.fixed_header.event_type.domain_name = CORBA::string_dup(added[0].domain_name);
      this->event_[consumer_count_].header.fixed_header.event_type.type_name = CORBA::string_dup(added[0].type_name);

      ++this->consumer_count_;

      ACE_DEBUG ((LM_DEBUG, "(%P,%t) Received Type %d: (%s)\n", this->consumer_count_, added[0].type_name.in ()));

      if (this->consumer_count_ == this->expected_consumer_count_)
        this->consumers_connected_.signal ();
    }
}

void
TAO_Notify_ThreadPool_Supplier::send_event (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL)
{
  ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));

  proxy_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_Notify_ThreadPool_Supplier::disconnect_structured_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
  ACE_THROW_SPEC ((
                   CORBA::SystemException
                   ))
{
  this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?