muf_scheduler.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 759 行 · 第 1/2 页
CPP
759 行
//MUF_Scheduler.cpp,v 1.5 2003/12/08 00:33:42 bala Exp
#include "MUF_Scheduler.h"
#include "Kokyu_qosC.h"
#include "utils.h"
#include "tao/RTScheduling/Request_Interceptor.h"
#include "tao/ORB_Constants.h"
MUF_Scheduling::SchedulingParameter
MUF_Sched_Param_Policy::value (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
return this->value_;
}
void
MUF_Sched_Param_Policy::value (const MUF_Scheduling::SchedulingParameter& value ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
this->value_ = value;
}
MUF_Scheduler::MUF_Scheduler (CORBA::ORB_ptr orb,
Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type,
int ace_sched_policy,
int ace_sched_scope)
: orb_ (orb),
disp_impl_type_ (disp_impl_type),
ace_sched_policy_ (ace_sched_policy),
ace_sched_scope_ (ace_sched_scope)
{
ACE_DECLARE_NEW_ENV;
Kokyu::DSRT_ConfigInfo config;
config.impl_type_ = this->disp_impl_type_;
config.sched_policy_ = ace_sched_policy_;
config.sched_scope_ = ace_sched_scope_;
Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>::DSRT_Dispatcher_Auto_Ptr
tmp( Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>::
create_DSRT_dispatcher (config) );
kokyu_dispatcher_ = tmp;
CORBA::Object_var object =
orb->resolve_initial_references ("RTScheduler_Current"
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->current_ =
RTScheduling::Current::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
IOP::CodecFactory_var codec_factory;
CORBA::Object_var obj =
orb->resolve_initial_references ("CodecFactory"
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (CORBA::is_nil(obj.in ()))
{
ACE_ERROR ((LM_ERROR, "Nil Codec factory\n"));
}
else
{
codec_factory = IOP::CodecFactory::_narrow (obj.in ());
}
IOP::Encoding encoding;
encoding.format = IOP::ENCODING_CDR_ENCAPS;
encoding.major_version = 1;
encoding.minor_version = 2;
codec_ = codec_factory->create_codec (encoding);
}
MUF_Scheduler::~MUF_Scheduler (void)
{
}
void
MUF_Scheduler::shutdown (void)
{
kokyu_dispatcher_->shutdown ();
ACE_DEBUG ((LM_DEBUG, "kokyu DSRT dispatcher shutdown\n"));
}
MUF_Scheduling::SchedulingParameterPolicy_ptr
MUF_Scheduler::create_scheduling_parameter (const MUF_Scheduling::SchedulingParameter & value ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
MUF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy;
ACE_NEW_THROW_EX (sched_param_policy,
MUF_Sched_Param_Policy,
CORBA::NO_MEMORY (
CORBA::SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
ENOMEM),
CORBA::COMPLETED_NO));
sched_param_policy->value (value);
return sched_param_policy;
}
void
MUF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *,
CORBA::Policy_ptr sched_policy,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):MUF_Scheduler::begin_new_scheduling_segment enter\n"));
#endif
#ifdef KOKYU_DSRT_LOGGING
int int_guid;
ACE_OS::memcpy (&int_guid,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG, "(%t|%T): guid is %d\n", int_guid));
#endif
MUF_Scheduler_Traits::QoSDescriptor_t qos;
MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
qos.deadline_ = sched_param->deadline;
qos.exec_time_ = sched_param->estimated_initial_execution_time;
qos.criticality_ = sched_param->criticality;
kokyu_dispatcher_->schedule (guid, qos);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):MUF_Scheduler::begin_new_scheduling_segment exit\n"));
#endif
}
void
MUF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *name,
CORBA::Policy_ptr sched_param,
CORBA::Policy_ptr implicit_sched_param
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
this->begin_new_scheduling_segment (guid,
name,
sched_param,
implicit_sched_param
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
MUF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType& guid,
const char* name,
CORBA::Policy_ptr sched_policy,
CORBA::Policy_ptr implicit_sched_param
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
ACE_UNUSED_ARG ((name));
ACE_UNUSED_ARG ((implicit_sched_param));
#ifdef KOKYU_DSRT_LOGGING
int int_guid ;
ACE_OS::memcpy (&int_guid,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG, "(%t|%T): update_sched_seg::guid is %d\n", int_guid));
#endif
MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
MUF_Scheduler_Traits::QoSDescriptor_t qos;
qos.deadline_ = sched_param->deadline;
qos.exec_time_ = sched_param->estimated_initial_execution_time;
qos.criticality_ = sched_param->criticality;
kokyu_dispatcher_->update_schedule (guid, qos);
}
void
MUF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
#ifdef KOKYU_DSRT_LOGGING
int int_guid;
ACE_OS::memcpy (&int_guid,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid));
#endif
kokyu_dispatcher_->cancel_schedule (guid);
}
void
MUF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
const char *,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
void
MUF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
CORBA::String_var operation = ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T): send_request "
"from \"%s\"\n",
operation.in ()));
#endif
// Make the context to send the context to the target
IOP::ServiceContext sc;
sc.context_id = Client_Interceptor::SchedulingInfo;
CORBA::Policy_ptr sched_policy =
this->current_->scheduling_parameter(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
/*
int guid;
ACE_OS::memcpy (&guid,
this->current_->id ()->get_buffer (),
this->current_->id ()->length ());
*/
CORBA::Long criticality;
TimeBase::TimeT deadline,exec_time;
if (CORBA::is_nil (sched_policy))
{
//24 hrs from now - infinity
ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
deadline = deadline_tv.sec () * 1000000 + deadline_tv.usec () * 10; //100s of nanoseconds for TimeBase::TimeT
exec_time = 0;
criticality = 0;
}
else
{
MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
deadline = sched_param->deadline;
exec_time = sched_param->estimated_initial_execution_time;
criticality = sched_param->criticality;
#ifdef KOKYU_DSRT_LOGGING
int int_guid;
ACE_OS::memcpy (&int_guid,
this->current_->id ()->get_buffer (),
this->current_->id ()->length ());
ACE_DEBUG ((LM_DEBUG,
"(%t|%T): send_request guid = %d\n",
int_guid));
#endif
//Fill the guid in the SC Qos struct
sc_qos.guid.length (this->current_->id ()->length ());
guid_copy (sc_qos.guid, *(this->current_->id ()));
sc_qos.deadline = deadline;
sc_qos.estimated_initial_execution_time = exec_time;
sc_qos.criticality = criticality;
CORBA::Any sc_qos_as_any;
sc_qos_as_any <<= sc_qos;
sc.context_data =
ACE_reinterpret_cast(CORBA::OctetSeq &,
*codec_->encode (sc_qos_as_any));
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
#endif
// Add this context to the service context list.
ri->add_request_service_context (sc, 0 ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("(%t|%T): send_request : ")
ACE_LIB_TEXT ("about to call scheduler to inform block\n")
));
#endif
kokyu_dispatcher_->update_schedule (*(this->current_->id ()),
Kokyu::BLOCK);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("(%t|%T): send_request interceptor done\n")));
#endif
}
void
MUF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri,
RTScheduling::Current::IdType_out guid_out,
CORBA::String_out /*name*/,
CORBA::Policy_out sched_param_out,
CORBA::Policy_out /*implicit_sched_param_out*/
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t|%T):entered MUF_Scheduler::receive_request\n"));
#endif
RTScheduling::Current::IdType guid;
CORBA::String_var operation = ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T): receive_request from "
"\"%s\"\n",
operation.in ()));
#endif
// Ignore the "_is_a" operation since it may have been invoked
// locally on the server side as a side effect of another call,
// meaning that the client hasn't added the service context yet.
if (ACE_OS::strcmp ("_is_a", operation.in ()) == 0)
return;
IOP::ServiceContext_var sc =
ri->get_request_service_context (Server_Interceptor::SchedulingInfo
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
CORBA::Long criticality;
TimeBase::TimeT deadline,exec_time;
if (sc.ptr () == 0)
{
//24 hrs from now - infinity
ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
exec_time = 0;
criticality = 0;
}
else
{
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?