muf_scheduler.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 759 行 · 第 1/2 页

CPP
759
字号
      CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
                                                 sc->context_data.length (),
                                                 sc->context_data.get_buffer (),
                                                 0);
      CORBA::Any sc_qos_as_any;
      sc_qos_as_any = *codec_->decode (oc_seq);
      //Don't store in a _var, since >>= returns a pointer to an
      //internal buffer and we are not supposed to free it.
      sc_qos_as_any >>= sc_qos_ptr;

      deadline  = sc_qos_ptr->deadline;
      criticality = sc_qos_ptr->criticality;
      exec_time = sc_qos_ptr->estimated_initial_execution_time;

      guid.length (sc_qos_ptr->guid.length ());
      guid_copy (guid, sc_qos_ptr->guid);

      ACE_NEW (guid_out.ptr (),
	       RTScheduling::Current::IdType);
      guid_out.ptr ()->length (guid.length ());
      *(guid_out.ptr ()) = guid;

#ifdef KOKYU_DSRT_LOGGING
      int int_guid;
      ACE_OS::memcpy (&int_guid,
                      guid.get_buffer (),
                      guid.length ());
      ACE_DEBUG ((LM_DEBUG,
                  "(%t|%T): Criticality = %d, guid = %d "
                  "in recvd service context\n",
                  criticality,
                  int_guid));
#endif
      MUF_Scheduling::SchedulingParameter sched_param;
      sched_param.criticality = criticality;
      sched_param.deadline = deadline;
      sched_param_out = this->create_scheduling_parameter (sched_param);
    }

  MUF_Scheduler_Traits::QoSDescriptor_t qos;
  qos.criticality_ = criticality;
  qos.deadline_ = deadline;
  qos.exec_time_ = exec_time;

  this->kokyu_dispatcher_->schedule (guid, qos);

#ifdef KOKYU_DSRT_LOGGING
  ACE_DEBUG ((LM_DEBUG, "(%t|%T): receive_request interceptor done\n"));
#endif

}

void
MUF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
			  ACE_ENV_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException,
		   PortableInterceptor::ForwardRequest))
{
}

void
MUF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri
                           ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{

  Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;

  CORBA::String_var operation = ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

#ifdef KOKYU_DSRT_LOGGING
  ACE_DEBUG ((LM_DEBUG,
              "(%t|%T): send_reply from \"%s\"\n",
              ri->operation ()));
#endif

  // Make the context to send the context to the target
  IOP::ServiceContext sc;
  sc.context_id = Server_Interceptor::SchedulingInfo;


  CORBA::Long criticality;
  TimeBase::TimeT deadline,exec_time;

  CORBA::Policy_ptr sched_policy =
    this->current_->scheduling_parameter(ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  if (CORBA::is_nil (sched_policy))
  {
#ifdef KOKYU_DSRT_LOGGING
    ACE_DEBUG ((LM_DEBUG, 
                "(%t|%T): sched_policy nil.\n "));
#endif
    //24 hrs from now - infinity
    ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
    deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
    exec_time = 0;
    criticality = 0;
  }
  else
    {
#ifdef KOKYU_DSRT_LOGGING
      ACE_DEBUG ((LM_DEBUG, 
                  "(%t|%T):sched_policy not nil. ",
                  "sched params set\n"));
#endif
      MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
        MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
      MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();


      sc_qos.guid.length (this->current_->id ()->length ());
      guid_copy (sc_qos.guid, *(this->current_->id ()));

      deadline = sched_param->deadline;
      exec_time = sched_param->estimated_initial_execution_time;
      criticality = sched_param->criticality;
      sc_qos.deadline = deadline;
      sc_qos.estimated_initial_execution_time = exec_time;
      sc_qos.criticality = criticality;

      CORBA::Any sc_qos_as_any;
      sc_qos_as_any <<= sc_qos;

      sc.context_data = ACE_reinterpret_cast(CORBA::OctetSeq &,
                                             *codec_->encode (sc_qos_as_any));

      // Add this context to the service context list.
      ri->add_reply_service_context (sc, 1 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

#ifdef KOKYU_DSRT_LOGGING
      ACE_DEBUG ((LM_DEBUG, "(%t|%T):reply sc added\n"));
#endif
    }

  kokyu_dispatcher_->update_schedule (*(this->current_->id ()), 
                                      Kokyu::BLOCK);

#ifdef KOKYU_DSRT_LOGGING
  ACE_DEBUG ((LM_DEBUG, "(%t|%T): send_reply interceptor done\n"));
#endif
}

void
MUF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri
                               ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
  send_reply (ri ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
MUF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri
                           ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
  send_reply (ri ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
MUF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri
                              ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  RTScheduling::Current::IdType guid;

  CORBA::String_var operation = ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Object_var target = ri->target (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  ACE_CString opname = operation.in ();
#ifdef KOKYU_DSRT_LOGGING
  ACE_DEBUG ((LM_DEBUG,
              "(%t|%T):receive_reply from "
              "\"%s\"\n",
              opname.c_str ()));
#endif

  // Check that the reply service context was received as
  // expected.
  IOP::ServiceContext_var sc =
    ri->get_reply_service_context (Client_Interceptor::SchedulingInfo 
                                   ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Long criticality;
  TimeBase::TimeT deadline,exec_time;

  if (sc.ptr () == 0)
    {
      ACE_DEBUG ((LM_DEBUG, "service context was not filled\n"));
      //24 hrs from now - infinity
      ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
      deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
      exec_time = 0;
      criticality = 0;
    }
  else
    {
      CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
                                                sc->context_data.length (),
                                                sc->context_data.get_buffer (),
                                                0);

      //Don't store in a _var, since >>= returns a pointer to an internal buffer
      //and we are not supposed to free it.
      Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr;
      CORBA::Any sc_qos_as_any;
      sc_qos_as_any = *codec_->decode (oc_seq);
      sc_qos_as_any >>= sc_qos_ptr;

      deadline  = sc_qos_ptr->deadline;
      criticality = sc_qos_ptr->criticality;
      exec_time = sc_qos_ptr->estimated_initial_execution_time;

      guid.length (sc_qos_ptr->guid.length ());
      guid_copy (guid, sc_qos_ptr->guid);

      ACE_DEBUG ((LM_DEBUG,
                  "(%t|%T): Criticality = %d in recvd service context\n",
                  criticality));
    }

  MUF_Scheduler_Traits::QoSDescriptor_t qos;
  qos.deadline_ =   qos.criticality_ = criticality;
  qos.deadline_ = deadline;
  qos.exec_time_ = exec_time;
  this->kokyu_dispatcher_->schedule (guid, qos);
}

void
MUF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri
                                  ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
  receive_reply (ri ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
MUF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri
                              ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException,
                   PortableInterceptor::ForwardRequest))
{
  receive_reply (ri ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
MUF_Scheduler::cancel (const RTScheduling::Current::IdType &
                       ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW (CORBA::NO_IMPLEMENT ());
}

CORBA::PolicyList*
MUF_Scheduler::scheduling_policies (ACE_ENV_SINGLE_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (), 0);
}

void
MUF_Scheduler::scheduling_policies (const CORBA::PolicyList &
                                    ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW (CORBA::NO_IMPLEMENT ());
}

CORBA::PolicyList*
MUF_Scheduler::poa_policies (ACE_ENV_SINGLE_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (), 0);
}

char *
MUF_Scheduler::scheduling_discipline_name (ACE_ENV_SINGLE_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (), 0);
}

RTScheduling::ResourceManager_ptr
MUF_Scheduler::create_resource_manager (const char *,
                                        CORBA::Policy_ptr
                                        ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (), 0);
}

void
MUF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
                                         const char *,
                                         CORBA::Policy_ptr
                                         ACE_ENV_ARG_DECL)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_THROW (CORBA::NO_IMPLEMENT ());
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>;
template class Kokyu::DSRT_Dispatcher<MUF_Scheduler_Traits>;
template class Kokyu::DSRT_Dispatcher_Impl<MUF_Scheduler_Traits>;
template class Kokyu::DSRT_Direct_Dispatcher_Impl<MUF_Scheduler_Traits>;
template class Kokyu::DSRT_CV_Dispatcher_Impl<MUF_Scheduler_Traits>;
template class Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>;
template class Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>;
template class Kokyu::Sched_Ready_Queue<MUF_Scheduler_Traits, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>;

template class ACE_Hash_Map_Manager_Ex<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> > *, Kokyu::Sched_Ready_Queue<MUF_Scheduler_Traits, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>::Guid_Hash, ACE_Equal_To<MUF_Scheduler_Traits::Guid_t>, ACE_Null_Mutex>;

template class ACE_RB_Tree<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>;

template class ACE_RB_Tree_Iterator<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>;

template class ACE_Hash_Map_Entry<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> > *>;

template class ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> >;

template class Kokyu::MUF_Comparator<MUF_Scheduler_Traits::QoSDescriptor_t>;

template class ACE_Hash_Map_Iterator_Base_Ex<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> > *, Kokyu::Sched_Ready_Queue<MUF_Scheduler_Traits, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>::Guid_Hash, ACE_Equal_To<MUF_Scheduler_Traits::Guid_t>, ACE_Null_Mutex>;

template class ACE_RB_Tree_Reverse_Iterator<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>;

template class ACE_RB_Tree_Iterator_Base<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>;

#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>
#pragma instantiate Kokyu::DSRT_Dispatcher<MUF_Scheduler_Traits>
#pragma instantiate Kokyu::DSRT_Dispatcher_Impl<MUF_Scheduler_Traits>
#pragma instantiate Kokyu::DSRT_Direct_Dispatcher_Impl<MUF_Scheduler_Traits>
#pragma instantiate Kokyu::DSRT_CV_Dispatcher_Impl<MUF_Scheduler_Traits>
#pragma instantiate Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>
#pragma instantiate Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>

#pragma instantiate Kokyu::Sched_Ready_Queue<MUF_Scheduler_Traits, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>


#pragma instantiate ACE_Hash_Map_Manager_Ex<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> > *, Kokyu::Sched_Ready_Queue<MUF_Scheduler_Traits, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>::Guid_Hash, ACE_Equal_To<MUF_Scheduler_Traits::Guid_t>, ACE_Null_Mutex>

#pragma instantiate ACE_RB_Tree<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>

#pragma instantiate ACE_RB_Tree_Iterator<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>

ACE_Hash_Map_Entry<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Koky\
u::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<\
MUF_Scheduler_Traits> >

#pragma instantiate ACE_Hash_Map_Entry<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> > *>

#pragma instantiate ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> >

#pragma instantiate Kokyu::MUF_Comparator<MUF_Scheduler_Traits::QoSDescriptor_t>

#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<MUF_Scheduler_Traits::Guid_t, ACE_RB_Tree_Node<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits> > *, Kokyu::Sched_Ready_Queue<MUF_Scheduler_Traits, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>::Guid_Hash, ACE_Equal_To<MUF_Scheduler_Traits::Guid_t>, ACE_Null_Mutex>

#pragma instantiate ACE_RB_Tree_Reverse_Iterator<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>

#pragma instantiate ACE_RB_Tree_Iterator_Base<Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::DSRT_Dispatch_Item_var<MUF_Scheduler_Traits>, Kokyu::Comparator_Adapter_Generator<MUF_Scheduler_Traits>::MoreEligible, ACE_Null_Mutex>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?