mif_scheduler.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 725 行 · 第 1/2 页

CPP
725
字号
MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr request_info,
				RTScheduling::Current::IdType_out guid_out,
				CORBA::String_out,
				CORBA::Policy_out sched_param_out,
				CORBA::Policy_out /*implicit_sched_param*/
				ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{

  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"MIF_Scheduler::receive_request\n"));

  IOP::ServiceContext* serv_cxt =
    request_info->get_request_service_context (Server_Interceptor::SchedulingInfo);

  if (serv_cxt != 0)
    { 
      ACE_DEBUG ((LM_DEBUG,
		  "Got scheduling info\n"));
      
      RTScheduling::Current::IdType* guid;
      ACE_NEW (guid,
	       RTScheduling::Current::IdType);

      guid->length (sizeof(long));
      ACE_OS::memcpy (guid->get_buffer (),
		      serv_cxt->context_data.get_buffer (),
		      sizeof (long));

      int gu_id;
      ACE_OS::memcpy (&gu_id,
		      guid->get_buffer (),
		      guid->length ());

      ACE_DEBUG ((LM_DEBUG,
		  "MIF_Scheduler::receive_request %d\n",
		  gu_id));


      CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (long));
      int i = sizeof (long);
      for (unsigned int j = 0;j < sizeof (int);j++)
	{
	  int_buf [j] = serv_cxt->context_data [i++];
	}

      int importance;
      ACE_OS::memcpy (&importance,
		      int_buf,
		      sizeof (int));

      guid_out.ptr () = guid;
      sched_param_out.ptr () = DT_TEST::instance ()->scheduler ()->create_segment_scheduling_parameter (importance);

      if (TAO_debug_level > 0)
	ACE_DEBUG ((LM_DEBUG,
		    "%t The Guid is %d Importance is %d\n",
		    gu_id,
		    importance));

      DT* new_dt;
      ACE_NEW (new_dt,
	       DT (this->lock_,
		   gu_id));

      new_dt->msg_priority (importance);
      lock_.acquire ();
      ready_que_.enqueue_prio (new_dt);
      new_dt->suspend ();
      lock_.release ();
    }
}

void
MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr
			   ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{

  RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  int count;
  ACE_OS::memcpy (&count,
		  guid->get_buffer (),
		  guid->length ());

  ACE_DEBUG ((LM_DEBUG,
	      "MIF_Scheduler::send_reply %d\n",
              count));

  if (ready_que_.message_count () > 0)
    {
      DT* run_dt;
      ACE_Message_Block* msg;
      ready_que_.dequeue_head (msg);
      run_dt = ACE_dynamic_cast (DT*, msg);
      lock_.acquire ();
      run_dt->resume ();
      lock_.release ();
    }
}

void
MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr
			       ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
  if (ready_que_.message_count () > 0)
    {
      DT* run_dt;
      ACE_Message_Block* msg;
      ready_que_.dequeue_head (msg);
      run_dt = ACE_dynamic_cast (DT*, msg);
      lock_.acquire ();
      run_dt->resume ();
      lock_.release ();
    }
}

void
MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr
			   ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
  if (TAO_debug_level > 0)
    {
      RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      int count;
      ACE_OS::memcpy (&count,
		      guid->get_buffer (),
		      guid->length ());


      ACE_DEBUG ((LM_DEBUG,
		  "MIF_Scheduler::send_other %d\n",
		  count));
    }

  if (ready_que_.message_count () > 0)
    {
      DT* run_dt;
      ACE_Message_Block* msg;
      ready_que_.dequeue_head (msg);
      run_dt = ACE_dynamic_cast (DT*, msg);
      lock_.acquire ();
      run_dt->resume ();
      lock_.release ();
    }
}

void
MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
			  ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
}

void
MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr
			      ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{

  MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
    MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
  ACE_CHECK;

  int importance = sched_param_var->importance ();
  CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
  ACE_OS::memcpy (int_buf,
		  &importance,
		  sizeof (int));

  RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);

  int gu_id;
  ACE_OS::memcpy (&gu_id,
		  guid->get_buffer (),
		  guid->length ());

  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"MIF_Scheduler::receive_reply Guid = %d Imp = %d\n",
		gu_id,
		importance));


  DT* new_dt;
  ACE_NEW (new_dt,
	   DT (this->lock_,
	       gu_id));

  new_dt->msg_priority (importance);

  lock_.acquire ();
  ready_que_.enqueue_prio (new_dt);
  int priority;
  ACE_hthread_t current;
  ACE_Thread::self (current);
  if (ACE_Thread::getprio (current, priority) == -1)
    return;
  
  current_->the_priority (priority - 1
			  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
  new_dt->suspend ();
  lock_.release ();
}

void
MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr
				  ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
  MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
    MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
  ACE_CHECK;

  int importance = sched_param_var->importance ();
  CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
  ACE_OS::memcpy (int_buf,
		  &importance,
		  sizeof (int));

  RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);

  int gu_id;
  ACE_OS::memcpy (&gu_id,
		  guid->get_buffer (),
		  guid->length ());

  DT* new_dt;
  ACE_NEW (new_dt,
	   DT (this->lock_,
	       gu_id));

  new_dt->msg_priority (importance);

  lock_.acquire ();
  ready_que_.enqueue_prio (new_dt);
  
  int priority;
  ACE_hthread_t current;
  ACE_Thread::self (current);
  if (ACE_Thread::getprio (current, priority) == -1)
    return;
  
  current_->the_priority (priority - 1
			  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
  new_dt->suspend ();
  lock_.release ();
}

void
MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr
			      ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
    MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
  ACE_CHECK;

  int importance = sched_param_var->importance ();
  CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
  ACE_OS::memcpy (int_buf,
		  &importance,
		  sizeof (int));

  RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);

  int gu_id;
  ACE_OS::memcpy (&gu_id,
		  guid->get_buffer (),
		  guid->length ());

  DT* new_dt;
  ACE_NEW (new_dt,
	   DT (this->lock_,
	       gu_id));

  new_dt->msg_priority (importance);

  lock_.acquire ();
  ready_que_.enqueue_prio (new_dt);

  int priority;
  ACE_hthread_t current;
  ACE_Thread::self (current);
  if (ACE_Thread::getprio (current, priority) == -1)
    return;
  
  current_->the_priority (priority - 1
			  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
  new_dt->suspend ();
  lock_.release ();
}

void
MIF_Scheduler::cancel (const RTScheduling::Current::IdType &
		       ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
}

CORBA::PolicyList*
MIF_Scheduler::scheduling_policies (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  return 0;
}

void
MIF_Scheduler::scheduling_policies (const CORBA::PolicyList &
				    ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
}

CORBA::PolicyList*
MIF_Scheduler::poa_policies (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
	return 0;
}

char *
MIF_Scheduler::scheduling_discipline_name (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
	return 0;
}

RTScheduling::ResourceManager_ptr
MIF_Scheduler::create_resource_manager (const char *,
					CORBA::Policy_ptr
					ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
	return 0;
}

void
MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
					 const char *,
					 CORBA::Policy_ptr
					 ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?