mif_scheduler.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 725 行 · 第 1/2 页
CPP
725 行
//MIF_Scheduler.cpp,v 1.4 2003/09/22 15:56:34 yamuna Exp
#include "MIF_Scheduler.h"
#include "ace/Atomic_Op.h"
#include "tao/RTScheduling/Request_Interceptor.h"
#include "test.h"
ACE_Atomic_Op<TAO_SYNCH_MUTEX, long> server_guid_counter;
DT::DT (TAO_SYNCH_MUTEX &lock,
int guid)
: dt_cond_ (lock),
guid_ (guid),
eligible_ (0)
{
}
void
DT::suspend (void)
{
eligible_ = 0;
while (!eligible_)
this->dt_cond_.wait ();
}
void
DT::resume (void)
{
eligible_ = 1;
this->dt_cond_.signal ();
}
CORBA::Short
Segment_Sched_Param_Policy::importance (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
return this->importance_;
}
void
Segment_Sched_Param_Policy::importance (CORBA::Short importance
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
this->importance_ = importance;
}
MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb)
: wait_cond_ (lock_),
wait_ (0)
{
ACE_TRY_NEW_ENV
{
CORBA::Object_var object =
orb->resolve_initial_references ("RTScheduler_Current"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
this->current_ =
RTScheduling::Current::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
wait_ = 0;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Caught exception:");
}
ACE_ENDTRY;
}
MIF_Scheduler::~MIF_Scheduler (void)
{
}
void
MIF_Scheduler::incr_thr_count (void)
{
lock_.acquire ();
wait_++;
lock_.release ();
}
void
MIF_Scheduler::wait (void)
{
lock_.acquire ();
while (wait_ > 0)
wait_cond_.wait ();
ACE_DEBUG ((LM_DEBUG,
"After Wait %d\n",
wait_));
lock_.release ();
}
void
MIF_Scheduler::resume_main (void)
{
wait_--;
wait_cond_.signal ();
}
MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr
MIF_Scheduler::create_segment_scheduling_parameter (CORBA::Short importance
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr segment_policy;
ACE_NEW_THROW_EX (segment_policy,
Segment_Sched_Param_Policy,
CORBA::NO_MEMORY (
CORBA::SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
ENOMEM),
CORBA::COMPLETED_NO));
segment_policy->importance (importance);
return segment_policy;
}
void
MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
const char *,
CORBA::Policy_ptr sched_policy,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
int count;
ACE_OS::memcpy (&count,
this->current_->id ()->get_buffer (),
this->current_->id ()->length ());
MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
CORBA::Short desired_priority = sched_param->importance (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"%t MIF_Scheduler::begin_scheduling_segment - Importance %d\n",
desired_priority));
if (desired_priority != 100)
{
//NOT Main Thread
DT* new_dt;
ACE_NEW (new_dt,
DT (this->lock_,
count));
new_dt->msg_priority (desired_priority);
lock_.acquire ();
ready_que_.enqueue_prio (new_dt);
resume_main ();
new_dt->suspend ();
lock_.release ();
}
}
void
MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *name,
CORBA::Policy_ptr sched_param,
CORBA::Policy_ptr implicit_sched_param
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
this->begin_new_scheduling_segment (guid,
name,
sched_param,
implicit_sched_param
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
const char* /*name*/,
CORBA::Policy_ptr sched_policy,
CORBA::Policy_ptr /*implicit_sched_param*/
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
RTScheduling::Current::UNSUPPORTED_SCHEDULING_DISCIPLINE))
{
int count;
ACE_OS::memcpy (&count,
this->current_->id ()->get_buffer (),
this->current_->id ()->length ());
MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
CORBA::Short desired_priority = sched_param->importance (ACE_ENV_SINGLE_ARG_PARAMETER);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"%t MIF_Scheduler::update_scheduling_segment - Importance %d\n",
desired_priority));
DT* new_dt;
ACE_NEW (new_dt,
DT (this->lock_,
count));
new_dt->msg_priority (desired_priority);
if (ready_que_.message_count () > 0)
{
DT* run_dt;
ACE_Message_Block* msg;
ready_que_.dequeue_head (msg);
run_dt = ACE_dynamic_cast (DT*, msg);
if ((desired_priority == 100) || run_dt->msg_priority () >= (unsigned int)desired_priority)
{
ready_que_.enqueue_prio (new_dt);
lock_.acquire ();
run_dt->resume ();
new_dt->suspend ();
lock_.release ();
}
else
{
ready_que_.enqueue_prio (run_dt);
delete new_dt;
}
}
else delete new_dt;
}
void
MIF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
const char *
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
int count;
ACE_OS::memcpy (&count,
guid.get_buffer (),
guid.length ());
ACE_DEBUG ((LM_DEBUG,
"MIF_Scheduler::end_scheduling_segment %d\n",
count));
if (ready_que_.message_count () > 0)
{
DT* run_dt;
ACE_Message_Block* msg;
ready_que_.dequeue_head (msg);
run_dt = ACE_dynamic_cast (DT*, msg);
lock_.acquire ();
run_dt->resume ();
lock_.release ();
}
}
void
MIF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
const char *,
CORBA::Policy_ptr
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
}
void
MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr request_info
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException,
PortableInterceptor::ForwardRequest))
{
MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (current_->scheduling_parameter (ACE_ENV_SINGLE_ARG_PARAMETER));
ACE_CHECK;
IOP::ServiceContext* srv_con = new IOP::ServiceContext;
srv_con->context_id = Client_Interceptor::SchedulingInfo;
int guid_length = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER)->length ();
ACE_CHECK;
RTScheduling::Current::IdType* guid = current_->id (ACE_ENV_SINGLE_ARG_PARAMETER);
CORBA::Octet *seq_buf = CORBA::OctetSeq::allocbuf (sizeof (guid_length));
ACE_OS::memcpy (seq_buf,
guid->get_buffer (),
guid_length);
int cxt_data_length = sizeof (int) + guid_length;
srv_con->context_data.length (cxt_data_length);
int i = 0;
for (;i < guid_length;i++)
{
srv_con->context_data [i] = seq_buf [i];
}
int importance = sched_param_var->importance ();
CORBA::Octet *int_buf = CORBA::OctetSeq::allocbuf (sizeof (int));
ACE_OS::memcpy (int_buf,
&importance,
sizeof (int));
int j = 0;
for (;i < cxt_data_length;i++)
{
srv_con->context_data [i] = int_buf [j++];
}
request_info->add_request_service_context (*srv_con,
0
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
lock_.acquire ();
if (ready_que_.message_count () > 0)
{
int priority;
ACE_hthread_t current;
ACE_Thread::self (current);
if (ACE_Thread::getprio (current, priority) == -1)
return;
ACE_DEBUG ((LM_DEBUG,
"Initial thread priority is %d %d\n",
priority,
ACE_DEFAULT_THREAD_PRIORITY));
current_->the_priority (priority + 1
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_Thread::self (current);
if (ACE_Thread::getprio (current, priority) == -1)
return;
ACE_DEBUG ((LM_DEBUG,
"Bumped thread priority is %d\n",
priority));
DT* run_dt;
ACE_Message_Block* msg;
ready_que_.dequeue_head (msg);
run_dt = ACE_dynamic_cast (DT*, msg);
run_dt->resume ();
}
lock_.release ();
}
void
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?