⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ec_multiple.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 4 页
字号:
                                 this->hpc_event_a_,
                                 this->hpc_event_b_,
                                 local_ec
                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
      this->stats_[i].total_time_ = 0;
      this->stats_[i].lcl_count_ = 0;
      this->stats_[i].rmt_count_ = 0;
    }

  for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
    {
      char buf[BUFSIZ];
      ACE_OS::sprintf (buf, "lp_consumer_%02d@%s",
                       i - this->hp_consumers_, this->lcl_name_);

      ACE_NEW (this->consumers_[i],
               Test_Consumer (this, this->consumers_ + i));

      this->consumers_[i]->open (buf,
                                 this->lpc_event_a_,
                                 this->lpc_event_b_,
                                 local_ec
                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
      this->stats_[i].total_time_ = 0;
      this->stats_[i].lcl_count_ = 0;
      this->stats_[i].rmt_count_ = 0;
    }
  this->running_consumers_ = this->hp_consumers_ + this->lp_consumers_;
}

void
Test_ECG::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec,
                       RtecEventChannelAdmin::EventChannel_ptr remote_ec,
                       RtecScheduler::Scheduler_ptr remote_sch
                       ACE_ENV_ARG_DECL)
{
  RtecScheduler::Scheduler_ptr local_sch =
    ACE_Scheduler_Factory::server ();

  // ECG name.
  const int bufsize = 512;
  char ecg_name[bufsize];
  ACE_OS::strcpy (ecg_name, "ecg_");
  ACE_OS::strcat (ecg_name, this->lcl_name_);

  // We could use the same name on the local and remote scheduler,
  // but that fails when using a global scheduler.
  char rmt[BUFSIZ];
  ACE_OS::strcpy (rmt, ecg_name);
  ACE_OS::strcat (rmt, "@");
  ACE_OS::strcat (rmt, this->rmt_name_);

  // We could use the same name on the local and remote scheduler,
  // but that fails when using a global scheduler.
  char lcl[bufsize];
  ACE_OS::strcpy (lcl, ecg_name);
  ACE_OS::strcat (lcl, "@");
  ACE_OS::strcat (lcl, this->lcl_name_);

  this->ecg_.init (remote_ec, local_ec, remote_sch, local_sch,
                   rmt, lcl ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
Test_ECG::push_supplier (void * /* cookie */,
                         RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
                         const RtecEventComm::EventSet &events
                         ACE_ENV_ARG_DECL)
{
  this->wait_until_ready ();
  // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
  // @@ TODO we could keep somekind of stats here...
  if (!this->short_circuit_)
    {
      consumer->push (events ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
  else
    {
      int i = 0;
      for (; i < this->hp_consumers_; ++i)
        {
          this->consumers_[i]->push (events ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
        }
      for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
        {
          this->consumers_[i]->push (events ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
        }
    }
}

void
Test_ECG::push_consumer (void *consumer_cookie,
                         ACE_hrtime_t arrival,
                         const RtecEventComm::EventSet &events
                         ACE_ENV_ARG_DECL_NOT_USED)
{
  int ID =
    (ACE_reinterpret_cast(Test_Consumer**,consumer_cookie)
     - this->consumers_);

  // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));

  if (events.length () == 0)
    {
      // ACE_DEBUG ((LM_DEBUG, "no events\n"));
      return;
    }

  // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));

#if 0
  const int bufsize = 128;
  char buf[bufsize];
  ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID);
  print_priority_info (buf);
#endif

  for (u_int i = 0; i < events.length (); ++i)
    {
      const RtecEventComm::Event& e = events[i];

      if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
        {
          this->shutdown_consumer (ID);
          continue;
        }

      ACE_hrtime_t s;
      ORBSVCS_Time::TimeT_to_hrtime (s, e.header.creation_time);
      ACE_hrtime_t nsec = arrival - s;
      if (this->local_source (e.header.source))
        {
          int& count = this->stats_[ID].lcl_count_;

          this->stats_[ID].lcl_latency_[count] = nsec;
          int workload = this->hp_workload_;
          int interval = this->hp_interval_;
          if (ID >= this->hp_consumers_)
            {
              workload = this->lp_workload_;
              interval = this->lp_interval_;
            }

          for (int j = 0; j < workload; ++j)
            {
              // Eat a little CPU so the Utilization test can measure the
              // consumed time....
              /* takes about 40.2 usecs on a 167 MHz Ultra2 */
              u_long n = 1279UL;
              ACE::is_prime (n, 2, n / 2);
            }
          // Increment the elapsed time on this consumer.
          ACE_hrtime_t now = ACE_OS::gethrtime ();
          this->stats_[ID].total_time_ += now - arrival;
          this->stats_[ID].end_[count] = now;

          // We estimate our laxity based on the event creation
          // time... it may not be very precise, but will do; other
          // strategies include:
          // + Keep track of the "current frame", then then deadline
          // is the end of the frame.
          // + Use the start of the test to keep the current frame.
          // + Use the last execution.

          CORBA::ULong tmp = ACE_U64_TO_U32 (s - now);
          this->stats_[ID].laxity_[count] = 1 + tmp/1000.0F/interval;
          count++;
        }
      else
        {
          int& count = this->stats_[ID].rmt_count_;
          this->stats_[ID].rmt_latency_[count] = nsec;
          count++;
        }
    }
}

void
Test_ECG::wait_until_ready (void)
{
  ACE_GUARD (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_);
  while (!this->ready_)
    this->ready_cnd_.wait ();
}

void
Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
                             RtecEventComm::PushConsumer_ptr consumer
                             ACE_ENV_ARG_DECL)
{

  this->running_suppliers_--;
  if (this->running_suppliers_ != 0)
    return;

  // We propagate a shutdown event through the system...
  RtecEventComm::EventSet shutdown (1);
  shutdown.length (1);
  RtecEventComm::Event& s = shutdown[0];

  s.header.source = 0;
  s.header.ttl = 1;

  ACE_hrtime_t t = ACE_OS::gethrtime ();
  ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
  s.header.type = ACE_ES_EVENT_SHUTDOWN;
  consumer->push (shutdown ACE_ENV_ARG_PARAMETER);
}

void
Test_ECG::shutdown_consumer (int id)
{
  ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id));
  this->running_consumers_--;
  if (this->running_consumers_ == 0)
    if (TAO_ORB_Core_instance ()->orb () == 0)
      {
        ACE_ERROR ((LM_ERROR,
                    "(%P|%t) Test_ECG::shutdown_consumer, "
                      "ORB instance is 0\n"));

      }
    else
      TAO_ORB_Core_instance ()->orb ()->shutdown ();
}

int
Test_ECG::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n"));

  if (this->rmt_name_ != 0)
    {
      this->ecg_.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
    }

  TAO_ORB_Core_instance ()->orb ()->shutdown ();
  return 0;
}

void
Test_ECG::dump_results (void)
{
  const int bufsize = 512;
  char buf[bufsize];

  int i;
  for (i = 0; i < this->hp_consumers_; ++i)
    {
      ACE_OS::sprintf (buf, "HP%02d", i);
      this->dump_results (buf, this->stats_[i]);
    }
  for (i = 0; i < this->lp_consumers_; ++i)
    {
      ACE_OS::sprintf (buf, "LP%02d", i);
      this->dump_results (buf, this->stats_[i + this->hp_consumers_]);
    }
  CORBA::ULong tmp = ACE_U64_TO_U32 (this->test_stop_ - this->test_start_);
  double usec =  tmp / 1000.0;
  ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec));
}

void
Test_ECG::dump_results (const char* name, Stats& stats)
{
  // @@ We are reporting the information without specifics about
  double usec = ACE_U64_TO_U32 (stats.total_time_) / 1000.0;
  ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec));
  int i;
  for (i = 1; i < stats.lcl_count_ - 1; ++i)
    {
      usec = ACE_U64_TO_U32 (stats.lcl_latency_[i]) / 1000.0;
      ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec));

      double percent = stats.laxity_[i] * 100.0;
      ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent));

      usec = ACE_U64_TO_U32 (stats.end_[i] - this->test_start_) / 1000.0;
      ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec));
    }
  for (i = 1; i < stats.rmt_count_ - 1; ++i)
    {
      double usec = ACE_U64_TO_U32 (stats.rmt_latency_[i]) / 1000.0;
      ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec));
    }
}

int
Test_ECG::local_source (RtecEventComm::EventSourceID id) const
{
  for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
    {
      if (this->suppliers_[i]->supplier_id () == id)
        return 1;
    }
  return 0;
}

int
Test_ECG::parse_args (int argc, char *argv [])
{
  ACE_Get_Opt get_opt (argc, argv, "l:r:s:i:xh:w:p:d:");
  int opt;

  while ((opt = get_opt ()) != EOF)
    {
      switch (opt)
        {
        case 'l':
          this->lcl_name_ = get_opt.opt_arg ();
          break;

        case 'r':
          this->rmt_name_ = get_opt.opt_arg ();
          break;

        case 's':
          if (ACE_OS::strcasecmp (get_opt.opt_arg (), "global") == 0)
            {
              this->scheduling_type_ = Test_ECG::ss_global;
            }
          else if (ACE_OS::strcasecmp (get_opt.opt_arg (), "local") == 0)
            {
              this->scheduling_type_ = Test_ECG::ss_local;
            }
          else if (ACE_OS::strcasecmp (get_opt.opt_arg (), "runtime") == 0)
            {
              this->scheduling_type_ = Test_ECG::ss_runtime;
            }
          else
            {
              ACE_DEBUG ((LM_DEBUG,
                          "Unknown scheduling type <%s> "
                          "defaulting to local\n",
                          get_opt.opt_arg ()));
              this->scheduling_type_ = Test_ECG::ss_local;
            }
          break;

        case 'x':
          this->short_circuit_ = 1;
          break;

        case 'i':
          {
            char* aux = 0;
            char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);
            this->consumer_disconnects_ = ACE_OS::atoi (arg);
            arg = ACE_OS::strtok_r (0, ",", &aux);
            this->supplier_disconnects_ = ACE_OS::atoi (arg);
          }
          break;

        case 'h':
          {
            char* aux = 0;
                char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);

            this->hp_suppliers_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hp_consumers_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hp_workload_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hp_interval_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hp_message_count_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->hpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
          }
          break;

        case 'w':
          {
            char* aux = 0;
                char* arg = ACE_OS::strtok_r (get_opt.opt_arg (), ",", &aux);

            this->lp_suppliers_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lp_consumers_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lp_workload_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lp_interval_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lp_message_count_ = ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
                arg = ACE_OS::strtok_r (0, ",", &aux);
            this->lpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
          }
          break;

        case 'p':
          this->pid_file_name_ = get_opt.opt_arg ();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -