mif_scheduler.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 725 行 · 第 1/2 页

CPP
725
字号
//MIF_Scheduler.cpp,v 1.4 2003/09/22 15:56:34 yamuna Exp

#include "MIF_Scheduler.h"
#include "ace/Atomic_Op.h"
#include "tao/RTScheduling/Request_Interceptor.h"
#include "test.h"

ACE_Atomic_Op<TAO_SYNCH_MUTEX, long> server_guid_counter;

DT::DT (TAO_SYNCH_MUTEX &lock,
	int guid)
  : dt_cond_ (lock),
    guid_ (guid),
    eligible_ (0)
{
}

void
DT::suspend (void)
{
  eligible_ = 0;
  while (!eligible_)
    this->dt_cond_.wait ();
}

void
DT::resume (void)
{
  eligible_ = 1;
  this->dt_cond_.signal ();
}

CORBA::Short
Segment_Sched_Param_Policy::importance (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  return this->importance_;
}

void
Segment_Sched_Param_Policy::importance (CORBA::Short importance
                                        ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  this->importance_ = importance;
}

MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb)
  : wait_cond_ (lock_),
    wait_ (0)
{
  ACE_TRY_NEW_ENV
    {
      CORBA::Object_var object =
        orb->resolve_initial_references ("RTScheduler_Current"
                                         ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      
      this->current_ =
        RTScheduling::Current::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      
      wait_ = 0;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Caught exception:");
    }
  ACE_ENDTRY;
}

MIF_Scheduler::~MIF_Scheduler (void)
{
}

void
MIF_Scheduler::incr_thr_count (void)
{
  lock_.acquire ();
  wait_++;
  lock_.release ();
}

void
MIF_Scheduler::wait (void)
{
  lock_.acquire ();
  while (wait_ > 0)
    wait_cond_.wait ();

  ACE_DEBUG ((LM_DEBUG,
	      "After Wait %d\n",
	      wait_));

  lock_.release ();
}

void
MIF_Scheduler::resume_main (void)
{
  wait_--;
  wait_cond_.signal ();
}

MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr
MIF_Scheduler::create_segment_scheduling_parameter (CORBA::Short importance
                                                    ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr segment_policy;
  ACE_NEW_THROW_EX (segment_policy,
                    Segment_Sched_Param_Policy,
                    CORBA::NO_MEMORY (
				      CORBA::SystemException::_tao_minor_code (
				       TAO_DEFAULT_MINOR_CODE,
				       ENOMEM),
				      CORBA::COMPLETED_NO));

  segment_policy->importance (importance);

  return segment_policy;
}


void
MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
					     const char *,
					     CORBA::Policy_ptr sched_policy,
					     CORBA::Policy_ptr
					     ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
  int count;
  ACE_OS::memcpy (&count,
		  this->current_->id ()->get_buffer (),
		  this->current_->id ()->length ());


  MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
    MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);

  CORBA::Short desired_priority = sched_param->importance (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"%t MIF_Scheduler::begin_scheduling_segment - Importance %d\n",
		desired_priority));


  if (desired_priority != 100)
    {
      //NOT Main Thread
      DT* new_dt;
      ACE_NEW (new_dt,
	       DT (this->lock_,
		   count));

      new_dt->msg_priority (desired_priority);
      lock_.acquire ();
      ready_que_.enqueue_prio (new_dt);
      resume_main ();
      new_dt->suspend ();
      lock_.release ();
    }
}

void
MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
							   const char *name,
							   CORBA::Policy_ptr sched_param,
							   CORBA::Policy_ptr implicit_sched_param
							   ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
  this->begin_new_scheduling_segment (guid,
				      name,
				      sched_param,
				      implicit_sched_param
				      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
					  const char* /*name*/,
					  CORBA::Policy_ptr sched_policy,
					  CORBA::Policy_ptr /*implicit_sched_param*/
					  ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
  int count;
  ACE_OS::memcpy (&count,
		  this->current_->id ()->get_buffer (),
		  this->current_->id ()->length ());

  MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
    MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);

  CORBA::Short desired_priority = sched_param->importance (ACE_ENV_SINGLE_ARG_PARAMETER);

  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"%t MIF_Scheduler::update_scheduling_segment - Importance %d\n",
		desired_priority));

  DT* new_dt;
  ACE_NEW (new_dt,
	   DT (this->lock_,
	       count));

  new_dt->msg_priority (desired_priority);

  if (ready_que_.message_count () > 0)
    {
      DT* run_dt;
      ACE_Message_Block* msg;
      ready_que_.dequeue_head (msg);
      run_dt = ACE_dynamic_cast (DT*, msg);
      if ((desired_priority == 100) || run_dt->msg_priority () >= (unsigned int)desired_priority)
	{
	  ready_que_.enqueue_prio (new_dt);
	  lock_.acquire ();
	  run_dt->resume ();
	  new_dt->suspend ();
	  lock_.release ();
	}
      else
	{
	  ready_que_.enqueue_prio (run_dt);
	  delete new_dt;
	}
    }
  else delete new_dt;
}

void
MIF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
				       const char *
				       ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  int count;
  ACE_OS::memcpy (&count,
		  guid.get_buffer (),
		  guid.length ());

  ACE_DEBUG ((LM_DEBUG,
	      "MIF_Scheduler::end_scheduling_segment %d\n",
		  count));

  if (ready_que_.message_count () > 0)
    {
      DT* run_dt;
      ACE_Message_Block* msg;
      ready_que_.dequeue_head (msg);
      run_dt = ACE_dynamic_cast (DT*, msg);
      lock_.acquire ();
      run_dt->resume ();
      lock_.release ();
    }
}

void
MIF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
							 const char *,
							 CORBA::Policy_ptr
							 ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
}

void
MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr request_info
			     ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
  MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
    MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
  ACE_CHECK;

  IOP::ServiceContext* srv_con = new IOP::ServiceContext;
  srv_con->context_id = Client_Interceptor::SchedulingInfo;

  int guid_length = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER)->length ();
  ACE_CHECK;

  RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);

  CORBA::Octet *seq_buf = CORBA::OctetSeq::allocbuf (sizeof (guid_length));
  ACE_OS::memcpy (seq_buf,
		  guid->get_buffer (),
		  guid_length);

  int cxt_data_length = sizeof (int) + guid_length;
  srv_con->context_data.length (cxt_data_length);

  int i = 0;
  for (;i < guid_length;i++)
    {
      srv_con->context_data [i] = seq_buf [i];
    }

  int importance = sched_param_var->importance ();
  CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
  ACE_OS::memcpy (int_buf,
		  &importance,
		  sizeof (int));

  int j = 0;
  for (;i < cxt_data_length;i++)
    {
      srv_con->context_data [i] = int_buf [j++];
    }

  request_info->add_request_service_context (*srv_con,
					     0
					     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  lock_.acquire ();
  if (ready_que_.message_count () > 0)
    {
      int priority;
      ACE_hthread_t current;
      ACE_Thread::self (current);
      if (ACE_Thread::getprio (current, priority) == -1)
	return;
      
      ACE_DEBUG ((LM_DEBUG,
		  "Initial thread priority is %d %d\n",
		  priority,
		  ACE_DEFAULT_THREAD_PRIORITY));

      current_->the_priority (priority + 1
			      ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      ACE_Thread::self (current);
      if (ACE_Thread::getprio (current, priority) == -1)
	return;

      ACE_DEBUG ((LM_DEBUG,
		  "Bumped thread priority is %d\n",
		  priority));
      
      
      DT* run_dt;
      ACE_Message_Block* msg;
      ready_que_.dequeue_head (msg);
      run_dt = ACE_dynamic_cast (DT*, msg);
      run_dt->resume ();
    }
  lock_.release ();

}

void

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?