⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 observer.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// Observer.cpp,v 1.15 2002/01/29 20:20:54 okellogg Exp

#include "Observer.h"
#include "Consumer.h"
#include "Supplier.h"
#include "orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/Event/EC_Default_Factory.h"
#include "ace/Arg_Shifter.h"
#include "ace/High_Res_Timer.h"

ACE_RCSID(EC_Tests_Basic, Observer, "Observer.cpp,v 1.15 2002/01/29 20:20:54 okellogg Exp")

int
main (int argc, char *argv [])
{
  TAO_EC_Default_Factory::init_svcs ();
  EC_Master master;
  return master.run (argc, argv);
}

// ****************************************************************

EC_Master::EC_Master (void)
  :  seed_ (0),
     n_channels_ (4),
     channels_ (0)
{
}

EC_Master::~EC_Master (void)
{
  if (this->channels_ != 0)
    {
      for (int i = 0; i < this->n_channels_; ++i)
        delete this->channels_[i];
      delete[] this->channels_;
    }
}

int
EC_Master::run (int argc, char* argv[])
{
  ACE_TRY_NEW_ENV
    {
      // Calibrate the high resolution timer *before* starting the
      // test.
      ACE_High_Res_Timer::calibrate ();

      this->seed_ = ACE_OS::time (0);

      this->initialize_orb_and_poa (argc, argv ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (this->parse_args (argc, argv))
        return 1;

      ACE_DEBUG ((LM_DEBUG,
                  "The seed value is %d\n", this->seed_));

      ACE_NEW_RETURN (this->channels_,
                      EC_Observer*[this->n_channels_],
                      1);

      {
        for (int i = 0; i != this->n_channels_; ++i)
          {
            ACE_OS::rand_r (this->seed_);
            ACE_NEW_RETURN (this->channels_[i],
                            EC_Observer (this,
                                         this->seed_,
                                         this->orb_.in (),
                                         this->root_poa_.in (),
                                         i),
                            1);
          }
      }

      {
        char** targv;
        ACE_NEW_RETURN (targv, char*[argc], 1);

        for (int i = 0; i != this->n_channels_; ++i)
          {
            int targc = argc;
            for (int j = 0; j < targc; ++j)
              targv[j] = argv[j];
            this->channels_[i]->run_init (targc, targv ACE_ENV_ARG_PARAMETER);
            ACE_TRY_CHECK;
          }
        delete[] targv;
      }

      {
        for (int i = 0; i != this->n_channels_; ++i)
          {
            this->channels_[i]->execute_test (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
          }
      }

      if (ACE_Thread_Manager::instance ()->wait () == -1)
        {
          ACE_ERROR ((LM_ERROR,
                      "EC_Master (%P|%t) thread manager wait failed\n"));
          return 1;
        }

      {
        for (int i = 0; i != this->n_channels_; ++i)
          {
            this->channels_[i]->dump_results ();
          }
      }

      {
        for (int i = 0; i != this->n_channels_; ++i)
          {
            this->channels_[i]->run_cleanup (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
          }
      }

      {
        for (int i = 0; i != this->n_channels_; ++i)
          {
            this->channels_[i]->disconnect_clients (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
            this->channels_[i]->shutdown_clients (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
            this->channels_[i]->destroy_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
            this->channels_[i]->deactivate_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
            this->channels_[i]->cleanup_tasks ();
            this->channels_[i]->cleanup_suppliers ();
            this->channels_[i]->cleanup_consumers ();
            this->channels_[i]->cleanup_ec ();
          }
      }

      this->root_poa_->destroy (1,
                                1
                                ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      this->orb_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
   }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "EC_Driver::run");
    }
  ACE_CATCHALL
    {
      ACE_ERROR ((LM_ERROR, "EC_Driver (%P|%t) non-corba exception raised\n"));
    }
  ACE_ENDTRY;
  return 0;
}

void
EC_Master::initialize_orb_and_poa (int &argc, char* argv[]
                                   ACE_ENV_ARG_DECL)
{
  this->orb_ =
    CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Object_var poa_object =
    this->orb_->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  if (CORBA::is_nil (poa_object.in ()))
    {
      ACE_ERROR ((LM_ERROR,
                  "EC_Driver (%P|%t) Unable to initialize the POA.\n"));
      return;
    }

  this->root_poa_ =
    PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  PortableServer::POAManager_var poa_manager =
    this->root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}

int
EC_Master::parse_args (int &argc, char *argv [])
{
  ACE_Arg_Shifter arg_shifter (argc, argv);

  while (arg_shifter.is_anything_left ())
    {
      const char *arg = arg_shifter.get_current ();

      if (ACE_OS::strcmp (arg, "-channels") == 0)
        {
          arg_shifter.consume_arg ();
          this->n_channels_ = ACE_OS::atoi (arg_shifter.get_current ());
        }
      else if (ACE_OS::strcmp (arg, "-seed") == 0)
        {
          arg_shifter.consume_arg ();
          this->seed_ = ACE_OS::atoi (arg_shifter.get_current ());
        }

      arg_shifter.ignore_arg ();
    }
  return 0;
}

int
EC_Master::channel_count (void) const
{
  return this->n_channels_;
}

EC_Observer*
EC_Master::channel (int i) const
{
  return this->channels_[i];
}

// ****************************************************************

EC_Observer::EC_Observer (EC_Master *master,
                          ACE_RANDR_TYPE seed,
                          CORBA::ORB_ptr orb,
                          PortableServer::POA_ptr root_poa,
                          int id)
  : master_ (master),
    seed_ (seed),
    id_ (id),
    gwys_ (0)
{
  this->orb_ = CORBA::ORB::_duplicate (orb);
  this->root_poa_ = PortableServer::POA::_duplicate (root_poa);
}

EC_Observer::~EC_Observer (void)
{
  if (this->gwys_ != 0)
    delete[] this->gwys_;
}

void
EC_Observer::initialize_orb_and_poa (int&, char*[]
                                     ACE_ENV_ARG_DECL_NOT_USED)
{
}

int
EC_Observer::parse_args (int& argc, char* argv[])
{
  return this->EC_Driver::parse_args (argc, argv);
}

void
EC_Observer::print_args (void) const
{
  this->EC_Driver::print_args ();
}

void
EC_Observer::print_usage (void)
{
  this->EC_Driver::print_usage ();
}

void
EC_Observer::execute_test (ACE_ENV_SINGLE_ARG_DECL)
{
  int peer_count = this->master_->channel_count ();
  ACE_NEW (this->gwys_, TAO_EC_Gateway_IIOP[peer_count]);

  for (int i = 0; i != peer_count; ++i)
    {
      if (i == this->id_)
        continue;

      RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
        this->master_->channel (i)->event_channel_.in ();

      this->gwys_[i].init (rmt_ec,
                           this->event_channel_.in ()
                           ACE_ENV_ARG_PARAMETER);

      RtecEventChannelAdmin::Observer_var obs =
        this->gwys_[i]._this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      RtecEventChannelAdmin::Observer_Handle h =
        rmt_ec->append_observer (obs.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->gwys_[i].observer_handle (h);

      ACE_CHECK;
    }

  if (this->allocate_tasks () == -1)
    return;

  this->activate_tasks (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  if (this->verbose ())
    ACE_DEBUG ((LM_DEBUG, "EC_Observer[%d] (%P|%t) suppliers are active\n",
                this->id_));
}

void
EC_Observer::run_cleanup (ACE_ENV_SINGLE_ARG_DECL)
{
  for (int j = 0; j != this->master_->channel_count (); ++j)
    {
      if (j == this->id_)
        continue;

      RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
        this->master_->channel (j)->event_channel_.in ();
      rmt_ec->remove_observer (this->gwys_[j].observer_handle ()
                               ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->gwys_[j].shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
EC_Observer::dump_results (void)
{
  ACE_DEBUG ((LM_DEBUG, "===== Results for %d =====\n", this->id_));

  ACE_Throughput_Stats throughput;
  ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
  for (int j = 0; j < this->n_consumers_; ++j)
    {
      this->consumers_[j]->accumulate (throughput);
    }
  ACE_DEBUG ((LM_DEBUG, "\n"));

  ACE_Throughput_Stats suppliers;
  for (int i = 0; i < this->n_suppliers_; ++i)
    {
      this->suppliers_[i]->accumulate (suppliers);
    }

  ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
  throughput.dump_results ("EC_Consumer/totals", gsf);

  ACE_DEBUG ((LM_DEBUG, "\n"));
  suppliers.dump_results ("EC_Supplier/totals", gsf);
}

void
EC_Observer::connect_consumer (
    RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin,
    int i
    ACE_ENV_ARG_DECL)
{
  if (i == 0)
    {
      this->EC_Driver::connect_consumer (consumer_admin, i
                                         ACE_ENV_ARG_PARAMETER);
      return;
    }
  unsigned int x = ACE_OS::rand_r (this->seed_);
  if (x < RAND_MAX / 8)
    this->EC_Driver::connect_consumer (consumer_admin, i
                                       ACE_ENV_ARG_PARAMETER);
}

void
EC_Observer::consumer_push (void*,
                            const RtecEventComm::EventSet&
                            ACE_ENV_ARG_DECL)
{
  unsigned int x = ACE_OS::rand_r (this->seed_);
  if (x < (RAND_MAX / 64))
    {
      if (this->verbose ())
        ACE_DEBUG ((LM_DEBUG,
                    "EC_Observer[%d] (%P|%t) reconnecting\n", this->id_));

      RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
        this->event_channel_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      for (int i = 1; i < this->n_consumers_; ++i)
        {
          ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);

          if (this->consumers_[i]->connected ())
            {
              this->consumers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
              ACE_CHECK;
            }
          else
            {
              this->EC_Driver::connect_consumer (consumer_admin.in (),
                                                 i ACE_ENV_ARG_PARAMETER);
              ACE_CHECK;
            }
        }
    }
}


#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -