ect_supplier.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 251 行

CPP
251
字号
// ECT_Supplier.cpp,v 1.36 2003/08/04 02:07:15 bala Exp

#include "ECT_Supplier.h"

#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
#include "orbsvcs/Time_Utilities.h"

#include "tao/Timeprobe.h"
#include "tao/debug.h"

#include "ace/Get_Opt.h"
#include "ace/Auto_Ptr.h"
#include "ace/Sched_Params.h"
#include "ace/High_Res_Timer.h"
#include "ace/ACE.h"

ACE_RCSID(EC_Throughput, ECT_Supplier, "ECT_Supplier.cpp,v 1.36 2003/08/04 02:07:15 bala Exp")

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_PushSupplier_Adapter<Test_Supplier>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_PushSupplier_Adapter<Test_Supplier>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Test_Supplier::Test_Supplier (ECT_Driver *driver)
  :  driver_ (driver),
     supplier_ (this),
     burst_count_ (0),
     burst_size_ (0),
     event_size_ (0),
     burst_pause_ (0),
     type_start_ (ACE_ES_EVENT_UNDEFINED),
     type_count_ (1)
{
}

void
Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler,
                        const char* name,
                        int burst_count,
                        int burst_size,
                        int event_size,
                        int burst_pause,
                        int type_start,
                        int type_count,
                        RtecEventChannelAdmin::EventChannel_ptr ec
                        ACE_ENV_ARG_DECL)
{
  this->burst_count_ = burst_count;
  this->burst_size_ = burst_size;
  this->event_size_ = event_size;
  this->burst_pause_ = burst_pause;
  this->type_start_ = type_start;
  this->type_count_ = type_count;

  RtecScheduler::handle_t rt_info =
    scheduler->create (name ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  ACE_Time_Value tv (0, burst_pause);
  RtecScheduler::Period_t rate = tv.usec () * 10;

  // The execution times are set to reasonable values, but
  // actually they are changed on the real execution, i.e. we
  // lie to the scheduler to obtain right priorities; but we
  // don't care if the set is schedulable.
  tv.set (0, 2000);
  TimeBase::TimeT time;
  ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
  scheduler->set (rt_info,
                  RtecScheduler::VERY_HIGH_CRITICALITY,
                  time, time, time,
                  rate,
                  RtecScheduler::VERY_LOW_IMPORTANCE,
                  time,
                  1,
                  RtecScheduler::OPERATION
                  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->supplier_id_ = ACE::crc32 (name);
  ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
              this->supplier_id_));

  ACE_SupplierQOS_Factory qos;
  for (int i = 0; i != type_count; ++i)
    {
      qos.insert (this->supplier_id_,
                  type_start + i,
                  rt_info, 1);
    }
  qos.insert (this->supplier_id_,
              ACE_ES_EVENT_SHUTDOWN,
              rt_info, 1);

  RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
    ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_proxy_ =
    supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  RtecEventComm::PushSupplier_var objref =
    this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_proxy_->connect_push_supplier (objref.in (),
                                                qos.get_SupplierQOS ()
                                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
Test_Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
  if (CORBA::is_nil (this->consumer_proxy_.in ()))
    return;

  this->consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  this->consumer_proxy_ =
    RtecEventChannelAdmin::ProxyPushConsumer::_nil ();

  // Deactivate the servant
  PortableServer::POA_var poa =
    this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
  PortableServer::ObjectId_var id =
    poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
  poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

int
Test_Supplier::svc ()
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      // Initialize a time value to pace the test
      ACE_Time_Value tv (0, this->burst_pause_);

      // Pre-allocate a message to send
      ACE_Message_Block mb (this->event_size_);
      mb.wr_ptr (this->event_size_);

      RtecEventComm::EventSet event (1);
      event.length (1);
      event[0].header.source = this->supplier_id ();
      event[0].header.ttl = 1;

      ACE_hrtime_t t = ACE_OS::gethrtime ();
      ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);

      // We use replace to minimize the copies, this should result
      // in just one memory allocation;
      event[0].data.payload.replace (this->event_size_,
                                     &mb);

      ACE_hrtime_t test_start = ACE_OS::gethrtime ();

      for (int i = 0; i < this->burst_count_; ++i)
        {
          for (int j = 0; j < this->burst_size_; ++j)
            {
              event[0].header.type =
                this->type_start_ + j % this->type_count_;

              ACE_hrtime_t request_start = ACE_OS::gethrtime ();
              ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
                                             request_start);
              // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
              this->consumer_proxy ()->push (event ACE_ENV_ARG_PARAMETER);

              ACE_TRY_CHECK;
              ACE_hrtime_t end = ACE_OS::gethrtime ();
              this->throughput_.sample (end - test_start,
                                        end - request_start);
            }

          if (TAO_debug_level > 0
              && i % 100 == 0)
            {
              ACE_DEBUG ((LM_DEBUG,
                          "ECT_Supplier (%P|%t): %d bursts sent\n",
                          i));
            }

          ACE_OS::sleep (tv);
        }

      // Send one event shutdown from each supplier
      event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
      ACE_hrtime_t request_start = ACE_OS::gethrtime ();
      ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
                                     request_start);
      this->consumer_proxy ()->push(event ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      ACE_hrtime_t end = ACE_OS::gethrtime ();
      this->throughput_.sample (end - test_start,
                                end - request_start);
    }
  ACE_CATCH (CORBA::SystemException, sys_ex)
    {
      ACE_PRINT_EXCEPTION (sys_ex, "SYS_EX");
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "NON SYS EX");
    }
  ACE_ENDTRY;

  ACE_DEBUG ((LM_DEBUG,
              "Supplier %4.4x completed\n",
              this->supplier_id_));
  return 0;
}

void
Test_Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
}

int Test_Supplier::supplier_id (void) const
{
  return this->supplier_id_;
}

RtecEventChannelAdmin::ProxyPushConsumer_ptr
Test_Supplier::consumer_proxy (void)
{
  return this->consumer_proxy_.in ();
}

void
Test_Supplier::dump_results (const char* name,
                             ACE_UINT32 gsf)
{
  this->throughput_.dump_results (name, gsf);
}

void
Test_Supplier::accumulate (ACE_Throughput_Stats& stats) const
{
  stats.accumulate (this->throughput_);
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?