⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatching_modules.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Dispatching_Modules.cpp,v 1.37 2003/04/30 12:30:58 elliott_c Exp

#include "ace/Sched_Params.h"
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Time_Utilities.h"
#include "Memory_Pools.h"

#include "Dispatching_Modules.h"

#if !defined (__ACE_INLINE__)
#include "Dispatching_Modules.i"
#endif /* __ACE_INLINE__ */

#include "tao/Timeprobe.h"

ACE_RCSID (Event,
           Dispatching_Modules,
           "Dispatching_Modules.cpp,v 1.37 2003/04/30 12:30:58 elliott_c Exp")

#if defined (ACE_ENABLE_TIMEPROBES)

static const char *TAO_Dispatching_Modules_Timeprobe_Description[] =
{
  "dispatch (dequeue) the event",
  "push_source_type: Correlation Module",
  "Priority_Dispatching::push - priority requested",
  "Priority_Dispatching::push - priority obtained"
};

enum
{
  // Timeprobe description table start key
  TAO_DISPATCHING_MODULES_DISPATCH_THE_EVENT = 5000,
  TAO_DISPATCHING_MODULES_PUSH_SOURCE_TYPE_CORRELATION_MODULE,
  TAO_DISPATCHING_MODULES_PRIORITY_DISPATCHING_PUSH_PRIORITY_REQUESTED,
  TAO_DISPATCHING_MODULES_PRIORITY_DISPATCHING_PUSH_PRIORITY_OBTAINED
};

// Setup Timeprobes
ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Dispatching_Modules_Timeprobe_Description,
                                  TAO_DISPATCHING_MODULES_DISPATCH_THE_EVENT);

#endif /* ACE_ENABLE_TIMEPROBES */

// ************************************************************

ACE_ES_Dispatch_Request::
ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer,
                         const TAO_EC_Event &event,
                         RtecScheduler::handle_t rt_info)
  : priority_ (0),
    rt_info_ (rt_info),
    dispatching_module_ (0),
    use_single_event_ (0),
    consumer_ (consumer),
    event_set_ (1)
{
  this->event_set_.set (event, 0);
}

void
ACE_ES_Dispatch_Request::make_copy (RtecEventComm::EventSet &dest) const
{
  if (use_single_event_)
    {
      // The RtecEventComm::EventSet will hold a pointer to the
      // buffer, without owning it, thus it is not removed!
      // @@ TODO Check what happens in the collocated case.
      dest.replace (1, 1,
                    ACE_const_cast(RtecEventComm::Event*,
                                   &this->single_event_.event ()),
                    0);
    }
  else if (this->event_set_.size () == 1)
    {
      dest.replace (1, 1,
                    ACE_const_cast(RtecEventComm::Event*,
                                   &this->event_set_[0].event ()),
                    0);
    }
  else
    {
      dest.length (ACE_static_cast (CORBA::ULong, this->event_set_.size ()));

      int c = 0;
      for (CORBA::ULong i = 0; i < this->event_set_.size (); ++i)
        {
          if (this->event_set_[i].empty ())
            continue;
          dest[c] = this->event_set_[i].event ();
          c++;
        }
      dest.length (c);
    }
}

void
ACE_ES_Dispatch_Request::append_event (const TAO_EC_Event& event)
{
  size_t size = this->event_set_.size ();
  if (this->event_set_.size (size + 1) == 0)
    this->event_set_.set (event, size);
}

int
ACE_ES_Dispatch_Request::execute (u_long &command_action)
{
  ACE_TIMEPROBE (TAO_DISPATCHING_MODULES_DISPATCH_THE_EVENT);

  return dispatching_module_->dispatch_event (this, command_action);
}

#if 0
// @@ Memory pools
void *
ACE_ES_Dispatch_Request::operator new (size_t nbytes)
{
  if (nbytes > sizeof (ACE_ES_Dispatch_Request))
    {
      ACE_ERROR ((LM_ERROR, "nbytes = %d, sizeof (ACE_ES_Dispatch_Request_Chunk) = %d.\n",
                  sizeof (ACE_ES_Dispatch_Request)));
      ACE_ASSERT (nbytes <= sizeof (ACE_ES_Dispatch_Request));
    }

  return ACE_ES_Memory_Pools::new_Dispatch_Request ();
}

void
ACE_ES_Dispatch_Request::operator delete (void *mem)
{
  ACE_ES_Memory_Pools::delete_Dispatch_Request (mem);
}
#endif /* 0 */

// ************************************************************

void
ACE_ES_Dispatching_Base::activate (int)
{
}

void
ACE_ES_Dispatching_Base::shutdown (void)
{
  ACE_DEBUG ((LM_DEBUG,
              "EC (%t) ACE_ES_Dispatching_Base module shutting down.\n"));
}

// Just forward the request.  This is basically a hook for the RTU
// stuff.
int
ACE_ES_Dispatching_Base::dispatch_event (ACE_ES_Dispatch_Request *request,
                                         u_long &command_action)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      // Forward the request.
      up_->push (request ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      // No exceptions should be raised (push is a oneway) but we try
      // to print something useful anyway.
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "ACE_ES_Dispatching_Base::dispatch_event");
    }
  ACE_ENDTRY;

  // Tell our caller to release the request.
  command_action = ACE_RT_Task_Command::RELEASE;

  // Return zero so our calling thread does not exit.
  return 0;
}

// ************************************************************

ACE_ES_Priority_Dispatching::ACE_ES_Priority_Dispatching (ACE_EventChannel *channel)
  : ACE_ES_Dispatching_Base (channel),
    notification_strategy_ (this, channel->timer_module ()),
    highest_priority_ (0),
    shutdown_ (0),
    threads_per_queue_ (0)
{
  // If we're single threaded, then we need to use the notification strategy.
  if ((threads_per_queue_ == 0) &&
      (notification_strategy_.open () == -1))
    ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ES_Priority_Dispatching"));

  // Initialize the queues.
  for (int x = 0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
    {
      this->queues_[x] = 0;
    }

  this->scheduler_ =
    this->channel_->scheduler ();
}

ACE_ES_Priority_Dispatching::~ACE_ES_Priority_Dispatching (void)
{
}


void
ACE_ES_Priority_Dispatching::initialize_queues (void)
{
  for (int x = 0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
    {
      if (this->queues_[x] != 0)
        continue;

      // Convert ACE_Scheduler_Rate (it's really a period, not a rate!)
      // to a form we can easily work with.
      ACE_Time_Value period_tv;
      ORBSVCS_Time::TimeT_to_Time_Value (period_tv, ACE_Scheduler_Rates[x]);

      RtecScheduler::Period_t period = period_tv.sec () * 10000000 +
                                       period_tv.usec () * 10;

      ACE_NEW (this->queues_[x],
               ACE_ES_Dispatch_Queue (this,
                                      &this->notification_strategy_,
                                      this->scheduler_.in ()));
      this->queues_[x]->thr_mgr (&this->thr_mgr_);

      if ( this->queues_[x]->open_queue (period,
                                         threads_per_queue_) == -1)
        {
          ACE_ERROR ((LM_ERROR, "%p.\n",
                      "ACE_ES_Priority_Dispatching::initialize_queues"));
          return;
        }

      this->queue_count_[x] = 1;
    }
  highest_priority_ = ACE_Scheduler_MAX_PRIORITIES - 1;
}

void
ACE_ES_Priority_Dispatching::connected (ACE_Push_Consumer_Proxy *consumer
                                        ACE_ENV_ARG_DECL)
{
  down_->connected (consumer ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // This code does dynamic allocation of channel dispatch threads.
  // It requires that consumer's priorities are known at connection
  // time and that threads can request priorities from the scheduler
  // at run-time.  These are both antithetical to static scheduling.
  // The constructor now allocates a thread per rate group.
#if 0
  // We have to tell the lower portions of the channel about the
  // consumer first.  This is so that any changes to the consumer's
  // qos will take effect when we get the dispatch priority.
  down_->connected (consumer ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  RtecScheduler::OS_Priority priority =
    ACE_Scheduler::instance ().preemption_priority (consumer->qos ().rt_info_);

  {
    ACE_ES_GUARD ace_mon (lock_);

    // If a queue has not been created for the consumer's priority,
    // create one.
    if (queues_[priority] == 0)
      {
        // Allocate a new dispatch queue.
        queues_[priority] = new ACE_ES_Dispatch_Queue (this, &notification_strategy_);
        if (queues_[priority] == 0)
          TAO_THROW (CORBA::NO_MEMORY (0, ,
                                          "ACE_ES_Priority_Dispatching::connected"));

        // Initialize the dispatch queue corresponding to the
        // consumer's priority.  With a full implementation of the
        // run-time scheduler, the dispatch queue can find it's
        // scheduling qos online.  However, we pass the rate in case
        // one is not found.  The rate can be used to obtain the
        // proper priority.  If threads_per_queue_ == 0, then these
        // queues will be passive.  Otherwise, they will be active.
        // This switches us between MT_ORB and ST_ORB.  If we're
        // single-threaded, this registers us with the ReactorEx using
        // our notification_strategy_.  If we're multi-threaded, this
        // spawns the threads.
        if (queues_[priority]->open_queue (priority,
                                           threads_per_queue_) == -1)
          TAO_THROW (DISPATCH_ERROR (0, ,
                                     "ACE_ES_Priority_Dispatching::connected:"
                                     "queue open failed.\n"));

        // When this goes down to 0, we will shutdown the queue.
        queue_count_[priority] = 1;

        // Keep track of this to optimize handle_signal.
        if (priority > highest_priority_)
          highest_priority_ = priority;

        ACE_DEBUG ((LM_DEBUG,
                    "EC (%t) Created queue priority = %d.\n", priority));
      }
    else
      queue_count_[priority]++;
  }
#endif
}

void
ACE_ES_Priority_Dispatching::disconnected (ACE_Push_Consumer_Proxy *consumer)
{
  // We'll not dynamically close down queues.
  ACE_UNUSED_ARG (consumer);

#if 0
  RtecScheduler::OS_Priority priority =
    ACE_Scheduler::instance ().preemption_priority (consumer->qos ().rt_info_);

  {
    ACE_ES_GUARD ace_mon (lock_);

    // If there are no more users of this queue, then we *could* shut
    // it down.  However, we will not.
    if (--queue_count_[priority] <= 0)
      {
        ACE_DEBUG ((LM_DEBUG, "EC (%t) unused dispatch queue priority = %d, "
                    "is_empty = %d.\n",
                    priority, queues_[priority]->msg_queue ()->is_empty ()));

        queues_[priority]->shutdown_task ();
      }
  }
#endif
}

// @@ This method could have a bypass optimization.
// <request> has been dynamically allocated by the filtering module.
void
ACE_ES_Priority_Dispatching::push (ACE_ES_Dispatch_Request *request
                                   ACE_ENV_ARG_DECL)
{
  ACE_TIMEPROBE (TAO_DISPATCHING_MODULES_PUSH_SOURCE_TYPE_CORRELATION_MODULE);

  RtecScheduler::OS_Priority thread_priority;
  RtecScheduler::Preemption_Subpriority_t subpriority;
  RtecScheduler::Preemption_Priority_t preemption_priority;

  if (request->rt_info () != 0)
    {
      ACE_TIMEPROBE (TAO_DISPATCHING_MODULES_PRIORITY_DISPATCHING_PUSH_PRIORITY_REQUESTED);
#if 1
      this->scheduler_->priority
        (request->rt_info (),
         thread_priority,
         subpriority,
         preemption_priority
          ACE_ENV_ARG_PARAMETER);
#else
      ACE_Scheduler_Factory::server ()->priority
        (request->rt_info (),
         thread_priority,
         subpriority,
         preemption_priority
          ACE_ENV_ARG_PARAMETER);
#endif
      ACE_TIMEPROBE (TAO_DISPATCHING_MODULES_PRIORITY_DISPATCHING_PUSH_PRIORITY_OBTAINED);
      ACE_CHECK;
    }
  else
    {
      thread_priority =
        ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
                                        ACE_SCOPE_PROCESS);
      subpriority = ACE_Scheduler_MIN_SUB_PRIORITY;
      preemption_priority = ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
    }

  // If it's a request to forward an event, it needs a reference to us
  // to call dispatch_event.
  request->set (this, preemption_priority, subpriority);

  // Make sure that a queue exists for this priority.
  if (queues_[preemption_priority] == 0)
    {
      ACE_ERROR ((LM_ERROR, "EC (%t): Push to closed queue %d,"
                  " dropping event.\n", preemption_priority));
      return;
#if 0
      ACE_THROW (SYNC_ERROR (0, , "ACE_ES_Priority_Dispatching::push"));
#endif /* 0 */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -