mif_scheduler.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 725 行 · 第 1/2 页
CPP
725 行
MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr request_info,
RTScheduling::Current::IdType_out guid_out,
CORBA::String_out,
CORBA::Policy_out sched_param_out,
CORBA::Policy_out /*implicit_sched_param*/
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"MIF_Scheduler::receive_request\n"));
IOP::ServiceContext* serv_cxt =
request_info->get_request_service_context (Server_Interceptor::SchedulingInfo);
if (serv_cxt != 0)
{
ACE_DEBUG ((LM_DEBUG,
"Got scheduling info\n"));
RTScheduling::Current::IdType* guid;
ACE_NEW (guid,
RTScheduling::Current::IdType);
guid->length (sizeof(long));
ACE_OS::memcpy (guid->get_buffer (),
serv_cxt->context_data.get_buffer (),
sizeof (long));
int gu_id;
ACE_OS::memcpy (&gu_id,
guid->get_buffer (),
guid->length ());
ACE_DEBUG ((LM_DEBUG,
"MIF_Scheduler::receive_request %d\n",
gu_id));
CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (long));
int i = sizeof (long);
for (unsigned int j = 0;j < sizeof (int);j++)
{
int_buf [j] = serv_cxt->context_data [i++];
}
int importance;
ACE_OS::memcpy (&importance,
int_buf,
sizeof (int));
guid_out.ptr () = guid;
sched_param_out.ptr () = DT_TEST::instance ()->scheduler ()->create_segment_scheduling_parameter (importance);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"%t The Guid is %d Importance is %d\n",
gu_id,
importance));
DT* new_dt;
ACE_NEW (new_dt,
DT (this->lock_,
gu_id));
new_dt->msg_priority (importance);
lock_.acquire ();
ready_que_.enqueue_prio (new_dt);
new_dt->suspend ();
lock_.release ();
}
}
void
MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
int count;
ACE_OS::memcpy (&count,
guid->get_buffer (),
guid->length ());
ACE_DEBUG ((LM_DEBUG,
"MIF_Scheduler::send_reply %d\n",
count));
if (ready_que_.message_count () > 0)
{
DT* run_dt;
ACE_Message_Block* msg;
ready_que_.dequeue_head (msg);
run_dt = ACE_dynamic_cast (DT*, msg);
lock_.acquire ();
run_dt->resume ();
lock_.release ();
}
}
void
MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
if (ready_que_.message_count () > 0)
{
DT* run_dt;
ACE_Message_Block* msg;
ready_que_.dequeue_head (msg);
run_dt = ACE_dynamic_cast (DT*, msg);
lock_.acquire ();
run_dt->resume ();
lock_.release ();
}
}
void
MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
if (TAO_debug_level > 0)
{
RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
int count;
ACE_OS::memcpy (&count,
guid->get_buffer (),
guid->length ());
ACE_DEBUG ((LM_DEBUG,
"MIF_Scheduler::send_other %d\n",
count));
}
if (ready_que_.message_count () > 0)
{
DT* run_dt;
ACE_Message_Block* msg;
ready_que_.dequeue_head (msg);
run_dt = ACE_dynamic_cast (DT*, msg);
lock_.acquire ();
run_dt->resume ();
lock_.release ();
}
}
void
MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
}
void
MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
ACE_CHECK;
int importance = sched_param_var->importance ();
CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
ACE_OS::memcpy (int_buf,
&importance,
sizeof (int));
RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
int gu_id;
ACE_OS::memcpy (&gu_id,
guid->get_buffer (),
guid->length ());
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"MIF_Scheduler::receive_reply Guid = %d Imp = %d\n",
gu_id,
importance));
DT* new_dt;
ACE_NEW (new_dt,
DT (this->lock_,
gu_id));
new_dt->msg_priority (importance);
lock_.acquire ();
ready_que_.enqueue_prio (new_dt);
int priority;
ACE_hthread_t current;
ACE_Thread::self (current);
if (ACE_Thread::getprio (current, priority) == -1)
return;
current_->the_priority (priority - 1
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
new_dt->suspend ();
lock_.release ();
}
void
MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
ACE_CHECK;
int importance = sched_param_var->importance ();
CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
ACE_OS::memcpy (int_buf,
&importance,
sizeof (int));
RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
int gu_id;
ACE_OS::memcpy (&gu_id,
guid->get_buffer (),
guid->length ());
DT* new_dt;
ACE_NEW (new_dt,
DT (this->lock_,
gu_id));
new_dt->msg_priority (importance);
lock_.acquire ();
ready_que_.enqueue_prio (new_dt);
int priority;
ACE_hthread_t current;
ACE_Thread::self (current);
if (ACE_Thread::getprio (current, priority) == -1)
return;
current_->the_priority (priority - 1
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
new_dt->suspend ();
lock_.release ();
}
void
MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
ACE_CHECK;
int importance = sched_param_var->importance ();
CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
ACE_OS::memcpy (int_buf,
&importance,
sizeof (int));
RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
int gu_id;
ACE_OS::memcpy (&gu_id,
guid->get_buffer (),
guid->length ());
DT* new_dt;
ACE_NEW (new_dt,
DT (this->lock_,
gu_id));
new_dt->msg_priority (importance);
lock_.acquire ();
ready_que_.enqueue_prio (new_dt);
int priority;
ACE_hthread_t current;
ACE_Thread::self (current);
if (ACE_Thread::getprio (current, priority) == -1)
return;
current_->the_priority (priority - 1
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
new_dt->suspend ();
lock_.release ();
}
void
MIF_Scheduler::cancel (const RTScheduling::Current::IdType &
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
CORBA::PolicyList*
MIF_Scheduler::scheduling_policies (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
return 0;
}
void
MIF_Scheduler::scheduling_policies (const CORBA::PolicyList &
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
CORBA::PolicyList*
MIF_Scheduler::poa_policies (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
return 0;
}
char *
MIF_Scheduler::scheduling_discipline_name (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
return 0;
}
RTScheduling::ResourceManager_ptr
MIF_Scheduler::create_resource_manager (const char *,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
return 0;
}
void
MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
const char *,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?