⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 throughput.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Throughput.cpp,v 1.20 2003/07/06 21:41:58 pradeep Exp

#include "Throughput.h"

#include "ace/Arg_Shifter.h"
#include "ace/Get_Opt.h"
#include "ace/Synch.h"
#include "ace/OS.h"
#include "ace/Dynamic_Service.h"
#include "tao/Strategies/advanced_resource.h"
#include "tao/Messaging/Messaging.h"
#include "orbsvcs/Notify/Service.h"
#include "orbsvcs/Time_Utilities.h"

ACE_RCSID (Notify_Tests, Throughput, "Throughput.cpp,v 1.20 2003/07/06 21:41:58 pradeep Exp")

/***************************************************************************/

Throughput_StructuredPushConsumer::Throughput_StructuredPushConsumer (
    Notify_Throughput *test_client
  )
  : test_client_ (test_client),
    push_count_ (0)
{
}

void
Throughput_StructuredPushConsumer::accumulate_into (
    ACE_Throughput_Stats &throughput
  ) const
{
  throughput.accumulate (this->throughput_);
}

void
Throughput_StructuredPushConsumer::dump_stats (const char* msg,
                                               ACE_UINT32 gsf)
{
  this->throughput_.dump_results (msg, gsf);
}

void
Throughput_StructuredPushConsumer::push_structured_event (
    const CosNotification::StructuredEvent & notification
    ACE_ENV_ARG_DECL_NOT_USED
  )
  ACE_THROW_SPEC ((CORBA::SystemException,
                   CosEventComm::Disconnected))
{
  // Extract payload.
  const char* msg;
  CORBA::Boolean ok = (notification.remainder_of_body >>= msg);

  if (!ok)
    ACE_DEBUG ((LM_DEBUG, "(%t) Error extracting message body\n"));

  TimeBase::TimeT Throughput_base_recorded;
  ACE_hrtime_t Throughput_base;

  notification.filterable_data[0].value >>= Throughput_base_recorded;

  ORBSVCS_Time::TimeT_to_hrtime (Throughput_base,
                                 Throughput_base_recorded);

  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

  if (this->push_count_ == 0)
    {
    this->throughput_start_ = ACE_OS::gethrtime ();
    }

  ++this->push_count_;

  // Grab timestamp again.
  ACE_hrtime_t now = ACE_OS::gethrtime ();

  // Record statistics.
  this->throughput_.sample (now - this->throughput_start_,
                            now - Throughput_base);

  if (this->push_count_%1000 == 0)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "(%P)(%t) event count = %d\n",
                  this->push_count_));
    }

  if (push_count_ == test_client_->perconsumer_count_)
  {
    ACE_DEBUG ((LM_DEBUG,
                "(%t)expected count reached\n"));
    test_client_->peer_done ();
  }
}

/***************************************************************************/

Throughput_StructuredPushSupplier::Throughput_StructuredPushSupplier (
    Notify_Throughput* test_client
  )
  :test_client_ (test_client),
   push_count_ (0)
{
}

Throughput_StructuredPushSupplier::~Throughput_StructuredPushSupplier ()
{
}

void
Throughput_StructuredPushSupplier::accumulate_into (
    ACE_Throughput_Stats &throughput
  ) const
{
  throughput.accumulate (this->throughput_);
}

void
Throughput_StructuredPushSupplier::dump_stats (const char* msg,
                                               ACE_UINT32 gsf)
{
  this->throughput_.dump_results (msg, gsf);
}

int
Throughput_StructuredPushSupplier::svc (void)
{
  // Initialize a time value to pace the test.
  ACE_Time_Value tv (0, test_client_->burst_pause_);

  // Operations.
  CosNotification::StructuredEvent event;

  // EventHeader

  // FixedEventHeader
  // EventType
  // string
  event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
  // string
  event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
  // string
  event.header.fixed_header.event_name = CORBA::string_dup("myevent");

  // OptionalHeaderFields
  // PropertySeq
  // sequence<Property>: string name, any value
  CosNotification::PropertySeq& qos =  event.header.variable_header;
  qos.length (0); // put nothing here

  // FilterableEventBody
  // PropertySeq
  // sequence<Property>: string name, any value
  event.filterable_data.length (1);
  event.filterable_data[0].name = CORBA::string_dup("Throughput_base");

  event.remainder_of_body <<= test_client_->payload_;

  ACE_DECLARE_NEW_CORBA_ENV;

  this->throughput_start_ = ACE_OS::gethrtime ();

  for (int i = 0; i < test_client_->burst_count_; ++i)
    {
      for (int j = 0; j < test_client_->burst_size_; ++j)
        {
          // Record current time.
          ACE_hrtime_t start = ACE_OS::gethrtime ();
          TimeBase::TimeT Throughput_base;
          ORBSVCS_Time::hrtime_to_TimeT (Throughput_base,
                                         start);
          // Any.
          event.filterable_data[0].value <<= Throughput_base;

          this->proxy_->push_structured_event (event
                                                        ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          ACE_hrtime_t end = ACE_OS::gethrtime ();
          this->throughput_.sample (end - this->throughput_start_,
                                    end - start);
        }

      ACE_OS::sleep (tv);
    }

  ACE_DEBUG ((LM_DEBUG, "(%P) (%t) Supplier done\n"));
  test_client_->peer_done ();
  return 0;
}

/***************************************************************************/
Notify_Throughput::Notify_Throughput (void)
  : collocated_ec_ (0),
    burst_count_ (1),
    burst_pause_ (10000),
    burst_size_ (1000),
    payload_size_ (1024),
    payload_ (0),
    consumer_count_ (1),
    supplier_count_ (1),
    perconsumer_count_ (burst_size_*burst_count_*supplier_count_),
    consumers_ (0),
    suppliers_ (0),
    nthreads_ (1),
    peer_done_count_ (consumer_count_ + supplier_count_),
    condition_ (lock_)
{
}

Notify_Throughput::~Notify_Throughput ()
{
  ACE_DECLARE_NEW_CORBA_ENV;
  this->orb_->shutdown (0
                        ACE_ENV_ARG_PARAMETER);

  delete payload_;
}

int
Notify_Throughput::init (int argc, char* argv [] ACE_ENV_ARG_DECL)
{
  // Initialize base class.
  Notify_Test_Client::init_ORB (argc,
                                argv
                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

#if (TAO_HAS_CORBA_MESSAGING == 1)
  CORBA::Object_var manager_object =
    orb_->resolve_initial_references ("ORBPolicyManager"
                                     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  CORBA::PolicyManager_var policy_manager =
    CORBA::PolicyManager::_narrow (manager_object.in ()
                                   ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  CORBA::Any sync_scope;
  sync_scope <<= Messaging::SYNC_WITH_TARGET;

  CORBA::PolicyList policy_list (1);
  policy_list.length (1);
  policy_list[0] =
    orb_->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
                        sync_scope
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);
  policy_manager->set_policy_overrides (policy_list,
                                        CORBA::SET_OVERRIDE
                                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);
#else
  ACE_DEBUG ((LM_DEBUG,
              "CORBA Messaging disabled in this configuration,"
              " test may not be optimally configured\n"));
#endif /* TAO_HAS_MESSAGING */

  worker_.orb (this->orb_.in ());

  if (worker_.activate (THR_NEW_LWP | THR_JOINABLE,
                        this->nthreads_) != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  "Cannot activate client threads\n"));
    }

  // Create all participents ...
  this->create_EC (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  CosNotifyChannelAdmin::AdminID adminid;

  supplier_admin_ =
    ec_->new_for_suppliers (this->ifgop_, adminid ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));

  consumer_admin_ =
    ec_->new_for_consumers (this->ifgop_, adminid ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));

  ACE_NEW_RETURN (consumers_,
                  Throughput_StructuredPushConsumer*[this->consumer_count_],
                  -1);
  ACE_NEW_RETURN (suppliers_,
                  Throughput_StructuredPushSupplier*[this->supplier_count_],
                  -1);

  // ----

  int i = 0;

  for (i = 0; i < this->consumer_count_; ++i)
    {
      ACE_NEW_RETURN (consumers_[i],
                      Throughput_StructuredPushConsumer (this),
                      -1);
      consumers_[i]->init (root_poa_.in ()
                           ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      consumers_[i]->connect (this->consumer_admin_.in ()
                              ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
    }

  for (i = 0; i < this->supplier_count_; ++i)
    {
      ACE_NEW_RETURN (suppliers_[i],
                      Throughput_StructuredPushSupplier (this),
                      -1);
      suppliers_[i]->TAO_Notify_Tests_StructuredPushSupplier::init (
                         root_poa_.in ()
                         ACE_ENV_ARG_PARAMETER
                       );
      ACE_CHECK_RETURN (-1);
      suppliers_[i]->connect (this->supplier_admin_.in ()
                              ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
    }

  return 0;
}

int
Notify_Throughput::parse_args(int argc, char *argv[])
{
    ACE_Arg_Shifter arg_shifter (argc, argv);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -