⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 task_client.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 3 页
字号:
// Task_Client.cpp,v 1.124 2003/11/04 05:21:33 dhinton Exp

#include "Task_Client.h"
#include "Timer.h"
#include "ace/Stats.h"
#include "tao/TAO_Internal.h"
#include "ace/Barrier.h"
#include "ace/Thread_Semaphore.h"
#include "ace/OS_NS_unistd.h"

#if defined (ACE_HAS_QUANTIFY)
# include "quantify.h"
#endif /* ACE_HAS_QUANTIFY */

inline
ACE_UINT32
ACE_round (ACE_timer_t t)
{
#if defined (ACE_LACKS_FLOATING_POINT)
  return t;
#else
  return ACE_static_cast (ACE_UINT32, t);
#endif
}

ACE_RCSID(MT_Cubit, Task_Client, "Task_Client.cpp,v 1.124 2003/11/04 05:21:33 dhinton Exp")

Task_State::Task_State (void)
  : barrier_ (0),
    key_ ("Cubit"),
    loop_count_ (1000),
    thread_count_ (2),
    latency_ (0),
    ave_latency_ (0),
    datatype_ (CB_OCTET),
    thread_per_rate_ (0),
    global_jitter_array_ (0),
    count_ (0),
    shutdown_ (0),
    oneway_ (0),
    one_ior_ (0),
    one_to_n_test_ (0),
    context_switch_test_ (0),
    iors_ (0),
    iors_count_ (0),
    ior_file_ (0),
    granularity_ (1),
    use_utilization_test_ (0),
    high_priority_loop_count_ (0),
    semaphore_ (0),
    use_multiple_priority_ (0),
    ready_ (0),
    ready_cnd_ (ready_mtx_),
    remote_invocations_ (1),
    util_test_time_ (0)
{
}

int
Task_State::parse_args (int argc,char *argv[])
{
  ACE_Get_Opt opts (argc, argv, "mu:n:t:d:rxof:g:1cl");
  int c;

  while ((c = opts ()) != -1)
    switch (c) {
    case 'g':
      granularity_ = ACE_OS::atoi (opts.opt_arg ());
      if (granularity_ < 1)
        granularity_ = 1;
      break;
    case 'l':
      remote_invocations_ = 0;
      break;
    case 'c':
      context_switch_test_ = 1;
      break;
    case 'm':
      use_multiple_priority_ = 1;
      break;
    case '1':
      one_to_n_test_ = 1;
      break;
    case 'u':
      use_utilization_test_ = 1;
      loop_count_ = ACE_OS::atoi (opts.opt_arg ());
      break;
    case 'f':
      ior_file_ = ACE_OS::strdup (opts.opt_arg ());
      break;
    case 'o':
      oneway_ = 1;
      break;
    case 'x':
      shutdown_ = 1;
      break;
    case 'r':
      thread_per_rate_ = 1;
      break;
    case 'd':
      {
        int datatype = ACE_OS::atoi (opts.opt_arg ());
        switch (datatype)
          {
          case CB_OCTET:
            ACE_DEBUG ((LM_DEBUG,
                        "Testing Octets\n"));
            datatype_ = CB_OCTET;
            break;
          case CB_LONG:
            ACE_DEBUG ((LM_DEBUG,
                        "Testing Longs\n"));
            datatype_ = CB_LONG;
            break;
          case CB_STRUCT:
            ACE_DEBUG ((LM_DEBUG,
                        "Testing Structs\n"));
            datatype_ = CB_STRUCT;
            break;
          case CB_SHORT:
          default:
            ACE_DEBUG ((LM_DEBUG,
                        "Testing Shorts\n"));
            datatype_ = CB_SHORT;
            break;
          }
      }
      continue;
    case 'n':                   // loop count
      loop_count_ = (u_int) ACE_OS::atoi (opts.opt_arg ());
      continue;
    case 't':
      thread_count_ = (u_int) ACE_OS::atoi (opts.opt_arg ());
      continue;
    case '?':
    default:
      ACE_DEBUG ((LM_DEBUG, "usage:  %s\t"
                  "[<ORB OPTIONS>]        // ORB options, e.g., \"-ORBobjrefstyle url\"                               \n\t\t\t"
                  "[-d <datatype>]        // what datatype to use for calls:  Octet=0, Short=1, Long=2, Struct=3      \n\t\t\t"
                  "[-n <num_calls>]       // number of CORBA calls to make.                                           \n\t\t\t"
                  "[-t <num_of_clients>]  // number of client threads to create                                       \n\t\t\t"
                  "[-f <ior_file>]        // specify the file from which we read the object references (iors), if any.\n\t\t\t"
                  "[-r]                   // run thread-per-rate test.                                                \n\t\t\t"
                  "[-o]                   // makes client use oneway calls.  By default, twoway calls are used.       \n\t\t\t"
                  "[-x]                   // makes a call to servant to shutdown                                      \n\t\t\t"
                  "[-u <requests> ]       // run the client utilization test for a number of <requests>               \n\t\t\t"
                  "[-1]                   // run the one-to-n test.                                                   \n\t\t\t"
                  "[-g <granularity>]     // choose the granularity of the timing of CORBA calls                      \n\t\t\t"
                  "[-c]                   // run the number of context switches test.                                 \n\t\t\t"
                  "[-l]                   // use direct function calls, as opposed to CORBA requests.  ONLY to be used with -u option.\n\t\t\t"
                  "[-m]                   // use multiple priorities for the low priority clients.                    \n"
                  ,argv [0]));
      return -1;
    }

  if (thread_per_rate_ == 1)
    thread_count_ = THREAD_PER_RATE_OBJS;

  if (use_utilization_test_ == 1)
    {
      thread_count_ = 1;
      shutdown_ = 1;
      datatype_ = CB_OCTET;
    }

  // Allocate the array of character pointers.
  ACE_NEW_RETURN (iors_,
                  char *[thread_count_],
                  -1);

  if (ior_file_ != 0)
    {
      FILE *ior_file =
        ACE_OS::fopen (ior_file_, "r");

      if (ior_file == 0)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "Task_State::parse_args; "
                           "unable to open IOR file \"%s\"\n",
                           ior_file_),
                          -1);
      char buf[BUFSIZ];
      u_int i;

      for (i = 0;
           ACE_OS::fgets (buf, BUFSIZ, ior_file) != 0
             && i < thread_count_;
           i++)
        {
          ACE_DEBUG ((LM_DEBUG,
                      buf));
          int j = ACE_OS::strlen (buf);

          // This overwrites the '\n' that was read from the file.
          buf[j - 1] = 0;
          iors_[i] = ACE_OS::strdup (buf);
        }

      this->iors_count_ = i;
      ACE_OS::fclose (ior_file);
    }

  // thread_count_ + 2 because there is one utilization thread also
  // wanting to begin at the same time the clients begin && the main
  // thread wants to know when clients will start running to get
  // accurate context switch numbers.

  if (thread_per_rate_ == 0)
    {
      if (use_utilization_test_ == 1)
        // If we are to use the utilization test, include it in the
        // barrier count.  See description of this variable in header
        // file.
        ACE_NEW_RETURN (barrier_,
                        ACE_Barrier (thread_count_ + 2),
                        -1);
      else
        ACE_NEW_RETURN (barrier_,
                        ACE_Barrier (thread_count_ + 1),
                        -1);
    }
  else
    ACE_NEW_RETURN (this->barrier_,
                    ACE_Barrier (thread_count_),
                    -1);
  ACE_NEW_RETURN (this->semaphore_,
                  ACE_SYNCH_SEMAPHORE (0),
                  -1);
  ACE_NEW_RETURN (this->latency_,
                  ACE_timer_t [thread_count_],
                  -1);
  ACE_NEW_RETURN (this->global_jitter_array_,
                  JITTER_ARRAY *[this->thread_count_],
                  -1);
  ACE_NEW_RETURN (this->count_,
                  u_int [thread_count_],
                  -1);
  return 0;
}

Task_State::~Task_State (void)
{
  int i;

  if (this->ior_file_ != 0)
    ACE_OS::free (this->ior_file_);

  // Delete the strduped memory.
  for (i = 0; i < this->iors_count_; i++)
    ACE_OS::free (this->iors_ [i]);

  delete [] this->iors_;
  // Delete the barrier.

  delete this->barrier_;
  delete this->semaphore_;
  delete [] this->latency_;
  delete [] this->ave_latency_;
  delete [] this->global_jitter_array_;
  delete [] this->count_;
}

Client::Client (ACE_Thread_Manager *thread_manager,
                Task_State *ts,
                int argc,
                char **argv,
                u_int id)
  : ACE_Task<ACE_SYNCH> (thread_manager),
    cubit_impl_ (CORBA::ORB::_nil (),
                 PortableServer::POA::_nil ()),
    ts_ (ts),
    num_ (0),
    id_ (id),
    call_count_ (0),
    error_count_ (0),
    my_jitter_array_ (0),
    timer_ (0),
    frequency_ (0),
    latency_ (0),
    argc_ (argc),
    argv_ (argv)
{
}

Client::~Client (void)
{
  delete this->my_jitter_array_;
  delete this->timer_;
}

int
Client::func (u_int i)
{
  return i - 117;
}

void
Client::put_latency (JITTER_ARRAY *jitter,
                     ACE_timer_t latency,
                     u_int thread_id,
                     u_int count)
{
  ACE_MT (ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->ts_->lock_));

  this->ts_->latency_[thread_id] = latency;
  this->ts_->global_jitter_array_[thread_id] = jitter;
  this->ts_->count_[thread_id] = count;

  ACE_DEBUG ((LM_DEBUG,
              "(%t) My latency was %A msec\n",
              latency/ACE_ONE_SECOND_IN_MSECS));
}

// Returns the latency in usecs.
ACE_timer_t
Client::get_high_priority_latency (void)
{
  return (ACE_timer_t) this->ts_->latency_ [0];
}

// Returns the latency in usecs.
ACE_timer_t
Client::get_low_priority_latency (void)
{
  if (this->ts_->thread_count_ == 1)
    return 0;

  ACE_timer_t l = 0;

  for (u_int i = 1;
       i < this->ts_->thread_count_;
       i++)
    l += (ACE_timer_t) this->ts_->latency_[i];

  // Return the average latency for the low priority threads.
  return l / (ACE_timer_t) (this->ts_->thread_count_ - 1);
}

ACE_timer_t
Client::get_latency (u_int thread_id)
{
  return ACE_static_cast (ACE_timer_t,
                          this->ts_->latency_ [thread_id]);
}

// Returns the jitter in usecs.
ACE_timer_t
Client::get_high_priority_jitter (void)
{
  ACE_timer_t jitter = 0.0;
  ACE_timer_t average = get_high_priority_latency ();
  u_int number_of_samples = 0;

  // Compute the standard deviation, i.e., jitter, from the values
  // stored in the global_jitter_array_.

  ACE_Stats stats;

  // We first compute the sum of the squares of the differences each
  // latency has from the average.

  JITTER_ARRAY_ITERATOR iterator  =
    this->ts_->global_jitter_array_[0]->begin ();

  // latency in usecs.
  ACE_timer_t *latency = 0;

  for (iterator.first ();
       iterator.next (latency) == 1;
       iterator.advance ())
    {
      ++number_of_samples;

      ACE_timer_t difference = *latency - average;
      jitter += difference * difference;

      if (stats.sample (ACE_round (*latency)) == -1)
        ACE_DEBUG ((LM_DEBUG, "Error: stats.sample returned -1\n"));

    }

  // Return the square root of the sum of the differences computed
  // above, i.e., jitter.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -