⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 throughput.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
    const ACE_TCHAR* current_arg = 0;
    while (arg_shifter.is_anything_left ())
    {
      if (arg_shifter.cur_arg_strncasecmp ("-collocated_ec") == 0)
        {
          this->collocated_ec_ = 1;
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-consumers")))
        {
          this->consumer_count_ = ACE_OS::atoi (current_arg);
          // The number of events to send/receive.
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-suppliers")))
        {
          this->supplier_count_ = ACE_OS::atoi (current_arg);
          // The number of events to send/receive.
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-burst_size")))
        {
          this->burst_size_ = ACE_OS::atoi (current_arg);
          // The number of events to send/receive.
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-burst_count")))
        {
          this->burst_count_ = ACE_OS::atoi (current_arg);
          //
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-burst_pause")))
        {
          this->burst_pause_ = ACE_OS::atoi (current_arg);
          //
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-payload")))
        {
          this->payload_size_ = ACE_OS::atoi (current_arg);
          ACE_NEW_RETURN (this->payload_,
                          char [this->payload_size_],
                          -1);
          //
          arg_shifter.consume_arg ();
        }
      else if ((current_arg = arg_shifter.get_the_parameter ("-EC")))
        {
          this->ec_name_ = current_arg;
          //
          arg_shifter.consume_arg ();
        }
      else if ((current_arg =
                arg_shifter.get_the_parameter ("-ExpectedCount")))
        {
          this->perconsumer_count_ = ACE_OS::atoi (current_arg);
          //
          arg_shifter.consume_arg ();
        }
      else if (arg_shifter.cur_arg_strncasecmp ("-?") == 0)
        {
          ACE_DEBUG((LM_DEBUG,
                     "usage: %s "
                     "-collocated_ec, "
                     "-consumers [count], "
                     "-suppliers [count], "
                     "-burst_size [size], "
                     "-burst_count [count], "
                     "-burst_pause [time(uS)], "
                     "-payload [size]"
                     "-EC [Channel Name]"
                     "-ExpectedCount [count]\n",
                     argv[0], argv[0]));

          arg_shifter.consume_arg ();

          return -1;
        }
      else
        {
          arg_shifter.ignore_arg ();
        }
    }
    // Recalculate.
    peer_done_count_ = consumer_count_ + supplier_count_;
    return 0;
}

void
Notify_Throughput::create_EC (ACE_ENV_SINGLE_ARG_DECL)
{
  if (this->collocated_ec_ == 1)
    {
      TAO_Notify_Service* notify_service = ACE_Dynamic_Service<TAO_Notify_Service>::instance (TAO_NOTIFICATION_SERVICE_NAME);

      if (notify_service == 0)
        {
          ACE_DEBUG ((LM_DEBUG, "Service not found! check conf. file\n"));
          return;
        }

      // Activate the factory
      this->notify_factory_ =
        notify_service->create (this->root_poa_.in ()
                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      ACE_ASSERT (!CORBA::is_nil (this->notify_factory_.in ()));
    }
  else
    {
      this->resolve_naming_service (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
      this->resolve_Notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
    }

  // A channel name was specified, use that to resolve the service.
  if (this->ec_name_.length () != 0)
    {
      CosNaming::Name name (1);
      name.length (1);
      name[0].id = CORBA::string_dup (ec_name_.c_str ());

      CORBA::Object_var obj =
        this->naming_context_->resolve (name
                                        ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->ec_ =
        CosNotifyChannelAdmin::EventChannel::_narrow (obj.in ()
                                                      ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
else
  {
    CosNotifyChannelAdmin::ChannelID id;

    ec_ = notify_factory_->create_channel (initial_qos_,
                                           initial_admin_,
                                           id
                                           ACE_ENV_ARG_PARAMETER);
    ACE_CHECK;
  }

  ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
}

void
Notify_Throughput::run_test (ACE_ENV_SINGLE_ARG_DECL)
{

  ACE_DEBUG ((LM_DEBUG, "collocated_ec_ %d ,"
              "burst_count_ %d, "
              "burst_pause_ %d, "
              "burst_size_  %d, "
              "payload_size_ %d, "
              "consumer_count_ %d,  "
              "supplier_count_ %d "
              "expected count %d\n",
              collocated_ec_,
              burst_count_ ,
              burst_pause_ ,
              burst_size_ ,
              payload_size_,
              consumer_count_ ,
              supplier_count_ ,
              perconsumer_count_));

  for (int i = 0; i < this->supplier_count_; ++i)
    {
      suppliers_[i]->
        TAO_Notify_Tests_StructuredPushSupplier::init (root_poa_.in ()
                                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      if (suppliers_[i]->ACE_Task_Base::activate (THR_NEW_LWP | THR_JOINABLE) != 0)
        {
          ACE_ERROR ((LM_ERROR,
                      "Cannot activate client threads\n"));
        }
    }

  // Wait till we're signalled done.
  {
    ACE_DEBUG ((LM_DEBUG, "(%t)Waiting for shutdown signal in main..\n"));
    ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_);

    while (this->peer_done_count_ != 0)
      {
        condition_.wait ();
      }
  }

  if (this->ec_name_.length () == 0) // we are not using a global EC
    {
      // Destroy the ec.
      this->ec_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
    }

  // Signal the workers.
  this->worker_.done_ = 1;
}

void
Notify_Throughput::peer_done (void)
{
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_);

  if (--this->peer_done_count_ == 0)
    {
      ACE_DEBUG ((LM_DEBUG, "calling shutdown\n"));
      condition_.broadcast ();
    }
}

void
Notify_Throughput::dump_results (void)
{
  ACE_Throughput_Stats throughput;
  ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
  char buf[BUFSIZ];

  for (int j = 0; j < this->consumer_count_; ++j)
    {
      ACE_OS::sprintf (buf, "Consumer [%02d]", j);

      this->consumers_[j]->dump_stats (buf, gsf);
      this->consumers_[j]->accumulate_into (throughput);
    }

  ACE_DEBUG ((LM_DEBUG, "\n"));

  ACE_Throughput_Stats suppliers;

  for (int i = 0; i < this->supplier_count_; ++i)
    {
      ACE_OS::sprintf (buf, "Supplier [%02d]", i);

      this->suppliers_[i]->dump_stats (buf, gsf);
      this->suppliers_[i]->accumulate_into (suppliers);
    }

  ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
  throughput.dump_results ("Notify_Consumer/totals", gsf);

  ACE_DEBUG ((LM_DEBUG, "\n"));
  suppliers.dump_results ("Notify_Supplier/totals", gsf);
}

/***************************************************************************/

int
main (int argc, char* argv[])
{
  ACE_High_Res_Timer::calibrate ();

  Notify_Throughput events;

  if (events.parse_args (argc, argv) == -1)
    {
      return 1;
    }

  ACE_TRY_NEW_ENV
    {
      events.init (argc, argv
                      ACE_ENV_ARG_PARAMETER); //Init the Client
      ACE_TRY_CHECK;

      events.run_test (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      ACE_DEBUG ((LM_DEBUG, "Waiting for threads to exit...\n"));
      ACE_Thread_Manager::instance ()->wait ();
      events.dump_results();

      ACE_DEBUG ((LM_DEBUG, "ending main...\n"));

    }
  ACE_CATCH (CORBA::UserException, ue)
    {
      ACE_PRINT_EXCEPTION (ue,
                           "Events user error: ");
      return 1;
    }
  ACE_CATCH (CORBA::SystemException, se)
    {
      ACE_PRINT_EXCEPTION (se,
                           "Events system error: ");
      return 1;
    }
  ACE_ENDTRY;

  return 0;
}


// ****************************************************************

Worker::Worker (void)
:done_ (0)
{
}

void
Worker::orb (CORBA::ORB_ptr orb)
{
  orb_ = CORBA::ORB::_duplicate (orb);
}

int
Worker::svc (void)
{
  ACE_Time_Value tv(5);

  do
    {
    this->orb_->run (tv);
  }while (!this->done_);

  ACE_DEBUG ((LM_DEBUG, "(%P) (%t) done\n"));

  return 0;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -