dt_creator.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 677 行 · 第 1/2 页
CPP
677 行
} /* while */
ACE_DEBUG ((LM_DEBUG,
"Activated Job List\n"));
}
void
DT_Creator::activate_schedule (ACE_ENV_SINGLE_ARG_DECL)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Activating schedule, task count = %d\n",
dt_count_));
Thread_Task* task;
for (int i = 0; i < dt_count_; ++i)
{
task = dt_list_[i];
if (task->dist ())
{
// resolve the object from the naming service
CosNaming::Name name (1);
name.length (1);
name[0].id = CORBA::string_dup (task->job ());
CORBA::Object_var obj =
this->naming_->resolve (name ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
Job_var job = Job::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// if (TAO_debug_level > 0)
// {
// Check that the object is configured with some
// PriorityModelPolicy.
CORBA::Policy_var policy =
job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
RTCORBA::PriorityModelPolicy_var priority_policy =
RTCORBA::PriorityModelPolicy::_narrow (policy.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (CORBA::is_nil (priority_policy.in ()))
ACE_DEBUG ((LM_DEBUG,
"ERROR: Priority Model Policy not exposed!\n"));
else
{
/*
RTCORBA::PriorityModel priority_model =
priority_policy->priority_model (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
if (priority_model == RTCORBA::CLIENT_PROPAGATED)
ACE_DEBUG ((LM_DEBUG,
"%s priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
else
ACE_DEBUG ((LM_DEBUG,
"%s priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
*/
}
//} /* if (TAO_debug_level > 0) */
task->job (job.in ());
}
}
if (TAO_debug_level > 0 && dt_count_ > 0)
ACE_DEBUG ((LM_DEBUG,
"Activated schedule, task count = %d\n",
dt_count_));
}
int
DT_Creator::resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL)
{
CORBA::Object_var naming_obj =
this->orb_->resolve_initial_references ("NameService"
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
// Need to check return value for errors.
if (CORBA::is_nil (naming_obj.in ()))
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to resolve the Naming Service.\n"),
-1);
this->naming_ =
CosNaming::NamingContextExt::_narrow (naming_obj.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
//@@tmp hack, otherwise crashes on exit!..??
CosNaming::NamingContextExt::_duplicate (this->naming_.in());
return 0;
}
void
DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current
ACE_ENV_ARG_DECL)
{
current_ = RTScheduling::Current::_duplicate (current);
long flags;
flags = THR_NEW_LWP | THR_JOINABLE;
flags |=
orb_->orb_core ()->orb_params ()->scope_policy () |
orb_->orb_core ()->orb_params ()->sched_policy ();
ACE_DEBUG ((LM_DEBUG,
"Waiting to Synch\n"));
while (!this->synch ()->synched ())
{
this->orb_->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
CORBA::Policy_var sched_param;
sched_param = CORBA::Policy::_duplicate (this->sched_param (100));
const char * name = 0;
current_->begin_scheduling_segment (name,
sched_param.in (),
sched_param.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_NEW (base_time_,
ACE_Time_Value (*(this->synch ()->base_time ())));
for (int i = 0; i < this->dt_count_; i++)
{
ACE_Time_Value now (ACE_OS::gettimeofday ());
ACE_Time_Value elapsed_time = now - *base_time_;
char buf [BUFSIZ];
ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
(int) elapsed_time.sec (),
(int) now.sec (),
(int) base_time_->sec());
log [log_index++] = ACE_OS::strdup (buf) ;
ACE_hthread_t curr_thr;
ACE_Thread::self (curr_thr);
if (dt_list_ [i]->start_time () != 0 && (elapsed_time.sec () < dt_list_[i]->start_time ()))
{
int suspension_time = dt_list_[i]->start_time () - elapsed_time.sec ();
ACE_OS::sprintf (buf,"suspension_tome = %d\n",
suspension_time);
log [log_index++] = ACE_OS::strdup (buf);
yield (suspension_time,
dt_list_[i]);
}
sched_param = CORBA::Policy::_duplicate (this->sched_param (dt_list_ [i]->importance ()));
dt_list_ [i]->activate_task (current,
sched_param.in (),
flags,
base_time_
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
this->wait ();
current_->end_scheduling_segment (name
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->check_ifexit ();
}
void
DT_Creator::dt_ended (void)
{
{
ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
--active_dt_count_;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Active dt count = %d\n",active_dt_count_));
char buf [BUFSIZ];
ACE_OS::sprintf (buf,"Active dt count = %d\n",active_dt_count_);
log [log_index++] = ACE_OS::strdup (buf);
}
this->check_ifexit ();
}
void
DT_Creator::job_ended (void)
{
{
ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
--active_job_count_;
char buf [BUFSIZ];
ACE_OS::sprintf (buf,"Active job count = %d\n",active_job_count_);
log [log_index++] = ACE_OS::strdup (buf);
}
this->check_ifexit ();
}
void
DT_Creator::check_ifexit (void)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Checking exit status Job# = %d DT# = %d\n",
active_job_count_,
active_dt_count_));
static int shutdown = 0;
{
ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_);
if (!shutdown)
{
// All tasks have finished and all jobs have been shutdown.
if (active_dt_count_ == 0 && active_job_count_ == 0)
{
ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));
/*
for (int i = 0; i < dt_count_; i++)
{
dt_list_[i]->dump_stats ();
}
for (int i = 0; i < job_count_; i ++)
{
job_list_[i]->dump_stats ();
}
*/
TASK_STATS::instance ()->dump_samples (file_name_,
"#Schedule Output",
ACE_High_Res_Timer::global_scale_factor ());
shutdown = 1;
FILE* log_file = ACE_OS::fopen (log_file_name_, "w");
if (log_file != NULL)
{
// first dump what the caller has to say.
ACE_OS::fprintf (log_file, "Log File\n");
for (int i = 0; i < log_index; i++)
{
ACE_OS::fprintf (log_file, "%s\n", log [i]);
}
ACE_OS::fclose (log_file);
}
ACE_DEBUG ((LM_DEBUG,
"Log File Ready\n"));
}
}
}
}
int
DT_Creator::dt_count (void)
{
return dt_count_;
}
DT_Creator::~DT_Creator (void)
{
// for (int i = 0; i < (BUFSIZ * 100); i++)
delete[] log;
delete[] dt_list_;
delete[] poa_list_;
delete[] job_list_;
delete base_time_;
delete state_lock_;
delete shutdown_lock_;
}
void
DT_Creator::log_msg (char* msg)
{
log [log_index++] = ACE_OS::strdup (msg);
}
CORBA::ORB_ptr
DT_Creator::orb (void)
{
return this->orb_.in ();
}
void
DT_Creator::orb (CORBA::ORB_ptr orb)
{
this->orb_ = CORBA::ORB::_duplicate (orb);
}
ACE_Time_Value*
DT_Creator::base_time (void)
{
return this->base_time_;
}
void
DT_Creator::base_time (ACE_Time_Value* base_time)
{
this->base_time_ = base_time;
}
RTScheduling::Current_ptr
DT_Creator::current (void)
{
return current_.in ();
}
Synch_i*
DT_Creator::synch (void)
{
return this->synch_;
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?