fp_scheduler.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 718 行 · 第 1/2 页
CPP
718 行
//FP_Scheduler.cpp,v 1.6 2003/12/08 00:33:41 bala Exp
#include "FP_Scheduler.h"
#include "Kokyu_qosC.h"
#include "utils.h"
#include "ORB_Constants.h"
#include "tao/RTScheduling/Request_Interceptor.h"
FP_Scheduling::SegmentSchedulingParameter
FP_Segment_Sched_Param_Policy::value (
ACE_ENV_SINGLE_ARG_DECL_NOT_USED
)
ACE_THROW_SPEC ((
CORBA::SystemException
))
{
return value_;
}
void
FP_Segment_Sched_Param_Policy::value (
const FP_Scheduling::SegmentSchedulingParameter & value
ACE_ENV_ARG_DECL_NOT_USED
)
ACE_THROW_SPEC ((
CORBA::SystemException
))
{
this->value_ = value;
}
Fixed_Priority_Scheduler::Fixed_Priority_Scheduler (CORBA::ORB_ptr orb,
Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type,
int ace_sched_policy,
int ace_sched_scope)
: orb_ (orb),
disp_impl_type_ (disp_impl_type),
ace_sched_policy_ (ace_sched_policy),
ace_sched_scope_ (ace_sched_scope)
{
ACE_DECLARE_NEW_ENV;
Kokyu::DSRT_ConfigInfo config;
config.impl_type_ = this->disp_impl_type_;
config.sched_policy_ = ace_sched_policy_;
config.sched_scope_ = ace_sched_scope_;
Kokyu::DSRT_Dispatcher_Factory<FP_Scheduler_Traits>::DSRT_Dispatcher_Auto_Ptr
tmp( Kokyu::DSRT_Dispatcher_Factory<FP_Scheduler_Traits>::
create_DSRT_dispatcher (config) );
kokyu_dispatcher_ = tmp;
CORBA::Object_var object =
orb->resolve_initial_references ("RTScheduler_Current"
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->current_ =
RTScheduling::Current::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
IOP::CodecFactory_var codec_factory;
CORBA::Object_var obj = orb->resolve_initial_references ("CodecFactory"
ACE_ENV_ARG_PARAMETER);
if (CORBA::is_nil(obj.in ()))
{
ACE_ERROR ((LM_ERROR, "Nil Codec factory\n"));
}
else
{
codec_factory = IOP::CodecFactory::_narrow (obj.in ());
}
IOP::Encoding encoding;
encoding.format = IOP::ENCODING_CDR_ENCAPS;
encoding.major_version = 1;
encoding.minor_version = 2;
codec_ = codec_factory->create_codec (encoding);
}
Fixed_Priority_Scheduler::~Fixed_Priority_Scheduler (void)
{
// delete kokyu_dispatcher_;
}
void
Fixed_Priority_Scheduler::shutdown (void)
{
kokyu_dispatcher_->shutdown ();
ACE_DEBUG ((LM_DEBUG, "kokyu DSRT dispatcher shutdown\n"));
}
FP_Scheduling::SegmentSchedulingParameterPolicy_ptr
Fixed_Priority_Scheduler::create_segment_scheduling_parameter (
const FP_Scheduling::SegmentSchedulingParameter & value
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
CORBA::SystemException
))
{
FP_Scheduling::SegmentSchedulingParameterPolicy_ptr
segment_sched_param_policy;
ACE_NEW_THROW_EX (segment_sched_param_policy,
FP_Segment_Sched_Param_Policy,
CORBA::NO_MEMORY (
CORBA::SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
ENOMEM),
CORBA::COMPLETED_NO));
segment_sched_param_policy->value (value);
return segment_sched_param_policy;
}
void
Fixed_Priority_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType& guid,
const char *,
CORBA::Policy_ptr sched_policy,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):FP_Scheduler::begin_new_scheduling_segment enter\n"));
#endif
#ifdef KOKYU_DSRT_LOGGING
int int_guid;
ACE_OS::memcpy (&int_guid,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG, "(%t|%T): guid is %d\n", int_guid));
#endif
FP_Scheduler_Traits::QoSDescriptor_t qos;
FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
FP_Scheduling::SegmentSchedulingParameter sched_param = sched_param_policy->value ();
RTCORBA::Priority desired_priority = sched_param.base_priority;
qos.priority_ = desired_priority;
kokyu_dispatcher_->schedule (guid, qos);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):FP_Scheduler::begin_new_scheduling_segment exit\n"));
#endif
}
void
Fixed_Priority_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *name,
CORBA::Policy_ptr sched_param,
CORBA::Policy_ptr implicit_sched_param
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
this->begin_new_scheduling_segment (guid,
name,
sched_param,
implicit_sched_param
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
Fixed_Priority_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char* name,
CORBA::Policy_ptr sched_policy,
CORBA::Policy_ptr implicit_sched_param
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
ACE_UNUSED_ARG ((name));
ACE_UNUSED_ARG ((implicit_sched_param));
#ifdef KOKYU_DSRT_LOGGING
int int_guid ;
ACE_OS::memcpy (&int_guid,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG, "(%t|%T): update_sched_seg::guid is %d\n", int_guid));
#endif
FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
FP_Scheduling::SegmentSchedulingParameter sched_param =
sched_param_policy->value ();
RTCORBA::Priority desired_priority = sched_param.base_priority;
FP_Scheduler_Traits::QoSDescriptor_t qos;
qos.priority_ = desired_priority;
kokyu_dispatcher_->update_schedule (guid, qos);
}
void
Fixed_Priority_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
#ifdef KOKYU_DSRT_LOGGING
int int_guid;
ACE_OS::memcpy (&int_guid,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid));
#endif
kokyu_dispatcher_->cancel_schedule (guid);
}
void
Fixed_Priority_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
const char *,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
void
Fixed_Priority_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
CORBA::String_var operation = ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T): send_request "
"from \"%s\"\n",
operation.in ()));
#endif
// Make the context to send the context to the target
IOP::ServiceContext sc;
sc.context_id = Client_Interceptor::SchedulingInfo;
CORBA::Policy_ptr sched_policy =
this->current_->scheduling_parameter(ACE_ENV_SINGLE_ARG_PARAMETER);
/*
int guid;
ACE_OS::memcpy (&guid,
this->current_->id ()->get_buffer (),
this->current_->id ()->length ());
*/
RTCORBA::Priority desired_priority;
if (CORBA::is_nil (sched_policy))
{
desired_priority = 0;
}
else
{
FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
FP_Scheduling::SegmentSchedulingParameter sched_param =
sched_param_policy->value ();
desired_priority = sched_param.base_priority;
#ifdef KOKYU_DSRT_LOGGING
int int_guid;
ACE_OS::memcpy (&int_guid,
this->current_->id ()->get_buffer (),
this->current_->id ()->length ());
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t): send_request desired_priority from current = %d, guid = %d\n"),
desired_priority, int_guid));
#endif
//Fill the guid in the SC Qos struct
sc_qos.guid.length (this->current_->id ()->length ());
guid_copy (sc_qos.guid, *(this->current_->id ()));
sc_qos.desired_priority = desired_priority;
CORBA::Any sc_qos_as_any;
sc_qos_as_any <<= sc_qos;
sc.context_data =
ACE_reinterpret_cast (CORBA::OctetSeq &,
*codec_->encode (sc_qos_as_any));
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
#endif
// Add this context to the service context list.
ri->add_request_service_context (sc, 0 ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("(%t|%T): send_request : ")
ACE_LIB_TEXT ("about to call scheduler to inform block\n")
));
#endif
kokyu_dispatcher_->update_schedule (*(this->current_->id ()),
Kokyu::BLOCK);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("(%t|%T): send_request interceptor done\n")));
#endif
}
void
Fixed_Priority_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri,
RTScheduling::Current::IdType_out guid_out,
CORBA::String_out /*name*/,
CORBA::Policy_out sched_param_out,
CORBA::Policy_out /*implicit_sched_param_out*/
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t|%T):entered FP_Scheduler::receive_request\n"));
#endif
RTScheduling::Current::IdType guid;
CORBA::String_var operation = ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T): receive_request from "
"\"%s\"\n",
operation.in ()));
#endif
// Ignore the "_is_a" operation since it may have been invoked
// locally on the server side as a side effect of another call,
// meaning that the client hasn't added the service context yet.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?