ect_consumer.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 188 行

CPP
188
字号
// ECT_Consumer.cpp,v 1.29 2002/01/29 20:20:54 okellogg Exp

#include "ECT_Consumer.h"

#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
#include "orbsvcs/Time_Utilities.h"

#include "tao/Timeprobe.h"
#include "tao/debug.h"

#include "ace/Get_Opt.h"
#include "ace/Auto_Ptr.h"
#include "ace/Sched_Params.h"

ACE_RCSID(EC_Throughput, ECT_Consumer, "ECT_Consumer.cpp,v 1.29 2002/01/29 20:20:54 okellogg Exp")

Test_Consumer::Test_Consumer (ECT_Driver *driver,
                              void *cookie,
                              int n_suppliers)
  : driver_ (driver),
    cookie_ (cookie),
    n_suppliers_ (n_suppliers),
    recv_count_ (0),
    shutdown_count_ (0)
{
}

void
Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
                        const char* name,
                        int type_start,
                        int type_count,
                        RtecEventChannelAdmin::EventChannel_ptr ec
                        ACE_ENV_ARG_DECL)
{
  RtecScheduler::handle_t rt_info =
    scheduler->create (name ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // The worst case execution time is far less than 2
  // milliseconds, but that is a safe estimate....
  ACE_Time_Value tv (0, 2000);
  TimeBase::TimeT time;
  ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
  scheduler->set (rt_info,
                  RtecScheduler::VERY_HIGH_CRITICALITY,
                  time, time, time,
                  0,
                  RtecScheduler::VERY_LOW_IMPORTANCE,
                  time,
                  0,
                  RtecScheduler::OPERATION
                  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  ACE_ConsumerQOS_Factory qos;
  qos.start_disjunction_group ();
  qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
  for (int i = 0; i != type_count; ++i)
    {
      qos.insert_type (type_start + i, rt_info);
    }

  // = Connect as a consumer.
  RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
    ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_proxy_ =
    consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventComm::PushConsumer_var objref = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_proxy_->connect_push_consumer (objref.in (),
                                                qos.get_ConsumerQOS ()
                                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
Test_Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
  if (CORBA::is_nil (this->supplier_proxy_.in ()))
    return;

  this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_proxy_ =
    RtecEventChannelAdmin::ProxyPushSupplier::_nil ();

  // Deactivate the servant
  PortableServer::POA_var poa =
    this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
  PortableServer::ObjectId_var id =
    poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
  poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
Test_Consumer::dump_results (const char* name,
                             ACE_UINT32 gsf)
{
  this->throughput_.dump_results (name, gsf);
}

void
Test_Consumer::accumulate (ACE_Throughput_Stats& stats) const
{
  stats.accumulate (this->throughput_);
}

void
Test_Consumer::push (const RtecEventComm::EventSet& events
                     ACE_ENV_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  if (events.length () == 0)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "ECT_Consumer (%P|%t) no events\n"));
      return;
    }

  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

  // We start the timer as soon as we receive the first event...
  if (this->recv_count_ == 0)
    this->first_event_ = ACE_OS::gethrtime ();

  this->recv_count_ += events.length ();

  if (TAO_debug_level > 0
      && this->recv_count_ % 100 == 0)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "ECT_Consumer (%P|%t): %d events received\n",
                  this->recv_count_));
    }

  // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));

  for (u_int i = 0; i < events.length (); ++i)
    {
      const RtecEventComm::Event& e = events[i];

      if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
        {
          this->shutdown_count_++;
          if (this->shutdown_count_ >= this->n_suppliers_)
            {
              // We stop the timer as soon as we realize it is time to
              // do so.
              this->driver_->shutdown_consumer (this->cookie_ ACE_ENV_ARG_PARAMETER);
              ACE_CHECK;
            }
        }
      else
        {
          ACE_hrtime_t creation;
          ORBSVCS_Time::TimeT_to_hrtime (creation,
                                         e.header.creation_time);

          const ACE_hrtime_t now = ACE_OS::gethrtime ();
          this->throughput_.sample (now - this->first_event_,
                                    now - creation);
        }
    }
}

void
Test_Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
}

// ****************************************************************

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?